Skip to content

SlackRetrieveRepliesTool

autogen.tools.experimental.SlackRetrieveRepliesTool #

SlackRetrieveRepliesTool(*, bot_token, channel_id)

Bases: Tool

Retrieves replies to a specific Slack message from both threads and the channel.

Initialize the SlackRetrieveRepliesTool.

PARAMETER DESCRIPTION
bot_token

Bot User OAuth Token starting with "xoxb-".

TYPE: str

channel_id

Channel ID where the parent message exists.

TYPE: str

Source code in autogen/tools/experimental/messageplatform/slack/slack.py
def __init__(self, *, bot_token: str, channel_id: str) -> None:
    """
    Initialize the SlackRetrieveRepliesTool.

    Args:
        bot_token: Bot User OAuth Token starting with "xoxb-".
        channel_id: Channel ID where the parent message exists.
    """

    async def slack_retrieve_replies(
        message_ts: Annotated[str, "Timestamp (ts) of the parent message to retrieve replies for."],
        bot_token: Annotated[str, Depends(on(bot_token))],
        channel_id: Annotated[str, Depends(on(channel_id))],
        min_replies: Annotated[
            Optional[int],
            "Minimum number of replies to wait for before returning (thread + channel). If None, returns immediately.",
        ] = None,
        timeout_seconds: Annotated[
            int, "Maximum time in seconds to wait for the requested number of replies."
        ] = 60,
        poll_interval: Annotated[int, "Time in seconds between polling attempts when waiting for replies."] = 5,
        include_channel_messages: Annotated[
            bool, "Whether to include messages in the channel after the original message."
        ] = True,
    ) -> Any:
        """
        Retrieves replies to a specific Slack message, from both threads and the main channel.

        Args:
            message_ts: The timestamp (ts) identifier of the parent message.
            bot_token: The bot token to use for Slack. (uses dependency injection)
            channel_id: The ID of the channel. (uses dependency injection)
            min_replies: Minimum number of combined replies to wait for before returning. If None, returns immediately.
            timeout_seconds: Maximum time in seconds to wait for the requested number of replies.
            poll_interval: Time in seconds between polling attempts when waiting for replies.
            include_channel_messages: Whether to include messages posted in the channel after the original message.
        """
        try:
            web_client = WebClient(token=bot_token)

            # Function to get current thread replies
            async def get_thread_replies() -> tuple[Optional[list[dict[str, Any]]], Optional[str]]:
                try:
                    response = web_client.conversations_replies(
                        channel=channel_id,
                        ts=message_ts,
                    )

                    if not response["ok"]:
                        return None, f"Thread reply retrieval failed, Slack response error: {response['error']}"

                    # The first message is the parent message itself, so exclude it when counting replies
                    replies = response["messages"][1:] if len(response["messages"]) > 0 else []
                    return replies, None

                except SlackApiError as e:
                    return None, f"Thread reply retrieval failed, Slack API exception: {e.response['error']}"
                except Exception as e:
                    return None, f"Thread reply retrieval failed, exception: {e}"

            # Function to get messages in the channel after the original message
            async def get_channel_messages() -> Tuple[Optional[list[dict[str, Any]]], Optional[str]]:
                try:
                    response = web_client.conversations_history(
                        channel=channel_id,
                        oldest=message_ts,  # Start from the original message timestamp
                        inclusive=False,  # Don't include the original message
                    )

                    if not response["ok"]:
                        return None, f"Channel message retrieval failed, Slack response error: {response['error']}"

                    # Return all messages in the channel after the original message
                    # We need to filter out any that are part of the thread we're already getting
                    messages = []
                    for msg in response["messages"]:
                        # Skip if the message is part of the thread we're already retrieving
                        if "thread_ts" in msg and msg["thread_ts"] == message_ts:
                            continue
                        messages.append(msg)

                    return messages, None

                except SlackApiError as e:
                    return None, f"Channel message retrieval failed, Slack API exception: {e.response['error']}"
                except Exception as e:
                    return None, f"Channel message retrieval failed, exception: {e}"

            # Function to get all replies (both thread and channel)
            async def get_all_replies() -> Tuple[
                Optional[list[dict[str, Any]]], Optional[list[dict[str, Any]]], Optional[str]
            ]:
                thread_replies, thread_error = await get_thread_replies()
                if thread_error:
                    return None, None, thread_error

                channel_messages: list[dict[str, Any]] = []
                channel_error = None

                if include_channel_messages:
                    channel_results, channel_error = await get_channel_messages()
                    if channel_error:
                        return thread_replies, None, channel_error
                    channel_messages = channel_results if channel_results is not None else []

                return thread_replies, channel_messages, None

            # If no waiting is required, just get replies and return
            if min_replies is None:
                thread_replies, channel_messages, error = await get_all_replies()
                if error:
                    return error

                thread_replies_list: list[dict[str, Any]] = [] if thread_replies is None else thread_replies
                channel_messages_list: list[dict[str, Any]] = [] if channel_messages is None else channel_messages

                # Combine replies for counting but keep them separate in the result
                total_reply_count = len(thread_replies_list) + len(channel_messages_list)

                return {
                    "parent_message_ts": message_ts,
                    "total_reply_count": total_reply_count,
                    "thread_replies": thread_replies_list,
                    "thread_reply_count": len(thread_replies_list),
                    "channel_messages": channel_messages_list if include_channel_messages else None,
                    "channel_message_count": len(channel_messages_list) if include_channel_messages else None,
                }

            # Wait for the required number of replies with timeout
            start_time = datetime.now()
            end_time = start_time + timedelta(seconds=timeout_seconds)

            while datetime.now() < end_time:
                thread_replies, channel_messages, error = await get_all_replies()
                if error:
                    return error

                thread_replies_current: list[dict[str, Any]] = [] if thread_replies is None else thread_replies
                channel_messages_current: list[dict[str, Any]] = (
                    [] if channel_messages is None else channel_messages
                )

                # Combine replies for counting
                total_reply_count = len(thread_replies_current) + len(channel_messages_current)

                # If we have enough total replies, return them
                if total_reply_count >= min_replies:
                    return {
                        "parent_message_ts": message_ts,
                        "total_reply_count": total_reply_count,
                        "thread_replies": thread_replies_current,
                        "thread_reply_count": len(thread_replies_current),
                        "channel_messages": channel_messages_current if include_channel_messages else None,
                        "channel_message_count": len(channel_messages_current)
                        if include_channel_messages
                        else None,
                        "waited_seconds": (datetime.now() - start_time).total_seconds(),
                    }

                # Wait before checking again
                await asyncio.sleep(poll_interval)

            # If we reach here, we timed out waiting for replies
            thread_replies, channel_messages, error = await get_all_replies()
            if error:
                return error

            # Combine replies for counting
            total_reply_count = len(thread_replies or []) + len(channel_messages or [])

            return {
                "parent_message_ts": message_ts,
                "total_reply_count": total_reply_count,
                "thread_replies": thread_replies or [],
                "thread_reply_count": len(thread_replies or []),
                "channel_messages": channel_messages or [] if include_channel_messages else None,
                "channel_message_count": len(channel_messages or []) if include_channel_messages else None,
                "timed_out": True,
                "waited_seconds": timeout_seconds,
                "requested_replies": min_replies,
            }

        except SlackApiError as e:
            return f"Reply retrieval failed, Slack API exception: {e.response['error']} (See https://api.slack.com/automation/cli/errors#{e.response['error']})"
        except Exception as e:
            return f"Reply retrieval failed, exception: {e}"

    super().__init__(
        name="slack_retrieve_replies",
        description="Retrieves replies to a specific Slack message, checking both thread replies and messages in the channel after the original message.",
        func_or_tool=slack_retrieve_replies,
    )

