WsLinkClient(url, *, ssl_context=None, ping_interval=20.0, ping_timeout=20.0, open_timeout=10.0)
Tenant-side WebSocket link to a hub.
Construct with the hub's URL, then await client.open() to perform the WebSocket handshake. Frames flow over a single bidirectional connection: outbound via :meth:send_frame, inbound via :meth:frames. endpoint_id is populated by the :class:WelcomeFrame the hub sends in response to a :class:HelloFrame; before that it is the empty string.
Closing the client closes the underlying WebSocket. Reconnect is not automatic — callers that want it construct a fresh WsLinkClient and open() again.
Source code in autogen/beta/network/transport/ws.py
| def __init__(
self,
url: str,
*,
ssl_context: Any = None,
ping_interval: float | None = 20.0,
ping_timeout: float | None = 20.0,
open_timeout: float | None = 10.0,
) -> None:
# __init__ stores params; side effects deferred to open().
self._url = url
self._ssl = ssl_context
self._ping_interval = ping_interval
self._ping_timeout = ping_timeout
self._open_timeout = open_timeout
self._ws: ClientConnection | None = None
self._closed = False
self._endpoint_id = ""
|
open async
Connect to the hub over WebSocket.
Does not send a :class:HelloFrame — callers send the handshake explicitly via :meth:send_frame so they can carry their own auth claim and optional since_envelope_id. The hub responds with :class:WelcomeFrame, which this client observes in :meth:frames and uses to populate :attr:endpoint_id.
Source code in autogen/beta/network/transport/ws.py
| async def open(self) -> None:
"""Connect to the hub over WebSocket.
Does not send a :class:`HelloFrame` — callers send the
handshake explicitly via :meth:`send_frame` so they can carry
their own auth claim and optional ``since_envelope_id``. The
hub responds with :class:`WelcomeFrame`, which this client
observes in :meth:`frames` and uses to populate
:attr:`endpoint_id`.
"""
if self._ws is not None:
return
self._ws = await _ws_connect(
self._url,
ssl=self._ssl,
ping_interval=self._ping_interval,
ping_timeout=self._ping_timeout,
open_timeout=self._open_timeout,
)
|
send_frame async
Source code in autogen/beta/network/transport/ws.py
| async def send_frame(self, frame: Frame) -> None:
if self._closed or self._ws is None:
return
try:
await self._ws.send(json.dumps(encode_frame(frame)))
except ConnectionClosed:
return
|
frames
Source code in autogen/beta/network/transport/ws.py
| def frames(self) -> AsyncIterator[Frame]:
return self._frames_impl()
|
close async
Source code in autogen/beta/network/transport/ws.py
| async def close(self) -> None:
if self._closed:
return
self._closed = True
if self._ws is not None:
with contextlib.suppress(Exception):
await self._ws.close()
|