Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Durable Consumer Does not Consume From Last Message Per Subject #5422

Closed
roguelxx opened this issue May 15, 2024 · 3 comments
Closed

Durable Consumer Does not Consume From Last Message Per Subject #5422

roguelxx opened this issue May 15, 2024 · 3 comments

Comments

@roguelxx
Copy link

roguelxx commented May 15, 2024

Description

I have configured a NATS JetStream Durable Consumer to start consuming from the last ACKed message per subject after a crash and recovery. This is crucial for my application, as I need the last ACKed message to rebuild the consumer's old state.

To achieve this, I set the DeliverPolicy to DeliverLastPerSubjectPolicy and the FilterSubject to "*". However, after restarting the consumer, it does not consume any messages from the stream.

Code Snippet

Here is the configuration code for my consumer:

c, err := s.CreateConsumer(context.Background(), jetstream.ConsumerConfig{
  Durable:       "nats_playground_consumer",
  DeliverPolicy: jetstream.DeliverLastPerSubjectPolicy,
  AckPolicy:     jetstream.AckExplicitPolicy,
  FilterSubject: jetstream.AllKeys,
  MaxAckPending: 10000,
  AckWait:       12 * time.Hour,
})

Steps to Reproduce

  1. Configure the consumer as shown in the code snippet.
  2. Start the consumer and process some messages, ensuring they are ACKed.
  3. Simulate a crash or restart the consumer.
  4. Observe that the consumer does not consume any messages upon restart.

Expected Behavior

After restarting, the consumer should start consuming from the last ACKed message per subject, as specified by the DeliverLastPerSubjectPolicy.

Actual Behavior

The consumer does not consume any messages from the stream after a restart.

Additional Information

  • NATS JetStream version: v2.10.14
  • Client library version: v1.34.1
  • Go version: v1.22.2

Thank you for your time and assistance!

@ripienaar ripienaar transferred this issue from nats-io/nats-streaming-server May 15, 2024
@ripienaar
Copy link
Contributor

After point 3 - the crash or restart - do you send more messages in, do those not get consumed?

The consumer will not deliver an already acked message, you cannot ask it to deliver acked messages again so that part is as designed. But after the restart the consumer should continue to deliver new messages.

@roguelxx
Copy link
Author

Thank you for your explanation. Let me provide a concrete example to clarify my understanding. Suppose I have two subjects in my stream, S1 and S2, and a durable consumer. After the consumer has acknowledged some messages but before it crashed, the consumer state is as follows (each message does not exceed the ack wait threshold, so no messages will be redelivered):

S1: a1, a2, [a3], a4, a5, a6 ...    (acked a3 message)
S2: b1, [b2], b3, b4 ...   (acked b2 message)

During the time the consumer is crashed, some new messages arrive, and the stream state becomes:

S1: a1, a2, [a3], a4, a5, a6, | a7, a8 ...    (new messages)
S2: b1, [b2], b3, b4, | b5, b6 ...   (new messages)

What I expect when the same durable consumer restarts is that it will receive the messages:

from S1: a3, a4, a5, a6, a7, a8, ....
from S2: b2, b3, b4, b5, b6 ...

So, my questions are:

  1. Can the durable consumer get the last acknowledged message per subject under the DeliverLastPerSubjectPolicy delivery policy?
  2. If the above condition is met, can the durable consumer receive the messages in order? (Specifically, must it be [a3, a4, a5, a6, a7] rather than [a7, a8, a3, a4, a5] or another order?)

@ripienaar
Copy link
Contributor

ripienaar commented May 20, 2024

The key insight is that the consumer does not restart. It was always there.

The client who connected to the consumer went away and crashed but the consumer remained on the server.

When the crashed client - or another client - asks for messages the consumer continues where it was before. By not sending you already acknowledged messages. It does not even know you crashed.

All the start config options - start sequence, time etc - only influence the consumer once and that’s when it is created. After that the behaviour is constant. It will only ever give you unacked messages.

So probably what you want is to make a new consumer each time or use a “ordered consumer”

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants