Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: Pausing Confluent consumer does not seem to work #2143

Open
palmaadam opened this issue Mar 24, 2025 · 0 comments
Open

Bug: Pausing Confluent consumer does not seem to work #2143

palmaadam opened this issue Mar 24, 2025 · 0 comments
Labels
bug Something isn't working Confluent Issues related to `faststream.confluent` module

Comments

@palmaadam
Copy link

palmaadam commented Mar 24, 2025

Describe the bug
My usecase involves dealing with long-running tasks which theoretically do not have a processing time cap, so when I get a new message, I need to be able to pause the consumer, then run my processing task asynchronously, and then manually acknowledge the message and resume the consumer within that task.
However, when the processing logic truly does take longer than the configured max poll interval, my consumer still gets kicked out of the group once I try to acknowledge.

How to reproduce
I am trying to use the pause/resume functionality of the inner Confluent consumer (which as I have gathered based on this thread, should be fine for me to access) as I have not found that option on any other level. I then acknowledge on the KafkaMessage object. This is how I set up my subscriber:

def register_consumer(self):
        group_id = properties.group_id

        @self.broker.subscriber(properties.topic, group_id=group_id, auto_commit=False,
                                session_timeout_ms=properties.kafka_session_timeout, max_poll_interval_ms=properties.kafka_poll_interval,
                                heartbeat_interval_ms=properties.kafka_heartbeat_interval, max_records=properties.kafka_max_poll_records,
                                partition_assignment_strategy='cooperative-sticky')
        async def listen(message: KafkaMessage):
            self.__pause_consumer(message)
            _ = asyncio.create_task(self.handler.process_message(message))
@staticmethod
def __pause_consumer(message: KafkaMessage):
    partitions = message.consumer.consumer.assignment()
    message.consumer.consumer.pause(partitions)

And here is the relevant processing task logic:

async def process_message(self, message: KafkaMessage):
        try:
            self.logger.debug('Received a message: \'{}\'.'.format(message.decoded_body))
            self.__process_message(message)
            self.__log_queue_status(message)
            message.consumer.consumer.commit(message.raw_message)
        except KeyError:
            self.logger.error('Message is missing some properties.', exc_info=True)
            message.consumer.consumer.commit(message.raw_message)
        except Exception:
            self.logger.error('Processing of message failed.', exc_info=True)
        finally:
            self.__resume_consumer(message)
@staticmethod
def __resume_consumer(message: KafkaMessage):
    partitions = message.consumer.consumer.assignment()
    message.consumer.consumer.resume(partitions)

Is there some other, correct way of pausing/resuming the consumer that I have missed? My previous solution used Confluent Kafka directly and the pausing of the consumer worked as expected. I thought perhaps the problem could be due to the fact that I am pausing and resuming on a different level than I am acknowledging, but when I tried manually comitting on the inner consumer level instead of acknowledging on the KafkaMessage, the result was the same.

Expected behavior
I expect the consumer to continue sending its heartbeat while it is paused during a long-running task, so that once the task concludes, acknowledges and resumes the consumer, the consumer will not get kicked out of the group.

Observed behavior
Despite pausing the consumer before asynchronously launching the message processing logic, the consumer still gets kicked out of the group once the task finishes and attempts to acknowledge the message. The error is as follows:
MAXPOLL [faststream-0.5.35#consumer-2] [thrd:main]: Application maximum poll interval (15000ms) exceeded by 40ms (adjust max.poll.interval.ms for long-running message processing): leaving group

@palmaadam palmaadam added the bug Something isn't working label Mar 24, 2025
@Lancetnik Lancetnik added the Confluent Issues related to `faststream.confluent` module label Mar 24, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working Confluent Issues related to `faststream.confluent` module
Projects
Status: No status
Development

No branches or pull requests

2 participants