Skip to content

Task

autogen.beta.task.Task #

Task(*, owner_id, spec, context=None, ttl_seconds=None)

Lifecycle handle for a unit of work.

Created via Agent.task(...) or directly. Use as an async context manager::

async with agent.task("research framework X") as task:
    await task.progress({"step": "search"})
    findings = await search(...)
    await task.complete(findings)

On clean exit, auto-completes with result=None if the user did not call complete() or fail(). On exception, auto-fails with the raised exception (the exception still propagates).

If a ConversationContext is passed, events flow on that stream and ag2.task is stamped into context.dependencies for the duration of the async with block (so TaskInject resolves to the active task). With no context, the Task creates a private MemoryStream — standalone use; events still fire but only observers attached to that private stream see them.

Source code in autogen/beta/task.py
def __init__(
    self,
    *,
    owner_id: str,
    spec: TaskSpec,
    context: ConversationContext | None = None,
    ttl_seconds: int | None = None,
) -> None:
    # __init__ stores params; side effects happen in __aenter__.
    self._owner_id = owner_id
    self._spec = spec
    self._ttl_seconds = ttl_seconds
    self._context = context
    self._owns_context = False
    self._metadata: TaskMetadata | None = None
    self._had_previous_dep = False
    self._previous_dep: Any = None

task_id property #

task_id

state property #

state

metadata property #

metadata

context property #

context

progress async #

progress(payload)

Emit TaskProgress; merges payload into metadata.progress.

No-op if the task is already in a terminal state.

Source code in autogen/beta/task.py
async def progress(self, payload: dict[str, Any]) -> None:
    """Emit ``TaskProgress``; merges payload into ``metadata.progress``.

    No-op if the task is already in a terminal state.
    """
    if self._metadata is None:
        raise RuntimeError("Task.progress() called before __aenter__")
    if self._metadata.state in TERMINAL_TASK_STATES:
        return
    self._metadata.progress.update(payload)
    self._metadata.last_progress_at = _now_iso()
    await self.context.send(
        TaskProgress(
            task_id=self._metadata.task_id,
            agent_name=self._owner_id,
            objective=self._spec.title,
            content="",
            payload=dict(payload),
        )
    )

complete async #

complete(result=None)

Terminal: emit TaskCompleted; state ← COMPLETED.

No-op if already terminal.

Source code in autogen/beta/task.py
async def complete(self, result: Any = None) -> None:
    """Terminal: emit ``TaskCompleted``; state ← COMPLETED.

    No-op if already terminal.
    """
    if self._metadata is None:
        raise RuntimeError("Task.complete() called before __aenter__")
    if self._metadata.state in TERMINAL_TASK_STATES:
        return
    self._metadata.state = TaskState.COMPLETED
    self._metadata.result = result
    self._metadata.completed_at = _now_iso()
    await self.context.send(
        TaskCompleted(
            task_id=self._metadata.task_id,
            agent_name=self._owner_id,
            objective=self._spec.title,
            result=result,
            task_stream=self.context.stream.id,
        )
    )

fail async #

fail(error)

Terminal: emit TaskFailed; state ← FAILED.

Accepts a string (wrapped in RuntimeError) or any BaseException. No-op if already terminal.

Source code in autogen/beta/task.py
async def fail(self, error: str | BaseException) -> None:
    """Terminal: emit ``TaskFailed``; state ← FAILED.

    Accepts a string (wrapped in ``RuntimeError``) or any
    ``BaseException``. No-op if already terminal.
    """
    if self._metadata is None:
        raise RuntimeError("Task.fail() called before __aenter__")
    if self._metadata.state in TERMINAL_TASK_STATES:
        return
    if isinstance(error, str):
        exc: BaseException = RuntimeError(error)
    else:
        exc = error
    self._metadata.state = TaskState.FAILED
    self._metadata.error = str(exc)
    self._metadata.completed_at = _now_iso()
    await self.context.send(
        TaskFailed(
            task_id=self._metadata.task_id,
            agent_name=self._owner_id,
            objective=self._spec.title,
            error=exc,
        )
    )

expire async #

expire()

Terminal: emit TaskExpired; state ← EXPIRED.

Called by an external TTL observer (e.g. the network hub's TTL sweeper, mirrored back to the agent's stream). No-op if already terminal.

Source code in autogen/beta/task.py
async def expire(self) -> None:
    """Terminal: emit ``TaskExpired``; state ← EXPIRED.

    Called by an external TTL observer (e.g. the network hub's TTL
    sweeper, mirrored back to the agent's stream). No-op if already
    terminal.
    """
    if self._metadata is None:
        raise RuntimeError("Task.expire() called before __aenter__")
    if self._metadata.state in TERMINAL_TASK_STATES:
        return
    self._metadata.state = TaskState.EXPIRED
    self._metadata.completed_at = _now_iso()
    await self.context.send(
        TaskExpired(
            task_id=self._metadata.task_id,
            agent_name=self._owner_id,
            objective=self._spec.title,
        )
    )