Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

[KAFKA] Add support to override consumer position and reset to head/tail #29

Open
stanlemon opened this issue Oct 21, 2017 · 3 comments

Comments

@stanlemon
Copy link
Contributor

No description provided.

@Crim
Copy link
Contributor

Crim commented Oct 23, 2017

So wondering the best way to do this, as our startup logic is already fairly complicated.

Here's the 4 scenarios that exist today:

  1. Consumer started with no explicit start position, and has no previous consumer state
  • starts from HEAD
  1. Consumer started with no explicit start position, and has previous consumer state
  • starts from previous state
  1. Consumer started with explicit start position, and has no previous consumer state
  • starts from explicit start position
  1. Consumer started with explicit start position, and has previous consumer state
  • starts from previous state

I believe conditions 3 and 4 only exist if a consumer is being started as a sideline.

So this is difficult because today the Kafka Consumer has no idea if its a "firehose" or a sideline instance. If we set a KafkaConsumerConfig option that said "reset to head" we need to somehow know to ONLY do it for the fire hose, and not for sideline instances.

So it'd have to be something like:
If has no starting state, and flag = true, then reset consumer position to TAIL/HEAD.

But this intrinsically couples Kafka Consumer to sidelining. I wonder if we can somehow wrap this logic into the Sideline SpoutHandler, or something. Of course then that tightly couples Sideline Spout Handler to the kafka consumer

@Crim
Copy link
Contributor

Crim commented Oct 23, 2017

Yeah I guess the trick is to figure out how to only instruct the fire hose. I think you'd need to use the virtual spout id.

What if we did the following:

Configuration key is something like:

consumer.reset.default: HEAD|TAIL
consumer.reset.<virtualSpoutId>: HEAD|TAIL

Today we default to HEAD for reset. If an offset is no good, we back up to head. the default key makes this logic configurable.

the reset logic might look something like this:

if (has no persisted state) {
  if (consumer.reset.default == HEAD) {
    if (hasStartingState) => reset to starting state;
    if (has No startingState) => reset to HEAD of topic;
  } else if (consumer.reset.default == tail) {
    if (hasEndingState) => reset to ending state;
    if (has NO endingState) => reset to tail of topic;
  }
} else if (has persisted state && consumer.reset.<my consumerid> != null) {
  if (consumer.reset.<myconsumerid> == HEAD) {
    if (hasStartingState) => reset to starting state;
    if (has No startingState) => reset to HEAD of topic;
  } else if (consumer.reset.<myconsumerid> == tail) {
    if (hasEndingState) => reset to ending state;
    if (has NO endingState) => reset to tail of topic;
  }
}

(Sweetjesus) thats complicated. It also means thats if you set default = tail, then it'll skip every sideline.

@stanlemon
Copy link
Contributor Author

@Crim Consider this...

  • The firehose in sidelining always starts with a null for starting. This is the easiest scenario to 'reset' on. If the reset option is set, the consumer just seeks to that point and moves on.
  • Sidelines always start with an explicit starting point. In this situation reset to head is probably reset to the starting point.
  • If a sideline is seeked past it's ending state (reset to tail), which is not stored in the consumer, the VirtualSpout will see that the consumer's offset is now past the ending offset of the vspout (sideline) and complete the vspout. See https://github.com/salesforce/storm-dynamic-spout/blob/master/src/main/java/com/salesforce/storm/spout/dynamic/VirtualSpout.java#L645-L693

So I think this actually could be relatively straight forward and Kafka-specific. I really think we add this reset position config with two options, check them in the Kafka consumer's open() call, do a seek and replace state accordingly and then let the rest of the spout work as intended, with this one caveat here https://github.com/salesforce/storm-dynamic-spout/blob/master/src/main/java/com/salesforce/storm/spout/dynamic/kafka/Consumer.java#L234-L236 where we think about what the 'head' of the consumer is a little differently.

If I get time today I may try to carve out what I'm talking about so you can look at it more concretely.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

2 participants