Skip to content

EventLogWriter

autogen.beta.knowledge.log.EventLogWriter #

EventLogWriter(store)

Persists stream events to the knowledge store as WAL entries.

Each event is serialized as a JSON line with a type tag for deserialization. Uses append-segmented writes: dropped events from compaction go to numbered segment files, final events go to the main log file.

Source code in autogen/beta/knowledge/log.py
def __init__(self, store: KnowledgeStore) -> None:
    self._store = store

persist async #

persist(stream_id, events)

Write final events to /log/{stream_id}.jsonl.

Source code in autogen/beta/knowledge/log.py
async def persist(self, stream_id: StreamId, events: Iterable[BaseEvent]) -> None:
    """Write final events to /log/{stream_id}.jsonl."""
    path = f"/log/{stream_id}.jsonl"
    lines = self._serialize_events(events)
    await self._store.write(path, "\n".join(lines))

persist_dropped async #

persist_dropped(stream_id, events)

Write compaction-dropped events to /log/{stream_id}.dropped-{n}.jsonl.

Discovers existing segments in the store to avoid overwriting.

Source code in autogen/beta/knowledge/log.py
async def persist_dropped(self, stream_id: StreamId, events: Iterable[BaseEvent]) -> None:
    """Write compaction-dropped events to /log/{stream_id}.dropped-{n}.jsonl.

    Discovers existing segments in the store to avoid overwriting.
    """
    prefix = f"{stream_id}.dropped-"
    entries = await self._store.list("/log/")
    existing = [e for e in entries if e.startswith(prefix) and e.endswith(".jsonl")]
    n = len(existing) + 1
    path = f"/log/{stream_id}.dropped-{n}.jsonl"
    lines = self._serialize_events(events)
    await self._store.write(path, "\n".join(lines))

load async #

load(stream_id)

Load events from WAL files: all dropped segments in order, then final.

Returns typed BaseEvent instances. Unknown types become UnknownEvent.

Source code in autogen/beta/knowledge/log.py
async def load(self, stream_id: StreamId) -> list[BaseEvent]:
    """Load events from WAL files: all dropped segments in order, then final.

    Returns typed BaseEvent instances. Unknown types become UnknownEvent.
    """
    all_events: list[BaseEvent] = []

    entries = await self._store.list("/log/")
    prefix = f"{stream_id}.dropped-"
    segments = sorted(
        [e for e in entries if e.startswith(prefix) and e.endswith(".jsonl")],
        key=lambda e: int(e[len(prefix) : -len(".jsonl")]),
    )
    for segment in segments:
        events = await self._load_file(f"/log/{segment}")
        all_events.extend(events)

    final = await self._load_file(f"/log/{stream_id}.jsonl")
    all_events.extend(final)

    return all_events