Skip to content

Agent

autogen.beta.agent.Agent #

Agent(name: str, prompt: PromptType | Iterable[PromptType] = ..., *, config: ModelConfig | None = ..., hitl_hook: HumanHook | None = ..., tools: Iterable[Callable[..., Any] | Tool] = ..., middleware: Iterable[MiddlewareFactory] = ..., observers: Iterable[Observer] = ..., dependencies: dict[Any, Any] | None = ..., variables: dict[Any, Any] | None = ..., response_schema: type[TResult], plugins: Iterable[Plugin] = ..., knowledge: KnowledgeConfig | None = ..., tasks: TaskConfig | Literal[False] = ..., assembly: Iterable[AssemblyPolicy] = ...)
Agent(name: str, prompt: PromptType | Iterable[PromptType] = ..., *, config: ModelConfig | None = ..., hitl_hook: HumanHook | None = ..., tools: Iterable[Callable[..., Any] | Tool] = ..., middleware: Iterable[MiddlewareFactory] = ..., observers: Iterable[Observer] = ..., dependencies: dict[Any, Any] | None = ..., variables: dict[Any, Any] | None = ..., response_schema: ResponseProto[TResult], plugins: Iterable[Plugin] = ..., knowledge: KnowledgeConfig | None = ..., tasks: TaskConfig | Literal[False] = ..., assembly: Iterable[AssemblyPolicy] = ...)
Agent(name: str, prompt: PromptType | Iterable[PromptType] = ..., *, config: ModelConfig | None = ..., hitl_hook: HumanHook | None = ..., tools: Iterable[Callable[..., Any] | Tool] = ..., middleware: Iterable[MiddlewareFactory] = ..., observers: Iterable[Observer] = ..., dependencies: dict[Any, Any] | None = ..., variables: dict[Any, Any] | None = ..., response_schema: UnionType, plugins: Iterable[Plugin] = ..., knowledge: KnowledgeConfig | None = ..., tasks: TaskConfig | Literal[False] = ..., assembly: Iterable[AssemblyPolicy] = ...)
Agent(name: str, prompt: PromptType | Iterable[PromptType] = ..., *, config: ModelConfig | None = ..., hitl_hook: HumanHook | None = ..., tools: Iterable[Callable[..., Any] | Tool] = ..., middleware: Iterable[MiddlewareFactory] = ..., observers: Iterable[Observer] = ..., dependencies: dict[Any, Any] | None = ..., variables: dict[Any, Any] | None = ..., response_schema: None = ..., plugins: Iterable[Plugin] = ..., knowledge: KnowledgeConfig | None = ..., tasks: TaskConfig | Literal[False] = ..., assembly: Iterable[AssemblyPolicy] = ...)
Agent(name, prompt=(), *, config=None, hitl_hook=None, tools=(), middleware=(), observers=(), dependencies=None, variables=None, response_schema=None, plugins=(), knowledge=None, tasks=False, assembly=())

Bases: Generic[TResult]

The agentic unit of autogen.beta.

An Agent runs a model loop, invokes tools, honours middleware, surfaces events through observers, and optionally runs the harness primitives (assembly, compaction, aggregation, knowledge store, subtask spawning).

A bare Agent(name, config=cfg) has zero harness middleware and behaves exactly like a plain LLM loop. Harness features are opt-in:

  • assembly= — assembly policies (e.g. ConversationPolicy, SlidingWindow, AlertPolicy). When non-empty, AssemblerMiddleware and _HaltCheckMiddleware are wired in.
  • knowledge=KnowledgeConfig(store=...) — persistent knowledge store, compaction, aggregation.
  • tasks=TaskConfig(...) — opt in to the auto-injected run_subtask / run_subtasks tools, and override the LLM config / prompt / tool-inheritance rules for sub-task Agents. Defaults to False (sub-task tools disabled).
