4

I got a problem understanding asyncio workflows it seems...

EDIT - changed code to incorporate asyncio.Queue:

#!/usr/bin/env python

import asyncio
import websockets
import threading


class WSServer:

    def serve_forever(self):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        start_server = websockets.serve(self.handler, '127.0.0.1', 5678)
        asyncio.get_event_loop().run_until_complete(start_server)
        asyncio.get_event_loop().run_forever()

    async def handler(self, websocket, path):
        loop = asyncio.get_event_loop()
        master = MyClass(websocket)
        while True:
            listener_task = asyncio.ensure_future(master.get_message())
            producer_task = asyncio.ensure_future(master.produce())
            done, pending = await asyncio.wait(
                [listener_task, producer_task],
                return_when=asyncio.FIRST_COMPLETED)

            if listener_task in done:
                await master.consume()
            else:
                listener_task.cancel()

            if producer_task in done:
                msg_to_send = producer_task.result()
                await master.send_message(msg_to_send)
            else:
                producer_task.cancel()


class MyClass:

    incoming = asyncio.Queue()
    outgoing = asyncio.Queue()

    def __init__(self, websocket):
        self.ws = websocket

    async def get_message(self):
        msg_in = await self.ws.recv()
        await self.incoming.put(msg_in)

    async def send_message(self, message):
        await self.ws.send(message)

    async def consume(self):
        msg_to_consume = await self.incoming.get()
        # do something 'consuming' :)
        consume_output = msg_to_consume
        await self.outgoing.put(consume_output)

    async def produce(self):
        msg_out = await self.outgoing.get()
        return msg_out


if __name__ == '__main__':
    s = WSServer()
    t = threading.Thread(target=s.serve_forever)
    t.daemon = True
    t.start()
    while True:
        asyncio.sleep(5)

When altering MyClass.consume() it's working (on one machine, on another one not lol), but with strange behavior:

async def consume(self):
    msg_to_consume = await self.incoming.get()
    # do something 'consuming' :)
    consume_output = msg_to_consume
    await self.outgoing.put('THIS WILL NOT GET INTO QUEUE???!!!')
    print('Outgoing empty 1: ' + str(self.outgoing.empty()))
    # And this will get into queue O.o
    await self.outgoing.put(consume_output)
    print('Outgoing empty 2: ' + str(self.outgoing.empty()))

I have two awaits because after calling self.outgoing.put() for the first time, self.outgoing queue is still empty! Only when I call it again it seems to receive the item... Any ideas?

Other machine just throws error:

Exception in connection handler
Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/websockets/server.py", line 78, in handler
    yield from self.ws_handler(self, path)
  File "test2.py", line 33, in handler
    msg_to_send = producer_task.result()
  File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/lib/python3.5/asyncio/tasks.py", line 241, in _step
    result = coro.throw(exc)
  File "test2.py", line 66, in produce
    msg_out = await self.outgoing.get()
  File "/usr/lib/python3.5/asyncio/queues.py", line 168, in get
    yield from getter
  File "/usr/lib/python3.5/asyncio/futures.py", line 361, in __iter__
    yield self  # This tells Task to wait for completion.
RuntimeError: Task <Task pending coro=<MyClass.produce() running at test2.py:66> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.5/asyncio/tasks.py:414]> got Future <Future pending> attached to a different loop

ORIGINAL:

I have this code, which is obviously not working as I intended :)

#!/usr/bin/env python

import asyncio
import websockets
import threading


class WSServer:

    def serve_forever(self):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        start_server = websockets.serve(self.handler, '127.0.0.1', 5678)
        asyncio.get_event_loop().run_until_complete(start_server)
        asyncio.get_event_loop().run_forever()

    async def handler(self, websocket, path):
        loop = asyncio.get_event_loop()
        master = MyClass(websocket)
        while True:
            listener_task = asyncio.ensure_future(master.get_message())
            producer_task = asyncio.ensure_future(master.produce())
            done, pending = await asyncio.wait(
                [listener_task, producer_task],
                return_when=asyncio.FIRST_COMPLETED)

            if listener_task in done:
                await master.consume()
            else:
                listener_task.cancel()

            if producer_task in done:
                if producer_task.result():
                    await master.send_message()
            else:
                producer_task.cancel()


class MyClass:

    incoming = []
    outgoing = []

    def __init__(self, websocket):
        self.ws = websocket

    async def get_message(self):
        self.incoming.append(self.ws.recv())

    async def send_message(self):
        self.ws.send(self.outgoing.pop(0))

    async def consume(self):
        self.outgoing.append(self.incoming.pop(0))

    async def produce(self):
        if self.outgoing:
            return True


if __name__ == '__main__':
    s = WSServer()
    t = threading.Thread(target=s.serve_forever)
    t.daemon = True
    t.start()
    while True:
        asyncio.sleep(5)

What I am trying to achieve:

  1. Have WSServer instance running in separate thread from the main thread (working fine with WSServer.serve_forever)

  2. For each client connected, in WSServer.handler method create MyClass instance which contains two lists - one for incoming messages, another for outgoing.

  3. Incoming should be filled from MyClass.get_message() - basically websocket.recv()

  4. Outgoing can be filled from MyClass.consume() - as a response, but can also be filled from outside of this code scope.

  5. When there is something in MyClass.incoming, process it via MyClass.consume(), when there is something in Myclass.outgoing, process it via MyClass.send_message()

I am not sure about the MyClass.produce(), as I really don't need to produce anything, just send message in outgoing when there is some. I have seen also some codes using asycnio.Queue()

I found similar threads here, but their examples and problems are out of my comprehension to be honest:

Need help on producer and consumer thread in python

asyncio queue consumer coroutine

What should be the correct approach here?

6
  • You have to await the websocket coroutines: await self.ws.recv() / await self.ws.send([...]). See this example. Commented Nov 21, 2016 at 15:07
  • I have gone through the standard websockets examples, wasn't much of change. If I await a recv/send part or parent func which is calling it, isn't it the same? Oh, and you can't use await inside of async function. Commented Nov 21, 2016 at 15:18
  • Calling a coroutine does not run it, you have to use await for that. For this reason, the following part of your code while True: asyncio.sleep(5) does not actually sleep but instead creates a lot of sleep coroutines without using them. Commented Nov 21, 2016 at 15:30
  • Hi Vincent, but that while true is in the Main Thread, running on it's own. Websocket server with everything is running in Thread-1. That is why I have : loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) Commented Nov 21, 2016 at 16:00
  • Yes, I'm simply pointing out some mistakes in the way you handle coroutines. The asyncio user documentation might give you some insight (also, see the producer/consumer section). Commented Nov 21, 2016 at 16:19

1 Answer 1

1

Found the answer thanks to the help from python chat room.

class MyClass:

    def __init__(self, websocket):
        self.ws = websocket
        self.incoming = asyncio.Queue()
        self.outgoing = asyncio.Queue()

Queue should be defined for the instance of class, not class itself.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.