Skip to content

ShellExecutor

autogen.tools.experimental.shell.shell_tool.ShellExecutor #

ShellExecutor(default_timeout=60.0, workspace_dir=None, allowed_paths=None, allowed_commands=None, denied_commands=None, enable_command_filtering=True, dangerous_patterns=None)

Executor for shell commands with timeout and sandboxing support.

Provides multiple layers of security: 1. Command pattern filtering (blocks dangerous commands) 2. Working directory restriction (chroot-like behavior) 3. Allowed/denied command lists (whitelist/blacklist) 4. Path restrictions (limits file system access)

Initialize the shell executor with sandboxing options.

PARAMETER DESCRIPTION
default_timeout

Default timeout in seconds for command execution.

TYPE: float DEFAULT: 60.0

workspace_dir

Working directory for command execution. Commands will be executed within this directory and cannot access files outside of it if path restrictions are enabled. Defaults to current directory.

TYPE: str | Path | None DEFAULT: None

allowed_paths

List of allowed path patterns (glob patterns). Commands can only access paths matching these patterns. If None, all paths within workspace_dir are allowed. Use ["**"] to allow all paths.

TYPE: list[str] | None DEFAULT: None

allowed_commands

Whitelist of allowed commands (e.g., ["ls", "cat", "grep"]). If provided, only these commands can be executed. None = allow all.

TYPE: list[str] | None DEFAULT: None

denied_commands

Blacklist of denied commands (e.g., ["rm", "dd"]). These commands will be blocked. None = use default dangerous patterns.

TYPE: list[str] | None DEFAULT: None

enable_command_filtering

Whether to enable pattern-based command filtering. Defaults to True.

TYPE: bool DEFAULT: True

dangerous_patterns

Custom dangerous command patterns to check. Each pattern is a tuple of (regex_pattern, error_message). If None, uses DEFAULT_DANGEROUS_PATTERNS.

TYPE: list[tuple[str, str]] | None DEFAULT: None

Source code in autogen/tools/experimental/shell/shell_tool.py
def __init__(
    self,
    default_timeout: float = 60.0,
    workspace_dir: str | Path | None = None,
    allowed_paths: list[str] | None = None,
    allowed_commands: list[str] | None = None,
    denied_commands: list[str] | None = None,
    enable_command_filtering: bool = True,
    dangerous_patterns: list[tuple[str, str]] | None = None,
):
    """Initialize the shell executor with sandboxing options.

    Args:
        default_timeout: Default timeout in seconds for command execution.
        workspace_dir: Working directory for command execution. Commands will be executed
                 within this directory and cannot access files outside of it if path
                 restrictions are enabled. Defaults to current directory.
        allowed_paths: List of allowed path patterns (glob patterns). Commands can only
                      access paths matching these patterns. If None, all paths within
                      workspace_dir are allowed. Use ["**"] to allow all paths.
        allowed_commands: Whitelist of allowed commands (e.g., ["ls", "cat", "grep"]).
                        If provided, only these commands can be executed. None = allow all.
        denied_commands: Blacklist of denied commands (e.g., ["rm", "dd"]).
                       These commands will be blocked. None = use default dangerous patterns.
        enable_command_filtering: Whether to enable pattern-based command filtering.
                                Defaults to True.
        dangerous_patterns: Custom dangerous command patterns to check. Each pattern is
                          a tuple of (regex_pattern, error_message). If None, uses
                          DEFAULT_DANGEROUS_PATTERNS.
    """
    self.default_timeout = default_timeout

    # Working directory setup
    if workspace_dir is None:
        self.workspace_dir = Path.cwd()
    else:
        self.workspace_dir = Path(workspace_dir).resolve()
        if not self.workspace_dir.exists():
            self.workspace_dir.mkdir(parents=True, exist_ok=True)

    # Path restrictions
    if allowed_paths is None:
        allowed_paths = ["**"]  # Allow all paths within workspace_dir by default
    self.allowed_paths = allowed_paths

    # Command restrictions
    self.allowed_commands = allowed_commands
    self.denied_commands = denied_commands or []

    # Pattern filtering
    self.enable_command_filtering = enable_command_filtering
    self.dangerous_patterns = dangerous_patterns or self.DEFAULT_DANGEROUS_PATTERNS

DEFAULT_DANGEROUS_PATTERNS class-attribute instance-attribute #