Source code in autogen/beta/agent.py
def __init__(
    self,
    name: str,
    prompt: PromptType | Iterable[PromptType] = (),
    *,
    config: ModelConfig | None = None,
    hitl_hook: HumanHook | None = None,
    tools: Iterable[Callable[..., Any] | Tool] = (),
    middleware: Iterable[MiddlewareFactory] = (),
    observers: Iterable[Observer] = (),
    dependencies: dict[Any, Any] | None = None,
    variables: dict[Any, Any] | None = None,
    response_schema: (ResponseProto[TResult] | type[TResult] | types.UnionType | None) = None,
    plugins: Iterable["Plugin"] = (),
    knowledge: KnowledgeConfig | None = None,
    tasks: TaskConfig | Literal[False] = False,
    assembly: Iterable[AssemblyPolicy] = (),
):
    self.name = name
    self.config = config

    self._agent_dependencies = dependencies or {}
    self._agent_variables = variables or {}

    self._middleware = list(middleware)
    self._observers = list(observers)

    self._serializer: SerializerProto = PydanticSerializer(
        pydantic_config={"arbitrary_types_allowed": True},
        use_fastdepends_errors=False,
    )

    self.dependency_provider = Provider()
    self.tools: list[FunctionTool] = []
    for t in tools:
        self.add_tool(t)

    self._hitl_hook = wrap_hitl(hitl_hook) if hitl_hook else None
    self.__tool_executor = ToolExecutor(self._serializer)

    self._system_prompt: list[str] = []
    self._dynamic_prompt: list[Callable[[ModelRequest, Context], Awaitable[str]]] = []

    self._response_schema = ResponseSchema.ensure_schema(response_schema)

    if isinstance(prompt, str) or callable(prompt):
        prompt = [prompt]

    for p in prompt:
        if isinstance(p, str):
            self._system_prompt.append(p)
        else:
            self._dynamic_prompt.append(_wrap_prompt_hook(p))

    for p in plugins:
        p.register(self)

    # Task spawning. ``tasks=False`` (the default) means no auto-injected
    # ``run_subtask`` / ``run_subtasks`` tools. Pass ``tasks=TaskConfig(...)``
    # to opt in.
    if tasks is False:
        self._task_config: TaskConfig | None = None
    else:
        self._task_config = tasks

    self._subtask_tools: list[Tool] = _build_subtask_tools(self) if self._task_config is not None else []

    # Knowledge store + compaction/aggregation strategies
    kc = knowledge
    self._knowledge_store = kc.store if kc else None
    self._knowledge_expose_tool = kc.expose_tool if kc else True
    self._knowledge_write_event_log = kc.write_event_log if kc else True
    self._bootstrap = kc.bootstrap if kc else None
    self._bootstrap_done: bool = False
    self._bootstrap_lock: asyncio.Lock | None = None
    self._compact_strategy = kc.compact if kc else None
    self._compact_trigger = kc.compact_trigger if kc and kc.compact_trigger else CompactTrigger()
    self._aggregate_strategy = kc.aggregate if kc else None
    self._aggregate_trigger = kc.aggregate_trigger if kc and kc.aggregate_trigger else AggregateTrigger()
    self._knowledge_tools: list[Tool] = (
        [_make_knowledge_tool(self._knowledge_store)]
        if self._knowledge_store and self._knowledge_expose_tool
        else []
    )

    # Assembly policies (empty by default; bare Agent has no harness).
    self._policies: list[AssemblyPolicy] = list(assembly)
    if self._policies:
        for w in AssemblerMiddleware.validate_order(self._policies):
            logger.warning("Assembly policy ordering: %s", w)

name instance-attribute #

name = name

config instance-attribute #

config = config

dependency_provider instance-attribute #

dependency_provider = Provider()

tools instance-attribute #

tools = []

hitl_hook #

hitl_hook(func)
Source code in autogen/beta/agent.py
def hitl_hook(self, func: HumanHook) -> HumanHook:
    if self._hitl_hook is not None:
        warnings.warn(
            "You already set HITL hook, provided value overrides it",
            category=RuntimeWarning,
            stacklevel=2,
        )

    self._hitl_hook = wrap_hitl(func)
    return func

prompt #

prompt(func: None = None) -> Callable[[PromptHook], PromptHook]
prompt(func: PromptHook) -> PromptHook
prompt(func=None)
Source code in autogen/beta/agent.py
def prompt(
    self,
    func: PromptHook | None = None,
) -> PromptHook | Callable[[PromptHook], PromptHook]:
    def wrapper(f: PromptHook) -> PromptHook:
        self._dynamic_prompt.append(_wrap_prompt_hook(f))
        return f

    if func:
        return wrapper(func)
    return wrapper

add_middleware #

add_middleware(m)

Append middleware as the innermost wrapper in the chain.

The added middleware is called last on turn entry and first on turn exit, executing closer to the LLM call than any middleware already registered.

Source code in autogen/beta/agent.py
def add_middleware(self, m: MiddlewareFactory) -> "Agent[TResult]":
    """Append middleware as the innermost wrapper in the chain.

    The added middleware is called last on turn entry and first on turn exit,
    executing closer to the LLM call than any middleware already registered.
    """
    self._middleware.append(m)
    return self

insert_middleware #

insert_middleware(m)

Insert middleware as the outermost wrapper in the chain.

The inserted middleware is called first on turn entry and last on turn exit, executing before all middleware already registered on the agent.

Source code in autogen/beta/agent.py
def insert_middleware(self, m: MiddlewareFactory) -> "Agent[TResult]":
    """Insert middleware as the outermost wrapper in the chain.

    The inserted middleware is called first on turn entry and last on turn exit,
    executing before all middleware already registered on the agent.
    """
    self._middleware.insert(0, m)
    return self

