1

I want to update Queue with several asyncio I receive data from each A,B,C( using websocket and "while true") and then i want to put in the queue and all the provider will be able to write in the same Queue ( I know that maybe i need to use multiThread or something else but i dont find the right way

**if __name__ == '__main__':
global_queue = queue.Queue()
asyncio.run(A_Orderbook.data_stream(global_queue))
asyncio.run(B_Orderbook.data_stream(global_queue))
asyncio.run(C_Orderbook.data_stream(global_queue))
print(global_queue.qsize())**

Thks

2
  • 1
    You probably don’t want to run each coroutine in a separate event loop. You may also want to consider using an asyncio Queue instead. Commented Aug 10, 2022 at 17:19
  • Any feedback please? Commented Aug 14, 2022 at 13:33

1 Answer 1

1

You can do it the following way:

import asyncio


async def worker(worker_name: str, q: asyncio.Queue):
    """Produces tasks for consumer."""
    for i in range(1, 6):
        await asyncio.sleep(1)
        await q.put(f"{worker_name}-{i}")


async def consumer(q: asyncio.Queue):
    """Consumes tasks from workers."""
    while True:
        item = await q.get()
        await asyncio.sleep(1)
        print(item)
        # we need it to ensure that all tasks were done
        q.task_done()   


async def main_wrapper():
    """Main function - entry point of our async app."""
    q = asyncio.Queue()
    # we do not need to await the asyncio task it is run in "parallel"
    asyncio.create_task(consumer(q))  
    await asyncio.gather(*[worker(f"w{i}", q) for i in range(1, 5)])  # create worker-tasks
    await q.join()  # we wait until asyncio.create_task(consumer(q)) consume all tasks
    print("All DONE !")

if __name__ == '__main__':
    asyncio.run(main_wrapper())


Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.