TaskMirror(*, hub_client=None, hub=None, owner_id, session_id=None)
Forwards an Agent's Task* events to the hub.
Construct one per AgentClient (the owner is the Agent's agent_id). Attach to a stream for the duration of a notify handler / Agent.ask call, then detach.
The mirror routes through a :class:HubClient so the same call sites work for both in-process and any future cross-process transport. Tests that hold a bare Hub can still pass it directly via the legacy hub= keyword for convenience.
Failures forwarding to the hub are swallowed — the mirror must never crash the agent's turn.
Source code in autogen/beta/network/task_mirror.py
| def __init__(
self,
*,
hub_client: "HubClient | None" = None,
hub: "Hub | None" = None,
owner_id: str,
session_id: str | None = None,
) -> None:
# __init__ stores params; subscription happens in attach().
if hub_client is None and hub is None:
raise TypeError("TaskMirror requires either hub_client= or hub= (legacy)")
self._hub_client = hub_client
self._hub = hub if hub is not None else (hub_client._hub if hub_client is not None else None)
self._owner_id = owner_id
self._session_id = session_id
|
attach
Subscribe to Task* events; returns sub ids for detach.
Source code in autogen/beta/network/task_mirror.py
| def attach(self, stream: "Stream") -> list[object]:
"""Subscribe to ``Task*`` events; returns sub ids for ``detach``."""
return [
stream.where(TaskStarted).subscribe(self._on_started, sync_to_thread=False),
stream.where(TaskProgress).subscribe(self._on_progress, sync_to_thread=False),
stream.where(TaskCompleted).subscribe(self._on_completed, sync_to_thread=False),
stream.where(TaskFailed).subscribe(self._on_failed, sync_to_thread=False),
stream.where(TaskExpired).subscribe(self._on_expired, sync_to_thread=False),
]
|
detach
Unsubscribe the previously-attached subscriptions.
Source code in autogen/beta/network/task_mirror.py
| def detach(self, stream: "Stream", sub_ids: list[object]) -> None:
"""Unsubscribe the previously-attached subscriptions."""
for sid in sub_ids:
with contextlib.suppress(Exception):
stream.unsubscribe(sid) # type: ignore[arg-type]
|