Source code for redditwarp.pagination.paginators.moderation.legacy_pull_users_async1
from __future__ import annotations
from typing import TYPE_CHECKING, TypeVar, Optional, Sequence
if TYPE_CHECKING:
from ....client_ASYNC import Client
from operator import itemgetter
from ..listing.listing_async_paginator import ListingAsyncPaginator
from ....model_loaders.user_relationship import load_user_relationship, load_banned_subreddit_user_relation
from ....models.user_relationship import UserRelationship, BannedSubredditUserRelationship
T = TypeVar('T')
[docs]class LegacyModerationUsersAsyncPaginator(ListingAsyncPaginator[T]):
def __init__(self,
client: Client,
url: str,
*,
limit: Optional[int] = 100,
) -> None:
super().__init__(client, url, limit=limit, cursor_extractor=itemgetter('rel_id'))
[docs]class UserRelationshipListingAsyncPaginator(LegacyModerationUsersAsyncPaginator[UserRelationship]):
[docs] async def fetch(self) -> Sequence[UserRelationship]:
data = await self._fetch_data()
return [load_user_relationship(d) for d in data['children']]
[docs]class BannedSubredditUserRelationshipListingAsyncPaginator(LegacyModerationUsersAsyncPaginator[BannedSubredditUserRelationship]):
[docs] async def fetch(self) -> Sequence[BannedSubredditUserRelationship]:
data = await self._fetch_data()
return [load_banned_subreddit_user_relation(d) for d in data['children']]