Skip to content

parse_message

autogen.beta.a2a.mappers.messages.parse_message #

parse_message(msg)

Decode an incoming Message into AG2-shaped buckets.

Walks the parts in order; extension data parts are routed to the matching bucket, everything else becomes an Input. Picks up context_update from Message.metadata if present.

Source code in autogen/beta/a2a/mappers/messages.py
def parse_message(msg: Message) -> ParsedMessage:
    """Decode an incoming ``Message`` into AG2-shaped buckets.

    Walks the parts in order; extension data parts are routed to the
    matching bucket, everything else becomes an ``Input``. Picks up
    ``context_update`` from ``Message.metadata`` if present.
    """
    parsed = ParsedMessage()
    for part in msg.parts:
        if is_data_part_with_mime(part, MIME_TOOL_SCHEMAS):
            parsed.tool_schemas.extend(payload_to_schemas(part_data_to_python(part)))
            continue
        if is_data_part_with_mime(part, MIME_TOOL_CALL):
            parsed.tool_calls.append(payload_to_call(part_data_to_python(part)))
            continue
        if is_data_part_with_mime(part, MIME_TOOL_RESULT):
            parsed.tool_results.extend(payload_to_results(part_data_to_python(part)))
            continue
        if is_data_part_with_mime(part, MIME_HISTORY):
            parsed.history_events.extend(payload_to_events(part_data_to_python(part)))
            continue
        parsed.inputs.append(part_to_input(part))
    parsed.context_update = extract_context_update(msg)
    return parsed