Source code for redditwarp.websocket.transport.reg_SYNC


from __future__ import annotations
from typing import TYPE_CHECKING, MutableMapping, Sequence, Protocol, MutableSequence, Optional, Mapping
if TYPE_CHECKING:
    from importlib.machinery import ModuleSpec

from dataclasses import dataclass

from ...util.imports import load_spec, import_module_from_spec
from ..websocket_SYNC import WebSocket


[docs]class ConnectFunctionProtocol(Protocol): def __call__(self, url: str, *, subprotocols: Sequence[str] = (), headers: Optional[Mapping[str, str]] = None, timeout: float = -2, ) -> WebSocket: ...
[docs]@dataclass class TransportInfo: adaptor_module_name: str name: str version: str connect: ConnectFunctionProtocol
[docs]def load_transport() -> TransportInfo: if not transport_info_registry: for module_spec in transport_module_spec_list: try: import_module_from_spec(module_spec) except ImportError: continue break else: raise ModuleNotFoundError('A websocket transport library needs to be installed.') return next(iter(transport_info_registry.values()))
[docs]def connect( url: str, *, subprotocols: Sequence[str] = (), headers: Optional[Mapping[str, str]] = None, timeout: float = -2, ) -> WebSocket: connect = load_transport().connect return connect( url, subprotocols=subprotocols, headers=headers, timeout=timeout, )
[docs]def register( adaptor_module_name: str, name: str, version: str, connect: ConnectFunctionProtocol, ) -> None: info = TransportInfo( adaptor_module_name=adaptor_module_name, name=name, version=version, connect=connect, ) transport_info_registry[adaptor_module_name] = info
transport_module_spec_list: MutableSequence[ModuleSpec] = [ load_spec('.carriers.websocket', __package__), ] transport_info_registry: MutableMapping[str, TransportInfo] = {}