Skip to content

AuditLog

autogen.beta.network.hub.audit.AuditLog #

AuditLog(store)

Append-only writer over the hub's KnowledgeStore.

Stateless — every append is one JSON line. Reads are O(file size) and intended for tests / admin tooling, not hot paths.

Source code in autogen/beta/network/hub/audit.py
def __init__(self, store: KnowledgeStore) -> None:
    # __init__ stores params; no side effects.
    self._store = store

append async #

append(record)

Serialise and append one record.

Source code in autogen/beta/network/hub/audit.py
async def append(self, record: dict) -> None:
    """Serialise and append one record."""
    line = json.dumps(record, default=str, sort_keys=True) + "\n"
    await self._store.append(audit_path(), line)

read_all async #

read_all()

Read and parse the entire audit log. Returns [] if absent.

Source code in autogen/beta/network/hub/audit.py
async def read_all(self) -> list[dict]:
    """Read and parse the entire audit log. Returns ``[]`` if absent."""
    data = await self._store.read(audit_path())
    if not data:
        return []
    records: list[dict] = []
    for line in data.splitlines():
        if not line.strip():
            continue
        records.append(json.loads(line))
    return records