redditwarp.websocket.websocket_SYNC#

class redditwarp.websocket.websocket_SYNC.WebSocket[source]#

Bases: object

SIDE: int = 0#
default_waittime: float#
state: ConnectionState#
close_code: int#
close_reason: str#
subprotocol: str#
send_frame(m: Frame) None[source]#
send_text(data: str) None[source]#
send_bytes(data: bytes) None[source]#
send(data: str | bytes) None[source]#
pulse(*, timeout: float = -2) Iterator[object][source]#
cycle(t: float = -1) Iterator[object][source]#

Yield events from self.pulse() over t seconds.

receive(*, timeout: float = -2) Message[source]#
recv_bytes(*, timeout: float = -2) bytes[source]#

Receive the next message. If it’s a BytesMessage then return its payload, otherwise raise an MessageTypeMismatchException exception.

recv(*, timeout: float = -2) bytes[source]#
recv_text(*, timeout: float = -2) str[source]#

Receive the next message. If it’s a TextMessage then return its payload, otherwise raise an MessageTypeMismatchException exception.

close(code: int | None = 1000, reason: str = '', *, waitfor: float = -2) None[source]#
class redditwarp.websocket.websocket_SYNC.PartiallyImplementedWebSocket[source]#

Bases: WebSocket

send_frame(m: Frame) None[source]#
pulse(*, timeout: float = -2) Iterator[object][source]#

Process one frame’s worth of incoming data and possibly generate events for it.

close(code: int | None = 1000, reason: str = '', *, waitfor: float = -2) None[source]#
class redditwarp.websocket.websocket_SYNC.PulsePartiallyImplementedWebSocketConnection[source]#

Bases: PartiallyImplementedWebSocket