Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Writing and reading offsets only from kafka #1647

Open
naresh-kotha-ck opened this issue Oct 20, 2020 · 10 comments
Open

Writing and reading offsets only from kafka #1647

naresh-kotha-ck opened this issue Oct 20, 2020 · 10 comments

Comments

@naresh-kotha-ck
Copy link

Hi Team

I have noticed that offsets are still being written to zookeeper even if I specify below config variables

kafka.offsets.storage=kafka

kafka.dual.commit.enabled= false

Is this expected or am I doing some thing wrong.

@naresh-kotha-ck
Copy link
Author

we are using kafka 2.5

@HenryCaiHaiying
Copy link
Contributor

HenryCaiHaiying commented Oct 21, 2020 via email

@naresh-kotha-ck
Copy link
Author

yes but for purpose 1 I still see offsets in zookeeper even I set below properties

kafka.offsets.storage=kafka

kafka.dual.commit.enabled= false

Can you please let me know if I am missing it

@HenryCaiHaiying
Copy link
Contributor

HenryCaiHaiying commented Oct 21, 2020 via email

@naresh-kotha-ck
Copy link
Author

Sure I will create a PR for that . Thanks for the details

@DomWos
Copy link
Contributor

DomWos commented Dec 3, 2020

Hey @HenryCaiHaiying,
I also think there is a bug or inconsistency in SecorKafkaMessageIterator or I don't understand something correctly.
Currently, there is a variable that is responsible for skipping reading offsets from zookeeper, but the variable is equal to
offsetStorage.equals("kafka") && dualCommitEnabled.equals("true"). So, if I understand correctly, if we set offset.storage=kafka and dual.commit.enabled=false we are going to try to read offsets from Zookeeper, even though they will never be there due to dual.commit.enabled=false. Shouldn't the check be for false instead of true ?

@HenryCaiHaiying
Copy link
Contributor

HenryCaiHaiying commented Dec 6, 2020 via email

@DomWos
Copy link
Contributor

DomWos commented Dec 9, 2020

Yeah, that is correct for the old kafka consumer used by legacy iterator. However, in the SecorKafkaMessageIterator the new kafka consumer is used, which doesn't really even have the dual.commit.enabled property available, nor it's actually set by Secor. So, in the new consumer we won't ever save the offsets to Zookeeper, but for some reason Secor will always try to read them from there.

@dpavlov-smartling
Copy link

Hello @HenryCaiHaiying and @naresh-kotha-ck,
Is there any updates and/or clarification of this situation?
Since Kafka 2.0 all internal Kafka tools don't support interaction with offsets in Zookeeper, so it is very hard to work with them if you need to reset offset, etc.
If it possible to confirm that with configuration

kafka.offsets.storage=kafka
kafka.dual.commit.enabled=true

Secor will read offsets only from Kafka storage? I do understand that it will write to Zookeeper as well, but we want to migrate from Zookeeper offset storage to Kafka and want to be sure that we understand how Secor will interact after restart with updated configuration.

@HenryCaiHaiying
Copy link
Contributor

By looking at the code (SecorConsumerRebalanceListener.java), it looks like it will skip zookeeper when the Kafka.offsets.storage=kafka:

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        if (skipZookeeperOffsetSeek) {
            LOG.debug("offset storage set to kafka. Skipping reading offsets from zookeeper");
            return;
        }
        Map<TopicPartition, Long> committedOffsets = getCommittedOffsets(collection);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants