Redis-backed storage implementing the Storage protocol from autogen.beta.history.
Source code in autogen/beta/streams/redis/storage.py
| def __init__(
self,
redis_url: str,
prefix: str = "ag2:stream",
serializer: Serializer = Serializer.JSON,
) -> None:
self._redis = aioredis.from_url(redis_url)
self._prefix = prefix
self._serializer = serializer
|
save_event async
save_event(event, context)
Source code in autogen/beta/streams/redis/storage.py
| async def save_event(self, event: BaseEvent, context: Context) -> None:
stream_id = context.stream.id
await self._redis.rpush(self._key(stream_id), serialize(event, self._serializer))
|
get_history async
Source code in autogen/beta/streams/redis/storage.py
| async def get_history(self, stream_id: StreamId) -> Iterable[BaseEvent]:
raw = await self._redis.lrange(self._key(stream_id), 0, -1)
return [deserialize(item, self._serializer) for item in raw]
|
set_history async
set_history(stream_id, events)
Source code in autogen/beta/streams/redis/storage.py
| async def set_history(self, stream_id: StreamId, events: Iterable[BaseEvent]) -> None:
key = self._key(stream_id)
async with self._redis.pipeline(transaction=True) as pipe:
pipe.delete(key)
for event in events:
pipe.rpush(key, serialize(event, self._serializer))
await pipe.execute()
|
drop_history async
Source code in autogen/beta/streams/redis/storage.py
| async def drop_history(self, stream_id: StreamId) -> None:
await self._redis.delete(self._key(stream_id))
|
close async
Source code in autogen/beta/streams/redis/storage.py
| async def close(self) -> None:
await self._redis.aclose()
|