Skip to content

Tasks

A Task is a framework-core wrapper any Agent can use to give a unit of work a trackable lifecycle. While the task is active, the framework emits TaskStarted, TaskProgress, TaskCompleted, TaskFailed, and TaskExpired events on a stream — so observers (UIs, watchers, mirrors, test harnesses) can follow along without participating in execution.

Note

Tasks are agent-owned. The framework does not assign or schedule them. Standalone usage requires no observers — events fly past harmlessly if nothing subscribes.

When to Use a Task#

Use a Task whenever a unit of work has a beginning, an end, and observable progress that you want to surface beyond your own function's return value:

  • Long-running pipelines where downstream consumers want progress checkpoints.
  • HITL approvals where a UI needs to know the task is waiting on a human.
  • Test harnesses that assert a sequence of lifecycle events.

For lightweight LLM-driven sub-agent delegation see Sub-task Delegation — that's a different feature that wraps an Agent in a run_subtask tool.

Quick Start#

from autogen.beta import Agent
from autogen.beta.config import AnthropicConfig

agent = Agent("indexer", config=AnthropicConfig(model="claude-sonnet-4-6"))

async with agent.task("index documents") as task:
    await task.progress({"stage": "discover", "files": 12})
    await task.progress({"stage": "index", "indexed": 12})
    await task.complete({"indexed": 12})

print(task.state)        # TaskState.COMPLETED
print(task.metadata.result)  # {'indexed': 12}

The async with block opens the lifecycle. On clean exit the task auto-completes with result=None if you didn't call complete() or fail() yourself.

Lifecycle States#

CREATED -> RUNNING -> COMPLETED   (terminal, success)
                   -> FAILED      (terminal, exception or explicit fail)
                   -> EXPIRED     (terminal, TTL elapsed)
