1

Good day!

I am trying to code a WebSocket connector and using asyncio. I am not that much familiar with asynchronous approaches therefore an incorrect behaviour occurs. Below is the simplified version of the code.

import pandas as pd
import json
import websockets
import asyncio
import time

class BinanceQuotesWS:    
    def __init__(self,client,pair):
        self.quotes = pd.DataFrame(columns=['Timestamp','Price'])
        self.pair = pair
        self.socket='wss://fstream.binance.com/ws'
        self.websocket = None
        self.loop = None
        self.result = None
        
    def get_quotes(self):
        return self.quotes
        
    def start(self):
        self.loop = asyncio.get_event_loop()        
        self.result = self.loop.create_task(self.connect())
        
    async def connect(self):
        self.websocket = await websockets.connect(self.socket)
        await self.subscribe_quotes()
        
    async def subscribe_quotes(self):
        subscribe_message = {
        "method": "SUBSCRIBE",
        "params":
        [
         self.pair.lower()+"@trade"
         ],
         "id": 1
         }
        subscribe_message = json.dumps(subscribe_message)
        await self.websocket.send(subscribe_message)
        async for msg in self.websocket:
            msg = json.loads(msg)    
            if('p' in msg):
                self.quotes.loc[0] = [msg['E'],float(msg['p'])]

temp_ws = BinanceQuotesWS(client,'BTCUSDT')
temp_ws.start()

When I am testing it in Jupyter and execute a cell with temp_ws.get_quotes() manually then every single time the correct dataframe with fresh quotes is returned.

Though in my program I need to have some infinite loop and there comes up an error.

while(True):
    quotes = temp_ws.get_quotes()
    print(quotes)
    time.sleep(3)

The quotes DF is always empty but I can't sort out why (probably because the while cycle is blocking). I will be glad if someone could help to sort out the issue (and give some hints if anything else could be improved in the code in terms of async requests). Thank you.

1
  • if you use async then you shuld run it in async loop - not in own normal loop. And you should use async.sleep for this. OR you would have to run one of loop in separated thread. Commented Apr 23, 2021 at 1:08

1 Answer 1

1

You could use asyncio.sleep to create async function

async def display(self):
    while True:
        await asyncio.sleep(3)
        quotes = self.get_quotes()
        print('time:', quotes['Timestamp'][0], 'price:', quotes['Price'][0])

and add it to loop

self.result2 = self.loop.create_task(self.display())

and then you can run all in the same loop

temp_ws.loop.run_forever()

If you not use run_forever() then it not run connect() - and you don't get values in your standard loop. But this loop has to runs all time and it can't runs at the same time with normal loop (which also has to run all the time). One of the loop would have to run in separated thread.

But await (whit asyncio.sleep) resolves problem. When it sleeps in while True then it goes to other functions and it can run other code - and later some other code uses await and then it can go back to while True.


Maybe in Jupyter it could work with run_forever() because they add many extra functions to make life easier (and elements used in Jupyter may need this loop to work correctly) but in normal program you have to use run_forever() manually.


Minimal working code:

import pandas as pd
import json
import websockets
import asyncio
import time

class BinanceQuotesWS:

    def __init__(self,client,pair):
        self.quotes = pd.DataFrame(columns=['Timestamp','Price'])
        self.pair = pair
        self.socket='wss://fstream.binance.com/ws'
        self.websocket = None
        self.loop = None
        self.result = None

    def get_quotes(self):
        return self.quotes

    def start(self):
        self.loop = asyncio.get_event_loop()
        self.result = self.loop.create_task(self.connect())
        self.result2 = self.loop.create_task(self.display())

    async def connect(self):
        self.websocket = await websockets.connect(self.socket)
        await self.subscribe_quotes()

    async def subscribe_quotes(self):
        subscribe_message = {
            "method": "SUBSCRIBE",
            "params": [
                 self.pair.lower()+"@trade"
             ],
            "id": 1
        }
         
        subscribe_message = json.dumps(subscribe_message)
        await self.websocket.send(subscribe_message)
        async for msg in self.websocket:
            msg = json.loads(msg)
            if('p' in msg):
                self.quotes.loc[0] = [msg['E'],float(msg['p'])]
                #print(self.quotes)

    async def display(self):
        while True:
            await asyncio.sleep(3)
            quotes = self.get_quotes()
            print('time:', quotes['Timestamp'][0], 'price:', quotes['Price'][0])

client = ''
temp_ws = BinanceQuotesWS(client,'BTCUSDT')
temp_ws.start()
temp_ws.loop.run_forever()
Sign up to request clarification or add additional context in comments.

1 Comment

Thank you. For some reason there appeared an error RuntimeError: This event loop is already running in python but I solved it with import nest_asyncio nest_asyncio.apply()

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.