def __init__(
self,
storage: Storage | None = None,
*,
id: StreamId | None = None,
persist_all: bool = False,
) -> None:
self.id: StreamId = id or uuid4()
self._subscribers: dict[SubId, tuple[Condition | None, CallModel]] = {}
# ordered dict
self._interrupters: dict[SubId, tuple[Condition | None, CallModel]] = {}
storage = storage or MemoryStorage()
self.history = History(self.id, storage)
# Agent._execute populates this lazily on first turn — setting it
# to None here so `getattr(..., None)` returns None instead of
# hitting a slot-uninitialized AttributeError.
self._ag2_turn_lock = None # type: ignore[assignment]
if persist_all:
# Persist every event including transient ones (streaming chunks, lifecycle, etc.)
self.subscribe(storage.save_event)
else:
# Default: skip events marked __transient__ (ModelMessageChunk, TaskProgress, etc.)
# These are real-time streaming artifacts superseded by their final counterparts.
self.subscribe(_FilteredStorage(storage).save_event)