from __future__ import annotations
from typing import TYPE_CHECKING, MutableMapping, Sequence, Protocol, MutableSequence, Optional, Mapping
if TYPE_CHECKING:
from importlib.machinery import ModuleSpec
from dataclasses import dataclass
from ...util.imports import load_spec, import_module_from_spec
from ..websocket_SYNC import WebSocket
[docs]class ConnectFunctionProtocol(Protocol):
def __call__(self,
url: str,
*,
subprotocols: Sequence[str] = (),
headers: Optional[Mapping[str, str]] = None,
timeout: float = -2,
) -> WebSocket: ...
[docs]@dataclass
class TransportInfo:
adaptor_module_name: str
name: str
version: str
connect: ConnectFunctionProtocol
[docs]def load_transport() -> TransportInfo:
if not transport_info_registry:
for module_spec in transport_module_spec_list:
try:
import_module_from_spec(module_spec)
except ImportError:
continue
break
else:
raise ModuleNotFoundError('A websocket transport library needs to be installed.')
return next(iter(transport_info_registry.values()))
[docs]def connect(
url: str,
*,
subprotocols: Sequence[str] = (),
headers: Optional[Mapping[str, str]] = None,
timeout: float = -2,
) -> WebSocket:
connect = load_transport().connect
return connect(
url,
subprotocols=subprotocols,
headers=headers,
timeout=timeout,
)
[docs]def register(
adaptor_module_name: str,
name: str,
version: str,
connect: ConnectFunctionProtocol,
) -> None:
info = TransportInfo(
adaptor_module_name=adaptor_module_name,
name=name,
version=version,
connect=connect,
)
transport_info_registry[adaptor_module_name] = info
transport_module_spec_list: MutableSequence[ModuleSpec] = [
load_spec('.carriers.websocket', __package__),
]
transport_info_registry: MutableMapping[str, TransportInfo] = {}