add_tool #

add_tool(t)
Source code in autogen/beta/agent.py
def add_tool(self, t: Callable[..., Any] | Tool) -> "Agent[TResult]":
    self.tools.append(FunctionTool.ensure_tool(t, provider=self.dependency_provider))
    return self

add_observer #

add_observer(observer)

Register an observer (before calling ask()).

Source code in autogen/beta/agent.py
def add_observer(self, observer: Observer) -> None:
    """Register an observer (before calling ask())."""
    self._observers.append(observer)

add_policy #

add_policy(policy)

Append an assembly policy to this agent's chain.

Policies run in order; a newly added policy runs after existing ones. Construction-time ordering validation (warning on suspicious sequences) only runs over policies passed via assembly= — late additions skip the check, so callers should be confident in the ordering they introduce.

Source code in autogen/beta/agent.py
def add_policy(self, policy: AssemblyPolicy) -> "Agent[TResult]":
    """Append an assembly policy to this agent's chain.

    Policies run in order; a newly added policy runs after existing
    ones. Construction-time ordering validation (warning on suspicious
    sequences) only runs over policies passed via ``assembly=`` — late
    additions skip the check, so callers should be confident in the
    ordering they introduce.
    """
    self._policies.append(policy)
    return self

task #

task(title, *, description='', payload=None, capability=None, ttl_seconds=None, context=None)

Create a Task whose lifecycle this Agent owns.

Returns an unentered Task; use as async with agent.task(...). Events flow on context.stream if a ConversationContext is supplied; else the Task creates a private MemoryStream on entry and events fire on it (only observers attached to that private stream see them).

Inside the async with block, ag2.task is stamped into context.dependencies so any tool annotated with TaskInject resolves to this Task.

capability tags the task with a capability name. When the agent is registered with the network, the TaskMirror calls Hub.record_observation on the terminal event so the matching Resume.observed[capability] track record updates.

Source code in autogen/beta/agent.py
def task(
    self,
    title: str,
    *,
    description: str = "",
    payload: dict[str, Any] | None = None,
    capability: str | None = None,
    ttl_seconds: int | None = None,
    context: Context | None = None,
) -> Task:
    """Create a ``Task`` whose lifecycle this Agent owns.

    Returns an unentered ``Task``; use as ``async with agent.task(...)``.
    Events flow on ``context.stream`` if a ``ConversationContext`` is
    supplied; else the Task creates a private ``MemoryStream`` on entry
    and events fire on it (only observers attached to that private
    stream see them).

    Inside the ``async with`` block, ``ag2.task`` is stamped into
    ``context.dependencies`` so any tool annotated with ``TaskInject``
    resolves to this Task.

    ``capability`` tags the task with a capability name. When the
    agent is registered with the network, the ``TaskMirror`` calls
    ``Hub.record_observation`` on the terminal event so the matching
    ``Resume.observed[capability]`` track record updates.
    """
    spec = TaskSpec(
        title=title,
        description=description,
        payload=dict(payload) if payload else {},
        capability=capability,
    )
    return Task(
        owner_id=self.name,
        spec=spec,
        context=context,
        ttl_seconds=ttl_seconds,
    )

tool #

tool(function: Callable[..., Any], *, name: str | None = None, description: str | None = None, schema: FunctionParameters | None = None, sync_to_thread: bool = True, middleware: Iterable[ToolMiddleware] = ()) -> Tool
tool(function: None = None, *, name: str | None = None, description: str | None = None, schema: FunctionParameters | None = None, sync_to_thread: bool = True, middleware: Iterable[ToolMiddleware] = ()) -> Callable[[Callable[..., Any]], Tool]
tool(function=None, *, name=None, description=None, schema=None, sync_to_thread=True, middleware=())
Source code in autogen/beta/agent.py
def tool(
    self,
    function: Callable[..., Any] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    schema: FunctionParameters | None = None,
    sync_to_thread: bool = True,
    middleware: Iterable[ToolMiddleware] = (),
) -> Tool | Callable[[Callable[..., Any]], Tool]:
    def make_tool(f: Callable[..., Any]) -> Tool:
        t = tool(
            f,
            name=name,
            description=description,
            schema=schema,
            sync_to_thread=sync_to_thread,
            middleware=middleware,
        )
        self.add_tool(t)
        return t

    if function:
        return make_tool(function)

    return make_tool

observer #

