Persists stream events to the knowledge store as WAL entries.
Each event is serialized as a JSON line with a type tag for deserialization. Uses append-segmented writes: dropped events from compaction go to numbered segment files, final events go to the main log file.
Source code in autogen/beta/knowledge/log.py
| def __init__(self, store: KnowledgeStore) -> None:
self._store = store
|
persist async
persist(stream_id, events)
Write final events to /log/{stream_id}.jsonl.
Source code in autogen/beta/knowledge/log.py
| async def persist(self, stream_id: StreamId, events: Iterable[BaseEvent]) -> None:
"""Write final events to /log/{stream_id}.jsonl."""
path = f"/log/{stream_id}.jsonl"
lines = self._serialize_events(events)
await self._store.write(path, "\n".join(lines))
|
persist_dropped async
persist_dropped(stream_id, events)
Write compaction-dropped events to /log/{stream_id}.dropped-{n}.jsonl.
Discovers existing segments in the store to avoid overwriting.
Source code in autogen/beta/knowledge/log.py
| async def persist_dropped(self, stream_id: StreamId, events: Iterable[BaseEvent]) -> None:
"""Write compaction-dropped events to /log/{stream_id}.dropped-{n}.jsonl.
Discovers existing segments in the store to avoid overwriting.
"""
prefix = f"{stream_id}.dropped-"
entries = await self._store.list("/log/")
existing = [e for e in entries if e.startswith(prefix) and e.endswith(".jsonl")]
n = len(existing) + 1
path = f"/log/{stream_id}.dropped-{n}.jsonl"
lines = self._serialize_events(events)
await self._store.write(path, "\n".join(lines))
|
load async
Load events from WAL files: all dropped segments in order, then final.
Returns typed BaseEvent instances. Unknown types become UnknownEvent.
Source code in autogen/beta/knowledge/log.py
| async def load(self, stream_id: StreamId) -> list[BaseEvent]:
"""Load events from WAL files: all dropped segments in order, then final.
Returns typed BaseEvent instances. Unknown types become UnknownEvent.
"""
all_events: list[BaseEvent] = []
entries = await self._store.list("/log/")
prefix = f"{stream_id}.dropped-"
segments = sorted(
[e for e in entries if e.startswith(prefix) and e.endswith(".jsonl")],
key=lambda e: int(e[len(prefix) : -len(".jsonl")]),
)
for segment in segments:
events = await self._load_file(f"/log/{segment}")
all_events.extend(events)
final = await self._load_file(f"/log/{stream_id}.jsonl")
all_events.extend(final)
return all_events
|