Skip to content

Creating Custom Events for Run Iteration#

Open In Colab Open on GitHub

AG2’s run_iter() lets you yield specific event types using yield_on. While AG2 provides many built-in events (TextEvent, ToolCallEvent, etc.), you can also create custom events for your specific use cases.

Custom events are useful for: - Workflow checkpoints - Pause at specific stages for validation - Progress tracking - Report status from within tools - Custom gating - Implement domain-specific pause conditions

Requirements

Install AG2:

pip install ag2[openai]

For more information, please refer to the installation guide.

Creating a Custom Event#

Custom events require three things:

  1. Inherit from BaseEvent - The base class for all AG2 events
  2. Use the @wrap_event decorator - Wraps the event for serialization
  3. Class name must end with Event - Enforced by the decorator

Let’s create a custom event for tracking data pipeline stages:

from collections.abc import Callable
from typing import Any

from autogen.events.base_event import BaseEvent, resolve_print_callable, wrap_event

@wrap_event
class PipelineStageEvent(BaseEvent):
    """Custom event emitted when a data pipeline stage completes."""

    stage_name: str
    records_processed: int
    validation_passed: bool

    def print(self, f: Callable[..., Any] | None = None) -> None:
        """Optional: Define how the event prints to console."""
        f = resolve_print_callable(f)
        status = "PASSED" if self.validation_passed else "FAILED"
        f(f"[Pipeline] {self.stage_name}: {self.records_processed} records - {status}", flush=True)

Emitting Custom Events#

To emit a custom event from within a tool or custom code, use IOStream.get_default().send():

from autogen.io.base import IOStream

def emit_pipeline_event(stage: str, records: int, passed: bool) -> None:
    """Helper to emit a pipeline stage event."""
    IOStream.get_default().send(
        PipelineStageEvent(
            stage_name=stage,
            records_processed=records,
            validation_passed=passed,
        )
    )

Example: Data Pipeline with Validation Gates#

Let’s build a realistic example: an AI agent that processes data through multiple pipeline stages. We’ll use custom events to yield at each stage for human validation.

First, let’s set up our LLM configuration:

import os

from dotenv import load_dotenv

from autogen import ConversableAgent

load_dotenv("../.env.local")

llm_config = {
    "config_list": [
        {
            "model": "gpt-4o-mini",
            "api_key": os.environ.get("OPENAI_API_KEY"),
        }
    ]
}

Now let’s create a tool that simulates a data pipeline. The tool emits our custom PipelineStageEvent at each stage:

import random

from autogen.tools import tool

@tool(description="Process data through a multi-stage pipeline: extract -> transform -> validate -> load")
def run_data_pipeline(source_name: str, record_count: int) -> str:
    """Simulates a data pipeline with multiple stages."""
    results = []

    # Stage 1: Extract
    extracted = record_count
    emit_pipeline_event("extract", extracted, True)
    results.append(f"Extracted {extracted} records from {source_name}")

    # Stage 2: Transform
    transformed = int(extracted * 0.95)  # Some records filtered
    emit_pipeline_event("transform", transformed, True)
    results.append(f"Transformed {transformed} records (filtered invalid)")

    # Stage 3: Validate
    validation_passed = random.random() > 0.3  # 70% chance of passing
    emit_pipeline_event("validate", transformed, validation_passed)
    if not validation_passed:
        results.append("VALIDATION FAILED - data quality issues detected")
        return "\n".join(results)
    results.append(f"Validated {transformed} records - all checks passed")

    # Stage 4: Load
    emit_pipeline_event("load", transformed, True)
    results.append(f"Loaded {transformed} records to destination")

    return "\n".join(results)

Now let’s create an agent with this tool and run it with run_iter(), yielding on our custom event:

from autogen.events.agent_events import InputRequestEvent, TerminationEvent, TextEvent, ToolCallEvent

# Create the data engineer agent
data_engineer = ConversableAgent(
    "DataEngineer",
    system_message="""You are a data engineer. When asked to process data, use the run_data_pipeline tool.
After processing, report the results. Say DONE when finished.""",
    is_termination_msg=lambda x: "DONE" in x.get("content", ""),
    llm_config=llm_config,
    functions=[run_data_pipeline],
)

# Run iteration, yielding on our custom PipelineStageEvent
for event in data_engineer.run_iter(
    message="Process 1000 records from the 'sales_data' source",
    max_turns=3,
    yield_on=[PipelineStageEvent, TextEvent, ToolCallEvent, TerminationEvent],
):
    # Handle input requests
    if isinstance(event, InputRequestEvent):
        user_input = input("  Input requested: ")
        event.content.respond(user_input)
        continue

    # Handle our custom pipeline event
    if isinstance(event, PipelineStageEvent):
        stage = event.content.stage_name
        records = event.content.records_processed
        passed = event.content.validation_passed
        status = "PASSED" if passed else "FAILED"
        print(f"\n[PIPELINE STAGE] {stage.upper()}")
        print(f"  Records: {records}")
        print(f"  Status: {status}")

        # You could add human approval here:
        # if stage == "validate" and not passed:
        #     approval = input("  Continue despite validation failure? (y/n): ")
        #     if approval.lower() != 'y':
        #         break  # Abort the pipeline
        continue

    # Handle other events
    if isinstance(event, ToolCallEvent):
        for tool_call in event.content.tool_calls:
            print(f"\n[TOOL CALL] {tool_call.function.name}")
    elif isinstance(event, TextEvent):
        content = str(event.content.content)[:150]
        print(f"\n[TEXT] {content}")

print("\n--- Pipeline run completed! ---")

How the Event Wrapper Works#

The @wrap_event decorator transforms your event class:

  1. Adds a type field - Converted from class name (e.g., PipelineStageEvent"pipeline_stage")
  2. Wraps in a content structure - Your fields are accessed via event.content.<field>
  3. Enables serialization - Events can be serialized for logging or transmission

Example structure after wrapping:

# Before wrapping (your class)
class PipelineStageEvent(BaseEvent):
    stage_name: str
    records_processed: int
    validation_passed: bool

# After wrapping (what you receive)
event.type           # "pipeline_stage"
event.content.stage_name
event.content.records_processed
event.content.validation_passed

Summary#

To create custom events for run iteration:

  1. Define the event class:
    • Inherit from BaseEvent
    • Decorate with @wrap_event
    • Name must end with Event
    • Define fields using Pydantic-style type hints
  2. Emit the event:
    • Use IOStream.get_default().send(YourEvent(...))
    • Emit from tools, hooks, or custom agent code
  3. Yield on the event:
    • Use run_iter() with yield_on=[YourEvent, ...]
    • Check for it with isinstance(event, YourEvent)
    • Access fields via event.content.<field>

This pattern enables powerful workflows like:

  • Validation gates in data pipelines
  • Human approval at critical checkpoints
  • Progress monitoring for long-running tasks
  • Custom logging and analytics