3

I'd like to encapsulate the functionality of the python websockets package into a class, representing a sensor coordinator. The aim of this is to allow me to create a coordinator object, and only have the server persist for as long as it is needed. Unfortunately, I have not been able to find any similar examples of this online and have so far struggled.

My code is as follows:

import asyncio
import json
import logging
import websockets

logging.basicConfig()


class Coordinator(object):

    def __init__(self, host='localhost', port=8080):
        self.host = host
        self.port = port

        self.running = False

        self.server = None
        self.sensors = set()

    def __enter__(self):
        self.server = websockets.serve((self.ws_handler, self.host, self.port))
        self.running = True

    def __exit__(self, exc_type, exc_val, exc_tb):
        # Gracefully stop serving
        self.running = False
        pass

    def sensors_event(self):
        return json.dumps({'type': 'sensors', 'count': len(self.sensors)})

    async def notify_sensors(self):
        if self.sensors:
            message = self.sensors_event()
            await asyncio.wait([user.send(message) for user in self.sensors])

    async def register(self, websocket):
        self.sensors.add(websocket)
        await self.notify_sensors()

    async def unregister(self, websocket):
        self.sensors.remove(websocket)
        await self.notify_sensors()

    async def ws_handler(self, websocket):
        try:
            await self.register(websocket)
            pass

        finally:
            await self.unregister(websocket)


if __name__ == '__main__':
    with Coordinator() as coordinator:
        pass

At the moment it would appear that the websocket server does not start, as it is not visible on netstat.

Would it be possible to run the server in a separate (demonised) thread, held by the coordinator object?

Thanks

4
  • what OS are you on? (I suggest adding that OS as a tag to your question) Commented Jan 31, 2019 at 18:50
  • Mac OS Mojave, using Python 3.6.8 Commented Jan 31, 2019 at 18:51
  • "this does not work" should be unpacked as well. You want to show what is actually happening and compare that to the expected behavior. Commented Jan 31, 2019 at 19:03
  • Updated, thanks for the constructive comments. Please let me know if there's anything else I can add :) Commented Jan 31, 2019 at 19:09

2 Answers 2

3

From the high-level documentation:

The websockets.server module defines a simple WebSocket server API.

serve() returns an awaitable. Awaiting it yields an instance of WebSocketServer which provides close() and wait_closed() methods for terminating the server and cleaning up its resources.

On Python ≥ 3.5, serve() can also be used as an asynchronous context manager. In this case, the server is shut down when exiting the context.

As @user4815162342 already identified, the main issue is that you do not await the call to the serve() coroutine.

Since you're using Python v3.6.8 you can use the asynchronous context manager to simplify the implementation. The benefit of this is that you do not need to worry about handling shutdown, since it is handled automatically. Here's an object oriented implementation of a simple echo server.

import asyncio
import signal
import websockets

class Server(object):

    def __init__(self, host, port):
        self.host, self.port = host, port
        self.loop = asyncio.get_event_loop()

        self.stop = self.loop.create_future()
        self.loop.add_signal_handler(signal.SIGINT, self.stop.set_result, None)

        self.loop.run_until_complete(self.server())

    async def server(self):
        async with websockets.serve(self.ws_handler, self.host, self.port):
            await self.stop

    async def ws_handler(self, websocket, path):
        msg = await websocket.recv()
        print(f'Received: {msg}')

        await websocket.send(msg)
        print(f'Sending: {msg}')


if __name__ == '__main__':
    server = Server(host='localhost', port=6789)

At the moment, this will run until the user sends an interrupt, but you can adjust the stop future to suit.

Sign up to request clarification or add additional context in comments.

Comments

0

Your code has two issues.

  • You never start the asyncio main loop, so asyncio has no chance of ever running. In other words, you need to have loop.run_until_complete(x) somewhere in your code.

  • start_server is a coroutine, so you must await it.

A fixed version of the code (but untested) might look like this:

class Coordinator(object):
    def __init__(self, host='localhost', port=8080):
        self.host = host
        self.port = port

        self.running = False

        self.server = None
        self.sensors = set()

    async def __aenter__(self):
        self.server = await websockets.serve((self.ws_handler, self.host, self.port))
        self.running = True

    def __aexit__(self, exc_type, exc_val, exc_tb):
        # Gracefully stop serving
        self.running = False

    def sensors_event(self):
        return json.dumps({'type': 'sensors', 'count': len(self.sensors)})

    async def notify_sensors(self):
        if self.sensors:
            message = self.sensors_event()
            await asyncio.wait([user.send(message) for user in self.sensors])

    async def register(self, websocket):
        self.sensors.add(websocket)
        await self.notify_sensors()

    async def unregister(self, websocket):
        self.sensors.remove(websocket)
        await self.notify_sensors()

    async def ws_handler(self, websocket):
        try:
            await self.register(websocket)
        finally:
            await self.unregister(websocket)

async def main():
    async with Coordinator() as coordinator:
        pass


if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())

It will be much easier to use asyncio if you take the time to go through a tutorial that covers basic asyncio concepts, such as running the main loop.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.