Agent(name: str, prompt: PromptType | Iterable[PromptType] = ..., *, config: ModelConfig | None = ..., hitl_hook: HumanHook | None = ..., tools: Iterable[Callable[..., Any] | Tool] = ..., middleware: Iterable[MiddlewareFactory] = ..., observers: Iterable[Observer] = ..., dependencies: dict[Any, Any] | None = ..., variables: dict[Any, Any] | None = ..., response_schema: type[TResult], plugins: Iterable[Plugin] = ..., knowledge: KnowledgeConfig | None = ..., tasks: TaskConfig | Literal[False] = ..., assembly: Iterable[AssemblyPolicy] = ...)
Agent(name: str, prompt: PromptType | Iterable[PromptType] = ..., *, config: ModelConfig | None = ..., hitl_hook: HumanHook | None = ..., tools: Iterable[Callable[..., Any] | Tool] = ..., middleware: Iterable[MiddlewareFactory] = ..., observers: Iterable[Observer] = ..., dependencies: dict[Any, Any] | None = ..., variables: dict[Any, Any] | None = ..., response_schema: ResponseProto[TResult], plugins: Iterable[Plugin] = ..., knowledge: KnowledgeConfig | None = ..., tasks: TaskConfig | Literal[False] = ..., assembly: Iterable[AssemblyPolicy] = ...)
Agent(name: str, prompt: PromptType | Iterable[PromptType] = ..., *, config: ModelConfig | None = ..., hitl_hook: HumanHook | None = ..., tools: Iterable[Callable[..., Any] | Tool] = ..., middleware: Iterable[MiddlewareFactory] = ..., observers: Iterable[Observer] = ..., dependencies: dict[Any, Any] | None = ..., variables: dict[Any, Any] | None = ..., response_schema: UnionType, plugins: Iterable[Plugin] = ..., knowledge: KnowledgeConfig | None = ..., tasks: TaskConfig | Literal[False] = ..., assembly: Iterable[AssemblyPolicy] = ...)
Agent(name: str, prompt: PromptType | Iterable[PromptType] = ..., *, config: ModelConfig | None = ..., hitl_hook: HumanHook | None = ..., tools: Iterable[Callable[..., Any] | Tool] = ..., middleware: Iterable[MiddlewareFactory] = ..., observers: Iterable[Observer] = ..., dependencies: dict[Any, Any] | None = ..., variables: dict[Any, Any] | None = ..., response_schema: None = ..., plugins: Iterable[Plugin] = ..., knowledge: KnowledgeConfig | None = ..., tasks: TaskConfig | Literal[False] = ..., assembly: Iterable[AssemblyPolicy] = ...)
Agent(name, prompt=(), *, config=None, hitl_hook=None, tools=(), middleware=(), observers=(), dependencies=None, variables=None, response_schema=None, plugins=(), knowledge=None, tasks=False, assembly=())
Bases: Generic[TResult]
The agentic unit of autogen.beta.
An Agent runs a model loop, invokes tools, honours middleware, surfaces events through observers, and optionally runs the harness primitives (assembly, compaction, aggregation, knowledge store, subtask spawning).
A bare Agent(name, config=cfg) has zero harness middleware and behaves exactly like a plain LLM loop. Harness features are opt-in:
assembly= — assembly policies (e.g. ConversationPolicy, SlidingWindow, AlertPolicy). When non-empty, AssemblerMiddleware and _HaltCheckMiddleware are wired in. knowledge=KnowledgeConfig(store=...) — persistent knowledge store, compaction, aggregation. tasks=TaskConfig(...) — opt in to the auto-injected run_subtask / run_subtasks tools, and override the LLM config / prompt / tool-inheritance rules for sub-task Agents. Defaults to False (sub-task tools disabled).
Source code in autogen/beta/agent.py
| def __init__(
self,
name: str,
prompt: PromptType | Iterable[PromptType] = (),
*,
config: ModelConfig | None = None,
hitl_hook: HumanHook | None = None,
tools: Iterable[Callable[..., Any] | Tool] = (),
middleware: Iterable[MiddlewareFactory] = (),
observers: Iterable[Observer] = (),
dependencies: dict[Any, Any] | None = None,
variables: dict[Any, Any] | None = None,
response_schema: (ResponseProto[TResult] | type[TResult] | types.UnionType | None) = None,
plugins: Iterable["Plugin"] = (),
knowledge: KnowledgeConfig | None = None,
tasks: TaskConfig | Literal[False] = False,
assembly: Iterable[AssemblyPolicy] = (),
):
self.name = name
self.config = config
self._agent_dependencies = dependencies or {}
self._agent_variables = variables or {}
self._middleware = list(middleware)
self._observers = list(observers)
self._serializer: SerializerProto = PydanticSerializer(
pydantic_config={"arbitrary_types_allowed": True},
use_fastdepends_errors=False,
)
self.dependency_provider = Provider()
self.tools: list[FunctionTool] = []
for t in tools:
self.add_tool(t)
self._hitl_hook = wrap_hitl(hitl_hook) if hitl_hook else None
self.__tool_executor = ToolExecutor(self._serializer)
self._system_prompt: list[str] = []
self._dynamic_prompt: list[Callable[[ModelRequest, Context], Awaitable[str]]] = []
self._response_schema = ResponseSchema.ensure_schema(response_schema)
if isinstance(prompt, str) or callable(prompt):
prompt = [prompt]
for p in prompt:
if isinstance(p, str):
self._system_prompt.append(p)
else:
self._dynamic_prompt.append(_wrap_prompt_hook(p))
for p in plugins:
p.register(self)
# Task spawning. ``tasks=False`` (the default) means no auto-injected
# ``run_subtask`` / ``run_subtasks`` tools. Pass ``tasks=TaskConfig(...)``
# to opt in.
if tasks is False:
self._task_config: TaskConfig | None = None
else:
self._task_config = tasks
self._subtask_tools: list[Tool] = _build_subtask_tools(self) if self._task_config is not None else []
# Knowledge store + compaction/aggregation strategies
kc = knowledge
self._knowledge_store = kc.store if kc else None
self._knowledge_expose_tool = kc.expose_tool if kc else True
self._knowledge_write_event_log = kc.write_event_log if kc else True
self._bootstrap = kc.bootstrap if kc else None
self._bootstrap_done: bool = False
self._bootstrap_lock: asyncio.Lock | None = None
self._compact_strategy = kc.compact if kc else None
self._compact_trigger = kc.compact_trigger if kc and kc.compact_trigger else CompactTrigger()
self._aggregate_strategy = kc.aggregate if kc else None
self._aggregate_trigger = kc.aggregate_trigger if kc and kc.aggregate_trigger else AggregateTrigger()
self._knowledge_tools: list[Tool] = (
[_make_knowledge_tool(self._knowledge_store)]
if self._knowledge_store and self._knowledge_expose_tool
else []
)
# Assembly policies (empty by default; bare Agent has no harness).
self._policies: list[AssemblyPolicy] = list(assembly)
if self._policies:
for w in AssemblerMiddleware.validate_order(self._policies):
logger.warning("Assembly policy ordering: %s", w)
|
config instance-attribute
dependency_provider instance-attribute
dependency_provider = Provider()
hitl_hook
Source code in autogen/beta/agent.py
| def hitl_hook(self, func: HumanHook) -> HumanHook:
if self._hitl_hook is not None:
warnings.warn(
"You already set HITL hook, provided value overrides it",
category=RuntimeWarning,
stacklevel=2,
)
self._hitl_hook = wrap_hitl(func)
return func
|
prompt
prompt(func: None = None) -> Callable[[PromptHook], PromptHook]
prompt(func: PromptHook) -> PromptHook
Source code in autogen/beta/agent.py
| def prompt(
self,
func: PromptHook | None = None,
) -> PromptHook | Callable[[PromptHook], PromptHook]:
def wrapper(f: PromptHook) -> PromptHook:
self._dynamic_prompt.append(_wrap_prompt_hook(f))
return f
if func:
return wrapper(func)
return wrapper
|
add_middleware
Append middleware as the innermost wrapper in the chain.
The added middleware is called last on turn entry and first on turn exit, executing closer to the LLM call than any middleware already registered.
Source code in autogen/beta/agent.py
| def add_middleware(self, m: MiddlewareFactory) -> "Agent[TResult]":
"""Append middleware as the innermost wrapper in the chain.
The added middleware is called last on turn entry and first on turn exit,
executing closer to the LLM call than any middleware already registered.
"""
self._middleware.append(m)
return self
|
insert_middleware
Insert middleware as the outermost wrapper in the chain.
The inserted middleware is called first on turn entry and last on turn exit, executing before all middleware already registered on the agent.
Source code in autogen/beta/agent.py
| def insert_middleware(self, m: MiddlewareFactory) -> "Agent[TResult]":
"""Insert middleware as the outermost wrapper in the chain.
The inserted middleware is called first on turn entry and last on turn exit,
executing before all middleware already registered on the agent.
"""
self._middleware.insert(0, m)
return self
|
Source code in autogen/beta/agent.py
| def add_tool(self, t: Callable[..., Any] | Tool) -> "Agent[TResult]":
self.tools.append(FunctionTool.ensure_tool(t, provider=self.dependency_provider))
return self
|
add_observer
Register an observer (before calling ask()).
Source code in autogen/beta/agent.py
| def add_observer(self, observer: Observer) -> None:
"""Register an observer (before calling ask())."""
self._observers.append(observer)
|
add_policy
Append an assembly policy to this agent's chain.
Policies run in order; a newly added policy runs after existing ones. Construction-time ordering validation (warning on suspicious sequences) only runs over policies passed via assembly= — late additions skip the check, so callers should be confident in the ordering they introduce.
Source code in autogen/beta/agent.py
| def add_policy(self, policy: AssemblyPolicy) -> "Agent[TResult]":
"""Append an assembly policy to this agent's chain.
Policies run in order; a newly added policy runs after existing
ones. Construction-time ordering validation (warning on suspicious
sequences) only runs over policies passed via ``assembly=`` — late
additions skip the check, so callers should be confident in the
ordering they introduce.
"""
self._policies.append(policy)
return self
|
task
task(title, *, description='', payload=None, capability=None, ttl_seconds=None, context=None)
Create a Task whose lifecycle this Agent owns.
Returns an unentered Task; use as async with agent.task(...). Events flow on context.stream if a ConversationContext is supplied; else the Task creates a private MemoryStream on entry and events fire on it (only observers attached to that private stream see them).
Inside the async with block, ag2.task is stamped into context.dependencies so any tool annotated with TaskInject resolves to this Task.
capability tags the task with a capability name. When the agent is registered with the network, the TaskMirror calls Hub.record_observation on the terminal event so the matching Resume.observed[capability] track record updates.
Source code in autogen/beta/agent.py
| def task(
self,
title: str,
*,
description: str = "",
payload: dict[str, Any] | None = None,
capability: str | None = None,
ttl_seconds: int | None = None,
context: Context | None = None,
) -> Task:
"""Create a ``Task`` whose lifecycle this Agent owns.
Returns an unentered ``Task``; use as ``async with agent.task(...)``.
Events flow on ``context.stream`` if a ``ConversationContext`` is
supplied; else the Task creates a private ``MemoryStream`` on entry
and events fire on it (only observers attached to that private
stream see them).
Inside the ``async with`` block, ``ag2.task`` is stamped into
``context.dependencies`` so any tool annotated with ``TaskInject``
resolves to this Task.
``capability`` tags the task with a capability name. When the
agent is registered with the network, the ``TaskMirror`` calls
``Hub.record_observation`` on the terminal event so the matching
``Resume.observed[capability]`` track record updates.
"""
spec = TaskSpec(
title=title,
description=description,
payload=dict(payload) if payload else {},
capability=capability,
)
return Task(
owner_id=self.name,
spec=spec,
context=context,
ttl_seconds=ttl_seconds,
)
|
tool(function: Callable[..., Any], *, name: str | None = None, description: str | None = None, schema: FunctionParameters | None = None, sync_to_thread: bool = True, middleware: Iterable[ToolMiddleware] = ()) -> Tool
tool(function: None = None, *, name: str | None = None, description: str | None = None, schema: FunctionParameters | None = None, sync_to_thread: bool = True, middleware: Iterable[ToolMiddleware] = ()) -> Callable[[Callable[..., Any]], Tool]
tool(function=None, *, name=None, description=None, schema=None, sync_to_thread=True, middleware=())
Source code in autogen/beta/agent.py
| def tool(
self,
function: Callable[..., Any] | None = None,
*,
name: str | None = None,
description: str | None = None,
schema: FunctionParameters | None = None,
sync_to_thread: bool = True,
middleware: Iterable[ToolMiddleware] = (),
) -> Tool | Callable[[Callable[..., Any]], Tool]:
def make_tool(f: Callable[..., Any]) -> Tool:
t = tool(
f,
name=name,
description=description,
schema=schema,
sync_to_thread=sync_to_thread,
middleware=middleware,
)
self.add_tool(t)
return t
if function:
return make_tool(function)
return make_tool
|
observer
observer(condition=None, callback=None)
Source code in autogen/beta/agent.py
| def observer(
self,
condition: ClassInfo | Condition | None = None,
callback: Callable[..., Any] | None = None,
) -> Callable[..., Any] | Callable[[Callable[..., Any]], Callable[..., Any]]:
def wrapper(func: Callable[..., Any]) -> Callable[..., Any]:
obs = observer_factory(condition, func)
self._observers.append(obs)
return func
if callback is not None:
return wrapper(callback)
return wrapper
|
ask async
ask(*msg: str | Input, stream: Stream | None = ..., dependencies: dict[Any, Any] | None = ..., variables: dict[Any, Any] | None = ..., prompt: Iterable[str] = ..., config: ModelConfig | None = ..., tools: Iterable[Tool] = ..., middleware: Iterable[MiddlewareFactory] = ..., observers: Iterable[Observer] = ..., response_schema: type[T2], hitl_hook: HumanHook | None = ...) -> AgentReply[T2, TResult]
ask(msg: str | Input, *, stream: Stream | None = ..., dependencies: dict[Any, Any] | None = ..., variables: dict[Any, Any] | None = ..., prompt: Iterable[str] = ..., config: ModelConfig | None = ..., tools: Iterable[Tool] = ..., middleware: Iterable[MiddlewareFactory] = ..., observers: Iterable[Observer] = ..., response_schema: ResponseProto[T2], hitl_hook: HumanHook | None = ...) -> AgentReply[T2, TResult]
ask(msg: str | Input, *, stream: Stream | None = ..., dependencies: dict[Any, Any] | None = ..., variables: dict[Any, Any] | None = ..., prompt: Iterable[str] = ..., config: ModelConfig | None = ..., tools: Iterable[Tool] = ..., middleware: Iterable[MiddlewareFactory] = ..., observers: Iterable[Observer] = ..., response_schema: None, hitl_hook: HumanHook | None = ...) -> AgentReply[str, TResult]
ask(msg: str | Input, *, stream: Stream | None = ..., dependencies: dict[Any, Any] | None = ..., variables: dict[Any, Any] | None = ..., prompt: Iterable[str] = ..., config: ModelConfig | None = ..., tools: Iterable[Tool] = ..., middleware: Iterable[MiddlewareFactory] = ..., observers: Iterable[Observer] = ..., hitl_hook: HumanHook | None = ...) -> AgentReply[TResult, TResult]
ask(*msg, stream=None, dependencies=None, variables=None, prompt=(), config=None, tools=(), middleware=(), observers=(), response_schema=omit, hitl_hook=None)
Source code in autogen/beta/agent.py
| async def ask(
self,
*msg: str | Input,
stream: Stream | None = None,
dependencies: dict[Any, Any] | None = None,
variables: dict[Any, Any] | None = None,
prompt: Iterable[str] = (),
config: ModelConfig | None = None,
tools: Iterable[Tool] = (),
middleware: Iterable[MiddlewareFactory] = (),
observers: Iterable[Observer] = (),
response_schema: Omittable[ResponseProto[Any] | type | None] = omit,
hitl_hook: HumanHook | None = None,
) -> "AgentReply[Any, Any]":
config = config or self.config
if not config:
raise ConfigNotProvidedError()
client = config.create()
stream = stream or MemoryStream()
initial_event = ModelRequest.ensure_request(msg)
context = Context(
stream,
prompt=list(prompt),
dependencies=self._agent_dependencies | (dependencies or {}),
variables=self._agent_variables | (variables or {}),
dependency_provider=self.dependency_provider,
)
if not context.prompt:
context.prompt.extend(self._system_prompt)
for dp in self._dynamic_prompt:
p = await dp(initial_event, context)
context.prompt.append(p)
return await self._execute(
initial_event,
context=context,
client=client,
hitl_hook=hitl_hook,
additional_tools=tools,
additional_middleware=middleware,
additional_observers=observers,
response_schema=response_schema,
)
|
as_tool(*, description, name=None, stream=None, middleware=())
Source code in autogen/beta/agent.py
| def as_tool(
self,
*,
description: str,
name: str | None = None,
stream: StreamFactory | None = None,
middleware: Iterable[ToolMiddleware] = (),
) -> FunctionTool:
return subagent_tool(
self,
description=description,
name=name,
stream=stream,
middleware=middleware,
)
|
as_conversable
Source code in autogen/beta/agent.py
| def as_conversable(self) -> "ConversableAdapter":
# Local import: ``conversable`` imports ``Agent`` from this module —
# a top-level import would create a circular dependency.
from .conversable import ConversableAdapter
return ConversableAdapter(self)
|