Skip to content

SqliteKnowledgeStore

autogen.beta.knowledge.sqlite.SqliteKnowledgeStore #

SqliteKnowledgeStore(path, *, poll_interval_s=0.5)

SQLite-backed :class:KnowledgeStore.

Stores every virtual path as a row (path, content, version) in a single entries table. version is a monotonic integer bumped on every write or append and is the key the polling change watcher uses to detect mutations.

SQLite does not have native change notifications, so on_change uses :class:PollingChangeWatcher at a default 500ms interval. Callers that need lower latency pass poll_interval_s to the constructor.

All store operations run on the asyncio loop's default executor so the blocking sqlite3 API does not stall the event loop. A single connection is serialized with an asyncio.Lock — SQLite's own file locking handles concurrent Python processes if multiple hubs share a database.

This is the same protocol surface as Memory and Disk, so every test that parameterizes over stores picks up Sqlite automatically.

Source code in autogen/beta/knowledge/sqlite.py
def __init__(
    self,
    path: str | os.PathLike[str],
    *,
    poll_interval_s: float = 0.5,
) -> None:
    self._db_path: Path = Path(path)
    self._poll_interval_s = poll_interval_s
    self._conn: sqlite3.Connection | None = None
    self._lock = asyncio.Lock()
    self._version_counter = 0

read async #

read(path)
Source code in autogen/beta/knowledge/sqlite.py
async def read(self, path: str) -> str | None:
    normalized = _normalize(path)
    return await self._run(functools.partial(self._sync_read, normalized))

write async #

write(path, content)
Source code in autogen/beta/knowledge/sqlite.py
async def write(self, path: str, content: str) -> None:
    normalized = _normalize(path)
    payload = content.encode("utf-8")
    async with self._lock:
        version = self._next_version()
        await self._run(functools.partial(self._sync_write, normalized, payload, version))

list async #

list(path='/')
Source code in autogen/beta/knowledge/sqlite.py
async def list(self, path: str = "/") -> list[str]:
    prefix = _normalize(path).rstrip("/") + "/"
    return await self._run(functools.partial(self._sync_list, prefix))

delete async #

delete(path)
Source code in autogen/beta/knowledge/sqlite.py
async def delete(self, path: str) -> None:
    normalized = _normalize(path)
    prefix = normalized.rstrip("/") + "/"
    async with self._lock:
        await self._run(functools.partial(self._sync_delete, normalized, prefix))

exists async #

exists(path)
Source code in autogen/beta/knowledge/sqlite.py
async def exists(self, path: str) -> bool:
    normalized = _normalize(path)
    prefix = normalized.rstrip("/") + "/"
    return await self._run(functools.partial(self._sync_exists, normalized, prefix))

append async #

append(path, content)
Source code in autogen/beta/knowledge/sqlite.py
async def append(self, path: str, content: str) -> int:
    normalized = _normalize(path)
    payload = content.encode("utf-8")
    async with self._lock:
        version = self._next_version()
        return await self._run(functools.partial(self._sync_append, normalized, payload, version))

read_range async #

read_range(path, start, end=None)
Source code in autogen/beta/knowledge/sqlite.py
async def read_range(self, path: str, start: int, end: int | None = None) -> str:
    normalized = _normalize(path)
    return await self._run(functools.partial(self._sync_read_range, normalized, start, end))

list_versions_under async #

list_versions_under(prefix)

Return {path: version} for every key under prefix.

Used by :class:PollingChangeWatcher to diff snapshots. The raw version numbers are per-write monotonic ints so any change under prefix bumps exactly one key's version, making the diff loop O(keys-under-prefix).

Source code in autogen/beta/knowledge/sqlite.py
async def list_versions_under(self, prefix: str) -> dict[str, int]:
    """Return ``{path: version}`` for every key under ``prefix``.

    Used by :class:`PollingChangeWatcher` to diff snapshots. The
    raw version numbers are per-write monotonic ints so any change
    under ``prefix`` bumps exactly one key's version, making the
    diff loop O(keys-under-prefix).
    """
    normalized = _normalize(prefix).rstrip("/")
    return await self._run(functools.partial(self._sync_list_versions, normalized))

on_change async #

on_change(path, callback)
Source code in autogen/beta/knowledge/sqlite.py
async def on_change(self, path: str, callback: ChangeCallback) -> ChangeSubscription:
    watcher = PollingChangeWatcher(
        backend=self,
        prefix=path,
        callback=callback,
        interval_s=self._poll_interval_s,
    )
    await watcher.start()
    return watcher

close #

close()

Close the SQLite connection. Idempotent.

Source code in autogen/beta/knowledge/sqlite.py
def close(self) -> None:
    """Close the SQLite connection. Idempotent."""
    if self._conn is not None:
        with contextlib.suppress(Exception):
            self._conn.close()