State Meaning
CREATED The Task object exists but __aenter__ has not run. task.task_id and task.metadata raise.
RUNNING Inside the async with block. Progress events allowed.
COMPLETED Reached complete() or clean block exit.
FAILED Reached fail() or block exited via exception.
EXPIRED TTL elapsed; emitted by an external observer (e.g. a network hub's TTL sweeper).

The terminal states are immutable — once set, further complete() / fail() / progress() calls are silent no-ops. The set is exported as autogen.beta.task.TERMINAL_TASK_STATES.

API Reference#

Agent.task(...)#

agent.task(
    title: str,
    *,
    description: str = "",
    payload: dict[str, Any] | None = None,
    capability: str | None = None,
    ttl_seconds: int | None = None,
    context: ConversationContext | None = None,
) -> Task
Parameter Type Description
title str Short objective shown on every event.
description str Optional longer description.
payload dict[str, Any] \| None Initial payload merged into TaskSpec.
capability str \| None Tags the task with a capability name; used by network mirrors.
ttl_seconds int \| None Sets metadata.expires_at. The Task does not self-expire — an external observer must call task.expire() when the TTL elapses.
context ConversationContext \| None If supplied, events flow on context.stream and ag2.task is stamped into context.dependencies for the duration of the block. If omitted, the Task creates a private MemoryStream on entry.

Returns an unentered Task. Use as async with agent.task(...) as task:.

Task instance methods#

Method Description
await task.progress(payload) Emits TaskProgress; merges payload into metadata.progress and stamps last_progress_at. No-op if already terminal.
await task.complete(result=None) Terminal. Emits TaskCompleted; sets metadata.result and state = COMPLETED.
await task.fail(error) Terminal. Accepts a string (wrapped in RuntimeError) or any BaseException. Emits TaskFailed; sets state = FAILED.
await task.expire() Terminal. Emits TaskExpired; sets state = EXPIRED. Called by external TTL observers.

Properties#

Property Available before __aenter__?
task.state Yes — returns TaskState.CREATED.
task.task_id No — raises RuntimeError.
task.metadata No — raises RuntimeError.
task.context No — raises RuntimeError.

Bound Context vs. Standalone#

1
2
3
4
5
6
7
from autogen.beta.context import ConversationContext
from autogen.beta.stream import MemoryStream

ctx = ConversationContext(stream=MemoryStream())

async with agent.task("with-ctx", context=ctx) as task:
    ...

Passing a ConversationContext shares the stream with the rest of your agent's run, so observers and middleware already attached to that stream see the lifecycle events.

Without a context, the Task creates a private stream on entry. Events still fire — but only observers attached to that private stream see them. Useful for one-off background work that doesn't need to surface anywhere.

Auto-Complete and Auto-Fail#

The async with block has these guarantees:

  • Clean exit, no terminal call -> auto complete(result=None).
  • Exception inside the block -> auto fail(exc), then the exception propagates.
  • Already terminal at exit time -> nothing further happens.
1
2
3
4
5
6
try:
    async with agent.task("flaky") as task:
        raise ValueError("boom")
except ValueError as exc:
    print(task.state)            # TaskState.FAILED
    print(task.metadata.error)   # 'boom'

Observing the Lifecycle#

Subscribe directly on the bound stream to capture every lifecycle event in order.

from autogen.beta.events import TaskCompleted, TaskProgress, TaskStarted

stream = MemoryStream()
ctx = ConversationContext(stream=stream)

stream.subscribe(
    lambda ev: print(type(ev).__name__, getattr(ev, "payload", "")),
    sync_to_thread=False,
)

async with agent.task("watched", context=ctx) as task:
    await task.progress({"step": "fetch"})
    await task.complete({"ok": True})

Note

TaskProgress is marked transient — it is delivered live to subscribers but not persisted to the stream's storage. Subscribe before the events fire to capture them. TaskStarted, TaskCompleted, TaskFailed, and TaskExpired are persisted normally.

Reading the Active Task with TaskInject#

Inside an async with agent.task(...) block, the framework stamps the active Task into context.dependencies["ag2.task"]. Two ways to read it:

Direct access#

1
2
3
async with agent.task("work", context=ctx) as task:
    active = ctx.dependencies["ag2.task"]
    assert active is task

TaskInject annotation#

TaskInject is a fast_depends-resolvable annotation that injects the active Task into any function the dependency-injection machinery resolves — most usefully a @tool body.

1
2
3
4
5
6
7
8
9
from autogen.beta import tool
from autogen.beta.task import TaskInject

@tool
async def report(message: str, task: TaskInject) -> str:
    if task is None:
        return "no active task"
    await task.progress({"tool_message": message})
    return f"reported on task {task.task_id}"

The injection has default=None, so always treat task as possibly None and null-check before use.

TTL and Expiry#

Setting ttl_seconds=N populates metadata.expires_at but does not start a timer. The Task primitive itself never self-expires — that's by design, so a standalone Task with no observer doesn't spawn a background task. Instead, an external observer (e.g. a network hub's TTL sweeper, a periodic watch) checks expires_at and calls task.expire() when due.

For self-contained TTL behaviour, wire up a sweeper in your application:

1
2
3
4
5
6
async def sweep(task: Task, deadline: datetime) -> None:
    while task.state == TaskState.RUNNING:
        if datetime.now(timezone.utc) >= deadline:
            await task.expire()
            return
        await asyncio.sleep(1.0)

TaskSpec and TaskMetadata#

Two small dataclasses surface around a Task:

  • TaskSpec — what the task is doing: title, description, payload, optional capability. Created by Agent.task(...).
  • TaskMetadata — mutable lifecycle record updated on each transition: task_id, owner_id, spec, state, ISO-8601 timestamps, progress, result, error, optional session_id.
1
2
3
4
5
async with agent.task("survey", description="probe upstream", payload={"region": "us"}) as task:
    print(task.metadata.spec.title)        # 'survey'
    print(task.metadata.spec.payload)      # {'region': 'us'}
    print(task.metadata.owner_id)          # 'researcher'
    print(task.metadata.started_at)        # ISO 8601 string