def build_grpc_server(
*,
agent_executor: AgentExecutor,
agent_card: AgentCard,
bind: str,
extended_agent_card: AgentCard | None = None,
extended_card_modifier: ExtendedCardModifier | None = None,
task_store: TaskStore | None = None,
push_config_store: PushNotificationConfigStore | None = None,
push_sender: PushNotificationSender | None = None,
options: Sequence[tuple[str, Any]] = (),
) -> grpc.aio.Server:
"""``grpc.aio.Server`` exposing A2A service; caller starts/awaits it."""
agent_card = clone_card_with_capabilities(
agent_card,
extended=extended_agent_card is not None,
push=push_config_store is not None,
)
handler = build_default_handler(
agent_executor=agent_executor,
agent_card=agent_card,
extended_agent_card=extended_agent_card,
extended_card_modifier=extended_card_modifier,
task_store=task_store,
push_config_store=push_config_store,
push_sender=push_sender,
)
server = grpc.aio.server(options=list(options) if options else None)
a2a_pb2_grpc.add_A2AServiceServicer_to_server(GrpcHandler(handler), server)
server.add_insecure_port(bind)
return server