Skip to content Skip to sidebar Skip to footer

Appending To Merged Async Generators In Python

I'm trying to merge a bunch of asynchronous generators in Python 3.7 while still adding new async generators on iteration. I'm currently using aiostream to merge my generators: fro

Solution 1:

Here is an implementation that should work efficiently even with a large number of async iterators:

classmerge:
    def__init__(self, *iterables):
        self._iterables = list(iterables)
        self._wakeup = asyncio.Event()

    def_add_iters(self, next_futs, on_done):
        for it in self._iterables:
            it = it.__aiter__()
            nfut = asyncio.ensure_future(it.__anext__())
            nfut.add_done_callback(on_done)
            next_futs[nfut] = it
        del self._iterables[:]
        return next_futs

    asyncdef__aiter__(self):
        done = {}
        next_futs = {}
        defon_done(nfut):
            done[nfut] = next_futs.pop(nfut)
            self._wakeup.set()

        self._add_iters(next_futs, on_done)
        try:
            while next_futs:
                await self._wakeup.wait()
                self._wakeup.clear()
                for nfut, it in done.items():
                    try:
                        ret = nfut.result()
                    except StopAsyncIteration:
                        continue
                    self._iterables.append(it)
                    yield ret
                done.clear()
                if self._iterables:
                    self._add_iters(next_futs, on_done)
        finally:
            # if the generator exits with an exception, or if the caller stops# iterating, make sure our callbacks are removedfor nfut in next_futs:
                nfut.remove_done_callback(on_done)

    defappend_iter(self, new_iter):
        self._iterables.append(new_iter)
        self._wakeup.set()

The only change required for your sample code is that the method is named append_iter, not merge.

Solution 2:

This can be done using stream.flatten with an asyncio queue to store the new generators.

import asyncio
from aiostream import stream, pipe

async def main():
    queue = asyncio.Queue()
    await queue.put(go())
    await queue.put(go())
    await queue.put(go())

    xs = stream.call(queue.get)
    ys = stream.cycle(xs)
    zs = stream.flatten(ys, task_limit=5)
    async with zs.stream() as streamer:
        async for item in streamer:
            if item == 50:
                await queue.put(go())
            print(item)

Notice that you may tune the number of tasks that can run at the same time using the task_limit argument. Also note that zs can be elegantly defined using the pipe syntax:

zs = stream.call(queue.get) | pipe.cycle() | pipe.flatten(task_limit=5)

Disclaimer: I am the project maintainer.

Post a Comment for "Appending To Merged Async Generators In Python"