Skip to content

TaskMirror

autogen.beta.network.task_mirror.TaskMirror #

TaskMirror(*, hub_client=None, hub=None, owner_id, session_id=None)

Forwards an Agent's Task* events to the hub.

Construct one per AgentClient (the owner is the Agent's agent_id). Attach to a stream for the duration of a notify handler / Agent.ask call, then detach.

The mirror routes through a :class:HubClient so the same call sites work for both in-process and any future cross-process transport. Tests that hold a bare Hub can still pass it directly via the legacy hub= keyword for convenience.

Failures forwarding to the hub are swallowed — the mirror must never crash the agent's turn.

Source code in autogen/beta/network/task_mirror.py
def __init__(
    self,
    *,
    hub_client: "HubClient | None" = None,
    hub: "Hub | None" = None,
    owner_id: str,
    session_id: str | None = None,
) -> None:
    # __init__ stores params; subscription happens in attach().
    if hub_client is None and hub is None:
        raise TypeError("TaskMirror requires either hub_client= or hub= (legacy)")
    self._hub_client = hub_client
    self._hub = hub if hub is not None else (hub_client._hub if hub_client is not None else None)
    self._owner_id = owner_id
    self._session_id = session_id

attach #

attach(stream)

Subscribe to Task* events; returns sub ids for detach.

Source code in autogen/beta/network/task_mirror.py
def attach(self, stream: "Stream") -> list[object]:
    """Subscribe to ``Task*`` events; returns sub ids for ``detach``."""
    return [
        stream.where(TaskStarted).subscribe(self._on_started, sync_to_thread=False),
        stream.where(TaskProgress).subscribe(self._on_progress, sync_to_thread=False),
        stream.where(TaskCompleted).subscribe(self._on_completed, sync_to_thread=False),
        stream.where(TaskFailed).subscribe(self._on_failed, sync_to_thread=False),
        stream.where(TaskExpired).subscribe(self._on_expired, sync_to_thread=False),
    ]

detach #

detach(stream, sub_ids)

Unsubscribe the previously-attached subscriptions.

Source code in autogen/beta/network/task_mirror.py
def detach(self, stream: "Stream", sub_ids: list[object]) -> None:
    """Unsubscribe the previously-attached subscriptions."""
    for sid in sub_ids:
        with contextlib.suppress(Exception):
            stream.unsubscribe(sid)  # type: ignore[arg-type]