-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUG]EventHub Consumer stops consuming messages until the consumer restated #40133
Comments
Thank you for your feedback. Tagging and routing to the team member best able to assist. |
Unfortunately, there is not enough information to understand what the problem is without a repro. This version of the library is ~3 years old, since then we have made many reliability fixes.
|
Hi @wstest123. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue. |
2024-05-09-21_03_48.log |
@conniey Thanks for your help! If the attached logs helpful to check this issue? Thanks! |
@conniey May I know if any lucky about this issue? I have attached the full log. |
Describe the bug
EventHub Consumer stops consuming messages until the consumer restated.
Exception or Stack Trace
2024-05-10 02:34:35.124 WARN 25378 --- [ parallel-2] c.a.m.e.PartitionBasedLoadBalancer : Load balancing for event processor failed - Did not observe any item or terminal signal within 60000ms in 'filter' (and no fallback has been configured)
Did not observe any item or terminal signal within 60000ms in 'filter' (and no fallback has been configured)
2024-05-10 02:34:35.125 INFO 25378 --- [ parallel-2] c.e.l.IotOperationProcessService : Error occurred in partition processor for partition NONE, {}
java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 60000ms in 'filter' (and no fallback has been configured)
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:295) ~[reactor-core-3.4.24.jar!/:3.4.24]
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:280) ~[reactor-core-3.4.24.jar!/:3.4.24]
at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:419) ~[reactor-core-3.4.24.jar!/:3.4.24]
at reactor.core.publisher.FluxOnErrorReturn$ReturnSubscriber.onNext(FluxOnErrorReturn.java:162) ~[reactor-core-3.4.24.jar!/:3.4.24]
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271) ~[reactor-core-3.4.24.jar!/:3.4.24]
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286) ~[reactor-core-3.4.24.jar!/:3.4.24]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.24.jar!/:3.4.24]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.24.jar!/:3.4.24]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
To Reproduce
Steps to reproduce the behavior:
Code Snippet
import com.alibaba.fastjson.JSON;
import com.azure.messaging.eventhubs.*;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import com.example.lgdcmaincontroller.biz.Biz;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
@component
@slf4j
@order(1)
public class IotOperationProcessService implements ApplicationRunner {
// Logger.getLogger("com.azure.messaging.eventhubs.PartitionBasedLoadBalancer").setLevel(Level.OFF); // 禁用PartitionBasedLoadBalancer日志
// initEventBus();
initEventHub();
// if (eventData.getSequenceNumber() % 10 == 0) {
// eventContext.updateCheckpoint();
// }
eventContext.updateCheckpoint();
// senderClient.sendMessage(new ServiceBusMessage(msg));
// create a batch
EventData eventData = new EventData(msg);
EventDataBatch eventDataBatch = hubProducerClient.createBatch();
if (eventDataBatch.tryAdd(eventData)) {
hubProducerClient.send(eventDataBatch);
log.error("消息发送成功");
} else {
log.error("消息生成失败");
}
} catch (Exception e) {
log.error("消息发送失败:{}", e.getMessage());
}
Expected behavior
Hope to know the reason that consumer stops consuming messages and how to fix it.
Screenshots
If applicable, add screenshots to help explain your problem.
Setup (please complete the following information):
spring-boot:2.4.1
azure-core:1.47.0
azure-messaging-eventhubs:5.10.0
reactor-core:3.4.24
azure-messaging-eventhubs-checkpointstore-blob:1.16.1
If you suspect a dependency version mismatch (e.g. you see
NoClassDefFoundError
,NoSuchMethodError
or similar), please check out Troubleshoot dependency version conflict article first. If it doesn't provide solution for the problem, please provide:mvn dependency:tree -Dverbose
)Additional context
Add any other context about the problem here.
Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report
The text was updated successfully, but these errors were encountered: