Skip to content

LocalLinkEndpoint

autogen.beta.network.transport.local.LocalLinkEndpoint #

LocalLinkEndpoint(endpoint_id, client_to_hub, hub_to_client)

Hub-side half of an in-memory duplex.

Mirror of LocalLinkClient: send_frame writes hub→client, frames() reads client→hub. agent_id is set by the hub once a HelloFrame (or direct in-process binding) associates this endpoint with an identity.

Source code in autogen/beta/network/transport/local.py
def __init__(
    self,
    endpoint_id: str,
    client_to_hub: "asyncio.Queue[Frame | None]",
    hub_to_client: "asyncio.Queue[Frame | None]",
) -> None:
    # __init__ stores params; no side effects.
    self.endpoint_id = endpoint_id
    self.agent_id: str | None = None
    self._c2h = client_to_hub
    self._h2c = hub_to_client
    self._closed = False

endpoint_id instance-attribute #

endpoint_id = endpoint_id

agent_id instance-attribute #

agent_id = None

send_frame async #

send_frame(frame)
Source code in autogen/beta/network/transport/local.py
async def send_frame(self, frame: Frame) -> None:
    if self._closed:
        return
    await self._h2c.put(frame)

frames #

frames()
Source code in autogen/beta/network/transport/local.py
def frames(self) -> AsyncIterator[Frame]:
    return self._frames_impl()

close async #

close()
Source code in autogen/beta/network/transport/local.py
async def close(self) -> None:
    if self._closed:
        return
    self._closed = True
    await self._c2h.put(None)
    await self._h2c.put(None)