Source code for redditwarp.core.rate_limited_ASYNC


from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from ..http.handler_ASYNC import Handler
    from ..http.send_params import SendParams
    from ..http.exchange import Exchange

from ..util.imports import lazy_import
if TYPE_CHECKING:
    import asyncio
else:
    lazy_import % 'asyncio'

import time
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime

from ..http.delegating_handler_ASYNC import DelegatingHandler
from ..util.token_bucket import TokenBucket

[docs]class RateLimited(DelegatingHandler): def __init__(self, handler: Handler) -> None: super().__init__(handler) self.remaining: int = 0 ("") self.reset: int = 0 ("") self.used: int = 0 ("") self._timestamp: float = time.monotonic() self._tb = TokenBucket(capacity=10, rate=1) self._lock = asyncio.Lock() self._datetime = datetime.min.replace(tzinfo=timezone.utc) async def _send(self, p: SendParams) -> Exchange: async with self._lock: tb = self._tb s = 0. if self.remaining <= 1: s = self.reset elif self.reset > 0: tb.get_value() tb.rate = self.remaining / self.reset s = tb.get_cooldown(1) await asyncio.sleep(s) tb.consume(1) xchg = await super()._send(p) now = time.monotonic() delta = now - self._timestamp self._timestamp = now dt = parsedate_to_datetime(xchg.response.headers['Date']) if dt > self._datetime: self._datetime = dt headers = xchg.response.headers if 'x-ratelimit-reset' in headers: self.remaining = int(float(headers['x-ratelimit-remaining'])) self.reset = int(headers['x-ratelimit-reset']) self.used = int(headers['x-ratelimit-used']) else: self.remaining -= 1 self.reset -= int(delta) self.used += 1 if self.reset <= 0: self.remaining = 300 self.reset = 600 self.used = 0 return xchg