Skip to content

make_tasks_tool

autogen.beta.network.client.tools.tasks.make_tasks_tool #

make_tasks_tool(agent_client)

Return a closure-bound tasks tool.

Source code in autogen/beta/network/client/tools/tasks.py
def make_tasks_tool(agent_client: "AgentClient") -> object:
    """Return a closure-bound ``tasks`` tool."""

    @tool
    async def tasks(
        action: Literal["progress", "complete", "list", "status", "wait", "cancel"],
        *,
        payload: dict | None = None,
        result: Any | None = None,
        task_id: str | None = None,
        scope: Literal["own", "all"] = "own",
        state: Literal["active", "all"] = "active",
        timeout: float = 300.0,
        poll_interval: float = 0.1,
        limit: int = 20,
        client: AgentClientInject = None,
        active_task: TaskInject = None,
    ) -> dict | list[dict] | str:
        """Task lifecycle.

        Active-task actions (require an open ``agent.task(...)`` block):
            ``progress``  args payload (dict)
            ``complete``  args result?

        Observation actions:
            ``list``    args scope="own"|"all", state="active"|"all", limit
            ``status``  args task_id
            ``wait``    args task_id, timeout=300, poll_interval=0.1
            ``cancel``  not implemented — returns an error placeholder
        """
        actual = client if client is not None else agent_client
        hub = actual._hub_client

        if action == "progress":
            if active_task is None:
                return "Error: progress requires an active `agent.task(...)` block"
            try:
                await active_task.progress(payload or {})
            except Exception as exc:
                return f"Error: progress failed: {exc}"
            return f"progress emitted on {active_task.task_id}"

        if action == "complete":
            if active_task is None:
                return "Error: complete requires an active `agent.task(...)` block"
            try:
                await active_task.complete(result=result)
            except Exception as exc:
                return f"Error: complete failed: {exc}"
            return f"completed {active_task.task_id}"

        if action == "list":
            owner_filter = actual.agent_id if scope == "own" else None
            metas = await hub.list_tasks(agent_id=owner_filter, limit=limit * 4)
            terminal = {"completed", "failed", "expired", "cancelled"}
            results: list[dict] = []
            for meta in metas:
                if state == "active" and meta.state.value in terminal:
                    continue
                results.append(_task_summary(meta))
                if len(results) >= limit:
                    break
            return results

        if action == "status":
            if not task_id:
                return "Error: status requires `task_id`"
            try:
                meta = await hub.get_task(task_id)
            except Exception:
                return f"Error: task {task_id!r} not found"
            return _task_summary(meta)

        if action == "wait":
            if not task_id:
                return "Error: wait requires `task_id`"
            deadline = asyncio.get_event_loop().time() + timeout
            terminal = {"completed", "failed", "expired", "cancelled"}
            while asyncio.get_event_loop().time() < deadline:
                try:
                    meta = await hub.get_task(task_id)
                except Exception:
                    return f"Error: task {task_id!r} not found"
                if meta.state.value in terminal:
                    return _task_summary(meta)
                await asyncio.sleep(poll_interval)
            return f"Error: task {task_id!r} did not complete within {timeout}s"

        if action == "cancel":
            return "Error: tasks(action='cancel') is not implemented"

        return f"Error: unknown action {action!r}; choose from progress, complete, list, status, wait"

    return tasks