Skip to content

AuditLog

autogen.beta.network.hub.audit.AuditLog #

AuditLog(store, *, clock=None)

Bases: BaseHubListener

Append-only writer over the hub's KnowledgeStore.

Implements :class:HubListener so the hub fans out state-transition events through the same Protocol every other observer uses. Each listener method translates its event into one structured audit record via :meth:append.

Subscribers attached via :meth:subscribe receive every appended record live (in addition to the on-disk append). Subscriber exceptions are logged and swallowed — a buggy live tail cannot break the persistent log.

Source code in autogen/beta/network/hub/audit.py
def __init__(self, store: KnowledgeStore, *, clock: ClockFn | None = None) -> None:
    # __init__ stores params; no side effects.
    self._store = store
    self._clock = clock if clock is not None else _default_clock
    self._subscribers: list[AuditSubscriber] = []
    # Running byte counter — process-local, reset on hydrate. Cheap
    # to read for ``Hub.health()`` without touching the store.
    self._bytes_written = 0

bytes_written property #

bytes_written

Process-local byte counter for the audit log.

Resets on hub restart. Cheap to read — :meth:Hub.health surfaces this as audit_log_bytes so operators can graph audit volume without touching the store.

append async #

append(record)

Serialise and append one record. Notifies subscribers afterwards.

Public so tenants and hub subclasses can append records with custom kind values that the built-in listener methods don't cover.

Source code in autogen/beta/network/hub/audit.py
async def append(self, record: dict) -> None:
    """Serialise and append one record. Notifies subscribers afterwards.

    Public so tenants and hub subclasses can append records with
    custom ``kind`` values that the built-in listener methods
    don't cover.
    """
    line = json.dumps(record, default=str, sort_keys=True) + "\n"
    await self._store.append(audit_path(), line)
    self._bytes_written += len(line.encode("utf-8"))
    for subscriber in self._subscribers:
        try:
            await subscriber(record)
        except Exception:
            logger.exception("audit subscriber raised: kind=%s", record.get("kind"))

read_all async #

read_all()

Read and parse the entire audit log. Returns [] if absent.

Source code in autogen/beta/network/hub/audit.py
async def read_all(self) -> list[dict]:
    """Read and parse the entire audit log. Returns ``[]`` if absent."""
    data = await self._store.read(audit_path())
    if not data:
        return []
    records: list[dict] = []
    for line in data.splitlines():
        if not line.strip():
            continue
        records.append(json.loads(line))
    return records

subscribe #

subscribe(callback)

Attach a live callback fired per appended record.

Useful for tailing the audit stream without polling the file — e.g. for an operational dashboard or live alert pipeline. Callbacks run sequentially in registration order. An exception in one callback is logged and does not abort subsequent ones.

Source code in autogen/beta/network/hub/audit.py
def subscribe(self, callback: AuditSubscriber) -> None:
    """Attach a live callback fired per appended record.

    Useful for tailing the audit stream without polling the file —
    e.g. for an operational dashboard or live alert pipeline.
    Callbacks run sequentially in registration order. An exception
    in one callback is logged and does not abort subsequent ones.
    """
    self._subscribers.append(callback)

unsubscribe #

unsubscribe(callback)

Detach a previously-registered subscriber. No-op if absent.

Source code in autogen/beta/network/hub/audit.py
def unsubscribe(self, callback: AuditSubscriber) -> None:
    """Detach a previously-registered subscriber. No-op if absent."""
    with contextlib.suppress(ValueError):
        self._subscribers.remove(callback)

on_agent_event async #

on_agent_event(agent_id, kind, payload)

Translate identity-lifecycle events into audit records.

Recognises "registered", "unregistered", "resume_set", "skill_set", "rule_set", and "observation_recorded". Unknown kinds are ignored — subclasses fan out their own kinds via :meth:append directly.

Source code in autogen/beta/network/hub/audit.py
async def on_agent_event(self, agent_id: str, kind: str, payload: dict) -> None:
    """Translate identity-lifecycle events into audit records.

    Recognises ``"registered"``, ``"unregistered"``, ``"resume_set"``,
    ``"skill_set"``, ``"rule_set"``, and ``"observation_recorded"``.
    Unknown kinds are ignored — subclasses fan out their own kinds
    via :meth:`append` directly.
    """
    at = payload.get("at") or self._clock()
    if kind == "registered":
        passport = payload.get("passport")
        await self.append({
            "at": at,
            "kind": AUDIT_KIND_AGENT_REGISTERED,
            "agent_id": agent_id,
            "name": getattr(passport, "name", None),
        })
    elif kind == "unregistered":
        await self.append({
            "at": at,
            "kind": AUDIT_KIND_AGENT_UNREGISTERED,
            "agent_id": agent_id,
            "name": payload.get("name"),
        })
    elif kind == "resume_set":
        await self.append({
            "at": at,
            "kind": AUDIT_KIND_RESUME_SET,
            "source": RESUME_SOURCE_TENANT,
            "agent_id": agent_id,
            "version": payload.get("version"),
        })
    elif kind == "skill_set":
        await self.append({
            "at": at,
            "kind": AUDIT_KIND_SKILL_SET,
            "agent_id": agent_id,
            "removed": payload.get("removed", False),
        })
    elif kind == "rule_set":
        await self.append({
            "at": at,
            "kind": AUDIT_KIND_RULE_SET,
            "agent_id": agent_id,
            "version": payload.get("version"),
        })
    elif kind == "observation_recorded":
        await self.append({
            "at": at,
            "kind": AUDIT_KIND_RESUME_SET,
            "source": RESUME_SOURCE_OBSERVED,
            "agent_id": agent_id,
            "version": payload.get("version"),
            "capability": payload.get("capability"),
            "outcome": payload.get("outcome"),
        })

