@export_module("autogen")
def run_swarm(
initial_agent: ConversableAgent,
messages: Union[list[dict[str, Any]], str],
agents: list[ConversableAgent],
user_agent: Optional[UserProxyAgent] = None,
swarm_manager_args: Optional[dict[str, Any]] = None,
max_rounds: int = 20,
context_variables: Optional[dict[str, Any]] = None,
after_work: Optional[
Union[
AfterWorkOption,
Callable[
[ConversableAgent, list[dict[str, Any]], GroupChat], Union[AfterWorkOption, ConversableAgent, str]
],
]
] = AfterWorkOption.TERMINATE,
exclude_transit_message: bool = True,
) -> RunResponseProtocol:
iostream = ThreadIOStream()
response = RunResponse(iostream)
def stream_run(
iostream: ThreadIOStream = iostream,
response: RunResponse = response,
) -> None:
with IOStream.set_default(iostream): # type: ignore[arg-type]
try:
chat_result, returned_context_variables, last_speaker = initiate_swarm_chat(
initial_agent=initial_agent,
messages=messages,
agents=agents,
user_agent=user_agent,
swarm_manager_args=swarm_manager_args,
max_rounds=max_rounds,
context_variables=context_variables,
after_work=after_work,
exclude_transit_message=exclude_transit_message,
)
response._summary = chat_result.summary
response._messages = chat_result.chat_history
response._context_variables = returned_context_variables
response._last_speaker = last_speaker
except Exception as e:
response.iostream.send(ErrorEvent(error=e)) # type: ignore[call-arg]
threading.Thread(
target=stream_run,
).start()
return response