Skip to content

WsLinkClient

autogen.beta.network.transport.ws.WsLinkClient #

WsLinkClient(url, *, ssl_context=None, ping_interval=20.0, ping_timeout=20.0, open_timeout=10.0)

Tenant-side WebSocket link to a hub.

Construct with the hub's URL, then await client.open() to perform the WebSocket handshake. Frames flow over a single bidirectional connection: outbound via :meth:send_frame, inbound via :meth:frames. endpoint_id is populated by the :class:WelcomeFrame the hub sends in response to a :class:HelloFrame; before that it is the empty string.

Closing the client closes the underlying WebSocket. Reconnect is not automatic — callers that want it construct a fresh WsLinkClient and open() again.

Source code in autogen/beta/network/transport/ws.py
def __init__(
    self,
    url: str,
    *,
    ssl_context: Any = None,
    ping_interval: float | None = 20.0,
    ping_timeout: float | None = 20.0,
    open_timeout: float | None = 10.0,
) -> None:
    # __init__ stores params; side effects deferred to open().
    self._url = url
    self._ssl = ssl_context
    self._ping_interval = ping_interval
    self._ping_timeout = ping_timeout
    self._open_timeout = open_timeout
    self._ws: ClientConnection | None = None
    self._closed = False
    self._endpoint_id = ""

endpoint_id property #

endpoint_id

open async #

open()

Connect to the hub over WebSocket.

Does not send a :class:HelloFrame — callers send the handshake explicitly via :meth:send_frame so they can carry their own auth claim and optional since_envelope_id. The hub responds with :class:WelcomeFrame, which this client observes in :meth:frames and uses to populate :attr:endpoint_id.

Source code in autogen/beta/network/transport/ws.py
async def open(self) -> None:
    """Connect to the hub over WebSocket.

    Does not send a :class:`HelloFrame` — callers send the
    handshake explicitly via :meth:`send_frame` so they can carry
    their own auth claim and optional ``since_envelope_id``. The
    hub responds with :class:`WelcomeFrame`, which this client
    observes in :meth:`frames` and uses to populate
    :attr:`endpoint_id`.
    """
    if self._ws is not None:
        return
    self._ws = await _ws_connect(
        self._url,
        ssl=self._ssl,
        ping_interval=self._ping_interval,
        ping_timeout=self._ping_timeout,
        open_timeout=self._open_timeout,
    )

send_frame async #

send_frame(frame)
Source code in autogen/beta/network/transport/ws.py
async def send_frame(self, frame: Frame) -> None:
    if self._closed or self._ws is None:
        return
    try:
        await self._ws.send(json.dumps(encode_frame(frame)))
    except ConnectionClosed:
        return

frames #

frames()
Source code in autogen/beta/network/transport/ws.py
def frames(self) -> AsyncIterator[Frame]:
    return self._frames_impl()

close async #

close()
Source code in autogen/beta/network/transport/ws.py
async def close(self) -> None:
    if self._closed:
        return
    self._closed = True
    if self._ws is not None:
        with contextlib.suppress(Exception):
            await self._ws.close()