SQLite-backed :class:KnowledgeStore.
Stores every virtual path as a row (path, content, version) in a single entries table. version is a monotonic integer bumped on every write or append and is the key the polling change watcher uses to detect mutations.
SQLite does not have native change notifications, so on_change uses :class:PollingChangeWatcher at a default 500ms interval. Callers that need lower latency pass poll_interval_s to the constructor.
All store operations run on the asyncio loop's default executor so the blocking sqlite3 API does not stall the event loop. A single connection is serialized with an asyncio.Lock — SQLite's own file locking handles concurrent Python processes if multiple hubs share a database.
This is the same protocol surface as Memory and Disk, so every test that parameterizes over stores picks up Sqlite automatically.
Source code in autogen/beta/knowledge/sqlite.py
| def __init__(
self,
path: str | os.PathLike[str],
*,
poll_interval_s: float = 0.5,
) -> None:
self._db_path: Path = Path(path)
self._poll_interval_s = poll_interval_s
self._conn: sqlite3.Connection | None = None
self._lock = asyncio.Lock()
self._version_counter = 0
|
read async
Source code in autogen/beta/knowledge/sqlite.py
| async def read(self, path: str) -> str | None:
normalized = _normalize(path)
return await self._run(functools.partial(self._sync_read, normalized))
|
write async
Source code in autogen/beta/knowledge/sqlite.py
| async def write(self, path: str, content: str) -> None:
normalized = _normalize(path)
payload = content.encode("utf-8")
async with self._lock:
version = self._next_version()
await self._run(functools.partial(self._sync_write, normalized, payload, version))
|
list async
Source code in autogen/beta/knowledge/sqlite.py
| async def list(self, path: str = "/") -> list[str]:
prefix = _normalize(path).rstrip("/") + "/"
return await self._run(functools.partial(self._sync_list, prefix))
|
delete async
Source code in autogen/beta/knowledge/sqlite.py
| async def delete(self, path: str) -> None:
normalized = _normalize(path)
prefix = normalized.rstrip("/") + "/"
async with self._lock:
await self._run(functools.partial(self._sync_delete, normalized, prefix))
|
exists async
Source code in autogen/beta/knowledge/sqlite.py
| async def exists(self, path: str) -> bool:
normalized = _normalize(path)
prefix = normalized.rstrip("/") + "/"
return await self._run(functools.partial(self._sync_exists, normalized, prefix))
|
append async
Source code in autogen/beta/knowledge/sqlite.py
| async def append(self, path: str, content: str) -> int:
normalized = _normalize(path)
payload = content.encode("utf-8")
async with self._lock:
version = self._next_version()
return await self._run(functools.partial(self._sync_append, normalized, payload, version))
|
read_range async
read_range(path, start, end=None)
Source code in autogen/beta/knowledge/sqlite.py
| async def read_range(self, path: str, start: int, end: int | None = None) -> str:
normalized = _normalize(path)
return await self._run(functools.partial(self._sync_read_range, normalized, start, end))
|
list_versions_under async
list_versions_under(prefix)
Return {path: version} for every key under prefix.
Used by :class:PollingChangeWatcher to diff snapshots. The raw version numbers are per-write monotonic ints so any change under prefix bumps exactly one key's version, making the diff loop O(keys-under-prefix).
Source code in autogen/beta/knowledge/sqlite.py
| async def list_versions_under(self, prefix: str) -> dict[str, int]:
"""Return ``{path: version}`` for every key under ``prefix``.
Used by :class:`PollingChangeWatcher` to diff snapshots. The
raw version numbers are per-write monotonic ints so any change
under ``prefix`` bumps exactly one key's version, making the
diff loop O(keys-under-prefix).
"""
normalized = _normalize(prefix).rstrip("/")
return await self._run(functools.partial(self._sync_list_versions, normalized))
|
on_change async
on_change(path, callback)
Source code in autogen/beta/knowledge/sqlite.py
| async def on_change(self, path: str, callback: ChangeCallback) -> ChangeSubscription:
watcher = PollingChangeWatcher(
backend=self,
prefix=path,
callback=callback,
interval_s=self._poll_interval_s,
)
await watcher.start()
return watcher
|
close
Close the SQLite connection. Idempotent.
Source code in autogen/beta/knowledge/sqlite.py
| def close(self) -> None:
"""Close the SQLite connection. Idempotent."""
if self._conn is not None:
with contextlib.suppress(Exception):
self._conn.close()
|