RedisStream(redis_url, *, prefix='ag2:stream', id=None, serializer=JSON)
Bases: MemoryStream
A full-featured stream with Redis-backed pub/sub and persistent event history.
All events flow through Redis Pub/Sub, ensuring subscribers across processes and machines receive every event. History is persisted to Redis.
Event flow
send() → persist to Redis + publish to Pub/Sub channel listener → receives from Pub/Sub → dispatches to local subscribers
| PARAMETER | DESCRIPTION |
redis_url | TYPE: str |
prefix | Key prefix for Redis storage and pub/sub channels. TYPE: str DEFAULT: 'ag2:stream' |
id | Stream ID. If None, a new UUID is generated. TYPE: StreamId | None DEFAULT: None |
serializer | Serialization format (Serializer.JSON or Serializer.PICKLE). TYPE: Serializer DEFAULT: JSON |
Source code in autogen/beta/streams/redis/stream.py
| def __init__(
self,
redis_url: str,
*,
prefix: str = "ag2:stream",
id: StreamId | None = None,
serializer: Serializer = Serializer.JSON,
) -> None:
storage = RedisStorage(redis_url, prefix=prefix, serializer=serializer)
super().__init__(storage, id=id)
# Unsubscribe the auto-registered save_event from MemoryStream.__init__
# We handle persistence explicitly in send() to avoid double-writes
storage_sub_id = next(iter(self._subscribers))
self.unsubscribe(storage_sub_id)
self._redis_storage = storage
self._redis_url = redis_url
self._prefix = prefix
self._serializer = serializer
self._channel = f"{prefix}:pubsub:{self.id}"
self._instance_id = str(uuid4())
self._listener_task: asyncio.Task | None = None
self._listener_ready = asyncio.Event()
self._pubsub_redis = aioredis.from_url(redis_url)
self._publish_redis = aioredis.from_url(redis_url)
|
history instance-attribute
history = History(id, storage)
send async
Persist the event and publish to Redis for all listeners (including self).
Source code in autogen/beta/streams/redis/stream.py
| async def send(self, event: BaseEvent, context: Context) -> None:
"""Persist the event and publish to Redis for all listeners (including self)."""
self._ensure_listener()
await self._listener_ready.wait()
# Persist once — only the sender writes to history
await self._redis_storage.save_event(event, context)
# Publish to Redis — all listeners dispatch to their local subscribers
await self._publish_redis.publish(self._channel, serialize(event, self._serializer))
|
close async
Source code in autogen/beta/streams/redis/stream.py
| async def close(self) -> None:
if self._listener_task and not self._listener_task.done():
self._listener_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._listener_task
await self._pubsub_redis.aclose()
await self._publish_redis.aclose()
await self._redis_storage.close()
|
where
Source code in autogen/beta/stream.py
| def where(
self,
condition: ClassInfo | Condition,
) -> "Stream":
if not isinstance(condition, Condition):
condition = TypeCondition(condition)
return SubStream(self, condition)
|
join
Source code in autogen/beta/stream.py
| @contextmanager
def join(self, *, max_events: int | None = None) -> Iterator[AsyncIterator[BaseEvent]]:
queue = asyncio.Queue[BaseEvent]()
async def write_events(event: BaseEvent) -> None:
await queue.put(event)
if max_events:
async def listen_events() -> AsyncIterator[BaseEvent]:
for _ in range(max_events):
yield await queue.get()
else:
async def listen_events() -> AsyncIterator[BaseEvent]:
while True:
yield await queue.get()
with self.sub_scope(write_events):
yield listen_events()
|
subscribe
subscribe(func=None, *, interrupt=False, sync_to_thread=True, condition=None)
Source code in autogen/beta/stream.py
| def subscribe(
self,
func: Callable[..., Any] | None = None,
*,
interrupt: bool = False,
sync_to_thread: bool = True,
condition: Condition | None = None,
) -> Callable[[Callable[..., Any]], SubId] | SubId:
def sub(s: Callable[..., Any]) -> SubId:
sub_id = uuid4()
model = build_model(s, sync_to_thread=sync_to_thread, serialize_result=False)
if interrupt:
self._interrupters[sub_id] = (condition, model)
else:
self._subscribers[sub_id] = (condition, model)
return sub_id
if func:
return sub(func)
return sub
|
unsubscribe
Source code in autogen/beta/stream.py
| def unsubscribe(self, sub_id: SubId) -> None:
self._subscribers.pop(sub_id, None)
self._interrupters.pop(sub_id, None)
|
sub_scope
sub_scope(func, *, interrupt=False, sync_to_thread=True, condition=None)
Source code in autogen/beta/stream.py
| @contextmanager
def sub_scope(
self,
func: Callable[..., Any],
*,
interrupt: bool = False,
sync_to_thread: bool = True,
condition: Condition | None = None,
) -> Iterator[None]:
sub_id = self.subscribe(
func,
interrupt=interrupt,
sync_to_thread=sync_to_thread,
condition=condition,
)
try:
yield
finally:
self.unsubscribe(sub_id)
|
get async
Source code in autogen/beta/stream.py
| @asynccontextmanager
async def get(
self,
condition: ClassInfo | Condition,
) -> AsyncIterator[asyncio.Future[BaseEvent]]:
result = asyncio.Future[BaseEvent]()
async def wait_result(event: BaseEvent) -> None:
result.set_result(event)
with self.where(condition).sub_scope(wait_result):
yield result
|