Skip to content

Agents

Agents are the central primitive in AG2. They maintain state, interact with models, execute tools, and handle user interactions through a clean, conversation-focused API.

Core Communication Primitives#

The API is built around a few simple methods:

  • Agent.ask(...) initiates a new turn and blocks until it finishes, returning an AgentReply.
  • AgentReply.ask(...) continues an existing conversation, preserving its context and history.
  • Agent.run(...) / AgentReply.run(...) are the observable counterparts: they open a turn you can watch live and drive on demand, returning an AgentRun handle. See Watching a Turn Live with run below.

The final result of any turn is safely stored in reply.response; use reply.body for the text.

Basic Communication Example#

Here's how easily you can start and continue a conversation:

from ag2 import Agent
from ag2.config import OpenAIConfig

agent = Agent(
    "assistant",
    prompt="You are a helpful assistant.",
    config=OpenAIConfig("gpt-4o-mini"),
)

# Start a new conversation
reply = await agent.ask("Give me one sentence about AG2.")
print(reply.body)

# Continue the exact same conversation context
next_turn = await reply.ask("Now make it shorter.")
print(next_turn.body)

...

Empowering Agents with Tools#

Agents can seamlessly use Python functions as tools. When you provide a list of @tool-decorated functions to an agent, it automatically manages the entire execution lifecycle (model requests to execution and returning results).

from ag2 import Agent, Context, tool
from ag2.config import OpenAIConfig

@tool
async def echo(text: str) -> str:
    """Useful for repeating exactly what was given."""
    return f"echo: {text}"

agent = Agent(
    "assistant",
    prompt="Use tools when helpful.",
    config=OpenAIConfig("gpt-4o-mini"),
    tools=[echo],
)

reply = await agent.ask("Call the echo tool with 'hello'.")
print(reply.body)

Adding Human-in-the-Loop (HITL)#

Sometimes an agent needs human guidance. You can configure an agent to handle HumanInputRequest events. This is especially effective inside tools where you can get confirmation before taking a sensitive action.

from ag2 import Agent, Context, tool
from ag2.config import OpenAIConfig
from ag2.events import HumanInputRequest, HumanMessage

@tool
async def ask_human(context: Context) -> str:
    # Pauses agent execution to await human input
    answer = await context.input("Please provide confirmation:")
    return f"Human said: {answer}"

# Define how your application handles the input request
def hitl_hook(event: HumanInputRequest) -> HumanMessage:
    # Here you could block and wait for UI/CLI input.
    # We return a static response for demonstration.
    return HumanMessage(content="confirmed")

agent = Agent(
    "assistant",
    prompt="Use ask_human when needed.",
    config=OpenAIConfig("gpt-4o-mini"),
    tools=[ask_human],
    hitl_hook=hitl_hook,
)

reply = await agent.ask("Request confirmation through the tool.")
print(reply.body)

Observing Agent Actions#

Need to know exactly what the agent is doing? Pass a MemoryStream when calling ask(). You can attach event subscribers to log actions, save history to a database, or update a user interface in real time.

from ag2 import Agent, Context, MemoryStream
from ag2.events import BaseEvent, ModelResponse, ToolCallEvent
from ag2.config import OpenAIConfig

stream = MemoryStream()

# Listen to everything
@stream.subscribe()
async def on_any_event(event: BaseEvent) -> None:
    print(f"Event occurred: {event}")

# Only listen to specific events
@stream.where(ToolCallEvent).subscribe()
async def on_tool_call(event: ToolCallEvent) -> None:
    print("Agent requested tool:", event.name)

agent = Agent(
    "assistant",
    prompt="You are a helpful assistant.",
    config=OpenAIConfig("gpt-4o-mini"),
)

# Stream captures all events during the ask
reply = await agent.ask(
    "Give me one sentence about AG2.",
    stream=stream
)

Watching a Turn Live with run#

ask() blocks until the whole turn is done. When you want to watch a turn unfold — stream tokens to a UI as they arrive, or steer the agent mid-turn — use run() instead. It returns an AgentRun async context manager.

The turn does not advance on its own. Call run.start() to drive it in the background, then read run.stream.join() — an async iterator of live events — at the same time. await run.result() returns the turn's final AgentReply.

Iterating over events while the turn runs#

from ag2 import Agent
from ag2.config import OpenAIConfig

agent = Agent(
    "assistant",
    config=OpenAIConfig("gpt-4o-mini", streaming=True),
)

async with agent.run("Tell me about Paris in two sentences.") as run:
    run.start()  # drive the turn in the background

    with run.stream.join() as events:
        async for event in events:
            print(event)  # every event the turn emits

    reply = await run.result()

print("\n", reply.body)

How it behaves:

  • run.start() drives the turn in a scope-owned background task; it returns immediately (it is not awaited). Without it, run.stream.join() would block forever — nothing advances the turn.
  • run.stream.join() yields every event the turn emits;
  • Token streaming needs streaming=True on the model config (as above); the model then emits ModelMessageChunk events as it generates.
  • await run.result() returns the same AgentReply the started turn produced (idempotent, re-raising the same failure on retry). Leaving the block without awaiting it cancels a still-running turn.
  • To drive inline instead, skip start() and await run.result() directly; cancelling that await — e.g. await asyncio.wait_for(run.result(), timeout=5) — cancels the turn.

Do not timeout individual join() items

Consume run.stream.join() with a plain async for, or use join(max_events=N) when you know how many events you need. Avoid wrapping individual pulls such as await asyncio.wait_for(events.__anext__(), timeout=...): cancelling one __anext__() closes the current iterator, so later events will not be yielded by that iterator. If you need a time limit, put one asyncio.timeout(...) around the whole loop instead of around each item.

AgentReply.run(...) is the same thing for a continuation: it watches a follow-up turn on an existing conversation, just as AgentReply.ask(...) continues one with a blocking call.

ask is run you don't have to drive

await agent.ask(...) is exactly run() + await result() rolled into one call. Reach for ask when you just want the answer, and run when you need to observe or steer the turn.

Feeding the turn while it runs#

A running turn keeps an inbox. Call run.enqueue(...) to push a follow-up message into it — from a concurrent task or while iterating events — and the turn consumes it at its next model call, without starting a new turn. Here we wait for the first tool result, then steer the turn:

from ag2 import Agent, tool
from ag2.events import ToolResultEvent
from ag2.config import OpenAIConfig

@tool
async def search(query: str) -> str:
    """Looks up a query."""
    return f"results for {query}"

agent = Agent(
    "assistant",
    prompt="Use the search tool when helpful.",
    config=OpenAIConfig("gpt-4o-mini"),
    tools=[search],
)

async with agent.run("Search for AG2.") as run:
    run.start()

    with run.stream.where(ToolResultEvent).join(max_events=1) as results:
        async for _ in results:
            run.enqueue("Now summarize the result in one line.")  # lands at the next model call

    reply = await run.result()

print(await reply.content())

enqueue is non-blocking — it only appends to the inbox. When the message is consumed depends on timing:

  • before result() → merged into the turn's first model call;
  • while the turn is running → consumed by that same turn's next or final model call;
  • after the turn finishes → waits for the next turn on this stream.

Building with an AI coding assistant?

See Coding with AI Assistants to set up Claude Code, Cursor, Copilot, or another assistant with AG2 skills and project rules so it writes against the current ag2 API.