Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG]EventHub Consumer stops consuming messages until the consumer restated #40133

Open
3 tasks
wstest123 opened this issue May 13, 2024 · 7 comments
Open
3 tasks
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs needs-team-attention This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that

Comments

@wstest123
Copy link

wstest123 commented May 13, 2024

Describe the bug
EventHub Consumer stops consuming messages until the consumer restated.

Exception or Stack Trace
2024-05-10 02:34:35.124 WARN 25378 --- [ parallel-2] c.a.m.e.PartitionBasedLoadBalancer : Load balancing for event processor failed - Did not observe any item or terminal signal within 60000ms in 'filter' (and no fallback has been configured)
Did not observe any item or terminal signal within 60000ms in 'filter' (and no fallback has been configured)
2024-05-10 02:34:35.125 INFO 25378 --- [ parallel-2] c.e.l.IotOperationProcessService : Error occurred in partition processor for partition NONE, {}

java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 60000ms in 'filter' (and no fallback has been configured)
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:295) ~[reactor-core-3.4.24.jar!/:3.4.24]
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:280) ~[reactor-core-3.4.24.jar!/:3.4.24]
at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:419) ~[reactor-core-3.4.24.jar!/:3.4.24]
at reactor.core.publisher.FluxOnErrorReturn$ReturnSubscriber.onNext(FluxOnErrorReturn.java:162) ~[reactor-core-3.4.24.jar!/:3.4.24]
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271) ~[reactor-core-3.4.24.jar!/:3.4.24]
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286) ~[reactor-core-3.4.24.jar!/:3.4.24]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.24.jar!/:3.4.24]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.24.jar!/:3.4.24]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]

To Reproduce
Steps to reproduce the behavior:

Code Snippet

import com.alibaba.fastjson.JSON;
import com.azure.messaging.eventhubs.*;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import com.example.lgdcmaincontroller.biz.Biz;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.logging.Level;
import java.util.logging.Logger;

@component
@slf4j
@order(1)
public class IotOperationProcessService implements ApplicationRunner {

@Value("${azure.serviceBus.connectionString}")
private String connectionString;

@Value("${azure.serviceBus.d2c-topicName}")
private String d2cTopicName;

@Value("${azure.serviceBus.c2d-topicName}")
private String c2dTopicName;

@Value("${azure.serviceBus.subscriptionName}")
private String subscriptionName;

@Value("${azure.eventHub.producerConnectionString}")
private String producerConnectionString;

@Value("${azure.eventHub.processorConnectionString}")
private String processorConnectionString;


@Value("${azure.eventHub.producerEventHubName}")
private String producerEventHubName;

@Value("${azure.eventHub.processorEventHubName}")
private String processorEventHubName;

@Value("${azure.eventHub.storageConnectionString}")
private String storageConnectionString;

@Value("${azure.eventHub.storageContainerName}")
private String storageContainerName;



private EventHubProducerClient hubProducerClient;
private EventProcessorClient hubProcessorClient;

@Override
public void run(ApplicationArguments args) throws Exception {
    IotChatBean.topic = c2dTopicName;

// Logger.getLogger("com.azure.messaging.eventhubs.PartitionBasedLoadBalancer").setLevel(Level.OFF); // 禁用PartitionBasedLoadBalancer日志
// initEventBus();
initEventHub();

}

private void initEventHub() {

    log.info("\n\n>>>>>>>>>>> initEventHub <<<<<<<<<<<<<<\n\n");

    hubProducerClient = new EventHubClientBuilder()
            .connectionString(producerConnectionString, producerEventHubName)
            .buildProducerClient();

    BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .connectionString(storageConnectionString)
            .containerName(storageContainerName)
            .buildAsyncClient();

    hubProcessorClient = new EventProcessorClientBuilder()
            .connectionString(processorConnectionString, processorEventHubName)
            .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
            .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))
            .processEvent(this::hubProcessEvent)
            .processError(this::hubProcessError)
            .buildEventProcessorClient();
    hubProcessorClient.start();

}

