RedisKnowledgeStore(url_or_client, *, key_prefix='ag2:knowledge', poll_interval_s=0.5)
Redis-backed :class:KnowledgeStore.
Each virtual path becomes a Redis string under a configurable key prefix (default ag2:knowledge:). A companion sorted set at {prefix}__index keeps a (path, version) ranking so :meth:list_versions_under returns a snapshot in one round trip and the polling watcher can diff without per-key GETs.
on_change uses :class:PollingChangeWatcher (no Redis keyspace notifications — see §14 Phase 3b "Deferred"). The default poll interval is 500 ms and is configurable via poll_interval_s.
Dependencies: redis.asyncio is an optional install. The import is lazy so a project that never constructs a :class:RedisKnowledgeStore does not force the dependency on every install.
Source code in autogen/beta/knowledge/redis.py
| def __init__(
self,
url_or_client: Any,
*,
key_prefix: str = "ag2:knowledge",
poll_interval_s: float = 0.5,
) -> None:
if isinstance(url_or_client, str):
if _aioredis is None: # pragma: no cover
raise ImportError("RedisKnowledgeStore requires the 'redis' package. Install with: pip install redis")
self._client = _aioredis.from_url(url_or_client, decode_responses=False)
self._owns_client = True
else:
self._client = url_or_client
self._owns_client = False
self._key_prefix = key_prefix.rstrip(":")
self._index_key = f"{self._key_prefix}:__index"
self._poll_interval_s = poll_interval_s
self._lock = asyncio.Lock()
|
read async
Source code in autogen/beta/knowledge/redis.py
| async def read(self, path: str) -> str | None:
value = await self._client.get(self._key(path))
if value is None:
return None
return value.decode("utf-8") if isinstance(value, bytes) else str(value)
|
write async
Source code in autogen/beta/knowledge/redis.py
| async def write(self, path: str, content: str) -> None:
normalized = _normalize(path)
async with self._lock:
version = int(await self._client.incr(f"{self._key_prefix}:__version_counter"))
await self._client.set(self._key(path), content.encode("utf-8"))
await self._index_add(normalized, version)
|
list async
Source code in autogen/beta/knowledge/redis.py
| async def list(self, path: str = "/") -> list[str]:
prefix = _normalize(path).rstrip("/") + "/"
snapshot = await self._index_scan()
children: set[str] = set()
for p in snapshot:
if not p.startswith(prefix):
continue
remainder = p[len(prefix) :]
if "/" in remainder:
children.add(remainder.split("/")[0] + "/")
else:
children.add(remainder)
return sorted(children)
|
delete async
Source code in autogen/beta/knowledge/redis.py
| async def delete(self, path: str) -> None:
normalized = _normalize(path)
prefix = normalized.rstrip("/") + "/"
async with self._lock:
snapshot = await self._index_scan()
to_delete = [p for p in snapshot if p == normalized or p.startswith(prefix)]
if not to_delete:
return
keys = [self._key(p) for p in to_delete]
await self._client.delete(*keys)
await self._index_remove(*to_delete)
|
exists async
Source code in autogen/beta/knowledge/redis.py
| async def exists(self, path: str) -> bool:
normalized = _normalize(path)
if await self._client.exists(self._key(normalized)):
return True
prefix = normalized.rstrip("/") + "/"
snapshot = await self._index_scan()
return any(p.startswith(prefix) for p in snapshot)
|
append async
Source code in autogen/beta/knowledge/redis.py
| async def append(self, path: str, content: str) -> int:
normalized = _normalize(path)
payload = content.encode("utf-8")
async with self._lock:
existing = await self._client.get(self._key(path))
existing_bytes = (
existing if isinstance(existing, bytes) else b"" if existing is None else str(existing).encode("utf-8")
)
offset = len(existing_bytes)
combined = existing_bytes + payload
version = int(await self._client.incr(f"{self._key_prefix}:__version_counter"))
await self._client.set(self._key(path), combined)
await self._index_add(normalized, version)
return offset
|
read_range async
read_range(path, start, end=None)
Source code in autogen/beta/knowledge/redis.py
| async def read_range(self, path: str, start: int, end: int | None = None) -> str:
existing = await self._client.get(self._key(path))
if existing is None:
return ""
data = existing if isinstance(existing, bytes) else str(existing).encode("utf-8")
stop = len(data) if end is None else min(end, len(data))
if start >= stop:
return ""
return data[start:stop].decode("utf-8", errors="strict")
|
list_versions_under async
list_versions_under(prefix)
Source code in autogen/beta/knowledge/redis.py
| async def list_versions_under(self, prefix: str) -> dict[str, int]:
snapshot = await self._index_scan()
normalized = _normalize(prefix).rstrip("/")
if normalized in ("", "/"):
return snapshot
scope = normalized + "/"
return {p: v for p, v in snapshot.items() if p == normalized or p.startswith(scope)}
|
on_change async
on_change(path, callback)
Source code in autogen/beta/knowledge/redis.py
| async def on_change(self, path: str, callback: ChangeCallback) -> ChangeSubscription:
watcher = PollingChangeWatcher(
backend=self,
prefix=path,
callback=callback,
interval_s=self._poll_interval_s,
)
await watcher.start()
return watcher
|
close async
Close the underlying Redis client (if we own it).
Source code in autogen/beta/knowledge/redis.py
| async def close(self) -> None:
"""Close the underlying Redis client (if we own it)."""
if self._owns_client:
with contextlib.suppress(Exception):
await self._client.aclose()
|