Skip to content

EventCapture

autogen.beta.eval.runtime._capture.EventCapture #

EventCapture()

Observer that records every event emitted during one task.

Subscribes once to the agent's stream on first register and accumulates events into :attr:events for the runner to read after agent.ask(...) returns. Idempotent: if register is called again (re-entering _execute for a continuation turn), the existing subscription is left in place rather than duplicated.

Source code in autogen/beta/eval/runtime/_capture.py
def __init__(self) -> None:
    self.events: list[BaseEvent] = []
    self._subscribed = False

events instance-attribute #

events = []

register #

register(stack, context)
Source code in autogen/beta/eval/runtime/_capture.py
def register(self, stack: ExitStack, context: Context) -> None:
    # Subscribe directly (not via sub_scope) so the subscription
    # outlives this ExitStack and continues capturing events from
    # any follow-up ``reply.ask`` on the same stream. The unused
    # ``stack`` parameter is part of the Observer protocol contract.
    del stack
    if self._subscribed:
        return
    context.stream.subscribe(self._on_event, sync_to_thread=False)
    self._subscribed = True