on_channel_event async #

on_channel_event(channel_id, kind, payload)

Translate channel-lifecycle events into audit records.

Records "created", "closed", and "expired". Other kinds ("opened", "participant_removed", "participant_hidden") are observed but not separately audited — they show up on the channel WAL and / or in per-violation records.

Source code in autogen/beta/network/hub/audit.py
async def on_channel_event(self, channel_id: str, kind: str, payload: dict) -> None:
    """Translate channel-lifecycle events into audit records.

    Records ``"created"``, ``"closed"``, and ``"expired"``. Other
    kinds (``"opened"``, ``"participant_removed"``,
    ``"participant_hidden"``) are observed but not separately
    audited — they show up on the channel WAL and / or in
    per-violation records.
    """
    at = payload.get("at") or self._clock()
    if kind == "created":
        metadata = payload.get("metadata")
        participants = payload.get("participants")
        if participants is None and metadata is not None:
            participants = [p.agent_id for p in metadata.participants]
        record: dict = {
            "at": at,
            "kind": AUDIT_KIND_CHANNEL_CREATED,
            "channel_id": channel_id,
        }
        if metadata is not None:
            record["manifest_type"] = metadata.manifest.type
            record["manifest_version"] = metadata.manifest.version
            record["creator_id"] = metadata.creator_id
        if participants is not None:
            record["participants"] = list(participants)
        await self.append(record)
    elif kind == "closed":
        await self.append({
            "at": at,
            "kind": AUDIT_KIND_CHANNEL_CLOSED,
            "channel_id": channel_id,
            "reason": payload.get("reason"),
        })
    elif kind == "expired":
        await self.append({
            "at": at,
            "kind": AUDIT_KIND_CHANNEL_EXPIRED,
            "channel_id": channel_id,
            "reason": payload.get("reason"),
        })

on_expectation_fired async #

on_expectation_fired(channel_id, expectation, violation)

Record one violation per (channel, expectation, violator) fire.

Source code in autogen/beta/network/hub/audit.py
async def on_expectation_fired(self, channel_id: str, expectation, violation) -> None:
    """Record one violation per ``(channel, expectation, violator)`` fire."""
    at = self._clock()
    await self.append({
        "at": at,
        "kind": AUDIT_KIND_EXPECTATION_VIOLATED,
        "channel_id": channel_id,
        "expectation": violation.expectation.name,
        "on_violation": violation.expectation.on_violation,
        "params": dict(violation.expectation.params),
        "violators": list(violation.violator_ids),
        "detail": dict(violation.detail),
    })

on_task_event async #

on_task_event(task_id, kind, payload)

Record terminal task transitions.

"started" / "progress" are observed-only; only the terminal kinds ("completed" / "failed" / "expired" / "cancelled") become audit records. "mirror_failed" signals that a hub-side mirror could not record the agent's terminal event — the audit reflects the failure separately.

Source code in autogen/beta/network/hub/audit.py
async def on_task_event(self, task_id: str, kind: str, payload: dict) -> None:
    """Record terminal task transitions.

    ``"started"`` / ``"progress"`` are observed-only; only the
    terminal kinds (``"completed"`` / ``"failed"`` / ``"expired"`` /
    ``"cancelled"``) become audit records. ``"mirror_failed"``
    signals that a hub-side mirror could not record the agent's
    terminal event — the audit reflects the failure separately.
    """
    if kind not in ("completed", "failed", "expired", "cancelled"):
        return
    at = payload.get("at") or self._clock()
    await self.append({
        "at": at,
        "kind": AUDIT_KIND_TASK_TERMINATED,
        "task_id": task_id,
        "owner_id": payload.get("owner_id"),
        "channel_id": payload.get("channel_id"),
        "outcome": payload.get("outcome", kind),
        "capability": payload.get("capability"),
        "reason": payload.get("reason"),
    })

on_turn_failed async #

on_turn_failed(channel_id, agent_id, envelope_id, exc)

Record a notify-handler crash so operators can correlate with the WAL.

Source code in autogen/beta/network/hub/audit.py
async def on_turn_failed(
    self,
    channel_id: str,
    agent_id: str,
    envelope_id: str,
    exc: BaseException,
) -> None:
    """Record a notify-handler crash so operators can correlate with the WAL."""
    await self.append({
        "at": self._clock(),
        "kind": AUDIT_KIND_TURN_FAILED,
        "channel_id": channel_id,
        "agent_id": agent_id,
        "envelope_id": envelope_id,
        "exc_type": type(exc).__name__,
        "exc_message": str(exc),
    })

on_envelope_posted async #

on_envelope_posted(envelope, metadata)
Source code in autogen/beta/network/hub/listener.py
async def on_envelope_posted(self, envelope, metadata) -> None:  # noqa: ARG002
    return None

on_envelope_rejected async #

on_envelope_rejected(envelope, reason)
Source code in autogen/beta/network/hub/listener.py
async def on_envelope_rejected(self, envelope, reason) -> None:  # noqa: ARG002
    return None

on_dispatch_failed async #

on_dispatch_failed(envelope, recipient_id, reason)
Source code in autogen/beta/network/hub/listener.py
async def on_dispatch_failed(self, envelope, recipient_id, reason) -> None:  # noqa: ARG002
    return None

on_inbox_pressure async #

on_inbox_pressure(agent_id, pending, cap)
Source code in autogen/beta/network/hub/listener.py
async def on_inbox_pressure(self, agent_id, pending, cap) -> None:  # noqa: ARG002
    return None