name property #

name

description property #

description

func property #

func

tool_schema property #

tool_schema

Get the schema for the tool.

This is the preferred way of handling function calls with OpeaAI and compatible frameworks.

function_schema property #

function_schema

Get the schema for the function.

This is the old way of handling function calls with OpenAI and compatible frameworks. It is provided for backward compatibility.

realtime_tool_schema property #

realtime_tool_schema

Get the schema for the tool.

This is the preferred way of handling function calls with OpeaAI and compatible frameworks.

register_for_llm #

register_for_llm(agent)

Registers the tool for use with a ConversableAgent's language model (LLM).

This method registers the tool so that it can be invoked by the agent during interactions with the language model.

PARAMETER DESCRIPTION
agent

The agent to which the tool will be registered.

TYPE: ConversableAgent

Source code in autogen/tools/tool.py
def register_for_llm(self, agent: "ConversableAgent") -> None:
    """Registers the tool for use with a ConversableAgent's language model (LLM).

    This method registers the tool so that it can be invoked by the agent during
    interactions with the language model.

    Args:
        agent (ConversableAgent): The agent to which the tool will be registered.
    """
    if self._func_schema:
        agent.update_tool_signature(self._func_schema, is_remove=False)
    else:
        agent.register_for_llm()(self)

register_for_execution #

register_for_execution(agent)

Registers the tool for direct execution by a ConversableAgent.

This method registers the tool so that it can be executed by the agent, typically outside of the context of an LLM interaction.

PARAMETER DESCRIPTION
agent

The agent to which the tool will be registered.

TYPE: ConversableAgent

Source code in autogen/tools/tool.py
def register_for_execution(self, agent: "ConversableAgent") -> None:
    """Registers the tool for direct execution by a ConversableAgent.

    This method registers the tool so that it can be executed by the agent,
    typically outside of the context of an LLM interaction.

    Args:
        agent (ConversableAgent): The agent to which the tool will be registered.
    """
    agent.register_for_execution()(self)

register_tool #

register_tool(agent)

Register a tool to be both proposed and executed by an agent.

Equivalent to calling both register_for_llm and register_for_execution with the same agent.

Note: This will not make the agent recommend and execute the call in the one step. If the agent recommends the tool, it will need to be the next agent to speak in order to execute the tool.

PARAMETER DESCRIPTION
agent

The agent to which the tool will be registered.

TYPE: ConversableAgent

Source code in autogen/tools/tool.py
def register_tool(self, agent: "ConversableAgent") -> None:
    """Register a tool to be both proposed and executed by an agent.

    Equivalent to calling both `register_for_llm` and `register_for_execution` with the same agent.

    Note: This will not make the agent recommend and execute the call in the one step. If the agent
    recommends the tool, it will need to be the next agent to speak in order to execute the tool.

    Args:
        agent (ConversableAgent): The agent to which the tool will be registered.
    """
    self.register_for_llm(agent)
    self.register_for_execution(agent)