Streaming Updates
createTask / Task returns an object that supports streaming updates as the task executes.
Iterating Updates
Section titled “Iterating Updates”createTask is both a Promise and an AsyncIterable. Use for await to stream updates:
import { createTask } from '@requence/task'
const task = createTask({ taskTemplate: 'my-template', input: { data: [1, 2, 3] },})
for await (const update of task) { switch (update.type) { case 'taskStart': console.log(`Task ${update.taskId} started`) break case 'nodeStart': console.log(`Node ${update.node.alias ?? update.node.id} started`) break case 'nodeUpdate': console.log(`Node output:`, update.data) break case 'nodeError': console.log(`Node error:`, update.error) break case 'nodeEnd': console.log(`Node ${update.node.alias ?? update.node.id} finished`) break case 'taskEnd': console.log(`Task completed:`, update.context.result) break case 'taskError': console.log(`Task failed:`, update.reason) break case 'taskAborted': console.log(`Task aborted:`, update.reason) break }}Use the sync.updates generator for synchronous iteration:
from requence.task import Task
task = Task( task_template="my-template", input={"data": [1, 2, 3]},)
for update in task.sync.updates: match update["type"]: case "taskStart": print(f"Task {update['taskId']} started") case "nodeStart": print(f"Node {update['node'].get('alias', update['node']['id'])} started") case "nodeUpdate": print(f"Node output: {update['data']}") case "nodeError": print(f"Node error: {update['error']}") case "taskEnd": print(f"Task completed: {update['context'].result}") case "taskError": print(f"Task failed: {update.get('reason')}") case "taskAborted": print(f"Task aborted: {update.get('reason')}")For async usage, iterate over task.updates:
async for update in task.updates: print(update["type"])Using a Callback (TypeScript)
Section titled “Using a Callback (TypeScript)”You can also provide an onUpdate callback as the second argument:
const result = await createTask( { taskTemplate: 'my-template', input: {}, }, (update) => { console.log(`[${update.type}]`, update.timestamp) },)This approach lets you stream updates while still awaiting the final result.
Update Types
Section titled “Update Types”| Type | Description |
|---|---|
taskStart | Task execution has begun. Contains input and taskId. |
nodeStart | A node has started processing. Contains node info (id, type, alias). |
nodeUpdate | A node has produced output. Contains data. |
nodeError | A node encountered an error. Contains error message. |
nodeDefer | A node has been deferred (e.g., waiting for retry). |
nodeEnd | A node has finished processing. |
taskEnd | The entire task completed successfully. Contains final result. |
taskError | The task failed. Contains reason. |
taskAborted | The task was aborted. Contains reason. |
Context on Updates
Section titled “Context on Updates”Every update includes a context object with the current state of the task:
update.context.input // The task inputupdate.context.taskId // The task IDupdate.context.result // Current accumulated result (partial until taskEnd)update.context.getNodeData(alias) // Output from a specific nodeupdate.context.getNodeError(alias) // Error from a specific nodeupdate["context"].input # The task inputupdate["context"].task_id # The task IDupdate["context"].result # Current accumulated resultupdate["context"].get_node_data(alias) # Output from a specific nodeupdate["context"].get_node_error(alias) # Error from a specific node