Source code for redditwarp.iterators.call_chunk_chaining_async_iterator


from __future__ import annotations
from typing import TYPE_CHECKING, TypeVar, Generic, Optional
if TYPE_CHECKING:
    from typing import Iterable, Iterator, Callable, Sequence, Awaitable, AsyncIterator

from .stubborn_caller_async_iterator import StubbornCallerAsyncIterator
from .unfaltering_chaining_async_iterator import UnfalteringChainingAsyncIterator

T = TypeVar('T')

[docs]class CallChunkChainingAsyncIterator(Generic[T]): """Evaluate call chunks and chain them together.""" @property def current_callable(self) -> Optional[Callable[[], Awaitable[Sequence[T]]]]: return self.__calling_itr.current @current_callable.setter def current_callable(self, value: Optional[Callable[[], Awaitable[Sequence[T]]]]) -> None: self.__calling_itr.current = value @property def current_iterator(self) -> Iterator[T]: return self.__chaining_itr.current_iterator @current_iterator.setter def current_iterator(self, value: Iterator[T]) -> None: self.__chaining_itr.current_iterator = value def __init__(self, chunks: Iterable[Callable[[], Awaitable[Sequence[T]]]]) -> None: self.__chunking_itr: Iterator[Callable[[], Awaitable[Sequence[T]]]] = iter(chunks) self.__calling_itr: StubbornCallerAsyncIterator[Sequence[T]] = StubbornCallerAsyncIterator(self.__chunking_itr) self.__chaining_itr: UnfalteringChainingAsyncIterator[T] = UnfalteringChainingAsyncIterator(self.__calling_itr) def __aiter__(self) -> AsyncIterator[T]: return self async def __anext__(self) -> T: return await self.__chaining_itr.__anext__()
[docs] def get_chunking_iterator(self) -> Iterator[Callable[[], Awaitable[Sequence[T]]]]: return self.__chunking_itr