1

I have the following producer-consumer architecture:

  • A Websockets server that accepts connections. connected clients send data. The incoming data is put into a queue
  • A coroutine that reads from the queue and processes the incoming data

The Problem is, I'm "trapped inside the client handler" for a lack of a better word. I can't find a way to pass parameters to the client handler and thus am not able to access a Queue to forward data outward of the client_handler

here the code a arrived at so far

import asyncio
import websockets

# Websockets client Handler accepts data and puts it into queue
async def client_handler(websocket, path):
    print(f"Connected with path '{path}'")

    async for msg_rx in websocket:
        if not msg_rx:
            break

        print(f"RX: {msg_rx }")

        # TODO Add to Queue
        # HOW DO I ACCESS THE QUEUE?

    print(f"Disconnected from Path '{path}'")


async def task_ws_server(q):
    
    # TODO how do I pass q to the client handler???
    async with websockets.serve(client_handler, '127.0.0.1', 5001):
        await asyncio.Future()  # run forever

async def task_consumer(q):
    # get elements from Queue
    while True:
        data = await q.get()

        # Process them like storing to file or forward to other code
        print(data) # print as stand-in for more complex code

        q.task_done()


async def main():
    
    # Queue to allow moving data from client_handler to Task_consumer
    q = asyncio.Queue()

    # Start consumer task
    consumer = asyncio.create_task(task_consumer(q))

    # Start and run WS Server to handle incoming connections
    await asyncio.gather(*[
        asyncio.create_task(task_ws_server(q)),
    ])

    await q.join()
    consumer.cancel()


if __name__ == '__main__':

    asyncio.run(main())

I found one solution: move the queue declaration up to the top meaning the queue can be accessed inside of the async functions. I don't like this solution as it means i have to declare the client_handler locally or expose the queue in the global scope

2
  • What version of Python are you using? Commented Oct 29, 2021 at 7:29
  • I use Python 3.8 Commented Oct 29, 2021 at 7:33

2 Answers 2

1

You can have your client_handler take a queue as an argument, and use functools.partial to create a function that you can pass to websockets.serve

import functools

async def client_handler(websocket, path, queue):
    # Do something with queue
    pass

async def task_ws_server(q):
    queued_client_handler = functools.partial(client_handler, queue=q)

    async with websockets.serve(queued_client_handler, '127.0.0.1', 5001):
        await asyncio.Future()  # run forever

Sign up to request clarification or add additional context in comments.

1 Comment

thats a nice trick thanks. I don't think its the intended way of doing this but if it works its fine by me. If no better solution comes along ill flag this as the solution
0

I have found a Object Oriented solution. Through self you are able to access variables outside of the client_handler

import asyncio
import websockets

class CustomWebSocketServer():

    def __init__(self, host='127.0.0.1', port=5001):

        self.host = host
        self.port = port    

        self.queue_rx = None

        self.ws_clients = set()

    async def run(self):

        # Queue to allow moving data from client_handler to Task_consumer
        self.queue_rx = asyncio.Queue()

        # Start consumer task
        consumer = asyncio.create_task(self.task_consumer())

        # Start and run WS Server to handle incoming connections
        await websockets.serve(self.client_handler, '127.0.0.1', 5001)
        
        await asyncio.Future()  # run forever

        await self.queue_rx.join()
        consumer.cancel()

    # Websockets client Handler accepts data and puts it into queue
    async def client_handler(self, ws, path):
        print(f"Connected with path '{path}'")
        try:
            # Register ws client
            self.ws_clients.add(ws)

            async for msg_rx in ws:

                if not msg_rx:
                    break

                printf"RX: {msg_rx}")

                # Add to Queue
                await self.queue_rx.put(msg_rx)

        finally:

            self.ws_clients.remove(ws)

            print(f"Disconnected from Path '{path}'")


    async def task_consumer(self):
        print"task consumer start")
        # get elements from Queue
        while True:
            data = await self.queue_rx.get()

            # Process them like storing to file or forward to other code
            print(data) # print as stand-in for more complex code

            self.queue_rx.task_done()


if __name__ == '__main__':
    Server = CustomWebSocketServer()
    asyncio.run(Server.run())

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.