I got a problem understanding asyncio workflows it seems...
EDIT - changed code to incorporate asyncio.Queue:
#!/usr/bin/env python
import asyncio
import websockets
import threading
class WSServer:
def serve_forever(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
start_server = websockets.serve(self.handler, '127.0.0.1', 5678)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
async def handler(self, websocket, path):
loop = asyncio.get_event_loop()
master = MyClass(websocket)
while True:
listener_task = asyncio.ensure_future(master.get_message())
producer_task = asyncio.ensure_future(master.produce())
done, pending = await asyncio.wait(
[listener_task, producer_task],
return_when=asyncio.FIRST_COMPLETED)
if listener_task in done:
await master.consume()
else:
listener_task.cancel()
if producer_task in done:
msg_to_send = producer_task.result()
await master.send_message(msg_to_send)
else:
producer_task.cancel()
class MyClass:
incoming = asyncio.Queue()
outgoing = asyncio.Queue()
def __init__(self, websocket):
self.ws = websocket
async def get_message(self):
msg_in = await self.ws.recv()
await self.incoming.put(msg_in)
async def send_message(self, message):
await self.ws.send(message)
async def consume(self):
msg_to_consume = await self.incoming.get()
# do something 'consuming' :)
consume_output = msg_to_consume
await self.outgoing.put(consume_output)
async def produce(self):
msg_out = await self.outgoing.get()
return msg_out
if __name__ == '__main__':
s = WSServer()
t = threading.Thread(target=s.serve_forever)
t.daemon = True
t.start()
while True:
asyncio.sleep(5)
When altering MyClass.consume() it's working (on one machine, on another one not lol), but with strange behavior:
async def consume(self):
msg_to_consume = await self.incoming.get()
# do something 'consuming' :)
consume_output = msg_to_consume
await self.outgoing.put('THIS WILL NOT GET INTO QUEUE???!!!')
print('Outgoing empty 1: ' + str(self.outgoing.empty()))
# And this will get into queue O.o
await self.outgoing.put(consume_output)
print('Outgoing empty 2: ' + str(self.outgoing.empty()))
I have two awaits because after calling self.outgoing.put() for the first time, self.outgoing queue is still empty! Only when I call it again it seems to receive the item... Any ideas?
Other machine just throws error:
Exception in connection handler
Traceback (most recent call last):
File "/usr/lib/python3/dist-packages/websockets/server.py", line 78, in handler
yield from self.ws_handler(self, path)
File "test2.py", line 33, in handler
msg_to_send = producer_task.result()
File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/usr/lib/python3.5/asyncio/tasks.py", line 241, in _step
result = coro.throw(exc)
File "test2.py", line 66, in produce
msg_out = await self.outgoing.get()
File "/usr/lib/python3.5/asyncio/queues.py", line 168, in get
yield from getter
File "/usr/lib/python3.5/asyncio/futures.py", line 361, in __iter__
yield self # This tells Task to wait for completion.
RuntimeError: Task <Task pending coro=<MyClass.produce() running at test2.py:66> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.5/asyncio/tasks.py:414]> got Future <Future pending> attached to a different loop
ORIGINAL:
I have this code, which is obviously not working as I intended :)
#!/usr/bin/env python
import asyncio
import websockets
import threading
class WSServer:
def serve_forever(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
start_server = websockets.serve(self.handler, '127.0.0.1', 5678)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
async def handler(self, websocket, path):
loop = asyncio.get_event_loop()
master = MyClass(websocket)
while True:
listener_task = asyncio.ensure_future(master.get_message())
producer_task = asyncio.ensure_future(master.produce())
done, pending = await asyncio.wait(
[listener_task, producer_task],
return_when=asyncio.FIRST_COMPLETED)
if listener_task in done:
await master.consume()
else:
listener_task.cancel()
if producer_task in done:
if producer_task.result():
await master.send_message()
else:
producer_task.cancel()
class MyClass:
incoming = []
outgoing = []
def __init__(self, websocket):
self.ws = websocket
async def get_message(self):
self.incoming.append(self.ws.recv())
async def send_message(self):
self.ws.send(self.outgoing.pop(0))
async def consume(self):
self.outgoing.append(self.incoming.pop(0))
async def produce(self):
if self.outgoing:
return True
if __name__ == '__main__':
s = WSServer()
t = threading.Thread(target=s.serve_forever)
t.daemon = True
t.start()
while True:
asyncio.sleep(5)
What I am trying to achieve:
Have WSServer instance running in separate thread from the main thread (working fine with WSServer.serve_forever)
For each client connected, in WSServer.handler method create MyClass instance which contains two lists - one for incoming messages, another for outgoing.
Incoming should be filled from MyClass.get_message() - basically websocket.recv()
Outgoing can be filled from MyClass.consume() - as a response, but can also be filled from outside of this code scope.
When there is something in MyClass.incoming, process it via MyClass.consume(), when there is something in Myclass.outgoing, process it via MyClass.send_message()
I am not sure about the MyClass.produce(), as I really don't need to produce anything, just send message in outgoing when there is some. I have seen also some codes using asycnio.Queue()
I found similar threads here, but their examples and problems are out of my comprehension to be honest:
Need help on producer and consumer thread in python
asyncio queue consumer coroutine
What should be the correct approach here?
awaitthe websocket coroutines:await self.ws.recv()/await self.ws.send([...]). See this example.awaitfor that. For this reason, the following part of your codewhile True: asyncio.sleep(5)does not actually sleep but instead creates a lot ofsleepcoroutines without using them.