Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to consume msg by sub coroutine? #624

Open
Cherrymelon opened this issue Mar 31, 2024 · 1 comment
Open

How to consume msg by sub coroutine? #624

Cherrymelon opened this issue Mar 31, 2024 · 1 comment

Comments

@Cherrymelon
Copy link

Cherrymelon commented Mar 31, 2024

I use queue.consume but it seems work in sequence,I want that main thread create a new sub coroutine for handle msg when new msg coming
here is my code
consumer.py

import asyncio
import os

import aio_pika
import django

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'HelloWorld.HelloWorld.settings')
django.setup()
from django.conf import settings as s
from HelloWorld.HelloWorld.log import logger


async def on_message(message):
    # print(f" [x] Received message {message!r}")
    print(f"     Message body is: {message.body!r}")
    if message.body == b'2':
        print('finish')
        await asyncio.sleep(1)
    await message.ack()


async def main():
    conn = await aio_pika.connect(s.RABBIT_MQ)

    channel = await conn.channel()
    await channel.set_qos(prefetch_count=1)

    queue = await channel.declare_queue(name=s.MQ_QUEUE, durable=True)
    exchange = await channel.declare_exchange('direct')
    await queue.bind(exchange, routing_key=s.MQ_ROUTE_KEY)

    await queue.consume(on_message)


if __name__ == '__main__':
    logger.info('start consumer')
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.run_forever()

producer.py

import asyncio
import os

import aio_pika
import django

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'HelloWorld.HelloWorld.settings')
django.setup()
from django.conf import settings as s
from HelloWorld.HelloWorld.log import logger


async def pub(mq_queue, mq_route_key, loop):
    conn = await aio_pika.connect_robust(s.RABBIT_MQ, loop=loop)
    channel = await conn.channel()
    # Declaring exchange
    exchange = await channel.declare_exchange("direct")

    queue = await channel.declare_queue(mq_queue, durable=True)
    await queue.bind(exchange, mq_route_key)
    for i in range(10):
        payload = str(i).encode()
        msg = aio_pika.Message(payload, delivery_mode=2)
        await exchange.publish(msg, routing_key=mq_route_key)
        logger.info('send success {}'.format(payload))
    logger.info('loop over')
    loop.stop()


if __name__ == '__main__':
    # django.setup()
    logger.info('asdfasdf')
    loop = asyncio.get_event_loop()
    loop.create_task(pub(mq_queue=s.MQ_QUEUE, mq_route_key=s.MQ_ROUTE_KEY, loop=loop))
    loop.run_forever()

actual output:

     Message body is: b'0'
     Message body is: b'1'
     Message body is: b'2'
finish
     Message body is: b'3'
     Message body is: b'4'
     Message body is: b'5'
     Message body is: b'6'
     Message body is: b'7'
     Message body is: b'8'
     Message body is: b'9'

expect output:

     Message body is: b'0'
     Message body is: b'1'

     Message body is: b'3'
     Message body is: b'4'
     Message body is: b'5'
     Message body is: b'6'
     Message body is: b'7'
     Message body is: b'8'
     Message body is: b'9'
     Message body is: b'2'
finish

whatever,2 should be lastest

@Darsstar
Copy link
Contributor

You set prefetch_count to 1. I suggest to read https://www.rabbitmq.com/docs/confirms#channel-qos-prefetch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants