Decode a wire StreamResponse into a typed A2A event.
Picks typed subclasses where possible — text-only artifacts become A2ATextArtifact, tool-call+json artifacts become A2AToolCallArtifact. Unrecognised artifact shapes fall through to the base A2ATaskArtifactUpdate.
Source code in autogen/beta/a2a/mappers/events.py
| def parse_stream_response(response: StreamResponse) -> A2AEvent:
"""Decode a wire ``StreamResponse`` into a typed A2A event.
Picks typed subclasses where possible — text-only artifacts become
``A2ATextArtifact``, ``tool-call+json`` artifacts become
``A2AToolCallArtifact``. Unrecognised artifact shapes fall through
to the base ``A2ATaskArtifactUpdate``.
"""
payload = response.WhichOneof("payload")
if payload == "task":
return A2ATaskSnapshot(task=response.task)
if payload == "message":
return A2AMessage(message=response.message)
if payload == "status_update":
update = response.status_update
return A2ATaskStatusUpdate(update=update, state=update.status.state)
if payload == "artifact_update":
return parse_artifact_update(response.artifact_update)
raise ValueError(f"Unexpected StreamResponse payload: {payload!r}")
|