Source code for redditwarp.pagination.paginators.modmail_async1


from __future__ import annotations
from typing import TYPE_CHECKING, Any, Optional, Sequence, Iterable, Tuple
if TYPE_CHECKING:
    from ...client_ASYNC import Client

from ..async_paginator import CursorAsyncPaginator, Resettable
from ...model_loaders.modmail_ASYNC import load_conversation_info, load_message
from ...models.modmail_ASYNC import ConversationInfo, Message

[docs]class ModmailConversationMessageAsyncPaginator(Resettable, CursorAsyncPaginator[Tuple[ConversationInfo, Message]]): def __init__(self, client: Client, url: str, mailbox: str, subreddit_names: Sequence[str], sort: str, *, limit: Optional[int] = 100, ) -> None: super().__init__() self.limit: Optional[int] = limit ("") self.client: Client = client ("") self.url: str = url ("") self.mailbox: str = mailbox ("") self.subreddit_names: Sequence[str] = subreddit_names ("") self.sort: str = sort ("") self.__reset() def __reset(self) -> None: self._after: str = '' self._has_after: bool = True
[docs] def reset(self) -> None: self.__reset()
[docs] def get_cursor(self) -> str: return self._after
[docs] def set_cursor(self, value: str) -> None: self._after = value
[docs] def has_more(self) -> bool: return self._has_after
[docs] def set_has_more(self, value: bool) -> None: self._has_after = value
def _generate_params(self) -> Iterable[tuple[str, str]]: if self.limit is not None: yield ('limit', str(self.limit)) if self._after: yield ('after', self._after) if self.mailbox: yield ('state', self.mailbox) if self.subreddit_names: yield ('entity', ','.join(self.subreddit_names)) if self.sort: yield ('sort', self.sort) async def _fetch_data(self) -> Any: params = dict(self._generate_params()) data = await self.client.request('GET', self.url, params=params) entries = data['conversationIds'] if entries: self._after = entries[-1] self._has_after = bool(entries) return data
[docs] async def fetch(self) -> Sequence[Tuple[ConversationInfo, Message]]: data = await self._fetch_data() conversations_mapping = data['conversations'] messages_mapping = data['messages'] results = [] for convo_id36 in data['conversationIds']: conversation_data = conversations_mapping[convo_id36] message_id36 = conversation_data['objIds'][0]['id'] message_data = messages_mapping[message_id36] results.append(( load_conversation_info(conversation_data, self.client), load_message(message_data, self.client), )) return results