observer(condition: ClassInfo | Condition | None, callback: Callable[..., Any]) -> Callable[..., Any]
observer(condition: ClassInfo | Condition | None = None) -> Callable[[Callable[..., Any]], Callable[..., Any]]
observer(condition=None, callback=None)
Source code in autogen/beta/agent.py
def observer(
    self,
    condition: ClassInfo | Condition | None = None,
    callback: Callable[..., Any] | None = None,
) -> Callable[..., Any] | Callable[[Callable[..., Any]], Callable[..., Any]]:
    def wrapper(func: Callable[..., Any]) -> Callable[..., Any]:
        obs = observer_factory(condition, func)
        self._observers.append(obs)
        return func

    if callback is not None:
        return wrapper(callback)
    return wrapper

ask async #

ask(*msg: str | Input, stream: Stream | None = ..., dependencies: dict[Any, Any] | None = ..., variables: dict[Any, Any] | None = ..., prompt: Iterable[str] = ..., config: ModelConfig | None = ..., tools: Iterable[Tool] = ..., middleware: Iterable[MiddlewareFactory] = ..., observers: Iterable[Observer] = ..., response_schema: type[T2], hitl_hook: HumanHook | None = ...) -> AgentReply[T2, TResult]
ask(msg: str | Input, *, stream: Stream | None = ..., dependencies: dict[Any, Any] | None = ..., variables: dict[Any, Any] | None = ..., prompt: Iterable[str] = ..., config: ModelConfig | None = ..., tools: Iterable[Tool] = ..., middleware: Iterable[MiddlewareFactory] = ..., observers: Iterable[Observer] = ..., response_schema: ResponseProto[T2], hitl_hook: HumanHook | None = ...) -> AgentReply[T2, TResult]
ask(msg: str | Input, *, stream: Stream | None = ..., dependencies: dict[Any, Any] | None = ..., variables: dict[Any, Any] | None = ..., prompt: Iterable[str] = ..., config: ModelConfig | None = ..., tools: Iterable[Tool] = ..., middleware: Iterable[MiddlewareFactory] = ..., observers: Iterable[Observer] = ..., response_schema: None, hitl_hook: HumanHook | None = ...) -> AgentReply[str, TResult]
ask(msg: str | Input, *, stream: Stream | None = ..., dependencies: dict[Any, Any] | None = ..., variables: dict[Any, Any] | None = ..., prompt: Iterable[str] = ..., config: ModelConfig | None = ..., tools: Iterable[Tool] = ..., middleware: Iterable[MiddlewareFactory] = ..., observers: Iterable[Observer] = ..., hitl_hook: HumanHook | None = ...) -> AgentReply[TResult, TResult]
ask(*msg, stream=None, dependencies=None, variables=None, prompt=(), config=None, tools=(), middleware=(), observers=(), response_schema=omit, hitl_hook=None)
Source code in autogen/beta/agent.py
async def ask(
    self,
    *msg: str | Input,
    stream: Stream | None = None,
    dependencies: dict[Any, Any] | None = None,
    variables: dict[Any, Any] | None = None,
    prompt: Iterable[str] = (),
    config: ModelConfig | None = None,
    tools: Iterable[Tool] = (),
    middleware: Iterable[MiddlewareFactory] = (),
    observers: Iterable[Observer] = (),
    response_schema: Omittable[ResponseProto[Any] | type | None] = omit,
    hitl_hook: HumanHook | None = None,
) -> "AgentReply[Any, Any]":
    config = config or self.config
    if not config:
        raise ConfigNotProvidedError()
    client = config.create()

    stream = stream or MemoryStream()

    initial_event = ModelRequest.ensure_request(msg)

    context = Context(
        stream,
        prompt=list(prompt),
        dependencies=self._agent_dependencies | (dependencies or {}),
        variables=self._agent_variables | (variables or {}),
        dependency_provider=self.dependency_provider,
    )

    if not context.prompt:
        context.prompt.extend(self._system_prompt)

        for dp in self._dynamic_prompt:
            p = await dp(initial_event, context)
            context.prompt.append(p)

    return await self._execute(
        initial_event,
        context=context,
        client=client,
        hitl_hook=hitl_hook,
        additional_tools=tools,
        additional_middleware=middleware,
        additional_observers=observers,
        response_schema=response_schema,
    )

as_tool #

as_tool(*, description, name=None, stream=None, middleware=())
Source code in autogen/beta/agent.py
def as_tool(
    self,
    *,
    description: str,
    name: str | None = None,
    stream: StreamFactory | None = None,
    middleware: Iterable[ToolMiddleware] = (),
) -> FunctionTool:
    return subagent_tool(
        self,
        description=description,
        name=name,
        stream=stream,
        middleware=middleware,
    )

as_conversable #

as_conversable()
Source code in autogen/beta/agent.py
def as_conversable(self) -> "ConversableAdapter":
    # Local import: ``conversable`` imports ``Agent`` from this module —
    # a top-level import would create a circular dependency.
    from .conversable import ConversableAdapter

    return ConversableAdapter(self)