3

I have read how to add a coroutine to a running asyncio loop? but it's not I want

basically, I need a daemon thread to subscribe a redis channel, and I can add a callback method dynamic, my solution is subclass a Thread class, and create an event loop and run forever, but after loop running, I can not call any method of the object,

redis.py

#! /usr/bin/env python
# -*- coding: utf-8 -*-
import redis
import os
import asyncio
import aioredis
from threading import Thread
from collections import defaultdict

assert os.getenv('REDIS_HOST') is not None
assert os.getenv('REDIS_PORT') is not None

class RedisClient(Thread):
    def __init__(self, loop):
        super(RedisClient, self).__init__()
        self.callbacks = defaultdict(list)
        self.channels = {}
        self.loop = loop

    async def pubsub(self):
        address = 'redis://{}:{}'.format(os.getenv('REDIS_HOST'), os.getenv('REDIS_PORT'))
        self.sub = await aioredis.create_redis(address)

    def sync_add_callback(self, channel, callback):
        self.loop.create_task(self.add_callback(channel, callback))

    async def add_callback(self, channel, callback):
        self.callbacks[channel].append(callback)

        if channel not in self.channels or self.channels[channel] is None:
            channels = await self.sub.subscribe(channel)
            ch1 = channels[0]
            assert isinstance(ch1, aioredis.Channel)
            self.channels[channel] = ch1

            async def async_reader(channel):
                while await channel.wait_message():
                    msg = await channel.get(encoding='utf-8')
                    # ... process message ...
                    print(msg)
                    print(channel.name)
                    for c in self.callbacks[channel.name.decode('utf-8')]:
                        c(channel.name, msg)

            tsk1 = asyncio.ensure_future(async_reader(ch1))

    def remove_callback(self, channel, callback):
        self.callbacks[channel].remove(callback)

    def run(self):
        asyncio.set_event_loop(self.loop)
        loop.run_until_complete(self.pubsub())


# Create the new loop and worker thread
loop = asyncio.new_event_loop()
redis_client = RedisClient(loop)
redis_client.start()

usage:

def test(channel, msg):
    print('{}{}'.format(channel, msg))

from redis import redis_client
redis_client.sync_add_callback('test', test)

maybe my solution is not a good practice of Python?

Update 1:

I have tried a solution and it works well, but in the beginning, I want to reuse the sub instance, this method can work as a module to subscribe to different channel, but every subscribe should have it's own sub, or that is to say every subscribe have to create it own redis connection

the solution:

#! /usr/bin/env python
# -*- coding: utf-8 -*-
import os
import asyncio
import aioredis
from threading import Thread

assert os.getenv('REDIS_HOST') is not None
assert os.getenv('REDIS_PORT') is not None

class RedisClient(Thread):
    def __init__(self, channel, callback, *args, **kwargs):
        super(RedisClient, self).__init__(*args, **kwargs)
        self.daemon = True
        self.channel = channel
        self.callback = callback

    async def pubsub(self):
        address = 'redis://{}:{}'.format(os.getenv('REDIS_HOST'), os.getenv('REDIS_PORT'))
        sub = await aioredis.create_redis(address)

        channels = await sub.subscribe(self.channel)
        ch1 = channels[0]
        assert isinstance(ch1, aioredis.Channel)

        async def async_reader(channel):
            while await channel.wait_message():
                msg = await channel.get(encoding='utf-8')
                self.callback(channel.name.decode('utf-8'), msg)

        await async_reader(ch1)

    def run(self):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(self.pubsub())

update 2:

finally, it works well

#! /usr/bin/env python
# -*- coding: utf-8 -*-
import redis
import os
import asyncio
import aioredis
from threading import Thread
from collections import defaultdict

assert os.getenv('REDIS_HOST') is not None
assert os.getenv('REDIS_PORT') is not None

