Skip to content

LiveAgent

autogen.beta.live.realtime.LiveAgent #

LiveAgent(name, prompt=(), *, config, hitl_hook=None, tools=(), middleware=(), observers=(), dependencies=None, variables=None, plugins=(), stream=None)

Realtime STT agent. Open a session via agent.run().

If stream is omitted, owns a fresh MemoryStream; otherwise binds to the supplied one. run() is an async context manager that yields the owned ConversationContext so peers (Player, Recorder) can share it.

prompt accepts the same shapes as Agent.prompt — a string, a PromptHook callable, or any iterable mixing both. Callable hooks are resolved once at session open against the ConversationContext (no ModelRequest — realtime is session-scoped, not request-scoped). The resulting iterable of strings is forwarded as instructions to the provider's session, which is responsible for joining them.

Source code in autogen/beta/live/realtime.py
def __init__(
    self,
    name: str,
    prompt: PromptType | Iterable[PromptType] = (),
    *,
    config: RealtimeConfig,
    hitl_hook: HumanHook | None = None,
    tools: Iterable[Callable[..., Any] | Tool] = (),
    middleware: Iterable[MiddlewareFactory] = (),
    observers: Iterable[Observer] = (),
    dependencies: dict[Any, Any] | None = None,
    variables: dict[Any, Any] | None = None,
    # response_schema
    plugins: Iterable[Plugin] = (),
    # knowledge
    # tasks
    # assembly
    stream: Stream | None = None,
) -> None:
    self.name = name

    self._config = config
    self._stream = stream

    self._agent_dependencies: dict[Any, Any] = dependencies or {}
    self._agent_variables: dict[Any, Any] = variables or {}
    self._hitl_hook = wrap_hitl(hitl_hook) if hitl_hook else None

    self._dependency_provider = Provider()
    self._tools: list[Tool] = []
    for t in tools:
        self.add_tool(t)

    self._middleware: list[MiddlewareFactory] = list(middleware)

    self._observers: list[Observer] = []
    for obs in observers:
        self.add_observer(obs)

    self._serializer: SerializerProto = PydanticSerializer(
        pydantic_config={"arbitrary_types_allowed": True},
        use_fastdepends_errors=False,
    )
    self._tool_executor = ToolExecutor(self._serializer)

    self._system_prompt: list[str] = []
    self._dynamic_prompt: list[Callable[[ModelRequest, ConversationContext], Awaitable[str]]] = []

    if isinstance(prompt, str) or callable(prompt):
        prompt = [prompt]
    for p in prompt:
        if isinstance(p, str):
            self._system_prompt.append(p)
        else:
            self._dynamic_prompt.append(_wrap_prompt_hook(p))

    for plg in plugins:
        plg.register(self)  # type: ignore[arg-type]

name instance-attribute #

name = name

hitl_hook #

hitl_hook(func)
Source code in autogen/beta/live/realtime.py
def hitl_hook(self, func: HumanHook) -> HumanHook:
    if self._hitl_hook is not None:
        warnings.warn(
            "You already set HITL hook, provided value overrides it",
            category=RuntimeWarning,
            stacklevel=2,
        )

    self._hitl_hook = wrap_hitl(func)
    return func

prompt #

prompt(func: None = None) -> Callable[[PromptHook], PromptHook]
prompt(func: PromptHook) -> PromptHook
prompt(func=None)
Source code in autogen/beta/live/realtime.py
def prompt(
    self,
    func: PromptHook | None = None,
) -> PromptHook | Callable[[PromptHook], PromptHook]:
    def wrapper(f: PromptHook) -> PromptHook:
        self._dynamic_prompt.append(_wrap_prompt_hook(f))
        return f

    if func:
        return wrapper(func)
    return wrapper

tool #

tool(function: Callable[..., Any], *, name: str | None = None, description: str | None = None, schema: FunctionParameters | None = None, sync_to_thread: bool = True, middleware: Iterable[ToolMiddleware] = ()) -> Tool
tool(function: None = None, *, name: str | None = None, description: str | None = None, schema: FunctionParameters | None = None, sync_to_thread: bool = True, middleware: Iterable[ToolMiddleware] = ()) -> Callable[[Callable[..., Any]], Tool]
tool(function=None, *, name=None, description=None, schema=None, sync_to_thread=True, middleware=())
Source code in autogen/beta/live/realtime.py
def tool(
    self,
    function: Callable[..., Any] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    schema: FunctionParameters | None = None,
    sync_to_thread: bool = True,
    middleware: Iterable[ToolMiddleware] = (),
) -> Tool | Callable[[Callable[..., Any]], Tool]:
    def make_tool(f: Callable[..., Any]) -> Tool:
        t = _tool(
            f,
            name=name,
            description=description,
            schema=schema,
            sync_to_thread=sync_to_thread,
            middleware=middleware,
        )
        self.add_tool(t)
        return t

    if function:
        return make_tool(function)

    return make_tool