public void hubProcessEvent(EventContext eventContext) {
    try {

        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();

        log.info("Processing event from partition {} with sequence number {} with body: {}",
                partitionContext.getPartitionId(), eventData.getSequenceNumber(),
                eventData.getBodyAsString());

        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.

// if (eventData.getSequenceNumber() % 10 == 0) {
// eventContext.updateCheckpoint();
// }
eventContext.updateCheckpoint();

        IotChatBean request = JSON.parseObject(eventData.getBodyAsString(), IotChatBean.class);
        String requestType = request.getRequestType();
        if (requestType == null) {
            log.error("null requestType");
            return;
        }

        log.info("request:{}", request.toString());
        HashMap<String, String> data = new HashMap<>();
        if (request.getResFlag() || request.getResMsg().contains("解序列化")) {
            Biz.BookingId = request.getBookingId();
            Biz.BusinessCode = request.getBusinessCode();
            //核验成功
            log.info("核验成功");
            data.put("EVENT", "PROCEDURE");
            data.put("type", "cmd");
            data.put("msg", "TakePicture");
        } else {
            //核验失败
            log.info("核验失败");
            data.put("EVENT", "ERROR");
            data.put("reason", request.getResMsg());
        }
        NoticeWebSocket.sendMessage(JSON.toJSONString(data));

    } catch (Exception e) {
        log.error("hubProcessEvent fail: {}", e.getMessage());
    }

}

public void hubProcessError(ErrorContext errorContext) {
    try {

        log.info("Error occurred in partition processor for partition {}, {}",
                errorContext.getPartitionContext().getPartitionId(),
                errorContext.getThrowable());
    } catch (Exception e) {
        log.error("hubProcessError fail: {}", e.getMessage());

    }
}

public void sendMessage(String msg) {

    if (hubProducerClient != null) {
        try {

// senderClient.sendMessage(new ServiceBusMessage(msg));
// create a batch
EventData eventData = new EventData(msg);
EventDataBatch eventDataBatch = hubProducerClient.createBatch();
if (eventDataBatch.tryAdd(eventData)) {
hubProducerClient.send(eventDataBatch);
log.error("消息发送成功");
} else {
log.error("消息生成失败");
}
} catch (Exception e) {
log.error("消息发送失败:{}", e.getMessage());

        }
    } else {
        log.error("hubProducerClient == null");
    }
}

}

Expected behavior
Hope to know the reason that consumer stops consuming messages and how to fix it.

Screenshots
If applicable, add screenshots to help explain your problem.

Setup (please complete the following information):
spring-boot:2.4.1
azure-core:1.47.0
azure-messaging-eventhubs:5.10.0
reactor-core:3.4.24

azure-messaging-eventhubs-checkpointstore-blob:1.16.1

If you suspect a dependency version mismatch (e.g. you see NoClassDefFoundError, NoSuchMethodError or similar), please check out Troubleshoot dependency version conflict article first. If it doesn't provide solution for the problem, please provide:

  • verbose dependency tree (mvn dependency:tree -Dverbose)
  • exception message, full stack trace, and any available logs

Additional context
Add any other context about the problem here.

Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • Bug Description Added
  • Repro Steps Added
  • Setup information Added
@github-actions github-actions bot added Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs needs-team-attention This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels May 13, 2024
Copy link

@anuchandy @conniey @lmolkova

Copy link

Thank you for your feedback. Tagging and routing to the team member best able to assist.

@conniey
Copy link
Member

conniey commented May 13, 2024

Unfortunately, there is not enough information to understand what the problem is without a repro. This version of the library is ~3 years old, since then we have made many reliability fixes.

  1. Does this reproduce when you upgrade to the latest version of azure-messaging-eventhubs?
  2. Can you provide +/- 5 minutes of DEBUG (ideally) logs when this issue occurs? The error message is not enough because the internals is a state machine, so we need to understand how it got to that state.

@conniey conniey added the needs-author-feedback More information is needed from author to address the issue. label May 13, 2024
Copy link

Hi @wstest123. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.

@github-actions github-actions bot removed the needs-team-attention This issue needs attention from Azure service team or SDK team label May 13, 2024
@wstest123
Copy link
Author

wstest123 commented May 15, 2024

2024-05-09-21_03_48.log
@conniey attached the full logs of this issue, please check it.
And we will check if this will reproduce after updating to the latest version.

@github-actions github-actions bot added needs-team-attention This issue needs attention from Azure service team or SDK team and removed needs-author-feedback More information is needed from author to address the issue. labels May 15, 2024
@wstest123
Copy link
Author

@conniey Thanks for your help! If the attached logs helpful to check this issue? Thanks!

@wstest123
Copy link
Author

@conniey May I know if any lucky about this issue? I have attached the full log.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs needs-team-attention This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that
Projects
None yet
Development

No branches or pull requests

2 participants