class RedisClient(Thread):
    def __init__(self, loop):
        super(RedisClient, self).__init__()
        self.callbacks = defaultdict(list)
        self.channels = {}
        self.loop = loop
        self.sub = None

    async def pubsub(self):
        print('test3')
        address = 'redis://{}:{}'.format(os.getenv('REDIS_HOST'), os.getenv('REDIS_PORT'))
        self.sub = await aioredis.create_redis(address)

    def sync_add_callback(self, channel, callback):
        print('ahhhhhhhhh')
        asyncio.run_coroutine_threadsafe(self.add_callback(channel, callback), self.loop)

    async def add_callback(self, channel, callback):
        print('test2')
        if not self.sub:
            await self.pubsub()
        self.callbacks[channel].append(callback)

        if channel not in self.channels or self.channels[channel] is None:
            channels = await self.sub.subscribe(channel)
            ch1 = channels[0]
            assert isinstance(ch1, aioredis.Channel)
            self.channels[channel] = ch1

            async def async_reader(channel):
                while await channel.wait_message():
                    msg = await channel.get(encoding='utf-8')
                    # ... process message ...
                    print(msg)
                    print(channel.name)
                    print(self.callbacks[channel.name])
                    for c in self.callbacks[channel.name.decode('utf-8')]:
                        c(channel.name, msg)

            tsk1 = asyncio.ensure_future(async_reader(ch1))

    def remove_callback(self, channel, callback):
        self.callbacks[channel].remove(callback)

    def run(self):
        asyncio.set_event_loop(self.loop)
        loop.run_forever()


# Create the new loop and worker thread
loop = asyncio.new_event_loop()
redis_client = RedisClient(loop)
redis_client.start()
3
  • What method are you unable to call, and on which object? Commented Nov 11, 2018 at 19:36
  • sorry, have added about usage Commented Nov 12, 2018 at 1:54
  • How would you handle SIGINT SIGTERM Ctrl + C and keyboard interrupts in this Commented Feb 21, 2019 at 8:13

2 Answers 2

1

let me show you a similarly case using aiohttp here.

async def listen_to_redis(app):
    try:
        sub = await aioredis.create_redis(('localhost', 6379), loop=app.loop)
        ch, *_ = await sub.subscribe('news')
        async for msg in ch.iter(encoding='utf-8'):
            # Forward message to all connected websockets:
            for ws in app['websockets']:
                ws.send_str('{}: {}'.format(ch.name, msg))
    except asyncio.CancelledError:
        pass
    finally:
        await sub.unsubscribe(ch.name)
        await sub.quit()


async def start_background_tasks(app):
    app['redis_listener'] = app.loop.create_task(listen_to_redis(app))


async def cleanup_background_tasks(app):
    app['redis_listener'].cancel()
    await app['redis_listener']


app = web.Application()
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
web.run_app(app)
Sign up to request clarification or add additional context in comments.

2 Comments

can you add listener after run_app? for my example, I use flask for web app, the app is sync so have no event loop, so I create a loop, and after run the loop, I want to add a listener to the sub to reuse the sub, for now, I must add listener before run, I want to add listener dynamic, that is add listener after loop run
@KevinGuo I do think you should give aiohttp a try , because it perfect for your case, and it has better performance in production. It is very easy transform flask code to aiohttp medium.com/@SkyscannerEng/from-flask-to-aiohttp-22f1ddc5dd5e
0

If the idea is for sync_add_callback to be invoked from other threads, then its implementation should look like this:

def sync_add_callback(self, channel, callback):
    asyncio.run_coroutine_threadsafe(self.add_callback(channel, callback), self.loop)

Please note that the callbacks will be invoked in the event loop thread, so they should not use blocking calls themselves.

9 Comments

sync_add_callback seems cannot be called from main thread :(
@KevinGuo What happens when you call it from the main thread? If you get a traceback, please post it to pastebin.com or equivalent.
my fault, yesterday I run the loop on the main thread, so when I call sync_add_callback from the main thread, nothing happened.
now I run the loop in thread obj's run method, sync_add_callback can call from method, but await in add_callback method never excute, code here gist.github.com/cielpy/3164074a34b8d017becd0ef60df29c91,it seems event loop's run* method will block the thread?
@KevinGuo The exception occurs, but in the other thread. You need to collect it by requesting the result() from the future object returned by asyncio.run_coroutine_threadsafe(...).
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.