Skip to content

RedisStream

autogen.beta.streams.redis.stream.RedisStream #

RedisStream(redis_url, *, prefix='ag2:stream', id=None, serializer=JSON)

Bases: MemoryStream

A full-featured stream with Redis-backed pub/sub and persistent event history.

All events flow through Redis Pub/Sub, ensuring subscribers across processes and machines receive every event. History is persisted to Redis.

Event flow

send() → persist to Redis + publish to Pub/Sub channel listener → receives from Pub/Sub → dispatches to local subscribers

PARAMETER DESCRIPTION
redis_url

Redis connection URL.

TYPE: str

prefix

Key prefix for Redis storage and pub/sub channels.

TYPE: str DEFAULT: 'ag2:stream'

id

Stream ID. If None, a new UUID is generated.

TYPE: StreamId | None DEFAULT: None

serializer

Serialization format (Serializer.JSON or Serializer.PICKLE).

TYPE: Serializer DEFAULT: JSON

Source code in autogen/beta/streams/redis/stream.py
def __init__(
    self,
    redis_url: str,
    *,
    prefix: str = "ag2:stream",
    id: StreamId | None = None,
    serializer: Serializer = Serializer.JSON,
) -> None:
    storage = RedisStorage(redis_url, prefix=prefix, serializer=serializer)
    super().__init__(storage, id=id)

    # Unsubscribe the auto-registered save_event from MemoryStream.__init__
    # We handle persistence explicitly in send() to avoid double-writes
    storage_sub_id = next(iter(self._subscribers))
    self.unsubscribe(storage_sub_id)

    self._redis_storage = storage
    self._redis_url = redis_url
    self._prefix = prefix
    self._serializer = serializer
    self._channel = f"{prefix}:pubsub:{self.id}"
    self._instance_id = str(uuid4())
    self._listener_task: asyncio.Task | None = None
    self._listener_ready = asyncio.Event()
    self._pubsub_redis = aioredis.from_url(redis_url)
    self._publish_redis = aioredis.from_url(redis_url)

id instance-attribute #

id = id or uuid4()

history instance-attribute #

history = History(id, storage)

send async #

send(event, context)

Persist the event and publish to Redis for all listeners (including self).

Source code in autogen/beta/streams/redis/stream.py
async def send(self, event: BaseEvent, context: Context) -> None:
    """Persist the event and publish to Redis for all listeners (including self)."""
    self._ensure_listener()
    await self._listener_ready.wait()
    # Persist once — only the sender writes to history
    await self._redis_storage.save_event(event, context)
    # Publish to Redis — all listeners dispatch to their local subscribers
    await self._publish_redis.publish(self._channel, serialize(event, self._serializer))

close async #

close()
Source code in autogen/beta/streams/redis/stream.py
async def close(self) -> None:
    if self._listener_task and not self._listener_task.done():
        self._listener_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._listener_task
    await self._pubsub_redis.aclose()
    await self._publish_redis.aclose()
    await self._redis_storage.close()

where #

where(condition)
Source code in autogen/beta/stream.py
def where(
    self,
    condition: ClassInfo | Condition,
) -> "Stream":
    if not isinstance(condition, Condition):
        condition = TypeCondition(condition)
    return SubStream(self, condition)

join #

join(*, max_events=None)
Source code in autogen/beta/stream.py
@contextmanager
def join(self, *, max_events: int | None = None) -> Iterator[AsyncIterator[BaseEvent]]:
    queue = asyncio.Queue[BaseEvent]()

    async def write_events(event: BaseEvent) -> None:
        await queue.put(event)

    if max_events:

        async def listen_events() -> AsyncIterator[BaseEvent]:
            for _ in range(max_events):
                yield await queue.get()

    else:

        async def listen_events() -> AsyncIterator[BaseEvent]:
            while True:
                yield await queue.get()

    with self.sub_scope(write_events):
        yield listen_events()

subscribe #

subscribe(func: Callable[..., Any], *, interrupt: bool = False, sync_to_thread: bool = True, condition: Condition | None = None) -> SubId
subscribe(func: None = None, *, interrupt: bool = False, sync_to_thread: bool = True, condition: Condition | None = None) -> Callable[[Callable[..., Any]], SubId]
subscribe(func=None, *, interrupt=False, sync_to_thread=True, condition=None)
Source code in autogen/beta/stream.py
def subscribe(
    self,
    func: Callable[..., Any] | None = None,
    *,
    interrupt: bool = False,
    sync_to_thread: bool = True,
    condition: Condition | None = None,
) -> Callable[[Callable[..., Any]], SubId] | SubId:
    def sub(s: Callable[..., Any]) -> SubId:
        sub_id = uuid4()
        model = build_model(s, sync_to_thread=sync_to_thread, serialize_result=False)
        if interrupt:
            self._interrupters[sub_id] = (condition, model)
        else:
            self._subscribers[sub_id] = (condition, model)
        return sub_id

    if func:
        return sub(func)
    return sub

unsubscribe #

unsubscribe(sub_id)
Source code in autogen/beta/stream.py
def unsubscribe(self, sub_id: SubId) -> None:
    self._subscribers.pop(sub_id, None)
    self._interrupters.pop(sub_id, None)

sub_scope #

sub_scope(func, *, interrupt=False, sync_to_thread=True, condition=None)
Source code in autogen/beta/stream.py
@contextmanager
def sub_scope(
    self,
    func: Callable[..., Any],
    *,
    interrupt: bool = False,
    sync_to_thread: bool = True,
    condition: Condition | None = None,
) -> Iterator[None]:
    sub_id = self.subscribe(
        func,
        interrupt=interrupt,
        sync_to_thread=sync_to_thread,
        condition=condition,
    )

    try:
        yield
    finally:
        self.unsubscribe(sub_id)

get async #

get(condition)
Source code in autogen/beta/stream.py
@asynccontextmanager
async def get(
    self,
    condition: ClassInfo | Condition,
) -> AsyncIterator[asyncio.Future[BaseEvent]]:
    result = asyncio.Future[BaseEvent]()

    async def wait_result(event: BaseEvent) -> None:
        result.set_result(event)

    with self.where(condition).sub_scope(wait_result):
        yield result