Skip to content

persistent_stream

autogen.beta.tools.subagents.persistent_stream.persistent_stream #

persistent_stream()
Source code in autogen/beta/tools/subagents/persistent_stream.py
def persistent_stream() -> StreamFactory:
    def stream_factory(agent: "Agent", ctx: "Context") -> MemoryStream:
        key = f"ag:{agent.name}:stream"
        if not (stream_id := ctx.dependencies.get(key)):
            stream_id = ctx.dependencies[key] = uuid4()

        return MemoryStream(
            storage=ctx.stream.history.storage,
            id=stream_id,
        )

    return stream_factory