Skip to content

RedisKnowledgeStore

autogen.beta.knowledge.redis.RedisKnowledgeStore #

RedisKnowledgeStore(url_or_client, *, key_prefix='ag2:knowledge', poll_interval_s=0.5)

Redis-backed :class:KnowledgeStore.

Each virtual path becomes a Redis string under a configurable key prefix (default ag2:knowledge:). A companion sorted set at {prefix}__index keeps a (path, version) ranking so :meth:list_versions_under returns a snapshot in one round trip and the polling watcher can diff without per-key GETs.

on_change uses :class:PollingChangeWatcher (no Redis keyspace notifications — see §14 Phase 3b "Deferred"). The default poll interval is 500 ms and is configurable via poll_interval_s.

Dependencies: redis.asyncio is an optional install. The import is lazy so a project that never constructs a :class:RedisKnowledgeStore does not force the dependency on every install.

Source code in autogen/beta/knowledge/redis.py
def __init__(
    self,
    url_or_client: Any,
    *,
    key_prefix: str = "ag2:knowledge",
    poll_interval_s: float = 0.5,
) -> None:
    if isinstance(url_or_client, str):
        if _aioredis is None:  # pragma: no cover
            raise ImportError("RedisKnowledgeStore requires the 'redis' package. Install with: pip install redis")
        self._client = _aioredis.from_url(url_or_client, decode_responses=False)
        self._owns_client = True
    else:
        self._client = url_or_client
        self._owns_client = False
    self._key_prefix = key_prefix.rstrip(":")
    self._index_key = f"{self._key_prefix}:__index"
    self._poll_interval_s = poll_interval_s
    self._lock = asyncio.Lock()

read async #

read(path)
Source code in autogen/beta/knowledge/redis.py
async def read(self, path: str) -> str | None:
    value = await self._client.get(self._key(path))
    if value is None:
        return None
    return value.decode("utf-8") if isinstance(value, bytes) else str(value)

write async #

write(path, content)
Source code in autogen/beta/knowledge/redis.py
async def write(self, path: str, content: str) -> None:
    normalized = _normalize(path)
    async with self._lock:
        version = int(await self._client.incr(f"{self._key_prefix}:__version_counter"))
        await self._client.set(self._key(path), content.encode("utf-8"))
        await self._index_add(normalized, version)

list async #

list(path='/')
Source code in autogen/beta/knowledge/redis.py
async def list(self, path: str = "/") -> list[str]:
    prefix = _normalize(path).rstrip("/") + "/"
    snapshot = await self._index_scan()
    children: set[str] = set()
    for p in snapshot:
        if not p.startswith(prefix):
            continue
        remainder = p[len(prefix) :]
        if "/" in remainder:
            children.add(remainder.split("/")[0] + "/")
        else:
            children.add(remainder)
    return sorted(children)

delete async #

delete(path)
Source code in autogen/beta/knowledge/redis.py
async def delete(self, path: str) -> None:
    normalized = _normalize(path)
    prefix = normalized.rstrip("/") + "/"

    async with self._lock:
        snapshot = await self._index_scan()
        to_delete = [p for p in snapshot if p == normalized or p.startswith(prefix)]
        if not to_delete:
            return
        keys = [self._key(p) for p in to_delete]
        await self._client.delete(*keys)
        await self._index_remove(*to_delete)

exists async #

exists(path)
Source code in autogen/beta/knowledge/redis.py
async def exists(self, path: str) -> bool:
    normalized = _normalize(path)
    if await self._client.exists(self._key(normalized)):
        return True
    prefix = normalized.rstrip("/") + "/"
    snapshot = await self._index_scan()
    return any(p.startswith(prefix) for p in snapshot)

append async #

append(path, content)
Source code in autogen/beta/knowledge/redis.py
async def append(self, path: str, content: str) -> int:
    normalized = _normalize(path)
    payload = content.encode("utf-8")
    async with self._lock:
        existing = await self._client.get(self._key(path))
        existing_bytes = (
            existing if isinstance(existing, bytes) else b"" if existing is None else str(existing).encode("utf-8")
        )
        offset = len(existing_bytes)
        combined = existing_bytes + payload
        version = int(await self._client.incr(f"{self._key_prefix}:__version_counter"))
        await self._client.set(self._key(path), combined)
        await self._index_add(normalized, version)
    return offset

read_range async #

read_range(path, start, end=None)
Source code in autogen/beta/knowledge/redis.py
async def read_range(self, path: str, start: int, end: int | None = None) -> str:
    existing = await self._client.get(self._key(path))
    if existing is None:
        return ""
    data = existing if isinstance(existing, bytes) else str(existing).encode("utf-8")
    stop = len(data) if end is None else min(end, len(data))
    if start >= stop:
        return ""
    return data[start:stop].decode("utf-8", errors="strict")

list_versions_under async #

list_versions_under(prefix)
Source code in autogen/beta/knowledge/redis.py
async def list_versions_under(self, prefix: str) -> dict[str, int]:
    snapshot = await self._index_scan()
    normalized = _normalize(prefix).rstrip("/")
    if normalized in ("", "/"):
        return snapshot
    scope = normalized + "/"
    return {p: v for p, v in snapshot.items() if p == normalized or p.startswith(scope)}

on_change async #

on_change(path, callback)
Source code in autogen/beta/knowledge/redis.py
async def on_change(self, path: str, callback: ChangeCallback) -> ChangeSubscription:
    watcher = PollingChangeWatcher(
        backend=self,
        prefix=path,
        callback=callback,
        interval_s=self._poll_interval_s,
    )
    await watcher.start()
    return watcher

close async #

close()

Close the underlying Redis client (if we own it).

Source code in autogen/beta/knowledge/redis.py
async def close(self) -> None:
    """Close the underlying Redis client (if we own it)."""
    if self._owns_client:
        with contextlib.suppress(Exception):
            await self._client.aclose()