task_state_to_status_update(state, *, task_id, context_id, message=None, timestamp=None)
Build an A2ATaskStatusUpdate for a lifecycle transition.
Used by the executor to surface start_work/complete/failed transitions through the same A2AEvent layer that artifact updates use, instead of going through TaskUpdater directly.
timestamp populates TaskStatus.timestamp; pass None to let the wire stay empty (callers that want a real transition time should supply a datetime.now(tz=UTC)).
Source code in autogen/beta/a2a/mappers/events.py
| def task_state_to_status_update(
state: TaskState,
*,
task_id: str,
context_id: str,
message: Message | None = None,
timestamp: datetime | None = None,
) -> A2ATaskStatusUpdate:
"""Build an ``A2ATaskStatusUpdate`` for a lifecycle transition.
Used by the executor to surface ``start_work``/``complete``/``failed``
transitions through the same A2AEvent layer that artifact updates
use, instead of going through ``TaskUpdater`` directly.
``timestamp`` populates ``TaskStatus.timestamp``; pass ``None`` to
let the wire stay empty (callers that want a real transition time
should supply a ``datetime.now(tz=UTC)``).
"""
status_kwargs: dict[str, Any] = {"state": state}
if message is not None:
status_kwargs["message"] = message
if timestamp is not None:
status_kwargs["timestamp"] = timestamp
status = TaskStatus(**status_kwargs)
update = TaskStatusUpdateEvent(task_id=task_id, context_id=context_id, status=status)
return A2ATaskStatusUpdate(update=update, state=state)
|