Skip to content

SessionStore

autogen.beta.mcp.sessions.SessionStore #

SessionStore(*, max_sessions=1024, ttl=None, storage=None, clock=monotonic)

Bounded LRU registry mapping an mcp-session-id to a persistent stream.

Each session is bound to a stable :class:~uuid.UUID stream id over a shared :class:Storage; :meth:acquire returns a fresh :class:MemoryStream object on every call (so per-call progress subscribers never accumulate) that reads prior turns back from storage — mirroring the subagents' persistent_stream pattern. Eviction (LRU overflow + idle TTL) drops the evicted session's stored history so memory stays bounded.

Source code in autogen/beta/mcp/sessions.py
def __init__(
    self,
    *,
    max_sessions: int = 1024,
    ttl: float | None = None,
    storage: Storage | None = None,
    clock: Callable[[], float] = time.monotonic,
) -> None:
    if max_sessions < 1:
        raise ValueError(f"max_sessions must be >= 1, got {max_sessions}.")
    if ttl is not None and ttl <= 0:
        raise ValueError(f"ttl must be > 0 when set, got {ttl}.")
    self._storage = storage or MemoryStorage()
    self._max = max_sessions
    self._ttl = ttl
    self._entries: OrderedDict[str, _Entry] = OrderedDict()
    self._lock = asyncio.Lock()
    self._clock = clock

session async #

session(session_id)

Yield session_id's stream while holding its per-session turn lock.

Holding the lock for the duration of the turn serializes concurrent calls on the same session, so their accumulated history can't interleave.

Source code in autogen/beta/mcp/sessions.py
@asynccontextmanager
async def session(self, session_id: str) -> AsyncIterator[MemoryStream]:
    """Yield ``session_id``'s stream while holding its per-session turn lock.

    Holding the lock for the duration of the turn serializes concurrent calls
    on the same session, so their accumulated history can't interleave.
    """
    entry = await self._entry(session_id)
    async with entry.turn_lock:
        yield MemoryStream(storage=self._storage, id=entry.stream_id)

acquire async #

acquire(session_id)

Return a stream carrying session_id's accumulated history.

Does not hold the turn lock — prefer :meth:session on the serving path.

Source code in autogen/beta/mcp/sessions.py
async def acquire(self, session_id: str) -> MemoryStream:
    """Return a stream carrying ``session_id``'s accumulated history.

    Does not hold the turn lock — prefer :meth:`session` on the serving path.
    """
    entry = await self._entry(session_id)
    return MemoryStream(storage=self._storage, id=entry.stream_id)