Source code for redditwarp.core.exceptions


from __future__ import annotations
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
    from ..http.exchange import Exchange

from .. import auth
from .const import TOKEN_OBTAINMENT_URL
from .. import http
from ..exceptions import ArgExcMixin
from ..auth.exceptions import raise_for_token_server_response_error

[docs]class ArgExc(ArgExcMixin): pass
[docs]def raise_for_reddit_token_server_response_error(json_dict: Any) -> None: error = json_dict.get('error') if not isinstance(error, str): return raise_for_token_server_response_error(json_dict)
[docs]class Throwaway(ArgExc): """These exceptions are not intended to be caught."""
[docs]class AuthError(Throwaway): pass
[docs]class CredentialsError(Throwaway): pass
[docs]class ClientCredentialsError(CredentialsError): pass
[docs]class GrantCredentialsError(CredentialsError): pass
[docs]class BlacklistedUserAgent(Throwaway): pass
[docs]class FaultyUserAgent(Throwaway): pass
[docs]def raise_for_reddit_auth_response_exception(e: Exception, xchg: Exchange) -> None: requ_headers = xchg.request.headers if isinstance(e, auth.exceptions.TokenServerResponseError): if e.error_name == 'invalid_grant': msg = 'Check your grant credentials' pld = xchg.requisition.payload if isinstance(pld, http.payload.URLEncodedFormData): grant = pld.data if grant.get('grant_type') == 'password': msg = "Check your grant credentials.\n\nTip: If you have 2FA enabled the password grant won't work. You must use a refresh token instead." raise GrantCredentialsError(msg) elif e.error_name == 'unsupported_grant_type': content_type = requ_headers.get('Content-Type', '') expected_content_type = 'application/x-www-form-urlencoded' if content_type != expected_content_type: raise AuthError(f'bad Content-Type header: got {content_type!r}, need {expected_content_type!r}') elif isinstance(e, http.exceptions.StatusCodeException): status = e.status_code if status == 400: pld = xchg.requisition.payload if isinstance(pld, http.payload.URLEncodedFormData): grant = pld.data if grant.get('grant_type') == 'refresh_token': e.arg = "Your refresh token might be invalid." raise e elif status == 401: if not (url := xchg.requisition.url).startswith(("https://www.reddit.com", "https://ssl.reddit.com")): raise AuthError(f'bad access token URL: got {url!r}, need {TOKEN_OBTAINMENT_URL!r}') if 'Authorization' not in requ_headers: raise AuthError('Authorization header missing from request') if not requ_headers['Authorization'][:5].lower().startswith('basic'): raise AuthError('Authorization header value must start with "Basic"') raise ClientCredentialsError('Check your client credentials') elif status == 403: ua = requ_headers['User-Agent'] if ua.startswith('Bot'): raise BlacklistedUserAgent("User-Agent strings must not start with 'Bot'.") for s in ['scraping', 'searchme']: if s in ua: raise BlacklistedUserAgent( f"{s!r} is a known blacklisted User-Agent pattern." " Remove it from your User-Agent string." ) elif status == 404: pld = xchg.requisition.payload if isinstance(pld, http.payload.URLEncodedFormData): grant = pld.data if grant.get('grant_type') == 'authorization_code': e.arg = "The authorization code might be expired." raise e elif status == 429: ua = requ_headers['User-Agent'] if 'curl' in ua: msg = "The pattern 'curl' in your User-Agent string is known to interfere with rate limits. Remove it from your User-Agent string." raise FaultyUserAgent(msg)