add_tool #

add_tool(t)
Source code in autogen/beta/live/realtime.py
def add_tool(self, t: Callable[..., Any] | Tool) -> "LiveAgent":
    self._tools.append(FunctionTool.ensure_tool(t, provider=self._dependency_provider))
    return self

add_middleware #

add_middleware(m)

Append middleware as the innermost wrapper in the chain.

Source code in autogen/beta/live/realtime.py
def add_middleware(self, m: MiddlewareFactory) -> "LiveAgent":
    """Append middleware as the innermost wrapper in the chain."""
    self._middleware.append(m)
    return self

insert_middleware #

insert_middleware(m)

Insert middleware as the outermost wrapper in the chain.

Source code in autogen/beta/live/realtime.py
def insert_middleware(self, m: MiddlewareFactory) -> "LiveAgent":
    """Insert middleware as the outermost wrapper in the chain."""
    self._middleware.insert(0, m)
    return self

add_observer #

add_observer(observer)

Register an observer (before calling run()).

Source code in autogen/beta/live/realtime.py
def add_observer(self, observer: Observer) -> None:
    """Register an observer (before calling run())."""
    self._observers.append(observer)

run async #

run(*, dependencies=None, variables=None, prompt=(), config=None, tools=(), middleware=(), observers=(), hitl_hook=None)
Source code in autogen/beta/live/realtime.py
@asynccontextmanager
async def run(
    self,
    *,
    dependencies: dict[Any, Any] | None = None,
    variables: dict[Any, Any] | None = None,
    prompt: Iterable[str] = (),
    config: RealtimeConfig | None = None,
    tools: Iterable[Callable[..., Any] | Tool] = (),
    middleware: Iterable[MiddlewareFactory] = (),
    observers: Iterable[Observer] = (),
    hitl_hook: HumanHook | None = None,
) -> AsyncIterator[ConversationContext]:
    stream = self._stream if self._stream is not None else MemoryStream()

    context = ConversationContext(
        stream=stream,
        dependency_provider=self._dependency_provider,
        dependencies=self._agent_dependencies | (dependencies or {}),
        variables=self._agent_variables | (variables or {}),
    )

    active_config = config if config is not None else self._config
    active_hitl = wrap_hitl(hitl_hook) if hitl_hook else self._hitl_hook

    all_tools: list[Tool] = self._tools + [
        FunctionTool.ensure_tool(t, provider=self._dependency_provider) for t in tools
    ]
    all_observers: list[Observer] = self._observers + list(observers)

    initial_event = ModelRequest([])
    middleware_instances: list[BaseMiddleware] = [
        m(initial_event, context) for m in (*self._middleware, *middleware)
    ]

    async with AsyncExitStack() as s:
        if active_hitl is not None:
            s.enter_context(
                stream.where(HumanInputRequest).sub_scope(
                    active_hitl(middleware_instances),
                    interrupt=True,
                ),
            )

        all_schemas: list[ToolSchema] = []
        known_tools: set[str] = set()
        for t in all_tools:
            schemas = await t.schemas(context)
            all_schemas.extend(schemas)
            for schema in schemas:
                if isinstance(schema, FunctionToolSchema):
                    known_tools.add(schema.function.name)
                else:
                    known_tools.add(schema.type)

        if all_tools:
            self._tool_executor.register(
                s,
                context,
                tools=all_tools,
                known_tools=known_tools,
                middleware=middleware_instances,
            )

        instructions = list(prompt) if prompt else await self._resolve_instructions(context)

        # enter Provider session
        await s.enter_async_context(
            active_config.session(
                context,
                instructions=instructions,
                tools=all_schemas,
                serializer=self._serializer,
            )
        )

        for obs in all_observers:
            obs.register(s, context)

        for obs in all_observers:
            await context.send(ObserverStarted(name=getattr(obs, "name", type(obs).__name__)))

        try:
            yield context

        finally:
            for obs in all_observers:
                with suppress(Exception):
                    await context.send(
                        ObserverCompleted(name=getattr(obs, "name", type(obs).__name__)),
                    )