Skip to content

build_grpc_server

autogen.beta.a2a.transports.grpc.build_grpc_server #

build_grpc_server(*, agent_executor, agent_card, bind, extended_agent_card=None, extended_card_modifier=None, task_store=None, push_config_store=None, push_sender=None, options=())

grpc.aio.Server exposing A2A service; caller starts/awaits it.

Source code in autogen/beta/a2a/transports/grpc.py
def build_grpc_server(
    *,
    agent_executor: AgentExecutor,
    agent_card: AgentCard,
    bind: str,
    extended_agent_card: AgentCard | None = None,
    extended_card_modifier: ExtendedCardModifier | None = None,
    task_store: TaskStore | None = None,
    push_config_store: PushNotificationConfigStore | None = None,
    push_sender: PushNotificationSender | None = None,
    options: Sequence[tuple[str, Any]] = (),
) -> grpc.aio.Server:
    """``grpc.aio.Server`` exposing A2A service; caller starts/awaits it."""
    agent_card = clone_card_with_capabilities(
        agent_card,
        extended=extended_agent_card is not None,
        push=push_config_store is not None,
    )
    handler = build_default_handler(
        agent_executor=agent_executor,
        agent_card=agent_card,
        extended_agent_card=extended_agent_card,
        extended_card_modifier=extended_card_modifier,
        task_store=task_store,
        push_config_store=push_config_store,
        push_sender=push_sender,
    )
    server = grpc.aio.server(options=list(options) if options else None)
    a2a_pb2_grpc.add_A2AServiceServicer_to_server(GrpcHandler(handler), server)
    server.add_insecure_port(bind)
    return server