I have the following producer-consumer architecture:
- A Websockets server that accepts connections. connected clients send data. The incoming data is put into a queue
- A coroutine that reads from the queue and processes the incoming data
The Problem is, I'm "trapped inside the client handler" for a lack of a better word. I can't find a way to pass parameters to the client handler and thus am not able to access a Queue to forward data outward of the client_handler
here the code a arrived at so far
import asyncio
import websockets
# Websockets client Handler accepts data and puts it into queue
async def client_handler(websocket, path):
print(f"Connected with path '{path}'")
async for msg_rx in websocket:
if not msg_rx:
break
print(f"RX: {msg_rx }")
# TODO Add to Queue
# HOW DO I ACCESS THE QUEUE?
print(f"Disconnected from Path '{path}'")
async def task_ws_server(q):
# TODO how do I pass q to the client handler???
async with websockets.serve(client_handler, '127.0.0.1', 5001):
await asyncio.Future() # run forever
async def task_consumer(q):
# get elements from Queue
while True:
data = await q.get()
# Process them like storing to file or forward to other code
print(data) # print as stand-in for more complex code
q.task_done()
async def main():
# Queue to allow moving data from client_handler to Task_consumer
q = asyncio.Queue()
# Start consumer task
consumer = asyncio.create_task(task_consumer(q))
# Start and run WS Server to handle incoming connections
await asyncio.gather(*[
asyncio.create_task(task_ws_server(q)),
])
await q.join()
consumer.cancel()
if __name__ == '__main__':
asyncio.run(main())
I found one solution: move the queue declaration up to the top meaning the queue can be accessed inside of the async functions. I don't like this solution as it means i have to declare the client_handler locally or expose the queue in the global scope