Source code for redditwarp.iterators.stubborn_caller_async_iterator


from __future__ import annotations
from typing import TypeVar, AsyncIterator, Callable, Optional, Awaitable, Iterable

T = TypeVar('T')

[docs]class StubbornCallerAsyncIterator(AsyncIterator[T]): """Call each callable in the given iterator and return its result. If a call raises an exception it will propagate normally. Doing `next(self)` will re-attempt the call until it returns a result. Has a `self.current` attribute to get the current callable. """ def __init__(self, iterable: Iterable[Callable[[], Awaitable[T]]]) -> None: self.__itr = iter(iterable) self.current: Optional[Callable[[], Awaitable[T]]] = None ("") def __aiter__(self) -> AsyncIterator[T]: return self async def __anext__(self) -> T: if self.current is None: try: self.current = next(self.__itr) except StopIteration: raise StopAsyncIteration ret = await self.current() self.current = None return ret