Watches
A Watch is a reactive trigger primitive. You arm it on a Stream, and it fires a callback when its condition is met — where "condition" can be an event match, a count, a time window, a schedule, or a composition of other watches.
Watches are the mechanism behind trigger-driven Observers, but they can be used standalone for custom reactive logic on any Stream.
When to use a Watch#
Use a Watch when a simple stream.subscribe(...) is not enough — i.e. when you need buffering, timing, ordering, or composition:
| You need | Use |
|---|---|
| Run a callback on every matching event | stream.subscribe(fn, condition=...) (no Watch needed) |
| Fire every N events | CadenceWatch(n=N, condition=...) |
| Collect events over a time window | CadenceWatch(max_wait=seconds, condition=...) |
| Fire on "N events OR T seconds, whichever first" | CadenceWatch(n=N, max_wait=seconds, condition=...) |
| Fire once after a delay | DelayWatch(seconds) |
| Fire on a schedule | IntervalWatch(seconds) or CronWatch(expr) |
| Wait for two separate events in any order | AllOf(w1, w2) |
| Wait for an ordered sequence | Sequence(w1, w2, ...) |
| Fire on the earliest of several triggers | AnyOf(w1, w2, ...) |
Anatomy of a Watch#
Every Watch implements the same Watch protocol (importable from autogen.beta):
The callback signature is uniform across all Watch kinds:
For event-driven watches (EventWatch, CadenceWatch, Sequence), events contains the matched events. For time-driven watches (DelayWatch, IntervalWatch, CronWatch), events is empty — the trigger is the timer itself.
Event-driven Watches#
EventWatch#
Fires immediately on each matching event.
EventWatch also supports field conditions:
CadenceWatch#
Buffers matching events and fires once the buffer reaches size n, once time max_wait seconds have elapsed since the first buffered event, or whichever comes first when both are set. At least one of n and max_wait is required.
The timer starts on the first buffered event in a cadence — a quiet stream produces no firings. Any unfilled buffer at disarm-time is discarded.
Time-driven Watches#
DelayWatch#
Fires exactly once after a delay, then auto-disarms.
IntervalWatch#
Fires periodically at a fixed interval until disarmed.
CronWatch#
Fires on a standard 5-field cron expression.
Supports *, */n, a-b, comma lists, and SUN–SAT day-of-week names.
Composite Watches#
AllOf#
Fires once when every sub-watch has fired at least once.
After firing, the gate resets — both sub-watches must fire again for the next firing.
AnyOf#
Fires on any sub-watch, every time.
Sequence#
Fires when sub-watches trigger in order. Each sub-watch is armed only after the previous has fired.
After the last sub-watch fires, the sequence resets.
Emitting events from a callback#
A Watch callback can send events back onto the stream — this is how trigger-driven Observers emit alerts:
Tip
Emitting events from within a callback means other subscribers and watches can react. This is the foundation for composing reactive workflows.
Arming and lifecycle#
A Watch is a stateful object. Calling arm() on an already-armed Watch will first disarm it, so re-arming is safe. disarm() cleans up any subscriptions and cancels any timers.
Most of the time you won't call arm() / disarm() directly — you'll hand the Watch to a BaseObserver, which manages the lifecycle against the agent's stream.
Next steps#
- Use Watches inside a BaseObserver for agent-scoped monitoring.
- See Stream for event publishing and subscription mechanics.