1

I am trying to create two processes that run forever, each running an asyncio loop inside of them.

I want to understand if the logic is correct. Is there a better way to do the same thing?

import asyncio
import multiprocessing


async def my_async_func(topic):
    while True:
        await asyncio.sleep(5)
        print(topic)


def create_aio_loop(topic):
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(my_async_func(topic), loop=loop)
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        loop.stop()


def main():
    topic_a = 'New to Asyncio'
    topic_b = 'New to Multiprocessing'
    process_a = multiprocessing.Process(target=create_aio_loop, args=(topic_a, ))
    process_b = multiprocessing.Process(target=create_aio_loop, args=(topic_b, ))
    processes = [process_a, process_b]

    try:
        for process in processes:
            process.start()
    except KeyboardInterrupt:
        for process in processes:
            process.terminate()
            process.join()


if __name__ == '__main__':
    main()
1

1 Answer 1

2

I am trying to create two processes that run forever, each running an asyncio loop inside of them.

I assume that you know why you wish to dispatch some of your code on several processing (virtual) cores (multiprocessing) and paralellize the rest on the same core (asyncio).

Then I think that you did right: you spawn two processes, and each of them has its own asyncio loop. The only improvement that I could find was to use loop.run_until_complete, which removes one line of code :) :

import os
import asyncio
import multiprocessing

async def my_async_func(topic):
    while True:
        await asyncio.sleep(5)
        print(topic)


def create_aio_loop(topic):
    process_name = "[Process %s, topic %s]" % (os.getpid(), topic)
    print("%s Started " % process_name)

    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(my_async_func(topic))
    except KeyboardInterrupt:
        print("%s Loop interrupted" % process_name)
        loop.stop()

    print("%s terminating" % process_name)


if __name__ == '__main__':
    topic_a = 'New to Asyncio'
    topic_b = 'New to Multiprocessing'
    process_a = multiprocessing.Process(target=create_aio_loop, args=(topic_a, ))
    process_b = multiprocessing.Process(target=create_aio_loop, args=(topic_b, ))
    processes = [process_a, process_b]

    try:
        for process in processes:
            process.start()
    except KeyboardInterrupt:
        for process in processes:
            process.terminate()
            process.join()

Oh and also I suggest that you display all messages using a prefix containing the process id, that is much easier for multiprocessing debugging. I introduced an example with the start/terminate print messages.

Running this yields:

>python tmp_asyncio.py
[Process 11456, topic New to Multiprocessing] Started
[Process 18396, topic New to Asyncio] Started
New to Asyncio
New to Multiprocessing

(here I pressed Ctrl+C)

[Process 11456, topic New to Multiprocessing] Loop interrupted
[Process 11456, topic New to Multiprocessing] terminating
[Process 18396, topic New to Asyncio] Loop interrupted
[Process 18396, topic New to Asyncio] terminating
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "C:\Miniconda3\envs\baseenv\lib\multiprocessing\util.py", line 310, in _exit_function
    p.join()
  File "C:\Miniconda3\envs\baseenv\lib\multiprocessing\process.py", line 121, in join
    res = self._popen.wait(timeout)
  File "C:\Miniconda3\envs\baseenv\lib\multiprocessing\popen_spawn_win32.py", line 81, in wait
    res = _winapi.WaitForSingleObject(int(self._handle), msecs)
KeyboardInterrupt
Sign up to request clarification or add additional context in comments.

1 Comment

Yeah, I actually have been experiencing the atexit._run_exitfuncs error. It's a good idea to prefix the print with PID so I can try debugging. Thanks for suggestion; didn't occur to me. Will post if I could find a solution. cheers!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.