Skip to content

parse_task_artifact

autogen.beta.a2a.mappers.events.parse_task_artifact #

parse_task_artifact(artifact, *, task_id, context_id)

Wrap a polling-snapshot Artifact as a typed update event.

A2A polling re-reads task.artifacts (a cumulative list of finalised artifacts) instead of an incremental TaskArtifactUpdateEvent stream. Synthesise the streaming wire shape so the same downstream code path can handle both transports. Each polled artifact is treated as a final, non-appended chunk — polling never sees mid-stream chunks.

Source code in autogen/beta/a2a/mappers/events.py
def parse_task_artifact(
    artifact: Artifact,
    *,
    task_id: str,
    context_id: str,
) -> A2ATaskArtifactUpdate:
    """Wrap a polling-snapshot ``Artifact`` as a typed update event.

    A2A polling re-reads ``task.artifacts`` (a cumulative list of
    finalised artifacts) instead of an incremental
    ``TaskArtifactUpdateEvent`` stream. Synthesise the streaming wire
    shape so the same downstream code path can handle both transports.
    Each polled artifact is treated as a final, non-appended chunk —
    polling never sees mid-stream chunks.
    """
    update = TaskArtifactUpdateEvent(
        task_id=task_id,
        context_id=context_id,
        artifact=artifact,
        append=False,
        last_chunk=True,
    )
    return parse_artifact_update(update)