Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stale consumers after commit #69

Open
cmccarthy1 opened this issue Nov 11, 2020 · 0 comments
Open

Stale consumers after commit #69

cmccarthy1 opened this issue Nov 11, 2020 · 0 comments

Comments

@cmccarthy1
Copy link
Contributor

Describe the bug
Committing an offset as a member of a consumer group during a rebalancing of the group can cause a consumer to become stale.

To Reproduce

cat stale_con.q

\l kfk.q
OFFSET_LOG:() ; MSGS:()
\c 5000 5000
commit:{ .kfk.CommitOffsets[0i;`test1;;1b] exec partition!offset from MSGS where offset = (max;offset)fby partition ; `COMMITED set .z.p ;  }
.kfk.offsetcb: {[cid;err;offsets] if[not err like "Success" ; .ms.sys.message "offsetcb not success" ; OFFSET_LOG,:(cid;err;offsets) ; `commit set { } ]; }
.kfk.consumecb:{ x[`rcvtime]:.z.p ; MSGS,:: enlist x _ `data  ; `MSG set x  }
cfg:(!) . flip(
  (`metadata.broker.list;`$"localhost:9092");
  (`bootstrap.servers;`$"localhost:9092");
  (`group.id;`$"test_consumer_group_1");
  (`enable.auto.commit;`false);
  (`enable.auto.offset.store;`false);
  (`auto.offset.reset;`latest);
  (`session.timeout.ms;`60000);
  );
.kfk.Consumer cfg
.kfk.Sub[0i;`test1;enlist[.kfk.PARTITION_UA]!enlist[.kfk.OFFSET.END] ]

cat other__cons.q

\l kfk.q
OFFSET_LOG:() ; MSGS:()
\c 5000 5000
system"sleep 2"
commit:{ .kfk.CommitOffsets[0i;`test1;;1b] exec partition!offset from MSGS where offset = (max;offset)fby partition ; `COMMITED set .z.p ;  }
.kfk.offsetcb: {[cid;err;offsets] if[not err like "Success" ; .ms.sys.message "offsetcb not success" ; OFFSET_LOG,:(cid;err;offsets) ]; }
.kfk.consumecb:{ x[`rcvtime]:.z.p ; MSGS,:: enlist x _ `data  ; `MSG set x ; }
cfg:(!) . flip(
  (`metadata.broker.list;`$"localhost:9092");
  (`bootstrap.servers;`$"localhost:9092");
  (`group.id;`$"test_consumer_group_1");
  (`enable.auto.commit;`false);
  (`enable.auto.offset.store;`false);
  (`auto.offset.reset;`latest);
  (`session.timeout.ms;`60000);
  );
clients:{ .kfk.Consumer cfg } each til 10
{ .kfk.Sub[x;`test1;enlist[.kfk.PARTITION_UA]!enlist[.kfk.OFFSET.END] ] } each clients

Steps:

  1. have a process producing on your topic
  2. start stale_con.q
  3. start other_cons.q once the stale one is up and running
  4. manually run commit[] on stale_con.q process in quick succession.
  5. If “offsetcb not success” is not seen then restart the other_cons.q process and try again
  6. After receiving the "Offset commit failed - Specified group generation id is not valid" from offsetcb the consumer won't consume any more messages.

Expected behavior
Consumer groups should continue to consume messages from an appropriate location following all rebalancing events

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant