Skip to content

RedisStorage

autogen.beta.streams.redis.storage.RedisStorage #

RedisStorage(redis_url, prefix='ag2:stream', serializer=JSON)

Redis-backed storage implementing the Storage protocol from autogen.beta.history.

Source code in autogen/beta/streams/redis/storage.py
def __init__(
    self,
    redis_url: str,
    prefix: str = "ag2:stream",
    serializer: Serializer = Serializer.JSON,
) -> None:
    self._redis = aioredis.from_url(redis_url)
    self._prefix = prefix
    self._serializer = serializer

save_event async #

save_event(event, context)
Source code in autogen/beta/streams/redis/storage.py
async def save_event(self, event: BaseEvent, context: Context) -> None:
    stream_id = context.stream.id
    await self._redis.rpush(self._key(stream_id), serialize(event, self._serializer))

get_history async #

get_history(stream_id)
Source code in autogen/beta/streams/redis/storage.py
async def get_history(self, stream_id: StreamId) -> Iterable[BaseEvent]:
    raw = await self._redis.lrange(self._key(stream_id), 0, -1)
    return [deserialize(item, self._serializer) for item in raw]

set_history async #

set_history(stream_id, events)
Source code in autogen/beta/streams/redis/storage.py
async def set_history(self, stream_id: StreamId, events: Iterable[BaseEvent]) -> None:
    key = self._key(stream_id)
    async with self._redis.pipeline(transaction=True) as pipe:
        pipe.delete(key)
        for event in events:
            pipe.rpush(key, serialize(event, self._serializer))
        await pipe.execute()

drop_history async #

drop_history(stream_id)
Source code in autogen/beta/streams/redis/storage.py
async def drop_history(self, stream_id: StreamId) -> None:
    await self._redis.delete(self._key(stream_id))

close async #

close()
Source code in autogen/beta/streams/redis/storage.py
async def close(self) -> None:
    await self._redis.aclose()