AG2 Multi-MCP Session Management: Dynamic Server Connections with MCPClientSessionManager
AG2's MCPClientSessionManager revolutionizes how you connect to multiple MCP (Model Context Protocol) servers by enabling on-demand session creation within your agent workflows. Instead of maintaining persistent connections, you can now dynamically open sessions to different servers—whether they use stdio or SSE transports—right inside your tool functions.
This article explores how to leverage MCPClientSessionManager for flexible, resource-efficient multi-server agent architectures, with practical examples for building research assistants, data pipelines, and intelligent routing systems.
{/ more /}
Traditional MCP integration patterns require opening sessions at application startup and keeping them alive for the entire workflow duration. While this works for single-server scenarios, it becomes cumbersome when you need to:
- Connect to multiple servers dynamically
- Switch between servers based on user queries
- Manage resources efficiently
- Handle both local (stdio) and remote (SSE) servers
MCPClientSessionManager solves these challenges by providing a clean, context-manager-based API for opening sessions on-demand, exactly when and where you need them.
Key Features:
-
On-Demand Session Creation: Open MCP sessions only when needed, inside tool functions or agent workflows
-
Multi-Transport Support: Seamlessly handle both
stdio(process-based) andSSE(HTTP-based) protocols -
Dynamic Server Selection: Let agents choose which server to connect to based on runtime conditions
-
Automatic Resource Management: Context managers ensure proper cleanup, preventing resource leaks
-
Session Isolation: Each query gets a fresh session, preventing state pollution between requests
-
Tool-Based Integration: Wrap session management in tools for LLM-driven server selection
Why This Matters:
Building multi-server agent systems traditionally requires complex connection pooling, manual resource management, and rigid server selection logic. MCPClientSessionManager abstracts away these complexities, allowing you to focus on building intelligent agent workflows that can dynamically adapt to different data sources and services.
When to Use MCPClientSessionManager:
Use MCPClientSessionManager when you need:
-
Multi-Server Workflows: Connect to multiple MCP servers (arXiv, Wikipedia, databases, APIs) in a single workflow
-
Dynamic Server Selection: Let agents decide which server to use based on the query context
-
Resource Efficiency: Avoid keeping connections open when not in use
-
Mixed Transport Types: Work with both local stdio servers and remote SSE endpoints
-
Tool-Based Architecture: Integrate MCP servers as tools that agents can invoke on-demand
Don't use MCPClientSessionManager for single-server, long-running workflows where persistent connections are more efficient.
Understanding MCPClientSessionManager#
MCPClientSessionManager is a utility class that simplifies managing MCP client sessions. Unlike traditional approaches where you open a session at startup and keep it alive, MCPClientSessionManager enables:
- On-demand session creation: Create sessions only when needed within your workflow
- Dynamic server switching: Select which MCP server to connect to at runtime
- Multi-transport management: Handle both
stdio(process-based) andSSE(HTTP-based) protocols - Automatic cleanup: Context managers ensure proper resource management
Key Components#
1. StdioConfig: Configuration for stdio-based MCP servers (local processes) - Starts a Python process that communicates via stdin/stdout - Ideal for local tools like arXiv paper search, file system operations, or database queries - Example: Local arXiv paper search server
2. SseConfig: Configuration for SSE-based MCP servers (HTTP endpoints) - Connects to a remote server via Server-Sent Events - Perfect for remote APIs, cloud services, or distributed MCP servers - Example: Remote Wikipedia API server
3. MCPConfig: Container for multiple server configurations - Holds all available servers in one configuration object - Enables dynamic server selection at runtime - Supports mixing stdio and SSE servers
4. MCPClientSessionManager: The session manager class - Provides open_session() method that returns an async context manager - Automatically initializes sessions when opened - Tracks active sessions internally - Ensures proper cleanup on exit
Basic Setup#
The simplest way to use MCPClientSessionManager is to open a session within an async context manager:
from autogen.mcp.mcp_client import MCPClientSessionManager, StdioConfig
# Configure a stdio-based server
arxiv_server = StdioConfig(
command="python3",
args=["mcp/mcp_arxiv.py", "stdio", "--storage-path", "/tmp/arxiv_papers"],
transport="stdio",
server_name="ArxivServer",
)
# Open a session on-demand
async with MCPClientSessionManager().open_session(arxiv_server) as session:
# Session is automatically initialized
tools = await session.list_tools()
# Use the session...
# Session automatically closes when exiting the context
This pattern ensures: - Session is initialized automatically - Resources are cleaned up properly - No manual connection management needed
Configuring Multiple Servers#
For multi-server workflows, use MCPConfig to hold all server configurations:
from autogen.mcp.mcp_client import MCPConfig, StdioConfig, SseConfig
# Configure a stdio-based MCP server (local process)
ArxivServer = StdioConfig(
command="python3",
args=["mcp/mcp_arxiv.py", "stdio", "--storage-path", "/tmp/arxiv_papers"],
transport="stdio",
server_name="ArxivServer",
)
# Configure an SSE-based MCP server (HTTP endpoint)
WikipediaServer = SseConfig(
url="http://127.0.0.1:8000/sse",
timeout=10,
sse_read_timeout=60,
server_name="WikipediaServer",
)
# Create an MCPConfig with both servers
mcp_config = MCPConfig(servers=[ArxivServer, WikipediaServer])
print(f"Configured {len(mcp_config.servers)} MCP servers:")
for server in mcp_config.servers:
print(f" - {server.server_name}")
This configuration allows you to dynamically select which server to use at runtime.
Practical Examples#
Example 1: Tool-Based Dynamic Server Selection#
One of the most powerful patterns is wrapping MCPClientSessionManager in a tool function, allowing agents to dynamically choose which server to use:
from autogen import ConversableAgent, LLMConfig
from autogen.agentchat.group import AgentTarget
from autogen.agentchat.group.reply_result import ReplyResult
from autogen.mcp.mcp_client import (
MCPClientSessionManager,
MCPConfig,
SseConfig,
StdioConfig,
create_toolkit,
)
from autogen.tools import tool
# Configure servers
ArxivServer = StdioConfig(
command="python3",
args=["mcp/mcp_arxiv.py", "stdio", "--storage-path", "/tmp/arxiv_papers"],
transport="stdio",
server_name="ArxivServer",
)
WikipediaServer = SseConfig(
url="http://127.0.0.1:8000/sse",
timeout=10,
sse_read_timeout=60,
server_name="WikipediaServer",
)
mcp_config = MCPConfig(servers=[ArxivServer, WikipediaServer])
def get_server_config(mcp_config: MCPConfig, server_name: str) -> StdioConfig | SseConfig:
"""Return the server config matching the given server_name."""
for server in mcp_config.servers:
if getattr(server, "server_name", None) == server_name:
return server
raise KeyError(f"Server '{server_name}' not found in MCPConfig")
@tool(description="Execute a query on the specified MCP server (ArxivServer or WikipediaServer)")
async def run_mcp_query(query: str, server_name: str) -> ReplyResult:
"""
Execute a query on the specified MCP server.
This tool:
1. Opens a session to the specified MCP server
2. Creates a toolkit from available MCP tools
3. Creates a temporary agent with those tools
4. Executes the query
5. Returns the result
"""
# Get the server configuration by name
server = get_server_config(mcp_config, server_name)
# Create a session manager and open a session
async with MCPClientSessionManager().open_session(server) as session:
# Session is automatically initialized
# Get available tools from the server
agent_tool_prompt = await session.list_tools()
# Create a toolkit from the session
toolkit = await create_toolkit(session=session)
# Create a temporary agent for this server
agent = ConversableAgent(
name="mcp_agent",
system_message=f"You are an agent with access to {server_name} tools. Use them to answer queries.",
llm_config=llm_config,
human_input_mode="NEVER",
)
# Register MCP tools with the agent
toolkit.register_for_llm(agent)
toolkit.register_for_execution(agent)
# Execute the query using the MCP tools
result = await agent.a_run(
message=query + " Use the following tools to answer the question: " + str(agent_tool_prompt),
tools=toolkit.tools,
max_turns=5,
)
# Process results
res = await result.process()
last_message = await res.last_message()
# Return result with handoff back to main agent
return ReplyResult(
message=str(last_message["content"][-1]),
target_agent=AgentTarget(research_assistant)
)
# Create a research assistant that can use the tool
research_assistant = ConversableAgent(
name="research_assistant",
system_message="You are a research assistant. Use run_mcp_query to access arXiv papers or Wikipedia articles.",
llm_config=llm_config,
human_input_mode="NEVER",
)
# Use the research assistant
result = research_assistant.run(
message="Search for recent papers about large language models on ArxivServer",
tools=[run_mcp_query],
max_turns=2,
).process()
This pattern enables: - LLM-driven server selection - Automatic session management - Clean separation of concerns - Easy extension to more servers
Example 2: Direct Session Usage#
For simpler workflows, you can use sessions directly without wrapping them in tools:
from autogen.mcp.mcp_client import MCPClientSessionManager, StdioConfig, create_toolkit
from autogen import ConversableAgent
# Configure server
arxiv_server = StdioConfig(
command="python3",
args=["mcp/mcp_arxiv.py", "stdio", "--storage-path", "/tmp/arxiv_papers"],
transport="stdio",
server_name="ArxivServer",
)
# Open session and use tools directly
async with MCPClientSessionManager().open_session(arxiv_server) as session:
# Create toolkit
toolkit = await create_toolkit(session=session)
# Create agent with MCP tools
agent = ConversableAgent(
name="arxiv_agent",
system_message="You are an agent that searches arXiv papers.",
llm_config=llm_config,
human_input_mode="NEVER",
)
# Register tools
toolkit.register_for_llm(agent)
toolkit.register_for_execution(agent)
# Use the agent
result = await agent.a_run(
message="Find papers about transformers",
tools=toolkit.tools,
max_turns=3,
)
Example 3: Multi-Server Workflow with Conditional Logic#
You can implement conditional logic to select servers based on query content:
async def route_query(query: str, mcp_config: MCPConfig) -> str:
"""Route query to appropriate server based on content."""
query_lower = query.lower()
# Determine which server to use
if "arxiv" in query_lower or "paper" in query_lower or "research" in query_lower:
server_name = "ArxivServer"
elif "wikipedia" in query_lower or "wiki" in query_lower or "article" in query_lower:
server_name = "WikipediaServer"
else:
# Default to first server
server_name = mcp_config.servers[0].server_name
# Get server config
server = get_server_config(mcp_config, server_name)
# Open session and execute
async with MCPClientSessionManager().open_session(server) as session:
toolkit = await create_toolkit(session=session)
# ... execute query ...
return result
Advanced Patterns#
Pattern 1: Session Pooling for Performance#
For high-throughput scenarios, you might want to reuse sessions:
class SessionPool:
"""Simple session pool for reusing MCP sessions."""
def __init__(self):
self.manager = MCPClientSessionManager()
self.active_sessions = {}
async def get_session(self, config: StdioConfig | SseConfig):
"""Get or create a session for the given config."""
server_name = config.server_name
if server_name not in self.active_sessions:
# Open new session
session = await self.manager.open_session(config).__aenter__()
self.active_sessions[server_name] = session
return self.active_sessions[server_name]
async def close_all(self):
"""Close all active sessions."""
for session in self.active_sessions.values():
await session.__aexit__(None, None, None)
self.active_sessions.clear()
Pattern 2: Error Handling and Retries#
Add robust error handling for production use:
import asyncio
from typing import Optional
async def execute_with_retry(
config: StdioConfig | SseConfig,
query: str,
max_retries: int = 3,
retry_delay: float = 1.0,
) -> Optional[str]:
"""Execute query with retry logic."""
for attempt in range(max_retries):
try:
async with MCPClientSessionManager().open_session(config) as session:
toolkit = await create_toolkit(session=session)
# ... execute query ...
return result
except Exception as e:
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay * (attempt + 1))
continue
else:
raise e
return None
Pattern 3: Health Checks#
Implement health checks for your servers:
async def check_server_health(config: StdioConfig | SseConfig) -> bool:
"""Check if an MCP server is healthy."""
try:
async with MCPClientSessionManager().open_session(config) as session:
# Try to list tools as a health check
await session.list_tools()
return True
except Exception:
return False
# Check all servers
async def check_all_servers(mcp_config: MCPConfig) -> dict[str, bool]:
"""Check health of all configured servers."""
health_status = {}
for server in mcp_config.servers:
health_status[server.server_name] = await check_server_health(server)
return health_status
Best Practices#
1. Always Use Context Managers#
Always use async with to ensure proper cleanup:
# ✅ Good: Context manager ensures cleanup
async with MCPClientSessionManager().open_session(config) as session:
# Use session
pass
# ❌ Bad: Manual management can lead to leaks
session = await MCPClientSessionManager().open_session(config).__aenter__()
# ... use session ...
# Easy to forget cleanup!
2. Initialize Sessions Properly#
Sessions are automatically initialized when opened, but you can verify:
async with MCPClientSessionManager().open_session(config) as session:
# Session is already initialized
tools = await session.list_tools() # Verify it works
3. Use Unique Server Names#
Ensure each server has a distinct name in your configuration:
# ✅ Good: Unique names
ArxivServer = StdioConfig(server_name="ArxivServer", ...)
WikipediaServer = SseConfig(server_name="WikipediaServer", ...)
# ❌ Bad: Duplicate names cause conflicts
Server1 = StdioConfig(server_name="Server", ...)
Server2 = SseConfig(server_name="Server", ...) # Conflict!
4. Configure Appropriate Timeouts#
Set timeouts for SSE servers to prevent hanging:
# ✅ Good: Appropriate timeouts
WikipediaServer = SseConfig(
url="http://127.0.0.1:8000/sse",
timeout=10, # HTTP request timeout
sse_read_timeout=60, # SSE event read timeout
server_name="WikipediaServer",
)
# ❌ Bad: No timeout configuration
WikipediaServer = SseConfig(
url="http://127.0.0.1:8000/sse",
server_name="WikipediaServer",
) # May hang indefinitely
5. Handle Errors Gracefully#
Add error handling for production deployments:
async def safe_mcp_query(config: StdioConfig | SseConfig, query: str) -> str:
"""Execute query with error handling."""
try:
async with MCPClientSessionManager().open_session(config) as session:
# ... execute query ...
return result
except ConnectionError as e:
return f"Connection error: {e}"
except TimeoutError as e:
return f"Timeout error: {e}"
except Exception as e:
return f"Unexpected error: {e}"
6. Use MCPConfig for Multi-Server Setups#
When working with multiple servers, use MCPConfig:
# ✅ Good: Centralized configuration
mcp_config = MCPConfig(servers=[ArxivServer, WikipediaServer])
# ❌ Bad: Scattered configurations
# Hard to manage and track
Troubleshooting#
Common Issues#
1. Session Not Initializing
Ensure you're using the context manager correctly:
# ✅ Correct
async with MCPClientSessionManager().open_session(config) as session:
tools = await session.list_tools()
# ❌ Incorrect - session not initialized
session = MCPClientSessionManager().open_session(config)
tools = await session.list_tools() # Error!
2. Server Name Not Found
Verify server names match exactly:
# Check available servers
for server in mcp_config.servers:
print(f"Available: {server.server_name}")
# Ensure exact match (case-sensitive)
server = get_server_config(mcp_config, "ArxivServer") # Must match exactly
3. SSE Server Connection Issues
Verify SSE server is running and accessible:
# Check if server is reachable
import httpx
try:
response = httpx.get("http://127.0.0.1:8000/sse", timeout=5)
print(f"Server status: {response.status_code}")
except Exception as e:
print(f"Server not accessible: {e}")
4. Stdio Process Fails to Start
Check command and arguments:
# Verify command exists and is executable
import shutil
command = "python3"
if not shutil.which(command):
raise ValueError(f"Command '{command}' not found in PATH")
# Test command manually
# python3 mcp/mcp_arxiv.py stdio --storage-path /tmp/arxiv_papers
5. Resource Leaks
Always use context managers:
# ✅ Good: Automatic cleanup
async with MCPClientSessionManager().open_session(config) as session:
# Use session
pass
# Session automatically closed
# ❌ Bad: Manual cleanup required
session = await manager.open_session(config).__aenter__()
# Must remember to call __aexit__!
Benefits Summary#
-
Resource Efficiency: Sessions are only opened when needed and automatically closed after use
-
Dynamic Server Selection: Agents can choose which server to use based on query context
-
Session Isolation: Each query gets a fresh session, preventing state pollution
-
Multi-Transport Support: Seamlessly handle both stdio and SSE protocols
-
Tool-Based Integration: Wrap session management in tools for LLM-driven workflows
-
Easy Extension: Add new servers without changing core workflow logic
-
Production Ready: Proper error handling, timeouts, and resource management
Getting Started#
- Import the necessary classes:
from autogen.mcp.mcp_client import (
MCPClientSessionManager,
MCPConfig,
SseConfig,
StdioConfig,
create_toolkit,
)
- Configure your servers:
# Stdio server
arxiv_server = StdioConfig(
command="python3",
args=["mcp/mcp_arxiv.py", "stdio"],
transport="stdio",
server_name="ArxivServer",
)
# SSE server
wiki_server = SseConfig(
url="http://127.0.0.1:8000/sse",
timeout=10,
sse_read_timeout=60,
server_name="WikipediaServer",
)
mcp_config = MCPConfig(servers=[arxiv_server, wiki_server])
- Open a session:
async with MCPClientSessionManager().open_session(arxiv_server) as session:
toolkit = await create_toolkit(session=session)
# Use toolkit with your agents
- Wrap in tools for dynamic selection:
@tool(description="Query MCP servers")
async def query_mcp_server(query: str, server_name: str):
server = get_server_config(mcp_config, server_name)
async with MCPClientSessionManager().open_session(server) as session:
# Execute query
return result
- Review the documentation: MCP Integration Guide
Additional Resources#
- AG2 MCP Documentation
- MCP Session Manager Example Notebook
- Model Context Protocol Specification
- AG2 Agent Chat Documentation
MCPClientSessionManager transforms how you build multi-server agent workflows in AG2. By enabling on-demand session creation with automatic resource management, it unlocks new possibilities for dynamic, intelligent agent systems that can adapt to different data sources and services. Start building your multi-MCP agent workflows today and experience the power of flexible session management.