Source code for redditwarp.http.transport.connectors.urllib3


from __future__ import annotations
from typing import Optional, Union, Any, cast

import json

import urllib3  # type: ignore[import]
import urllib3.exceptions as urllib3_exceptions  # type: ignore[import]

from ... import exceptions
from ... import payload
from ...send_params import SendParams
from ...exchange import Exchange
from ...request import Request
from ...response import UResponse
from ..reg_SYNC import register
from ..connector_SYNC import Connector
from ...util.merge_query_params import merge_query_params


def _get_effective_timeout(v: float) -> float:
    if 0 > v != -1:
        raise ValueError(f"invalid `timeout` value: {v}")
    return v

def _get_effective_follow_redirects(v: Optional[bool]) -> bool:
    if v is None:
        raise ValueError(f"invalid `follow_redirects` value: {v}")
    return v


[docs]class Urllib3Connector(Connector): def __init__(self, http: urllib3.poolmanager.PoolManager, ) -> None: super().__init__() self.http: urllib3.poolmanager.PoolManager = http ("") def _send(self, p: SendParams) -> Exchange: r = p.requisition etv = _get_effective_timeout(p.timeout) tmo = None if etv == -1 else etv follow_redirects = _get_effective_follow_redirects(p.follow_redirects) url = merge_query_params(r.url, r.params) headers = dict(r.headers) pld = r.payload resp: urllib3.response.BaseHTTPResponse try: if pld is None: resp = self.http.urlopen( r.verb, url, headers=headers, timeout=tmo, redirect=follow_redirects, ) elif isinstance(pld, payload.Bytes): headers['Content-Type'] = pld.get_media_type() resp = self.http.urlopen( r.verb, url, headers=headers, body=pld.data, timeout=tmo, redirect=follow_redirects, ) elif isinstance(pld, payload.Text): headers['Content-Type'] = pld.get_media_type() resp = self.http.urlopen( r.verb, url, headers=headers, body=pld.text.encode(), timeout=tmo, redirect=follow_redirects, ) elif isinstance(pld, payload.JSON): headers['Content-Type'] = pld.get_media_type() resp = self.http.urlopen( r.verb, url, headers=headers, body=json.dumps(pld.json).encode(), timeout=tmo, redirect=follow_redirects, ) elif isinstance(pld, payload.URLEncodedFormData): fields0: dict[str, str] = dict(pld.data) resp = self.http.request_encode_body( r.verb, url, headers=headers, fields=fields0, timeout=cast(Any, tmo), redirect=cast(Any, follow_redirects), encode_multipart=False, ) elif isinstance(pld, payload.MultipartFormData): fields: dict[str, Union[str, tuple[Optional[str], Union[str, bytes], Optional[str]]]] = {} for pt in pld.parts: if isinstance(pt, payload.MultipartFormData.TextField): fields[pt.name] = pt.text elif isinstance(pt, payload.MultipartFormData.FileField): if pt.content_type == '': raise ValueError('empty string multipart content type not supported') fields[pt.name] = (pt.filename, pt.file.read(), pt.content_type) else: raise ValueError('unexpected multipart field type: ' + repr(pt)) resp = self.http.request_encode_body( r.verb, url, headers=headers, fields=cast(Any, fields), timeout=cast(Any, tmo), redirect=cast(Any, follow_redirects), ) else: raise Exception(f"unsupported payload type: {pld.__class__.__name__!r}") except urllib3_exceptions.ReadTimeoutError as cause: raise exceptions.TimeoutException from cause except Exception as cause: raise exceptions.TransportError from cause x_requ = Request( verb=r.verb, url=url, headers={}, data=b'', ) x_resp = UResponse( status=resp.status, headers=resp.headers, data=resp.data, underlying_object=resp, ) return Exchange( requisition=r, request=x_requ, response=x_resp, history=(), ) def _close(self) -> None: self.http.clear()
[docs]def new_connector() -> Urllib3Connector: return Urllib3Connector(urllib3.PoolManager())
name: str = urllib3.__name__ version: str = getattr(urllib3, '__version__') register( adaptor_module_name=__name__, name=name, version=version, new_connector=new_connector, )