-
Notifications
You must be signed in to change notification settings - Fork 13
[KAFKA] Add support to override consumer position and reset to head/tail #29
Comments
So wondering the best way to do this, as our startup logic is already fairly complicated. Here's the 4 scenarios that exist today:
I believe conditions 3 and 4 only exist if a consumer is being started as a sideline. So this is difficult because today the Kafka Consumer has no idea if its a "firehose" or a sideline instance. If we set a KafkaConsumerConfig option that said "reset to head" we need to somehow know to ONLY do it for the fire hose, and not for sideline instances. So it'd have to be something like: But this intrinsically couples Kafka Consumer to sidelining. I wonder if we can somehow wrap this logic into the Sideline SpoutHandler, or something. Of course then that tightly couples Sideline Spout Handler to the kafka consumer |
Yeah I guess the trick is to figure out how to only instruct the fire hose. I think you'd need to use the virtual spout id. What if we did the following: Configuration key is something like: consumer.reset.default: HEAD|TAIL
consumer.reset.<virtualSpoutId>: HEAD|TAIL Today we default to HEAD for reset. If an offset is no good, we back up to head. the default key makes this logic configurable. the reset logic might look something like this: if (has no persisted state) {
if (consumer.reset.default == HEAD) {
if (hasStartingState) => reset to starting state;
if (has No startingState) => reset to HEAD of topic;
} else if (consumer.reset.default == tail) {
if (hasEndingState) => reset to ending state;
if (has NO endingState) => reset to tail of topic;
}
} else if (has persisted state && consumer.reset.<my consumerid> != null) {
if (consumer.reset.<myconsumerid> == HEAD) {
if (hasStartingState) => reset to starting state;
if (has No startingState) => reset to HEAD of topic;
} else if (consumer.reset.<myconsumerid> == tail) {
if (hasEndingState) => reset to ending state;
if (has NO endingState) => reset to tail of topic;
}
} (Sweetjesus) thats complicated. It also means thats if you set default = tail, then it'll skip every sideline. |
@Crim Consider this...
So I think this actually could be relatively straight forward and Kafka-specific. I really think we add this reset position config with two options, check them in the Kafka consumer's open() call, do a seek and replace state accordingly and then let the rest of the spout work as intended, with this one caveat here https://github.com/salesforce/storm-dynamic-spout/blob/master/src/main/java/com/salesforce/storm/spout/dynamic/kafka/Consumer.java#L234-L236 where we think about what the 'head' of the consumer is a little differently. If I get time today I may try to carve out what I'm talking about so you can look at it more concretely. |
No description provided.
The text was updated successfully, but these errors were encountered: