Skip to content

DiskKnowledgeStore

autogen.beta.knowledge.disk.DiskKnowledgeStore #

DiskKnowledgeStore(root)

Persistent KnowledgeStore backed by the local filesystem.

Maps virtual paths directly to real files under a root directory. Directories are created on write. Supports macOS and Linux. Not supported on Windows (filenames may contain characters that are illegal on NTFS such as :, ?, *, <, >).

Example::

store = DiskKnowledgeStore("/tmp/my-agent")
await store.write("/artifacts/report.md", "# Report")
# Creates /tmp/my-agent/artifacts/report.md on disk
Source code in autogen/beta/knowledge/disk.py
def __init__(self, root: str | os.PathLike[str]) -> None:
    self._root = Path(root)

read async #

read(path)
Source code in autogen/beta/knowledge/disk.py
async def read(self, path: str) -> str | None:
    target = self._resolve(path)
    if not target.is_file():
        return None
    return target.read_text(encoding="utf-8")

write async #

write(path, content)
Source code in autogen/beta/knowledge/disk.py
async def write(self, path: str, content: str) -> None:
    target = self._resolve(path)
    target.parent.mkdir(parents=True, exist_ok=True)
    target.write_text(content, encoding="utf-8")

list async #

list(path='/')
Source code in autogen/beta/knowledge/disk.py
async def list(self, path: str = "/") -> list[str]:
    target = self._resolve(path)
    if not target.is_dir():
        return []
    children: list[str] = []
    for entry in sorted(target.iterdir()):
        if entry.is_dir():
            children.append(entry.name + "/")
        else:
            children.append(entry.name)
    return children

delete async #

delete(path)
Source code in autogen/beta/knowledge/disk.py
async def delete(self, path: str) -> None:
    target = self._resolve(path)
    if target.is_file():
        target.unlink()
    elif target.is_dir():
        shutil.rmtree(target)

exists async #

exists(path)
Source code in autogen/beta/knowledge/disk.py
async def exists(self, path: str) -> bool:
    return self._resolve(path).exists()

append async #

append(path, content)
Source code in autogen/beta/knowledge/disk.py
async def append(self, path: str, content: str) -> int:
    target = self._resolve(path)
    target.parent.mkdir(parents=True, exist_ok=True)
    payload = content.encode("utf-8")
    with target.open("ab") as fh:
        offset = fh.tell()
        fh.write(payload)
    return offset

read_range async #

read_range(path, start, end=None)
Source code in autogen/beta/knowledge/disk.py
async def read_range(self, path: str, start: int, end: int | None = None) -> str:
    target = self._resolve(path)
    if not target.is_file():
        return ""
    with target.open("rb") as fh:
        fh.seek(start)
        if end is None:
            data = fh.read()
        else:
            span = max(0, end - start)
            data = fh.read(span)
    return data.decode("utf-8", errors="strict")

on_change async #

on_change(path, callback)

Subscribe to filesystem change notifications under path.

Uses the watchdog library to dispatch platform-native events (inotify on Linux, FSEvents on macOS, ReadDirectoryChangesW on Windows). Falls back to :class:PollingObserver if the native backend cannot be initialized, and to :class:NoopChangeSubscription if watchdog is not installed at all.

Source code in autogen/beta/knowledge/disk.py
async def on_change(self, path: str, callback: ChangeCallback) -> ChangeSubscription:
    """Subscribe to filesystem change notifications under ``path``.

    Uses the ``watchdog`` library to dispatch platform-native events
    (inotify on Linux, FSEvents on macOS, ReadDirectoryChangesW on
    Windows). Falls back to :class:`PollingObserver` if the native
    backend cannot be initialized, and to :class:`NoopChangeSubscription`
    if ``watchdog`` is not installed at all.
    """

    virtual_path = _normalize(path)
    physical_target = self._resolve(virtual_path)
    physical_target.mkdir(parents=True, exist_ok=True)

    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        return NoopChangeSubscription()

    handler = _DiskChangeHandler(
        root=self._root.resolve(),
        virtual_prefix=virtual_path,
        loop=loop,
        callback=callback,
    )

    observer: Any
    try:
        observer = Observer()
        observer.schedule(handler, str(physical_target), recursive=True)
        observer.start()
    except Exception:
        observer = PollingObserver()
        observer.schedule(handler, str(physical_target), recursive=True)
        observer.start()

    return _DiskChangeSubscription(observer)