Persistent KnowledgeStore backed by the local filesystem.
Maps virtual paths directly to real files under a root directory. Directories are created on write. Supports macOS and Linux. Not supported on Windows (filenames may contain characters that are illegal on NTFS such as :, ?, *, <, >).
Example::
store = DiskKnowledgeStore("/tmp/my-agent")
await store.write("/artifacts/report.md", "# Report")
# Creates /tmp/my-agent/artifacts/report.md on disk
Source code in autogen/beta/knowledge/disk.py
| def __init__(self, root: str | os.PathLike[str]) -> None:
self._root = Path(root)
|
read async
Source code in autogen/beta/knowledge/disk.py
| async def read(self, path: str) -> str | None:
target = self._resolve(path)
if not target.is_file():
return None
return target.read_text(encoding="utf-8")
|
write async
Source code in autogen/beta/knowledge/disk.py
| async def write(self, path: str, content: str) -> None:
target = self._resolve(path)
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(content, encoding="utf-8")
|
list async
Source code in autogen/beta/knowledge/disk.py
| async def list(self, path: str = "/") -> list[str]:
target = self._resolve(path)
if not target.is_dir():
return []
children: list[str] = []
for entry in sorted(target.iterdir()):
if entry.is_dir():
children.append(entry.name + "/")
else:
children.append(entry.name)
return children
|
delete async
Source code in autogen/beta/knowledge/disk.py
| async def delete(self, path: str) -> None:
target = self._resolve(path)
if target.is_file():
target.unlink()
elif target.is_dir():
shutil.rmtree(target)
|
exists async
Source code in autogen/beta/knowledge/disk.py
| async def exists(self, path: str) -> bool:
return self._resolve(path).exists()
|
append async
Source code in autogen/beta/knowledge/disk.py
| async def append(self, path: str, content: str) -> int:
target = self._resolve(path)
target.parent.mkdir(parents=True, exist_ok=True)
payload = content.encode("utf-8")
with target.open("ab") as fh:
offset = fh.tell()
fh.write(payload)
return offset
|
read_range async
read_range(path, start, end=None)
Source code in autogen/beta/knowledge/disk.py
| async def read_range(self, path: str, start: int, end: int | None = None) -> str:
target = self._resolve(path)
if not target.is_file():
return ""
with target.open("rb") as fh:
fh.seek(start)
if end is None:
data = fh.read()
else:
span = max(0, end - start)
data = fh.read(span)
return data.decode("utf-8", errors="strict")
|
on_change async
on_change(path, callback)
Subscribe to filesystem change notifications under path.
Uses the watchdog library to dispatch platform-native events (inotify on Linux, FSEvents on macOS, ReadDirectoryChangesW on Windows). Falls back to :class:PollingObserver if the native backend cannot be initialized, and to :class:NoopChangeSubscription if watchdog is not installed at all.
Source code in autogen/beta/knowledge/disk.py
| async def on_change(self, path: str, callback: ChangeCallback) -> ChangeSubscription:
"""Subscribe to filesystem change notifications under ``path``.
Uses the ``watchdog`` library to dispatch platform-native events
(inotify on Linux, FSEvents on macOS, ReadDirectoryChangesW on
Windows). Falls back to :class:`PollingObserver` if the native
backend cannot be initialized, and to :class:`NoopChangeSubscription`
if ``watchdog`` is not installed at all.
"""
virtual_path = _normalize(path)
physical_target = self._resolve(virtual_path)
physical_target.mkdir(parents=True, exist_ok=True)
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return NoopChangeSubscription()
handler = _DiskChangeHandler(
root=self._root.resolve(),
virtual_prefix=virtual_path,
loop=loop,
callback=callback,
)
observer: Any
try:
observer = Observer()
observer.schedule(handler, str(physical_target), recursive=True)
observer.start()
except Exception:
observer = PollingObserver()
observer.schedule(handler, str(physical_target), recursive=True)
observer.start()
return _DiskChangeSubscription(observer)
|