Creating Custom Events for Run Iteration#
AG2’s run_iter() lets you yield specific event types using yield_on. While AG2 provides many built-in events (TextEvent, ToolCallEvent, etc.), you can also create custom events for your specific use cases.
Custom events are useful for: - Workflow checkpoints - Pause at specific stages for validation - Progress tracking - Report status from within tools - Custom gating - Implement domain-specific pause conditions
Creating a Custom Event#
Custom events require three things:
- Inherit from
BaseEvent- The base class for all AG2 events - Use the
@wrap_eventdecorator - Wraps the event for serialization - Class name must end with
Event- Enforced by the decorator
Let’s create a custom event for tracking data pipeline stages:
from collections.abc import Callable
from typing import Any
from autogen.events.base_event import BaseEvent, resolve_print_callable, wrap_event
@wrap_event
class PipelineStageEvent(BaseEvent):
"""Custom event emitted when a data pipeline stage completes."""
stage_name: str
records_processed: int
validation_passed: bool
def print(self, f: Callable[..., Any] | None = None) -> None:
"""Optional: Define how the event prints to console."""
f = resolve_print_callable(f)
status = "PASSED" if self.validation_passed else "FAILED"
f(f"[Pipeline] {self.stage_name}: {self.records_processed} records - {status}", flush=True)
Emitting Custom Events#
To emit a custom event from within a tool or custom code, use IOStream.get_default().send():
from autogen.io.base import IOStream
def emit_pipeline_event(stage: str, records: int, passed: bool) -> None:
"""Helper to emit a pipeline stage event."""
IOStream.get_default().send(
PipelineStageEvent(
stage_name=stage,
records_processed=records,
validation_passed=passed,
)
)
Example: Data Pipeline with Validation Gates#
Let’s build a realistic example: an AI agent that processes data through multiple pipeline stages. We’ll use custom events to yield at each stage for human validation.
First, let’s set up our LLM configuration:
import os
from dotenv import load_dotenv
from autogen import ConversableAgent
load_dotenv("../.env.local")
llm_config = {
"config_list": [
{
"model": "gpt-4o-mini",
"api_key": os.environ.get("OPENAI_API_KEY"),
}
]
}
Now let’s create a tool that simulates a data pipeline. The tool emits our custom PipelineStageEvent at each stage:
import random
from autogen.tools import tool
@tool(description="Process data through a multi-stage pipeline: extract -> transform -> validate -> load")
def run_data_pipeline(source_name: str, record_count: int) -> str:
"""Simulates a data pipeline with multiple stages."""
results = []
# Stage 1: Extract
extracted = record_count
emit_pipeline_event("extract", extracted, True)
results.append(f"Extracted {extracted} records from {source_name}")
# Stage 2: Transform
transformed = int(extracted * 0.95) # Some records filtered
emit_pipeline_event("transform", transformed, True)
results.append(f"Transformed {transformed} records (filtered invalid)")
# Stage 3: Validate
validation_passed = random.random() > 0.3 # 70% chance of passing
emit_pipeline_event("validate", transformed, validation_passed)
if not validation_passed:
results.append("VALIDATION FAILED - data quality issues detected")
return "\n".join(results)
results.append(f"Validated {transformed} records - all checks passed")
# Stage 4: Load
emit_pipeline_event("load", transformed, True)
results.append(f"Loaded {transformed} records to destination")
return "\n".join(results)
Now let’s create an agent with this tool and run it with run_iter(), yielding on our custom event:
from autogen.events.agent_events import InputRequestEvent, TerminationEvent, TextEvent, ToolCallEvent
# Create the data engineer agent
data_engineer = ConversableAgent(
"DataEngineer",
system_message="""You are a data engineer. When asked to process data, use the run_data_pipeline tool.
After processing, report the results. Say DONE when finished.""",
is_termination_msg=lambda x: "DONE" in x.get("content", ""),
llm_config=llm_config,
functions=[run_data_pipeline],
)
# Run iteration, yielding on our custom PipelineStageEvent
for event in data_engineer.run_iter(
message="Process 1000 records from the 'sales_data' source",
max_turns=3,
yield_on=[PipelineStageEvent, TextEvent, ToolCallEvent, TerminationEvent],
):
# Handle input requests
if isinstance(event, InputRequestEvent):
user_input = input(" Input requested: ")
event.content.respond(user_input)
continue
# Handle our custom pipeline event
if isinstance(event, PipelineStageEvent):
stage = event.content.stage_name
records = event.content.records_processed
passed = event.content.validation_passed
status = "PASSED" if passed else "FAILED"
print(f"\n[PIPELINE STAGE] {stage.upper()}")
print(f" Records: {records}")
print(f" Status: {status}")
# You could add human approval here:
# if stage == "validate" and not passed:
# approval = input(" Continue despite validation failure? (y/n): ")
# if approval.lower() != 'y':
# break # Abort the pipeline
continue
# Handle other events
if isinstance(event, ToolCallEvent):
for tool_call in event.content.tool_calls:
print(f"\n[TOOL CALL] {tool_call.function.name}")
elif isinstance(event, TextEvent):
content = str(event.content.content)[:150]
print(f"\n[TEXT] {content}")
print("\n--- Pipeline run completed! ---")
How the Event Wrapper Works#
The @wrap_event decorator transforms your event class:
- Adds a
typefield - Converted from class name (e.g.,PipelineStageEvent→"pipeline_stage") - Wraps in a content structure - Your fields are accessed via
event.content.<field> - Enables serialization - Events can be serialized for logging or transmission
Example structure after wrapping:
# Before wrapping (your class)
class PipelineStageEvent(BaseEvent):
stage_name: str
records_processed: int
validation_passed: bool
# After wrapping (what you receive)
event.type # "pipeline_stage"
event.content.stage_name
event.content.records_processed
event.content.validation_passed
Summary#
To create custom events for run iteration:
- Define the event class:
- Inherit from
BaseEvent - Decorate with
@wrap_event - Name must end with
Event - Define fields using Pydantic-style type hints
- Inherit from
- Emit the event:
- Use
IOStream.get_default().send(YourEvent(...)) - Emit from tools, hooks, or custom agent code
- Use
- Yield on the event:
- Use
run_iter()withyield_on=[YourEvent, ...] - Check for it with
isinstance(event, YourEvent) - Access fields via
event.content.<field>
- Use
This pattern enables powerful workflows like:
- Validation gates in data pipelines
- Human approval at critical checkpoints
- Progress monitoring for long-running tasks
- Custom logging and analytics