Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue with using RobustQueue.consume() in latest version vs 9.0.7 #626

Open
MDHD99 opened this issue Apr 3, 2024 · 2 comments
Open

Issue with using RobustQueue.consume() in latest version vs 9.0.7 #626

MDHD99 opened this issue Apr 3, 2024 · 2 comments

Comments

@MDHD99
Copy link

MDHD99 commented Apr 3, 2024

There seems to be a change in the way RobustQueue.consume() works when used in connection pooling scenarios in version 9.4.1 (latest atm). The RobustChannel and the declared RobustQueue seem to be getting restored, but the consumers inside the declared queue are not getting restored after a broker restart.

Here is an example to reproduce:

publisher.py

import asyncio
import aio_pika
from aio_pika.abc import AbstractRobustConnection
from aio_pika.pool import Pool

async def get_connection() -> AbstractRobustConnection:
    return await aio_pika.connect_robust("<amqp_url>")

async def main() -> None:
    connection_pool: Pool = Pool(get_connection, max_size=2)

    async def get_channel() -> aio_pika.Channel:
        async with connection_pool.acquire() as connection:
            return await connection.channel()

    channel_pool: Pool = Pool(get_channel, max_size=10)
    queue_name = "pool_queue2"

    async def publish(message: str) -> None:
        async with channel_pool.acquire() as channel:  # type: aio_pika.Channel
            await channel.default_exchange.publish(
                aio_pika.Message(message.encode()),
                queue_name,
            )

    async with connection_pool, channel_pool:
        while True:
            user_input = input("Enter 'y' to send a message or any other key to exit: ")
            if user_input.strip().lower() == 'y':
                await publish("Hello, World!")
            else:
                break

if __name__ == "__main__":
    asyncio.run(main())

consumer.py

import asyncio
import aio_pika
from aio_pika.abc import AbstractRobustConnection
from aio_pika.pool import Pool

async def get_connection() -> AbstractRobustConnection:
    return await aio_pika.connect_robust("<amqp_url>")

async def process_message(
    message: aio_pika.abc.AbstractIncomingMessage,
) -> None:
    async with message.process():
        print(message.body)
        await asyncio.sleep(1)

async def main() -> None:
    connection_pool: Pool = Pool(get_connection, max_size=2)

    async def get_channel() -> aio_pika.Channel:
        async with connection_pool.acquire() as connection:
            return await connection.channel()

    channel_pool: Pool = Pool(get_channel, max_size=10)
    queue_name = "pool_queue2"

    async def consume() -> None:
        async with channel_pool.acquire() as channel:  # type: aio_pika.Channel
            await channel.set_qos(10)

            queue = await channel.declare_queue(
                queue_name, durable=True, auto_delete=False,
            )

            await queue.consume(process_message)

    async with connection_pool, channel_pool:
        await consume()
        # Add an infinite loop to keep the program running until manually stopped
        while True:
            await asyncio.sleep(1)

if __name__ == "__main__":
    asyncio.run(main())

In the above example, the consumption will not get restored after a broker restart, but if we use an older version fe. 9.0.7 it seems to work fine.

!!! Additional Notes!!!

  1. Using the RobustQueueIterator seems to solve the issue in both versions (9.0.7 and latest). Replace await queue.consume(process_message) with:
           async with queue.iterator() as queue_iter:
                async for message in queue_iter:  # type: aio_pika.abc.AbstractIncomingMessage
                    await callback(message)

But I am not sure whether or not to switch to this approach since it is not mentioned in the docs what the difference is between using RobustQueueIterator, and between using simply RobustQueue.consume() to handle incoming messages
2. I am using this version of RabbitMQ rabbitmq:3.8-management-alpine

@Darsstar
Copy link
Contributor

Darsstar commented Apr 16, 2024

You don't have a strong reference to queue, so it gets Garbage Collected and when it is time to .restore() the queues it is no longer in the relevant weakset.

QueueIterator keeps a strong reference to its queue.

Edit: 663807c is the first commit after the 9.0.7 release.

@raqbit
Copy link

raqbit commented May 14, 2024

I'm also hitting this, but even in 9.0.7, no more messages are getting through after a broker restart.

As a workaround I'm now using .consume(). But keep in mind you need to hold a reference to the Channel & Queue object to prevent hitting #594

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants