1

so this is my code:

import asyncio
import logging
from asyncio import AbstractEventLoop

from aio_pika import connect, IncomingMessage


def test_one(a, b):
    print("test_one", a, b)


class Consumer:
    def __init__(self, url):
        self.url = url

    async def run(self, loop: AbstractEventLoop):
        while True:
            try:
                connection = await connect(self.url, loop=loop)
                connection.add_close_callback(test_one)
                connection.add_close_callback(self.test_two)

                # Creating a channel
                channel = await connection.channel()

                # Declaring queue
                queue = await channel.declare_queue("snapshots")

                logging.info("Started listening")

                # Start listening the queue with name 'hello'
                await queue.consume(self.on_message, no_ack=True)
                break
            except:
                logging.error("Could not connect")
            finally:
                await asyncio.sleep(1)

    def on_message(self, message: IncomingMessage):
        print(message.body)

    def test_two(self, a, b):
        print("closed", a, b)

My problem is when I disconnect it only calls test_one function, but it doesn't call test_two function inside the class. I don't understand. I tried only adding the test_two function but that didn't work either. Tried removing the parameters. Same issue. I'm out of ideas. Do you know what I do wrong?

btw the self.on_message does work.

4
  • I'm struggling with a similar problem. If I call a class function in Main, it works. If I call a static function in a module, it works. But if I call a class function in a module, it doesn't. Did you ever find a solution to your problem? Commented Jan 11, 2021 at 13:09
  • @VoteCoffee hi there, no I didn't find a solution and gave up. Later dicided to try again with another language. Commented Jan 13, 2021 at 6:33
  • Maybe try lambda function? Commented Jan 13, 2021 at 6:34
  • It turns out that the API I was using was creating a weak reference to the callback function, and that the underlying callback function handle was being destroyed before the API tried to use it. I worked around it by storing the callback function as a self._callbackfunction variable in the class before passing it to the API. Took a lot of work to figure it out! Commented Jan 13, 2021 at 12:48

1 Answer 1

1

It's possible the API is creating a weak reference to the callback function that it is being passed. Try creating a strong reference to the callback function before passing it.

self._cb_func = self.test_two
connection.add_close_callback(self._cb_func)

The full code:

import asyncio
import logging
from asyncio import AbstractEventLoop

from aio_pika import connect, IncomingMessage


def test_one(a, b):
    print("test_one", a, b)


class Consumer:
    def __init__(self, url):
        self.url = url

    async def run(self, loop: AbstractEventLoop):
        while True:
            try:
                connection = await connect(self.url, loop=loop)
                connection.add_close_callback(test_one)
                self._cb_func = self.test_two
                connection.add_close_callback(self._cb_func)

                # Creating a channel
                channel = await connection.channel()

                # Declaring queue
                queue = await channel.declare_queue("snapshots")

                logging.info("Started listening")

                # Start listening the queue with name 'hello'
                await queue.consume(self.on_message, no_ack=True)
                break
            except:
                logging.error("Could not connect")
            finally:
                await asyncio.sleep(1)

    def on_message(self, message: IncomingMessage):
        print(message.body)

    def test_two(self, a, b):
        print("closed", a, b)

If you have a lot of callback functions, then there is an answer in this question for storing them as an array: using python WeakSet to enable a callback functionality

Sign up to request clarification or add additional context in comments.

2 Comments

Didn't test it, since abondend the project months ago but I trust you it's the right solution :).
@DazDylz No worries, it was the solution for my issue which brought me to your similar question. Hopefully it helps someone down the line!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.