DEFAULT_DANGEROUS_PATTERNS = [('\\brm\\s+-rf\\s+/\\s*$', 'Deletion of root filesystem (rm -rf /) is not allowed.'), ('\\brm\\s+-rf\\s+/\\s+', 'Deletion starting from root (rm -rf / ...) is not allowed.'), ('\\brm\\s+-rf\\s+~\\s*$', 'Deletion of entire home directory (rm -rf ~) is not allowed.'), ('\\brm\\s+-rf\\s+~\\s+', 'Deletion starting from home (rm -rf ~ ...) is not allowed.'), ('\\brm\\s+-rf\\s+/(?:etc|usr|bin|sbin|lib|lib64|boot|root|sys|proc|dev)\\b', 'Deletion of critical system directories is not allowed.'), ('>\\s*/dev/sd[a-z][0-9]*\\s*$', 'Direct disk block device overwrite is not allowed.'), ('>\\s*/dev/hd[a-z][0-9]*\\s*$', 'Direct disk block device overwrite is not allowed.'), ('>\\s*/dev/nvme\\d+n\\d+p\\d+\\s*$', 'Direct NVMe disk overwrite is not allowed.'), ('\\bdd\\b.*\\bof=/dev/(?:sd|hd|nvme)', 'Writing to disk devices with dd is not allowed.'), (':\\(\\)\\s*\\{\\s*:\\s*\\|\\s*:\\s*&\\s*\\}\\s*;', 'Fork bombs are not allowed.'), ('\\bmkfs\\.(?:ext[234]|xfs|btrfs|ntfs|vfat|fat)\\s+/dev/', 'Formatting filesystems is not allowed.'), ('\\bformat\\s+[A-Z]:\\s*$', 'Formatting Windows drives is not allowed.'), ('\\bformat\\s+[A-Z]:\\s+/', 'Formatting Windows drives is not allowed.'), ('\\bdel\\s+/[sS]\\s+C:\\\\Windows', 'Deletion of Windows system directory is not allowed.'), ('\\bdel\\s+/[sS]\\s+C:\\\\Program\\s+Files', 'Deletion of Windows Program Files is not allowed.'), ('\\brmdir\\s+/[sS]\\s+C:\\\\Windows', 'Deletion of Windows system directory is not allowed.'), ('\\brm\\s+-rf\\s+/\\*\\s*$', 'Mass deletion of root directory contents is not allowed.'), ('\\brm\\s+-rf\\s+~\\*\\s*$', 'Mass deletion of home directory contents is not allowed.'), ('>\\s*/etc/(?:passwd|shadow|hosts|fstab)', 'Overwriting critical system files is not allowed.'), ('>\\s*/boot/', 'Overwriting boot files is not allowed.')]

default_timeout instance-attribute #

default_timeout = default_timeout

workspace_dir instance-attribute #

workspace_dir = cwd()

allowed_paths instance-attribute #

allowed_paths = allowed_paths

allowed_commands instance-attribute #

allowed_commands = allowed_commands

denied_commands instance-attribute #

denied_commands = denied_commands or []

enable_command_filtering instance-attribute #

enable_command_filtering = enable_command_filtering

dangerous_patterns instance-attribute #

dangerous_patterns = dangerous_patterns or DEFAULT_DANGEROUS_PATTERNS

run #

run(cmd, timeout=None)

Execute a shell command and return the result.

PARAMETER DESCRIPTION
cmd

Shell command to execute.

TYPE: str

timeout

Timeout in seconds. If None, uses default_timeout.

TYPE: float | None DEFAULT: None

RETURNS DESCRIPTION
CmdResult

CmdResult with stdout, stderr, exit_code, and timed_out flag.

RAISES DESCRIPTION
ValueError

If command violates security restrictions

Source code in autogen/tools/experimental/shell/shell_tool.py
def run(self, cmd: str, timeout: float | None = None) -> CmdResult:
    """Execute a shell command and return the result.

    Args:
        cmd: Shell command to execute.
        timeout: Timeout in seconds. If None, uses default_timeout.

    Returns:
        CmdResult with stdout, stderr, exit_code, and timed_out flag.

    Raises:
        ValueError: If command violates security restrictions
    """
    # Validate command before execution
    self._validate_command(cmd)

    if timeout is not None and timeout <= 0:
        raise ValueError("Timeout must be positive")

    t = timeout or self.default_timeout

    # Execute in the restricted working directory
    p = subprocess.Popen(
        cmd,
        shell=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
        cwd=str(self.workspace_dir),
    )
    try:
        out, err = p.communicate(timeout=t)
        return CmdResult(stdout=out or "", stderr=err or "", exit_code=p.returncode, timed_out=False)
    except subprocess.TimeoutExpired:
        p.kill()
        out, err = p.communicate()
        return CmdResult(stdout=out or "", stderr=err or "", exit_code=p.returncode, timed_out=True)

run_commands #

run_commands(commands, timeout_ms=None)

Execute multiple shell commands concurrently and return results.

PARAMETER DESCRIPTION
commands

List of shell commands to execute.

TYPE: list[str]

timeout_ms

Timeout in milliseconds for each command.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
list[ShellCommandOutput]

List of ShellCommandOutput objects.

Source code in autogen/tools/experimental/shell/shell_tool.py
def run_commands(self, commands: list[str], timeout_ms: int | None = None) -> list[ShellCommandOutput]:
    """Execute multiple shell commands concurrently and return results.

    Args:
        commands: List of shell commands to execute.
        timeout_ms: Timeout in milliseconds for each command.

    Returns:
        List of ShellCommandOutput objects.
    """
    timeout = (timeout_ms / 1000.0) if timeout_ms is not None else None
    results = []

    for cmd in commands:
        try:
            result = self.run(cmd, timeout=timeout)
            if result.timed_out:
                outcome = ShellCallOutcome(type="timeout")
            else:
                outcome = ShellCallOutcome(type="exit", exit_code=result.exit_code)

            results.append(
                ShellCommandOutput(
                    stdout=result.stdout,
                    stderr=result.stderr,
                    outcome=outcome,
                )
            )
        except ValueError as e:
            # Security violation - return error as output
            results.append(
                ShellCommandOutput(
                    stdout="",
                    stderr=str(e),
                    outcome=ShellCallOutcome(type="exit", exit_code=1),
                )
            )
    return results