1

Hi I created async websocket client which I can receive and send messages asynchronously.

Here is the clint class that I use.

import websockets
import asyncio
from models import Heartbeat
from model_helper import ModelHelper
from json_helper import JSONHelper
from os.path import dirname, abspath
import logging
import time
waiting_time = 0.5

class WebSocketClient():

    def __init__(self, websocket_queue, configs):
        self.Heartbeat = Heartbeat().message
        self.websocket_queue = websocket_queue
        self.configs = configs

    def get_connection_string(self):
        server_url = self.configs.URL + ":" + str(self.configs.Port) + self.configs.Root
        return server_url


    async def connect(self):

        try:
            server_url = self.get_connection_string()
            self.connection = await websockets.client.connect(server_url)

            if self.connection.open:
                print("Connection stablished. Client correcly connected")
                # Send greeting
                await self.connection.send(self.Heartbeat)
                return self.connection

            else:
                print("Can not connect")

        except ConnectionRefusedError as err:
            print("Connection Error: {}".format(err))


    async def send_message_to_socket(self, connection):
        while True:
            message = self.websocket_queue.get_send_queue()
            try:
                if message is not None:
                    message_ = ModelHelper.to_outgoing_request_model(message)
                    await connection.send(message_)
                else:
                    await asyncio.sleep(waiting_time)
            except websockets.exceptions.ConnectionClosed:
                print('Connection with server closed')
                self.connect()


    async def receive_message_from_socket(self, connection):
        while True:
            try:
                message = await connection.recv()
                obj = JSONHelper.toObject(message)
                print("Received object from websocket: {}".format(obj))


                #If a websocket entry  has SendMessage in its Action property
                #Consider it as an sms content to be sent.
                if(obj.Action == "SendMessage"):
                    self.websocket_queue.add_received_queue(obj)

            except websockets.exceptions.ConnectionClosed:
                print('Connection with server closed1')


    async def send_heartbeat_to_socket(self, connection):
        while True:            
            #print("Heartbeat loop\n")
            try:
                await connection.send(self.Heartbeat)
                await asyncio.sleep(3)
            except websockets.exceptions.ConnectionClosed:
                print('Connection with server closed2')

And this is the code where I listen for websocket messages:

def listen_websocket_routine(sms_queue, websocket_queue):
    while True:
        time.sleep(waiting_time)
        #Check Websocket queue for new messages
        message = websocket_queue.get_received_queue()

        # If Incoming websocket JSON has SendMessage string in its Action attribute transform it to request_model
        if message is not None and send_sms.Action == "SendMessage":
            # Transform it to  outgoing_sms_model
            reset = ModelHelper.to_outgoing_sms_model(message.Data)
            # Add To send_queue of sms
            sms_queue.add_send_queue(reset)

finally How I initiate them with asyncio and threading.

client = WebSocketClient(websocket_queue, configs)

loop = asyncio.get_event_loop()
connection = loop.run_until_complete(client.connect())

task1 = asyncio.ensure_future(client.receive_message_from_socket(connection))
task2 = asyncio.ensure_future(client.send_heartbeat_to_socket(connection))
task3 = asyncio.ensure_future(client.send_message_to_socket(connection))


listen_websocket_thread = threading.Thread(target=listen_websocket_routine, args=(sms_queue, websocket_queue))

listen_websocket_thread.start()

loop.run_forever()

So my question is, whenever connetion breaks, I need to re establish the connection. But I am not sure where should I do that. Should it be before everytime I try to send message or receive or should I do that in more general approach?

1 Answer 1

6

Since update 10.0 you could do it with an async iterator, like this(example from the official doc):

async for websocket in websockets.connect(...):
    try:
        ...
    except websockets.ConnectionClosed:
        continue
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.