Skip to content

MemoryStream

autogen.beta.stream.MemoryStream #

MemoryStream(storage=None, *, id=None)

Bases: ABCStream

Source code in autogen/beta/stream.py
def __init__(
    self,
    storage: Storage | None = None,
    *,
    id: StreamId | None = None,
) -> None:
    self.id: StreamId = id or uuid4()

    self._subscribers: dict[SubId, tuple[Condition | None, CallModel]] = {}
    # ordered dict
    self._interrupters: dict[SubId, tuple[Condition | None, CallModel]] = {}

    storage = storage or MemoryStorage()
    self.history = History(self.id, storage)
    self.subscribe(storage.save_event)

id instance-attribute #

id = id or uuid4()

history instance-attribute #

history = History(id, storage)

subscribe #

subscribe(func: Callable[..., Any], *, interrupt: bool = False, sync_to_thread: bool = True, condition: Condition | None = None) -> SubId
subscribe(func: None = None, *, interrupt: bool = False, sync_to_thread: bool = True, condition: Condition | None = None) -> Callable[[Callable[..., Any]], SubId]
subscribe(func=None, *, interrupt=False, sync_to_thread=True, condition=None)
Source code in autogen/beta/stream.py
def subscribe(
    self,
    func: Callable[..., Any] | None = None,
    *,
    interrupt: bool = False,
    sync_to_thread: bool = True,
    condition: Condition | None = None,
) -> Callable[[Callable[..., Any]], SubId] | SubId:
    def sub(s: Callable[..., Any]) -> SubId:
        sub_id = uuid4()
        model = build_model(s, sync_to_thread=sync_to_thread, serialize_result=False)
        if interrupt:
            self._interrupters[sub_id] = (condition, model)
        else:
            self._subscribers[sub_id] = (condition, model)
        return sub_id

    if func:
        return sub(func)
    return sub

unsubscribe #

unsubscribe(sub_id)
Source code in autogen/beta/stream.py
def unsubscribe(self, sub_id: SubId) -> None:
    self._subscribers.pop(sub_id, None)
    self._interrupters.pop(sub_id, None)

send async #

send(event, context)
Source code in autogen/beta/stream.py
async def send(
    self,
    event: BaseEvent,
    context: "Context",
) -> None:
    # interrupters should follow registration order
    for condition, interrupter in tuple(self._interrupters.values()):
        if condition and not condition(event):
            continue

        async with AsyncExitStack() as stack:
            if not (
                e := await interrupter.asolve(
                    event,
                    cache_dependencies={},
                    stack=stack,
                    dependency_provider=context.dependency_provider,
                    **{CONTEXT_OPTION_NAME: context},
                )
            ):
                return

        event = e

    # TODO: we need to publish under RWLock to prevent
    # subscribers dictionary mutation. Now it is protected by copy
    for condition, s in tuple(self._subscribers.values()):
        if condition and not condition(event):
            continue

        async with AsyncExitStack() as stack:
            await s.asolve(
                event,
                cache_dependencies={},
                stack=stack,
                dependency_provider=context.dependency_provider,
                **{CONTEXT_OPTION_NAME: context},
            )

where #

where(condition)
Source code in autogen/beta/stream.py
def where(
    self,
    condition: ClassInfo | Condition,
) -> "Stream":
    if not isinstance(condition, Condition):
        condition = TypeCondition(condition)
    return SubStream(self, condition)

join #

join(*, max_events=None)
Source code in autogen/beta/stream.py
@contextmanager
def join(self, *, max_events: int | None = None) -> Iterator[AsyncIterator[BaseEvent]]:
    queue = asyncio.Queue[BaseEvent]()

    async def write_events(event: BaseEvent) -> None:
        await queue.put(event)

    if max_events:

        async def listen_events() -> AsyncIterator[BaseEvent]:
            for _ in range(max_events):
                yield await queue.get()

    else:

        async def listen_events() -> AsyncIterator[BaseEvent]:
            while True:
                yield await queue.get()

    with self.sub_scope(write_events):
        yield listen_events()

sub_scope #

sub_scope(func, *, interrupt=False, sync_to_thread=True, condition=None)
Source code in autogen/beta/stream.py
@contextmanager
def sub_scope(
    self,
    func: Callable[..., Any],
    *,
    interrupt: bool = False,
    sync_to_thread: bool = True,
    condition: Condition | None = None,
) -> Iterator[None]:
    sub_id = self.subscribe(
        func,
        interrupt=interrupt,
        sync_to_thread=sync_to_thread,
        condition=condition,
    )

    try:
        yield
    finally:
        self.unsubscribe(sub_id)

get async #

get(condition)
Source code in autogen/beta/stream.py
@asynccontextmanager
async def get(
    self,
    condition: ClassInfo | Condition,
) -> AsyncIterator[asyncio.Future[BaseEvent]]:
    result = asyncio.Future[BaseEvent]()

    async def wait_result(event: BaseEvent) -> None:
        result.set_result(event)

    with self.where(condition).sub_scope(wait_result):
        yield result