1

I'm writing a function dealing with two WebSockets and the response of each WebSocket will change a shared DataFrame df.

import json
import asyncio
import websockets

@asyncio.coroutine
def printResponse(df, dataSocket, quoteSocket, dataRequest, quoteRequest):

    yield from dataSocket.send(dataRequest)
    yield from quoteSocket.send(quoteRequest)

    response = yield from dataSocket.recv()     # skip first response
    response = yield from quoteSocket.recv()    # skip first response

    while True:

        response = yield from dataSocket.recv()
        print("<< {}".format(json.loads(response)))
        df = changeRecord(df, response)

        response = yield from quoteSocket.recv()
        print("<< {}".format(json.loads(response)))
        df = changeRecord(df, response)

I'm not sure but current code seems to process two WebSockets in turns. I want to process response in a "first in first out" manner, regardless which WebSocket it comes from. How should I make changes to achieve this goal?

1 Answer 1

1

Because you are using the two yield from statements inside the same while loop it will process them in order and then repeat ad infinitum.

So it will always wait until it gets a response from dataSocket and then it will wait until it gets a response from quoteSocket, and then rinse and repeat.

Tasks() work well for what you are trying to do because they allow the coroutines to operate independently of each other. So if you start two separate coroutines in their own Task wrappers, each will then wait for their own next response without necessarily disturbing the other.

For example:

import json
import asyncio
import websockets

@asyncio.coroutine
def coroutine_1(df, dataSocket):
    yield from dataSocket.send(dataRequest)
    response = yield from dataSocket.recv()     # skip first response
    while True:
        response = yield from dataSocket.recv()
        print("<< {}".format(json.loads(response)))
        df = changeRecord(df, response)

@asyncio.coroutine
def coroutine_2(df, quoteSocket):
    yield from quoteSocket.send(quoteRequest)
    response = yield from quoteSocket.recv()    # skip first response
    while True:
        response = yield from quoteSocket.recv()
        print("<< {}".format(json.loads(response)))
        df = changeRecord(df, response)

@asyncio.coroutine
def printResponse(df, dataSocket, quoteSocket):

    websocket_task_1 = asyncio.ensure_future(coroutine_1(df, dataSocket))
    websocket_task_2 = asyncio.ensure_future(coroutine_2(df, quoteSocket))

    yield from asyncio.wait([websocket_task_1, websocket_task_2])
Sign up to request clarification or add additional context in comments.

5 Comments

In this structure, do dataSocket and quoteSocket work on same df or two separate df?
My goal is to make response from both websockets works on same df. The df updated by dataSocket should be available to quoteSocket's response.
I added a shared DataFrame tb. Now we have coroutine_1(tb, df, dataSocket) and coroutine_2(tb, df, quoteSocket). It turned out that df still works well, but updates on tb within coroutine_1 is not available to coroutine_2. Could you think of any potential causes result in this problem?
@kinreyli is the same tb instance being passed to both coroutines or are you possibly creating two of them separately?
Same tb. DataFrame alone doesn't work. I solved this problem by putting DataFrame in a list. Though I don't know why this can help, it does fix this bug.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.