Skip to content

TokenMonitor

autogen.beta.observer.token_monitor.TokenMonitor #

TokenMonitor(warn_threshold=50000, alert_threshold=100000, *, name='token-monitor')

Bases: BaseObserver

Tracks cumulative token usage and alerts when thresholds are exceeded.

Observes ModelResponse and TaskCompleted events to aggregate usage across the actor and all task sub-agents.

Parameters#

warn_threshold: Total tokens at which a WARNING alert is emitted. alert_threshold: Total tokens at which a CRITICAL alert is emitted. name: Observer display name.

Source code in autogen/beta/observer/token_monitor.py
def __init__(
    self,
    warn_threshold: int = 50_000,
    alert_threshold: int = 100_000,
    *,
    name: str = "token-monitor",
) -> None:
    super().__init__(name, watch=EventWatch(ModelResponse | TaskCompleted))
    self._warn_threshold = warn_threshold
    self._alert_threshold = alert_threshold
    self._total_tokens: int = 0
    self._warned = False
    self._alerted = False

total_tokens property #

total_tokens

name instance-attribute #

name = name

process async #

process(events, ctx)
Source code in autogen/beta/observer/token_monitor.py
async def process(self, events: list[BaseEvent], ctx: Context) -> ObserverAlert | None:
    for event in events:
        if isinstance(event, (ModelResponse, TaskCompleted)):
            usage = event.usage
            if usage:
                self._total_tokens += int(usage.total_tokens or 0)

    if not self._alerted and self._total_tokens >= self._alert_threshold:
        self._alerted = True
        return ObserverAlert(
            source=self.name,
            severity=Severity.CRITICAL,
            message=(
                f"Token usage critical: {self._total_tokens:,} tokens "
                f"(threshold: {self._alert_threshold:,}). "
                "Consider wrapping up to control costs."
            ),
        )

    if not self._warned and self._total_tokens >= self._warn_threshold:
        self._warned = True
        return ObserverAlert(
            source=self.name,
            severity=Severity.WARNING,
            message=(
                f"Token usage warning: {self._total_tokens:,} tokens "
                f"(threshold: {self._warn_threshold:,}). "
                "Be mindful of remaining budget."
            ),
        )

    return None

reset #

reset()

Reset counters for a fresh session.

Source code in autogen/beta/observer/token_monitor.py
def reset(self) -> None:
    """Reset counters for a fresh session."""
    self._total_tokens = 0
    self._warned = False
    self._alerted = False

register #

register(stack, context)
Source code in autogen/beta/observer/observer.py
def register(self, stack: ExitStack, context: Context) -> None:
    if self._watch.is_armed:
        self._watch.disarm()
    self._ctx = context
    self._watch.arm(context.stream, self._on_watch)
    stack.callback(self._disarm)