Unwrap an A2A event back to the bare SDK protobuf for event_queue.
EventQueue.enqueue_event accepts the bare protobuf objects (not StreamResponse); the SDK request handler does the StreamResponse wrapping itself. We unwrap here so callers can keep working with typed AG2 events all the way up to the queue boundary.
Source code in autogen/beta/a2a/mappers/events.py
| def a2a_event_to_sdk(
event: A2AEvent,
) -> Task | Message | TaskStatusUpdateEvent | TaskArtifactUpdateEvent:
"""Unwrap an A2A event back to the bare SDK protobuf for ``event_queue``.
``EventQueue.enqueue_event`` accepts the bare protobuf objects (not
``StreamResponse``); the SDK request handler does the StreamResponse
wrapping itself. We unwrap here so callers can keep working with
typed AG2 events all the way up to the queue boundary.
"""
if isinstance(event, A2ATaskArtifactUpdate):
return event.update
if isinstance(event, A2ATaskStatusUpdate):
return event.update
if isinstance(event, A2AMessage):
return event.message
if isinstance(event, A2ATaskSnapshot):
return event.task
raise TypeError(f"Cannot unwrap {type(event).__name__} to A2A SDK type")
|