Task(*, owner_id, spec, context=None, ttl_seconds=None)
Lifecycle handle for a unit of work.
Created via Agent.task(...) or directly. Use as an async context manager::
async with agent.task("research framework X") as task:
await task.progress({"step": "search"})
findings = await search(...)
await task.complete(findings)
On clean exit, auto-completes with result=None if the user did not call complete() or fail(). On exception, auto-fails with the raised exception (the exception still propagates).
If a ConversationContext is passed, events flow on that stream and ag2.task is stamped into context.dependencies for the duration of the async with block (so TaskInject resolves to the active task). With no context, the Task creates a private MemoryStream — standalone use; events still fire but only observers attached to that private stream see them.
Source code in autogen/beta/task.py
| def __init__(
self,
*,
owner_id: str,
spec: TaskSpec,
context: ConversationContext | None = None,
ttl_seconds: int | None = None,
) -> None:
# __init__ stores params; side effects happen in __aenter__.
self._owner_id = owner_id
self._spec = spec
self._ttl_seconds = ttl_seconds
self._context = context
self._owns_context = False
self._metadata: TaskMetadata | None = None
self._had_previous_dep = False
self._previous_dep: Any = None
|
progress async
Emit TaskProgress; merges payload into metadata.progress.
No-op if the task is already in a terminal state.
Source code in autogen/beta/task.py
| async def progress(self, payload: dict[str, Any]) -> None:
"""Emit ``TaskProgress``; merges payload into ``metadata.progress``.
No-op if the task is already in a terminal state.
"""
if self._metadata is None:
raise RuntimeError("Task.progress() called before __aenter__")
if self._metadata.state in TERMINAL_TASK_STATES:
return
self._metadata.progress.update(payload)
self._metadata.last_progress_at = _now_iso()
await self.context.send(
TaskProgress(
task_id=self._metadata.task_id,
agent_name=self._owner_id,
objective=self._spec.title,
content="",
payload=dict(payload),
)
)
|
complete async
Terminal: emit TaskCompleted; state ← COMPLETED.
No-op if already terminal.
Source code in autogen/beta/task.py
| async def complete(self, result: Any = None) -> None:
"""Terminal: emit ``TaskCompleted``; state ← COMPLETED.
No-op if already terminal.
"""
if self._metadata is None:
raise RuntimeError("Task.complete() called before __aenter__")
if self._metadata.state in TERMINAL_TASK_STATES:
return
self._metadata.state = TaskState.COMPLETED
self._metadata.result = result
self._metadata.completed_at = _now_iso()
await self.context.send(
TaskCompleted(
task_id=self._metadata.task_id,
agent_name=self._owner_id,
objective=self._spec.title,
result=result,
task_stream=self.context.stream.id,
)
)
|
fail async
Terminal: emit TaskFailed; state ← FAILED.
Accepts a string (wrapped in RuntimeError) or any BaseException. No-op if already terminal.
Source code in autogen/beta/task.py
| async def fail(self, error: str | BaseException) -> None:
"""Terminal: emit ``TaskFailed``; state ← FAILED.
Accepts a string (wrapped in ``RuntimeError``) or any
``BaseException``. No-op if already terminal.
"""
if self._metadata is None:
raise RuntimeError("Task.fail() called before __aenter__")
if self._metadata.state in TERMINAL_TASK_STATES:
return
if isinstance(error, str):
exc: BaseException = RuntimeError(error)
else:
exc = error
self._metadata.state = TaskState.FAILED
self._metadata.error = str(exc)
self._metadata.completed_at = _now_iso()
await self.context.send(
TaskFailed(
task_id=self._metadata.task_id,
agent_name=self._owner_id,
objective=self._spec.title,
error=exc,
)
)
|
expire async
Terminal: emit TaskExpired; state ← EXPIRED.
Called by an external TTL observer (e.g. the network hub's TTL sweeper, mirrored back to the agent's stream). No-op if already terminal.
Source code in autogen/beta/task.py
| async def expire(self) -> None:
"""Terminal: emit ``TaskExpired``; state ← EXPIRED.
Called by an external TTL observer (e.g. the network hub's TTL
sweeper, mirrored back to the agent's stream). No-op if already
terminal.
"""
if self._metadata is None:
raise RuntimeError("Task.expire() called before __aenter__")
if self._metadata.state in TERMINAL_TASK_STATES:
return
self._metadata.state = TaskState.EXPIRED
self._metadata.completed_at = _now_iso()
await self.context.send(
TaskExpired(
task_id=self._metadata.task_id,
agent_name=self._owner_id,
objective=self._spec.title,
)
)
|