Source code for jina.serve.stream.helper

from typing import Iterator, AsyncIterator, Union

from jina.helper import get_or_reuse_loop


[docs]class AsyncRequestsIterator: """Iterator to allow async iteration of blocking/non-blocking iterator from the Client""" def __init__(self, iterator: Union[Iterator, AsyncIterator]) -> None: """Async request iterator TODO (Deepankar): accept `num_req` :param iterator: request iterator """ self.iterator = iterator
[docs] def iterator__next__(self): """ Executed inside a `ThreadPoolExecutor` via `loop.run_in_executor` to avoid following exception. "StopIteration interacts badly with generators and cannot be raised into a Future" :return: next request or None """ try: return self.iterator.__next__() except StopIteration: return None
def __aiter__(self): return self async def __anext__(self): if isinstance(self.iterator, Iterator): """ An `Iterator` indicates "blocking" code, which might block all tasks in the event loop. Hence we iterate in the default executor provided by asyncio. """ request = await get_or_reuse_loop().run_in_executor( None, self.iterator__next__ ) """ `iterator.__next__` can be executed directly and that'd raise `StopIteration` in the executor, which raises the following exception while chaining states in futures. "StopIteration interacts badly with generators and cannot be raised into a Future" To avoid that, we handle the raise by a `return None` """ if request is None: raise StopAsyncIteration elif isinstance(self.iterator, AsyncIterator): # we assume that `AsyncIterator` doesn't block the event loop request = await self.iterator.__anext__() return request