Wraps a KnowledgeStore with a Lock for concurrent access safety.
Reads are not locked (safe for concurrent access on all backends). Writes and deletes acquire the lock.
Source code in autogen/beta/knowledge/locked.py
| def __init__(self, store: KnowledgeStore, lock: Any) -> None:
self._store = store
self._lock = lock
|
read async
Source code in autogen/beta/knowledge/locked.py
| async def read(self, path: str) -> str | None:
return await self._store.read(path)
|
write async
Source code in autogen/beta/knowledge/locked.py
| async def write(self, path: str, content: str) -> None:
acquired = await self._lock.acquire(f"store:write:{path}", ttl=30.0)
if not acquired:
raise RuntimeError(f"Failed to acquire write lock for {path}")
try:
await self._store.write(path, content)
finally:
await self._lock.release(f"store:write:{path}")
|
list async
Source code in autogen/beta/knowledge/locked.py
| async def list(self, path: str = "/") -> list[str]:
return await self._store.list(path)
|
delete async
Source code in autogen/beta/knowledge/locked.py
| async def delete(self, path: str) -> None:
acquired = await self._lock.acquire(f"store:write:{path}", ttl=30.0)
if not acquired:
raise RuntimeError(f"Failed to acquire delete lock for {path}")
try:
await self._store.delete(path)
finally:
await self._lock.release(f"store:write:{path}")
|
exists async
Source code in autogen/beta/knowledge/locked.py
| async def exists(self, path: str) -> bool:
return await self._store.exists(path)
|
append async
Source code in autogen/beta/knowledge/locked.py
| async def append(self, path: str, content: str) -> int:
acquired = await self._lock.acquire(f"store:write:{path}", ttl=30.0)
if not acquired:
raise RuntimeError(f"Failed to acquire append lock for {path}")
try:
return await self._store.append(path, content)
finally:
await self._lock.release(f"store:write:{path}")
|
read_range async
read_range(path, start, end=None)
Source code in autogen/beta/knowledge/locked.py
| async def read_range(self, path: str, start: int, end: int | None = None) -> str:
return await self._store.read_range(path, start, end)
|
on_change async
on_change(path, callback)
Source code in autogen/beta/knowledge/locked.py
| async def on_change(self, path: str, callback: ChangeCallback) -> ChangeSubscription:
return await self._store.on_change(path, callback)
|