Tasks
A Task is a framework-core wrapper any Agent can use to give a unit of work a trackable lifecycle. While the task is active, the framework emits TaskStarted, TaskProgress, TaskCompleted, TaskFailed, and TaskExpired events on a stream — so observers (UIs, watchers, mirrors, test harnesses) can follow along without participating in execution.
Note
Tasks are agent-owned. The framework does not assign or schedule them. Standalone usage requires no observers — events fly past harmlessly if nothing subscribes.
When to Use a Task#
Use a Task whenever a unit of work has a beginning, an end, and observable progress that you want to surface beyond your own function's return value:
- Long-running pipelines where downstream consumers want progress checkpoints.
- HITL approvals where a UI needs to know the task is waiting on a human.
- Test harnesses that assert a sequence of lifecycle events.
For lightweight LLM-driven sub-agent delegation see Sub-task Delegation — that's a different feature that wraps an Agent in a run_subtask tool.
Quick Start#
The async with block opens the lifecycle. On clean exit the task auto-completes with result=None if you didn't call complete() or fail() yourself.
Lifecycle States#
CREATED -> RUNNING -> COMPLETED (terminal, success)
-> FAILED (terminal, exception or explicit fail)
-> EXPIRED (terminal, TTL elapsed)
| State | Meaning |
|---|---|
CREATED | The Task object exists but __aenter__ has not run. task.task_id and task.metadata raise. |
RUNNING | Inside the async with block. Progress events allowed. |
COMPLETED | Reached complete() or clean block exit. |
FAILED | Reached fail() or block exited via exception. |
EXPIRED | TTL elapsed; emitted by an external observer (e.g. a network hub's TTL sweeper). |
The terminal states are immutable — once set, further complete() / fail() / progress() calls are silent no-ops. The set is exported as autogen.beta.task.TERMINAL_TASK_STATES.
API Reference#
Agent.task(...)#
agent.task(
title: str,
*,
description: str = "",
payload: dict[str, Any] | None = None,
capability: str | None = None,
ttl_seconds: int | None = None,
context: ConversationContext | None = None,
) -> Task
| Parameter | Type | Description |
|---|---|---|
title | str | Short objective shown on every event. |
description | str | Optional longer description. |
payload | dict[str, Any] \| None | Initial payload merged into TaskSpec. |
capability | str \| None | Tags the task with a capability name; used by network mirrors. |
ttl_seconds | int \| None | Sets metadata.expires_at. The Task does not self-expire — an external observer must call task.expire() when the TTL elapses. |
context | ConversationContext \| None | If supplied, events flow on context.stream and ag2.task is stamped into context.dependencies for the duration of the block. If omitted, the Task creates a private MemoryStream on entry. |
Returns an unentered Task. Use as async with agent.task(...) as task:.
Task instance methods#
| Method | Description |
|---|---|
await task.progress(payload) | Emits TaskProgress; merges payload into metadata.progress and stamps last_progress_at. No-op if already terminal. |
await task.complete(result=None) | Terminal. Emits TaskCompleted; sets metadata.result and state = COMPLETED. |
await task.fail(error) | Terminal. Accepts a string (wrapped in RuntimeError) or any BaseException. Emits TaskFailed; sets state = FAILED. |
await task.expire() | Terminal. Emits TaskExpired; sets state = EXPIRED. Called by external TTL observers. |
Properties#
| Property | Available before __aenter__? |
|---|---|
task.state | Yes — returns TaskState.CREATED. |
task.task_id | No — raises RuntimeError. |
task.metadata | No — raises RuntimeError. |
task.context | No — raises RuntimeError. |
Bound Context vs. Standalone#
Passing a ConversationContext shares the stream with the rest of your agent's run, so observers and middleware already attached to that stream see the lifecycle events.
Without a context, the Task creates a private stream on entry. Events still fire — but only observers attached to that private stream see them. Useful for one-off background work that doesn't need to surface anywhere.
Auto-Complete and Auto-Fail#
The async with block has these guarantees:
- Clean exit, no terminal call -> auto
complete(result=None). - Exception inside the block -> auto
fail(exc), then the exception propagates. - Already terminal at exit time -> nothing further happens.
Observing the Lifecycle#
Subscribe directly on the bound stream to capture every lifecycle event in order.
Note
TaskProgress is marked transient — it is delivered live to subscribers but not persisted to the stream's storage. Subscribe before the events fire to capture them. TaskStarted, TaskCompleted, TaskFailed, and TaskExpired are persisted normally.
Reading the Active Task with TaskInject#
Inside an async with agent.task(...) block, the framework stamps the active Task into context.dependencies["ag2.task"]. Two ways to read it:
Direct access#
TaskInject annotation#
TaskInject is a fast_depends-resolvable annotation that injects the active Task into any function the dependency-injection machinery resolves — most usefully a @tool body.
The injection has default=None, so always treat task as possibly None and null-check before use.
TTL and Expiry#
Setting ttl_seconds=N populates metadata.expires_at but does not start a timer. The Task primitive itself never self-expires — that's by design, so a standalone Task with no observer doesn't spawn a background task. Instead, an external observer (e.g. a network hub's TTL sweeper, a periodic watch) checks expires_at and calls task.expire() when due.
For self-contained TTL behaviour, wire up a sweeper in your application:
TaskSpec and TaskMetadata#
Two small dataclasses surface around a Task:
TaskSpec— what the task is doing:title,description,payload, optionalcapability. Created byAgent.task(...).TaskMetadata— mutable lifecycle record updated on each transition:task_id,owner_id,spec,state, ISO-8601 timestamps,progress,result,error, optionalsession_id.