Skip to content

Watches

A Watch is a reactive trigger primitive. You arm it on a Stream, and it fires a callback when its condition is met — where "condition" can be an event match, a count, a time window, a schedule, or a composition of other watches.

Watches are the mechanism behind trigger-driven Observers, but they can be used standalone for custom reactive logic on any Stream.

When to use a Watch#

Use a Watch when a simple stream.subscribe(...) is not enough — i.e. when you need buffering, timing, ordering, or composition:

You need Use
Run a callback on every matching event stream.subscribe(fn, condition=...) (no Watch needed)
Fire every N events CadenceWatch(n=N, condition=...)
Collect events over a time window CadenceWatch(max_wait=seconds, condition=...)
Fire on "N events OR T seconds, whichever first" CadenceWatch(n=N, max_wait=seconds, condition=...)
Fire once after a delay DelayWatch(seconds)
Fire on a schedule IntervalWatch(seconds) or CronWatch(expr)
Wait for two separate events in any order AllOf(w1, w2)
Wait for an ordered sequence Sequence(w1, w2, ...)
Fire on the earliest of several triggers AnyOf(w1, w2, ...)

Anatomy of a Watch#

Every Watch implements the same Watch protocol (importable from autogen.beta):

from typing import Protocol

class Watch(Protocol):
    @property
    def id(self) -> str: ...
    @property
    def is_armed(self) -> bool: ...

    def arm(self, stream, callback) -> None: ...
    def disarm(self) -> None: ...

The callback signature is uniform across all Watch kinds:

1
2
3
4
from autogen.beta import Context
from autogen.beta.events import BaseEvent

async def callback(events: list[BaseEvent], ctx: Context) -> None: ...

For event-driven watches (EventWatch, CadenceWatch, Sequence), events contains the matched events. For time-driven watches (DelayWatch, IntervalWatch, CronWatch), events is empty — the trigger is the timer itself.

Event-driven Watches#

EventWatch#

Fires immediately on each matching event.

from autogen.beta import EventWatch, MemoryStream
from autogen.beta.events import ModelResponse

stream = MemoryStream()
watch = EventWatch(ModelResponse)

async def on_response(events, ctx):
    print(f"Model responded: {events[0].content}")

watch.arm(stream, on_response)

EventWatch also supports field conditions:

1
2
3
4
from autogen.beta import EventWatch
from autogen.beta.events import ToolCallEvent

watch = EventWatch(ToolCallEvent.name == "search")

CadenceWatch#

Buffers matching events and fires once the buffer reaches size n, once time max_wait seconds have elapsed since the first buffered event, or whichever comes first when both are set. At least one of n and max_wait is required.

from autogen.beta import CadenceWatch
from autogen.beta.events import ModelResponse

# Count-only: fire every 5 model responses
batch = CadenceWatch(n=5, condition=ModelResponse)

# Time-only: flush the buffer once a minute, whenever there's something in it
window = CadenceWatch(max_wait=60.0, condition=ModelResponse)

# Size OR time: fire at 5 events OR 60 seconds after the first event, whichever first
hybrid = CadenceWatch(n=5, max_wait=60.0, condition=ModelResponse)

async def summarize(events, ctx):
    print(f"Batched {len(events)} responses")

The timer starts on the first buffered event in a cadence — a quiet stream produces no firings. Any unfilled buffer at disarm-time is discarded.

Time-driven Watches#

DelayWatch#

Fires exactly once after a delay, then auto-disarms.

1
2
3
4
5
6
7
8
9
from autogen.beta import DelayWatch

watch = DelayWatch(30.0)

async def timeout_guard(events, ctx):
    # events is always [] for time-driven watches
    print("30 seconds elapsed")

watch.arm(stream, timeout_guard)

IntervalWatch#

Fires periodically at a fixed interval until disarmed.

1
2
3
4
5
6
from autogen.beta import IntervalWatch

watch = IntervalWatch(60.0)

async def heartbeat(events, ctx):
    print("tick")

CronWatch#

Fires on a standard 5-field cron expression.

1
2
3
from autogen.beta import CronWatch

watch = CronWatch("0 9 * * MON")  # every Monday at 9am

Supports *, */n, a-b, comma lists, and SUNSAT day-of-week names.

Composite Watches#

AllOf#

Fires once when every sub-watch has fired at least once.

from autogen.beta import AllOf, EventWatch
from autogen.beta.events import ModelResponse, ToolCallEvent

watch = AllOf(
    EventWatch(ModelResponse),
    EventWatch(ToolCallEvent),
)

async def both_seen(events, ctx):
    # Gets the combined events from all sub-watches
    print(f"Saw both types, got {len(events)} events total")

After firing, the gate resets — both sub-watches must fire again for the next firing.

AnyOf#

Fires on any sub-watch, every time.

1
2
3
4
5
6
7
from autogen.beta import AnyOf, EventWatch
from autogen.beta.events.alert import ObserverAlert

watch = AnyOf(
    EventWatch(ObserverAlert.severity == "critical"),
    EventWatch(ObserverAlert.severity == "fatal"),
)

Sequence#

Fires when sub-watches trigger in order. Each sub-watch is armed only after the previous has fired.

from autogen.beta import Sequence, EventWatch
from autogen.beta.events import ModelRequest, ModelResponse

watch = Sequence(
    EventWatch(ModelRequest),
    EventWatch(ModelResponse),
)

async def round_trip(events, ctx):
    # events contains one ModelRequest followed by one ModelResponse
    print("round-trip complete")

After the last sub-watch fires, the sequence resets.

Emitting events from a callback#

A Watch callback can send events back onto the stream — this is how trigger-driven Observers emit alerts:

from autogen.beta import DelayWatch, MemoryStream
from autogen.beta.events.alert import ObserverAlert, Severity

watch = DelayWatch(30.0)

async def timeout_alert(events, ctx):
    await ctx.send(ObserverAlert(
        source="timeout-watch",
        severity=Severity.WARNING,
        message="Agent has been running for >30s.",
    ))

Tip

Emitting events from within a callback means other subscribers and watches can react. This is the foundation for composing reactive workflows.

Arming and lifecycle#

A Watch is a stateful object. Calling arm() on an already-armed Watch will first disarm it, so re-arming is safe. disarm() cleans up any subscriptions and cancels any timers.

Most of the time you won't call arm() / disarm() directly — you'll hand the Watch to a BaseObserver, which manages the lifecycle against the agent's stream.

Next steps#

  • Use Watches inside a BaseObserver for agent-scoped monitoring.
  • See Stream for event publishing and subscription mechanics.