4

I'm trying to create a simple network monitoring app in Python. It should essentially:

  • Run multiple scripts (in this case, bash commands like "ping" and "traceroute") infinitely and simultaneously
  • Yield each line from the output of each subprocess; each line should then be consumed elsewhere in the program and sent to a Kafka topic
  • Do some extra processing on the topic and send the data to InfluxDB (but that's less relevant - I do it with Faust).

What I did:

I tried using an async generator:

async def run(command: str):
    proc = await asyncio.create_subprocess_shell(
        command,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    while True:
        line = await proc.stdout.readline()
        if line: yield line

Then consume it elsewhere in the program:

...
async for output_line in run("some_command"):
        # do something with line

This works fine for a single subprocess, however I'm not sure what to do when I need multiple async generators to run in parallel and be consumed in parallel - some like asyncio.gather, maybe, but for async generators.

What do you think would be the best approach to go about doing this? Upon searching I found the aiostream module, which can merge multiple async generators like so. I can then instead yield a tuple with the line and, say, the command I gave, to identify which generator the output line came from.

However, maybe there's a simpler solution, hopefully a native one?

Thanks!

2 Answers 2

9

Given some number of AsyncIterators, you want to be able to consume them from within a single for loop:

async for line in merge_iterators([
    run("some_command"),
    run("another_command"),
    ...
]):
    ...

preferably without relying on a third-party library.

There are some subtleties to consider:

  • What should happen if one of the iterators fails while the others are still going?
    • Should it abort the loop entirely?
    • Should it trigger some separate logic?
    • Could it just be ignored? *
  • What should happen if one of the iterators exhausts before the others?
    • Should everything stop?
    • Should the rest keep going? *
  • Are all of the iterators returning data of the same type?
    • If so, no further consideration needed. *
    • If not, how should this be managed, and should it really be one loop, or could it be better handled by multiple concurrent ones?

Your use case is well-defined, so I'm confident that you'd accept the choices I've marked with asterisks (*).

A "native" Python solution to this might look like:

from asyncio import FIRST_COMPLETED, Task, create_task, wait
from typing import AsyncIterable, AsyncIterator, Collection, TypeVar


_T = TypeVar("_T")


async def merge_iterators(iterators: Collection[AsyncIterator[_T]]) -> AsyncIterable[_T]:
    """
    Enable consumption of multiple `AsyncIterator`s from within one `for` loop.

    - Ignore any exceptions.
    - Yield until all iterators have exhausted.

    https://stackoverflow.com/q/72445371/4877269
    """

    # Start by obtaining a task for each iterator's next result.
    # Unfortunately, `create_task` doesn't accept pure awaitables.
    # We need something to turn an awaitable into a coroutine...
    async def await_next(iterator: AsyncIterator[_T]) -> _T:
        """Turn an awaitable into a coroutine for `create_task`."""
        return await iterator.__anext__()

    # ...which can then be turned into a task.
    def as_task(iterator: AsyncIterator[_T]) -> Task[_T]:
        return create_task(await_next(iterator))

    # Create a task for each iterator, keyed on the iterator.
    next_tasks = {iterator: as_task(iterator) for iterator in iterators}

    # As iterators are exhausted, they'll be removed from that mapping.
    # Repeat for as long as any are NOT exhausted.
    while next_tasks:
        # Wait until one of the iterators yields (or errors out).
        # This also returns pending tasks, but we've got those in our mapping.
        done, _ = await wait(next_tasks.values(), return_when=FIRST_COMPLETED)

        for task in done:
            # Identify the iterator.
            iterator = next(it for it, t in next_tasks.items() if t == task)

            # Yield the value, or handle the error.
            try:
                yield task.result()
            except StopAsyncIteration:
                # This iterator has exhausted.
                del next_tasks[iterator]
            except Exception:
                # Something else went wrong.
                # For the sake of this example, ignore the error.
                # In real life, that's not good--at least log it or something!
                pass
            else:
                # The iterator hasn't exhausted or errored out.
                # Queue the next inspection.
                next_tasks[iterator] = as_task(iterator)

    # At this point, all iterators are exhausted.

The same code without comments is perhaps a little less intimidating in size:

from asyncio import FIRST_COMPLETED, Task, create_task, wait
from typing import AsyncIterable, AsyncIterator, Collection, TypeVar


_T = TypeVar("_T")


async def _await_next(iterator: AsyncIterator[_T]) -> _T:
    return await iterator.__anext__()


def _as_task(iterator: AsyncIterator[_T]) -> Task[_T]:
    return create_task(_await_next(iterator))


async def merge_iterators(iterators: Collection[AsyncIterator[_T]]) -> AsyncIterable[_T]:
    next_tasks = {iterator: _as_task(iterator) for iterator in iterators}
    while next_tasks:
        done, _ = await wait(next_tasks.values(), return_when=FIRST_COMPLETED)
        for task in done:
            iterator = next(it for it, t in next_tasks.items() if t == task)
            try:
                yield task.result()
            except StopAsyncIteration:
                del next_tasks[iterator]
            except Exception:
                pass
            else:
                next_tasks[iterator] = _as_task(iterator)
Sign up to request clarification or add additional context in comments.

3 Comments

I realise you would prefer this to output a tuple including (say) the command that the output is related to. An more suitable merge_iterators function signature for that might look like async def merge_iterators(iterators: Mapping[str, AsyncIterator[_T]]) -> AsyncIterable[tuple[str, _T]]:.
This is a nice solution. I came with an alternative solution using a queue, that is possibly useful if there's a need to limit memory use (by tuning the queue maxsize), but it has the drawback of needed to wait. This wait will happen at the end when all producing tasks are consumed, and simultaneously controls how often to check whether this is the case. Thought it was interesting enough to share: gist.github.com/antonagestam/8476ada7d74cce93af0339cf32c62ae2
Hmm, actually that's probably not true. Equal amount of memory would be consumed, because each queue-producing task would consume and hold the item until there's room in the queue. So, probably not a very useful solution after all, but still interesting :)
-1

What you are looking for is asyncio.gather, which runs multiple awaitable objects simultaneously.

To use it, I think your first task is to wrap your parsing code into a single function, like:

async def parse(cmd):
    async for output_line in run(cmd):
        # something

Then in another function/context, wrap the parse with gather:

result = await asyncio.gather(
    parse("cmd1"),
    parse("cmd2"),
    parse("cmd3"),
)

1 Comment

gather is great for awaitables with a single result, but probably not going to work as intended for something like iterables as asked.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.