5

I am trying to implement a basic websocket client using asyncio and websockets with Python 3.5.2.

Basically, I want connect_to_dealer to be a blocking call, but wait for the websocket message on a different thread.

After reading some docs (I have very little exp with Python), I concluded that asyncio.ensure_future() passing a coroutine (listen_for_message) was the way to go.

Now, I get to run listen_for_message on a different thread, but from within the coroutine I can't seem to use await or any other mechanism to make the calls synchronous. If I do it, the execution waits forever (it hangs) even for a simple sleep.

I'd like to know what I'm doing wrong.

async def listen_for_message(self, future, websocket):
    while (True):
        try:
            await asyncio.sleep(1) # It hangs here
            print('Listening for a message...')
            message = await websocket.recv() # If I remove the sleep, hangs here
            print("< {}".format(message))
            future.set_result(message)
            future.done()
        except websockets.ConnectionClosed as cc:
            print('Connection closed')
        except Exception as e:
            print('Something happened')

def handle_connect_message(self, future):
    # We must first remove the websocket-specific payload because we're only interested in the connect protocol msg
    print(future.result)

async def connect_to_dealer(self):
    print('connect to dealer')
    websocket = await websockets.connect('wss://mywebsocket'))
    hello_message = await websocket.recv()
    print("< {}".format(hello_message))
    # We need to parse the connection ID out of the message
    connection_id = hello_message['connectionId']
    print('Got connection id {}'.format(connection_id))
    sub_response = requests.put('https://subscribetotraffic{user_id}?connection={connection_id}'.format(user_id='username', connection_id=connection_id), headers=headers)
    if sub_response.status_code == 200:
        print('Now we\'re observing traffic')
    else:
        print('Oops request failed with {code}'.format(code=sub_response.status_code))
    # Now we need to handle messages but continue with the regular execution
    try:
        future = asyncio.get_event_loop().create_future()
        future.add_done_callback(self.handle_connect_message)
        asyncio.ensure_future(self.listen_for_message(future, websocket))
    except Exception as e:
        print(e)

1 Answer 1

5

Is there a specific reason you need to work with explicit futures?

With asyncio you can use a combination of coroutines and Tasks to achieve most purposes. Tasks are essentially wrapped coroutines that go about cranking themselves over in the background, independently of other async code, so you don't have to explicitly manage their flow or juggle them with other bits of code.

I am not entirely sure of your end goal, but perhaps the approach elaborated below gives you something to work with:

import asyncio

async def listen_for_message(websocket):

    while True:

        await asyncio.sleep(0)

        try:
            print('Listening for a message...')
            message = await websocket.recv()

            print("< {}".format(message))

        except websockets.ConnectionClosed as cc:
            print('Connection closed')

        except Exception as e:
            print('Something happened')


async def connect_to_dealer():

    print('connect to dealer')
    websocket = await websockets.connect('wss://mywebsocket')

    hello_message = await websocket.recv()
    print("< {}".format(hello_message))

    # We need to parse the connection ID out of the message
    connection_id = hello_message['connectionId']
    print('Got connection id {}'.format(connection_id))

    sub_response = requests.put('https://subscribetotraffic{user_id}?connection={connection_id}'.format(
        user_id='username', connection_id=connection_id), headers=headers)

    if sub_response.status_code == 200:
        print('Now we\'re observing traffic')
    else:
        print('Oops request failed with {code}'.format(code=sub_response.status_code))


async def my_app():

    # this will block until connect_to_dealer() returns
    websocket = await connect_to_dealer()

    # start listen_for_message() in its own task wrapper, so doing it continues in the background
    asyncio.ensure_future(listen_for_message(websocket))

    # you can continue with other code here that can now coexist with listen_for_message()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(my_app())
    loop.run_forever() 
Sign up to request clarification or add additional context in comments.

4 Comments

Hi shongolo, I did not need to work with explicit futures, but I read it on the docs and just used it. I solved my problem thanks to your suggestions, I think the key was not to call asyncio.ensure_future(listen_for_message(websocket)) from within the connect_to_dealer() coroutine.
Glad it triggered a solution. It is OK to call ensure_future() from another coroutine but somewhere in your code you need to drive all the coroutines using loop.run_until_complete() and, if necessary (e.g. with tasks) loop.run_forever()
@songololo what is the reason for the await asyncio.sleep(0)?
@walksignnison It gives the event loop a foothold to juggle between coroutines. i.e. without the await statement the event loop will get stuck inside the while loop. All coroutines need await statements so that the event loop can switch between coroutines (or otherwise need to return and get out of the way).

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.