Skip to content

Task Observation

The bridge between the Task lifecycle primitive and the network's per-agent track record. When an Agent runs an agent.task(..., capability="X") inside a network turn, a TaskMirror forwards the lifecycle events to the hub. On terminal events with a capability tag, the hub updates the worker's Resume.observed[capability].

In short: agents earn a track record on the network by completing capability-tagged tasks. Other agents (and operators) read that track record off the worker's Resume.

The Mechanism#

TaskMirror is a stream subscriber that:

  1. Subscribes to TaskStarted, TaskProgress, TaskCompleted, TaskFailed, TaskExpired events on a stream.
  2. Forwards each as an ag2.task.* envelope to the hub via HubClient.
  3. On terminal events with spec.capability set, calls Hub.record_observation(...) so the hub's per-agent ObservedStat updates.

It's auto-attached by the default handler for the duration of every LLM turn — you don't need to wire it up manually. If you write a custom handler, attach it manually:

from autogen.beta.network import TaskMirror
from autogen.beta.stream import MemoryStream

mirror = TaskMirror(
    hub_client=client._hub_client,
    owner_id=client.agent_id,
    session_id=metadata.session_id,
)
stream = MemoryStream()
sub_ids = mirror.attach(stream)
try:
    await client.agent.ask(text, stream=stream)
finally:
    mirror.detach(stream, sub_ids)

Capability Tagging#

agent.task(...) accepts a capability keyword:

1
2
3
4
5
6
7
8
async with agent.task(
    "survey: deployment patterns",
    capability="research",
    context=ctx,
) as task:
    await task.progress({"step": "gather"})
    # ... do work ...
    await task.complete({"items_found": 7})

ctx is the active Context (passed in by fast_depends to a tool body, or carried explicitly in scripts). It's important to pass context=ctx so the task fires its events on the LLM-turn's stream — that's the stream the mirror is attached to.

capability is a free-form string. Common values: "research", "summarisation", "review", "code_review". Whatever names your application uses internally for capability roles, use them here.

If capability is None (the default), the mirror still forwards lifecycle envelopes to the hub, but doesn't update Resume.observed. The track record is opt-in.

ObservedStat#

@dataclass(slots=True)
class ObservedStat:
    n: int = 0                       # total terminal events seen
    completed: int = 0
    failed: int = 0
    expired: int = 0
    p50_latency_ms: int | None = None  # rolling median of started_at → completed_at

Read it off the worker's resume:

1
2
3
4
resume = await hub.get_resume(bob.agent_id)
stat = resume.observed.get("research")
if stat:
    print(f"completed={stat.completed}/{stat.n}  median_latency={stat.p50_latency_ms}ms")

The latency is computed from task_meta.started_at to the terminal event time, sourced from the hub's clock. With a MockClock you can construct deterministic latency values for testing.

What the Mirror Records#

TaskMirror.record_observation(...) writes to:

  • Resume.observed[capability] — the per-capability ObservedStat for the owner.
  • AuditLogAUDIT_KIND_TASK_TERMINATED records every terminal task with capability, outcome, and latency.

Both update happen inside the same hub transaction, so partial updates don't occur.

Where TaskMirror Fits in the Default Handler#

Look at autogen.beta.network.client.handlers._process_text if you want the exact wiring. Sketch:

mirror = TaskMirror(
    hub_client=client._hub_client,
    owner_id=client.agent_id,
    session_id=metadata.session_id,
)
sub_ids = mirror.attach(stream)
try:
    reply = await client.agent.ask(
        current_text,
        stream=stream,
        dependencies=dependencies,
    )
finally:
    mirror.detach(stream, sub_ids)

Notably:

  • The mirror is attached per turn, not per agent. A new mirror is constructed and attached for each inbound envelope the handler processes.
  • The session_id lets the hub tie task observations back to the session that produced them — useful for governance and replay.
  • The mirror swallows errors when forwarding to the hub. A flaky hub connection should not crash the LLM turn.

When to Skip Capability Tagging#

Not every agent.task(...) deserves a capability tag. Tag only when:

  • The task represents a capability you want to track in the agent's resume.
  • Failure / latency signals are operationally meaningful (driving routing, alerting, or peer ranking).

Untagged tasks still get full lifecycle observation in the audit log — just no Resume.observed update. Use them for internal book-keeping or sub-task delegation that doesn't represent an externally-visible capability.

Cross-Cutting Pattern#

A common pattern: an agent has multiple capability roles. Tag each tool's task with the right capability and inspect the resume to see which capabilities are well-exercised.

@worker.tool
async def research(topic: str, ctx: Context) -> str:
    async with worker.task(f"research: {topic}", capability="research", context=ctx) as t:
        # ...
    return f"researched {topic}"

@worker.tool
async def summarise(text: str, ctx: Context) -> str:
    async with worker.task("summarise", capability="summarisation", context=ctx) as t:
        # ...
    return f"summary: ..."

After a few sessions, worker.resume.observed will hold both "research" and "summarisation" ObservedStats, each tracking that capability independently.