Append-only writer over the hub's KnowledgeStore.
Stateless — every append is one JSON line. Reads are O(file size) and intended for tests / admin tooling, not hot paths.
Source code in autogen/beta/network/hub/audit.py
| def __init__(self, store: KnowledgeStore) -> None:
# __init__ stores params; no side effects.
self._store = store
|
append async
Serialise and append one record.
Source code in autogen/beta/network/hub/audit.py
| async def append(self, record: dict) -> None:
"""Serialise and append one record."""
line = json.dumps(record, default=str, sort_keys=True) + "\n"
await self._store.append(audit_path(), line)
|
read_all async
Read and parse the entire audit log. Returns [] if absent.
Source code in autogen/beta/network/hub/audit.py
| async def read_all(self) -> list[dict]:
"""Read and parse the entire audit log. Returns ``[]`` if absent."""
data = await self._store.read(audit_path())
if not data:
return []
records: list[dict] = []
for line in data.splitlines():
if not line.strip():
continue
records.append(json.loads(line))
return records
|