SkillSearchToolkit(runtime=None, *, client=None, middleware=())
Bases: SkillsToolkit
Toolkit for dynamically searching and installing skills from the skills.sh <https://skills.sh>_ ecosystem.
Does not require Node.js. Uses HTTP + GitHub Tarball API directly. A GITHUB_TOKEN environment variable is read automatically to raise the GitHub rate limit from 60 to 5,000 requests per hour.
Example::
import asyncio
from autogen.beta import Agent
from autogen.beta.config import AnthropicConfig
from autogen.beta.tools import SkillSearchToolkit
config = AnthropicConfig(model="claude-sonnet-4-5")
skills = SkillSearchToolkit()
agent = Agent(
"coder",
"You are a helpful coding assistant. Use skills to extend your capabilities.",
config=config,
tools=[skills],
)
async def main():
reply = await agent.ask("Find and install a skill for React best practices, then tell me the top 3 rules.")
print(await reply.content())
asyncio.run(main())
Custom configuration::
from autogen.beta.tools import SkillSearchToolkit, SkillsClientConfig, LocalRuntime
skills = SkillSearchToolkit(
runtime=LocalRuntime(
dir="./my-skills",
extra_paths=["./extra-skills"],
cleanup=True,
timeout=30,
blocked=["rm -rf"],
),
client=SkillsClientConfig(github_token="ghp_...", proxy="http://proxy:8080"),
)
Individual tools are available as methods::
agent = Agent("a", config=config, tools=[skills.search_skills(), skills.install_skill()])
Source code in autogen/beta/tools/skills/skill_search/toolkit.py
| def __init__(
self,
runtime: SkillRuntime | str | os.PathLike[str] | None = None,
*,
client: SkillsClientConfig | None = None,
middleware: Iterable[ToolMiddleware] = (),
) -> None:
if runtime is not None:
self._runtime: SkillRuntime = LocalRuntime.ensure_runtime(runtime)
else:
self._runtime = LocalRuntime()
self._client = SkillsClient(client)
self._lock = SkillsLock(self._runtime.lock_dir / "skills-lock.json")
Toolkit.__init__(
self,
self.list_skills(),
self.load_skill(),
self.run_skill_script(),
self.search_skills(),
self.install_skill(),
self.remove_skill(),
name="local_skills_toolkit",
middleware=middleware,
)
|
search_skills(*, name='search_skills', description='Search for skills on skills.sh. Returns a formatted list of matching skills with ready-to-use install commands.', middleware=())
Source code in autogen/beta/tools/skills/skill_search/toolkit.py
| def search_skills(
self,
*,
name: str = "search_skills",
description: str = "Search for skills on skills.sh. Returns a formatted list of matching skills with ready-to-use install commands.",
middleware: Iterable[ToolMiddleware] = (),
) -> FunctionTool:
client = self._client
@tool(name=name, description=description, middleware=middleware)
async def _search_skills(
query: Annotated[
str,
Field(description='Search query (e.g. "react performance").'),
],
limit: int = Field(
default=10,
description="Maximum number of results to return.",
),
) -> str:
try:
skills = await client.search(query, limit)
except Exception as e:
return f"Error searching skills.sh: {e}"
if not skills:
return f'No skills found for "{query}".'
lines: list[str] = [f'Found {len(skills)} skill(s) for "{query}":\n']
for i, s in enumerate(skills, 1):
skill_name = s.get("name") or s.get("skillId") or "unknown"
installs: int = s.get("installs", 0)
skill_id_val: str = s.get("skillId") or ""
source: str = s.get("source") or ""
install_id = f"{source}/{skill_id_val}" if skill_id_val and source else source or skill_id_val
lines.append(f"{i}. {skill_name} ({installs:,} installs)")
lines.append(f' \u2192 install_skill("{install_id}")')
lines.append("")
return "\n".join(lines)
return _search_skills
|
install_skill(*, name='install_skill', description='Download and install a skill from skills.sh.', middleware=())
Source code in autogen/beta/tools/skills/skill_search/toolkit.py
| def install_skill(
self,
*,
name: str = "install_skill",
description: str = "Download and install a skill from skills.sh.",
middleware: Iterable[ToolMiddleware] = (),
) -> FunctionTool:
client = self._client
lock = self._lock
runtime = self._runtime
@tool(name=name, description=description, middleware=middleware)
async def _install_skill(
skill_id: Annotated[
str,
Field(
description=(
"The skill identifier from search results, e.g.: "
'"vercel-labs/agent-skills/react-best-practices" (monorepo), '
'"mvanhorn/last30days-skill" (standalone repo).'
)
),
],
) -> str:
parts = skill_id.split("/")
if len(parts) >= 3:
source, sid = f"{parts[0]}/{parts[1]}", "/".join(parts[2:])
elif len(parts) == 2:
source, sid = skill_id, ""
else:
return f"Invalid skill_id format: {skill_id!r}. Expected 'owner/repo/skill-name' or 'owner/repo'."
try:
runtime.ensure_storage()
meta, computed_hash = await client.download_skill(source, sid, runtime)
lock.record(meta.name, source, computed_hash)
runtime.invalidate()
install_dir = runtime.lock_dir
return format_install_result(meta, install_dir)
except (SkillDownloadError, SkillInstallError) as e:
return str(e)
except Exception as e:
return f"Error installing skill: {e}"
return _install_skill
|
remove_skill(*, name='remove_skill', description='Remove an installed skill by name.', middleware=())
Source code in autogen/beta/tools/skills/skill_search/toolkit.py
| def remove_skill(
self,
*,
name: str = "remove_skill",
description: str = "Remove an installed skill by name.",
middleware: Iterable[ToolMiddleware] = (),
) -> FunctionTool:
lock = self._lock
runtime = self._runtime
@tool(name=name, description=description, middleware=middleware)
def _remove_skill(
name: Annotated[
str,
Field(description="Skill name as returned by list_skills()."),
],
) -> str:
try:
runtime.remove(name)
except (ValueError, FileNotFoundError) as e:
return str(e)
lock.remove(name)
runtime.invalidate()
return f"Removed: {name}"
return _remove_skill
|
Source code in autogen/beta/tools/final/toolkit.py
| def set_provider(self, provider: Provider) -> None:
for t in self.tools:
t.set_provider(provider)
|
Source code in autogen/beta/tools/final/toolkit.py
| async def schemas(self, context: "Context") -> Iterable[ToolSchema]:
schemas: list[ToolSchema] = []
for t in self.tools:
schemas.extend(await t.schemas(context))
return schemas
|
register(stack, context, *, middleware=())
Source code in autogen/beta/tools/final/toolkit.py
| def register(
self,
stack: "ExitStack",
context: "Context",
*,
middleware: Iterable["BaseMiddleware"] = (),
) -> None:
for t in self.tools:
t.register(stack, context, middleware=middleware)
|
tool(function: Callable[..., Any], *, name: str | None = None, description: str | None = None, schema: FunctionParameters | None = None, sync_to_thread: bool = True, middleware: Iterable[ToolMiddleware] = ()) -> FunctionTool
tool(function: None = None, *, name: str | None = None, description: str | None = None, schema: FunctionParameters | None = None, sync_to_thread: bool = True, middleware: Iterable[ToolMiddleware] = ()) -> Callable[[Callable[..., Any]], FunctionTool]
tool(function=None, *, name=None, description=None, schema=None, sync_to_thread=True, middleware=())
Source code in autogen/beta/tools/final/toolkit.py
| def tool(
self,
function: Callable[..., Any] | None = None,
*,
name: str | None = None,
description: str | None = None,
schema: FunctionParameters | None = None,
sync_to_thread: bool = True,
middleware: Iterable[ToolMiddleware] = (),
) -> FunctionTool | Callable[[Callable[..., Any]], FunctionTool]:
def make_tool(f: Callable[..., Any]) -> FunctionTool:
t = tool(
f,
name=name,
description=description,
schema=schema,
sync_to_thread=sync_to_thread,
middleware=middleware,
)
self._add_tool(t)
return t
if function:
return make_tool(function)
return make_tool
|
list_skills(*, name='list_skills', description='List available local skills with name and short description.', middleware=())
Source code in autogen/beta/tools/skills/local_skills/toolkit.py
| def list_skills(
self,
*,
name: str = "list_skills",
description: str = "List available local skills with name and short description.",
middleware: Iterable[ToolMiddleware] = (),
) -> FunctionTool:
runtime = self._runtime
@tool(name=name, description=description, middleware=middleware)
def _list_skills() -> list[dict[str, str]]:
return [{"name": m.name, "description": m.description} for m in runtime.discover()]
return _list_skills
|
load_skill(*, name='load_skill', description='Load the full SKILL.md content for a specific skill.', middleware=())
Source code in autogen/beta/tools/skills/local_skills/toolkit.py
| def load_skill(
self,
*,
name: str = "load_skill",
description: str = "Load the full SKILL.md content for a specific skill.",
middleware: Iterable[ToolMiddleware] = (),
) -> FunctionTool:
runtime = self._runtime
@tool(name=name, description=description, middleware=middleware)
def _load_skill(
name: Annotated[str, Field(description="Skill name returned by list_skills.")],
) -> str:
return runtime.load(name)
return _load_skill
|
run_skill_script(*, name='run_skill_script', description="Run a script from a skill's scripts directory. Only .py and .sh scripts are supported.", middleware=())
Source code in autogen/beta/tools/skills/local_skills/toolkit.py
| def run_skill_script(
self,
*,
name: str = "run_skill_script",
description: str = "Run a script from a skill's scripts directory. Only .py and .sh scripts are supported.",
middleware: Iterable[ToolMiddleware] = (),
) -> FunctionTool:
runtime = self._runtime
@tool(name=name, description=description, middleware=middleware)
def _run_skill_script(
name: Annotated[
str,
Field(description="Skill name returned by list_skills."),
],
script: Annotated[
str,
Field(description="Script filename inside scripts/, for example scaffold.py or build.sh."),
],
args: Annotated[
list[str] | None,
Field(description="Optional script arguments passed as positional parameters."),
] = None,
) -> str:
skill_dir = runtime.get_path(name)
scripts_dir = skill_dir / "scripts"
script_path = Path(script)
if script_path.name != script:
raise ValueError("script must be a filename inside the skill scripts directory")
resolved_script = (scripts_dir / script_path.name).resolve()
if not resolved_script.is_file() or not resolved_script.is_relative_to(scripts_dir.resolve()):
raise FileNotFoundError(f"script {script!r} not found in {scripts_dir}")
first_line = resolved_script.read_text(encoding="utf-8", errors="replace").split("\n", 1)[0]
has_shebang = first_line.startswith("#!")
if has_shebang:
resolved_script.chmod(resolved_script.stat().st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
command = [f"./{resolved_script.name}"]
elif resolved_script.suffix.lower() == ".py":
command = ["python3", f"./{resolved_script.name}"]
elif resolved_script.suffix.lower() == ".sh":
command = ["sh", f"./{resolved_script.name}"]
else:
resolved_script.chmod(resolved_script.stat().st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
command = [f"./{resolved_script.name}"]
if args:
command.extend(args)
env = runtime.shell(scripts_dir)
return env.run(shlex.join(command))
return _run_skill_script
|