Source code for redditwarp.iterators.call_chunk_calling_async_iterator
from __future__ import annotations
from typing import TYPE_CHECKING, TypeVar, Generic, Optional
if TYPE_CHECKING:
from typing import Iterable, Iterator, Callable, Awaitable, AsyncIterator
from .stubborn_caller_async_iterator import StubbornCallerAsyncIterator
T = TypeVar('T')
[docs]class CallChunkCallingAsyncIterator(Generic[T]):
"""Evaluate call chunks and return their results."""
@property
def current_callable(self) -> Optional[Callable[[], Awaitable[T]]]:
return self.__calling_itr.current
@current_callable.setter
def current_callable(self, value: Optional[Callable[[], Awaitable[T]]]) -> None:
self.__calling_itr.current = value
def __init__(self, chunks: Iterable[Callable[[], Awaitable[T]]]) -> None:
self.__chunking_itr: Iterator[Callable[[], Awaitable[T]]] = iter(chunks)
self.__calling_itr: StubbornCallerAsyncIterator[T] = StubbornCallerAsyncIterator(self.__chunking_itr)
def __aiter__(self) -> AsyncIterator[T]:
return self
async def __anext__(self) -> T:
return await self.__calling_itr.__anext__()
[docs] def get_chunking_iterator(self) -> Iterator[Callable[[], Awaitable[T]]]:
return self.__chunking_itr