async def dispatch(
self,
data: RunAgentInput,
*,
context: dict[str, Any] | None = None,
accept: str | None = None,
) -> AsyncIterator[str]:
state = ContextVariables()
# add agent initial context
state.update(self.__agent.context_variables.to_dict())
# add frontend-passed context
state.update(data.state or {})
# add manual-passed context
state.update(context or {})
client_tools = []
client_tools_names: set[str] = set()
for t in data.tools:
func = t.model_dump(exclude_none=True)
client_tools.append({
"type": "function",
"function": func,
})
client_tools_names.add(func["name"])
message = RequestMessage(
messages=[m.model_dump(exclude_none=True) for m in data.messages],
context=state.data,
client_tools=client_tools,
)
# EventEncoder typed incompletely, so we need to ignore the type error
encoder = EventEncoder(accept=accept) # type: ignore[arg-type]
try:
yield encoder.encode(
RunStartedEvent(
thread_id=data.thread_id,
run_id=data.run_id,
timestamp=_get_timestamp(),
)
)
if state.data != data.state:
yield encoder.encode(
StateSnapshotEvent(
snapshot=state.data,
timestamp=_get_timestamp(),
)
)
async for response in self.service(message):
msg_id = str(uuid4())
if ctx := response.context:
yield encoder.encode(
StateSnapshotEvent(
snapshot=ctx,
timestamp=_get_timestamp(),
)
)
if msg := response.message:
content = msg.get("content", "")
has_tool_result = False
for tool_response in msg.get("tool_responses", []):
has_tool_result = True
yield encoder.encode(
ToolCallResultEvent(
tool_call_id=tool_response["tool_call_id"],
content=tool_response["content"],
message_id=msg_id,
timestamp=_get_timestamp(),
role="tool",
)
)
yield encoder.encode(
ToolCallEndEvent(
tool_call_id=tool_response["tool_call_id"],
timestamp=_get_timestamp(),
)
)
has_tool, has_local_tool = False, False
for tool_call in msg.get("tool_calls", []):
func = tool_call["function"]
if (name := func.get("name")) in client_tools_names:
has_local_tool = True
yield encoder.encode(
ToolCallChunkEvent(
parent_message_id=msg_id,
tool_call_id=tool_call.get("id"),
tool_call_name=name,
delta=func.get("arguments"),
timestamp=_get_timestamp(),
)
)
else:
has_tool = True
yield encoder.encode(
ToolCallStartEvent(
tool_call_id=tool_call.get("id"),
tool_call_name=name,
timestamp=_get_timestamp(),
)
)
yield encoder.encode(
ToolCallArgsEvent(
tool_call_id=tool_call.get("id"),
delta=func.get("arguments"),
timestamp=_get_timestamp(),
)
)
if (
content
and
# do not send tool result as chat message
not has_tool_result
and
# message attached to `tool_calls` mostly thinking block
# than refular text message
(not has_tool or has_local_tool)
):
yield encoder.encode(
TextMessageChunkEvent(
message_id=msg_id,
delta=content,
timestamp=_get_timestamp(),
)
)
except Exception as e:
yield encoder.encode(
RunErrorEvent(
message=repr(e),
timestamp=_get_timestamp(),
)
)
raise e
else:
yield encoder.encode(
RunFinishedEvent(
thread_id=data.thread_id,
run_id=data.run_id,
timestamp=_get_timestamp(),
)
)