Source code for redditwarp.iterators.unfaltering_chaining_async_iterator


from __future__ import annotations
from typing import TypeVar, AsyncIterable, Iterable, AsyncIterator, Iterator

E = TypeVar('E')

[docs]class UnfalteringChainingAsyncIterator(AsyncIterator[E]): """Like `itertools.chain.from_iterable()` but is able to continue when an exception occurs during iteration. Also has a `self.current_iterator` attribute to get the current iterator. """ def __init__(self, source: AsyncIterable[Iterable[E]]) -> None: self.__iterator = source.__aiter__() self.current_iterator: Iterator[E] = iter(()) ("") def __aiter__(self) -> AsyncIterator[E]: return self async def __anext__(self) -> E: while True: for elem in self.current_iterator: return elem it = await self.__iterator.__anext__() try: self.current_iterator = iter(it) except StopIteration: raise StopAsyncIteration