Skip to content

Streaming Updates

createTask / Task returns an object that supports streaming updates as the task executes.

createTask is both a Promise and an AsyncIterable. Use for await to stream updates:

import { createTask } from '@requence/task'
const task = createTask({
taskTemplate: 'my-template',
input: { data: [1, 2, 3] },
})
for await (const update of task) {
switch (update.type) {
case 'taskStart':
console.log(`Task ${update.taskId} started`)
break
case 'nodeStart':
console.log(`Node ${update.node.alias ?? update.node.id} started`)
break
case 'nodeUpdate':
console.log(`Node output:`, update.data)
break
case 'nodeError':
console.log(`Node error:`, update.error)
break
case 'nodeEnd':
console.log(`Node ${update.node.alias ?? update.node.id} finished`)
break
case 'taskEnd':
console.log(`Task completed:`, update.context.result)
break
case 'taskError':
console.log(`Task failed:`, update.reason)
break
case 'taskAborted':
console.log(`Task aborted:`, update.reason)
break
}
}

You can also provide an onUpdate callback as the second argument:

const result = await createTask(
{
taskTemplate: 'my-template',
input: {},
},
(update) => {
console.log(`[${update.type}]`, update.timestamp)
},
)

This approach lets you stream updates while still awaiting the final result.

TypeDescription
taskStartTask execution has begun. Contains input and taskId.
nodeStartA node has started processing. Contains node info (id, type, alias).
nodeUpdateA node has produced output. Contains data.
nodeErrorA node encountered an error. Contains error message.
nodeDeferA node has been deferred (e.g., waiting for retry).
nodeEndA node has finished processing.
taskEndThe entire task completed successfully. Contains final result.
taskErrorThe task failed. Contains reason.
taskAbortedThe task was aborted. Contains reason.

Every update includes a context object with the current state of the task:

update.context.input // The task input
update.context.taskId // The task ID
update.context.result // Current accumulated result (partial until taskEnd)
update.context.getNodeData(alias) // Output from a specific node
update.context.getNodeError(alias) // Error from a specific node