Open
Description
Environment Information
- OS: Mac M3 Sonoma 14.6.1:
- Node Version: 22.12.0
- NPM Version: 10.9.0:
- confluent-kafka-javascript version: 1.2.0
Steps to Reproduce
Attempt to use highWatermark
on EachBatchPayload. Value always returned is "-1001", which prevents us from tracking partition lag.
Given that the highWatermark
is part of the type definition and no mention is made of it in the migration guide, I would expect this to provide an accurate value. Partition lag is a very useful application metric.
import {KafkaJS as Confluent, RdKafka} from "@confluentinc/kafka-javascript";
import {Admin, Consumer, Kafka, Producer} from "kafkajs";
const KAFKA_JS_TOPIC = `test-confluent-topic-${Date.now()}`;
const KAFKA_JS_GROUP_ID = `test-confluent-group-${Date.now()}`;
const CONFLUENT_TOPIC = `test-confluent-topic-${Date.now()}`;
const CONFLUENT_GROUP_ID = `test-confluent-group-${Date.now()}`;
describe("partitionLag", () => {
let kafkaJSKafka: Kafka;
let kafkaJSAdmin: Admin;
let kafkaJSConsumer: Consumer;
let kafkaJSProducer: Producer;
let confluentKafka: Confluent.Kafka;
let confluentAdmin: Confluent.Admin;
let confluentConsumer: Confluent.Consumer;
let confluentProducer: Confluent.Producer;
before(async () => {
kafkaJSKafka = new Kafka({brokers: ["localhost:9092"]});
kafkaJSAdmin = kafkaJSKafka.admin();
await kafkaJSAdmin.connect();
confluentKafka = new Confluent.Kafka({kafkaJS: {brokers: ["localhost:9092"]}});
confluentAdmin = confluentKafka.admin();
await confluentAdmin.connect();
});
beforeEach(async () => {
await kafkaJSAdmin.createTopics({topics: [{topic: KAFKA_JS_TOPIC}]});
kafkaJSProducer = kafkaJSKafka.producer();
await kafkaJSProducer.connect();
await confluentAdmin.createTopics({topics: [{topic: CONFLUENT_TOPIC}]});
confluentProducer = confluentKafka.producer();
await confluentProducer.connect();
});
afterEach(async () => {
await kafkaJSProducer.disconnect();
await kafkaJSConsumer?.disconnect();
await confluentProducer.disconnect();
await confluentConsumer?.disconnect();
});
after(async () => {
await confluentAdmin.disconnect();
await confluentProducer.disconnect();
await confluentConsumer?.disconnect();
await kafkaJSAdmin.disconnect();
await kafkaJSProducer.disconnect();
await kafkaJSConsumer?.disconnect();
});
it("reports lag with KafkaJS", async () => {
for (let i = 0; i < 10; i++) {
await kafkaJSProducer.send({
topic: CONFLUENT_TOPIC,
messages: [{value: "one"}]
});
}
let ready = false;
let receivedMessages: number = 0;
kafkaJSConsumer = kafkaJSKafka.consumer({groupId: KAFKA_JS_GROUP_ID});
kafkaJSConsumer.on(kafkaJSConsumer.events.GROUP_JOIN, (event: any) => {
ready = true;
});
await kafkaJSConsumer.connect();
await kafkaJSConsumer.subscribe({topic: KAFKA_JS_TOPIC, fromBeginning: true});
await kafkaJSConsumer.run({
eachBatch: async ({batch}) => {
console.log(`Received batch with ${batch.messages.length} messages with highWatermark ${batch.highWatermark} on partition ${batch.partition}`);
const highWatermark = parseInt(batch.highWatermark);
for (const message of batch.messages) {
const offset = parseInt(message.offset);
console.log(` Processing offset ${message.offset} which has a lag of ${highWatermark - offset}`);
receivedMessages++;
}
}
});
await until(() => ready);
for (let i = 0; i < 10; i++) {
await kafkaJSProducer.send({
topic: CONFLUENT_TOPIC,
messages: [{value: "one"}]
});
}
await until(() => receivedMessages == 20);
});
it("reports lag with Confluent", async () => {
for (let i = 0; i < 10; i++) {
await confluentProducer.send({
topic: CONFLUENT_TOPIC,
messages: [{value: "one"}]
});
}
let ready = false;
let receivedMessages: number = 0;
confluentConsumer = confluentKafka.consumer({
kafkaJS: {groupId: CONFLUENT_GROUP_ID, fromBeginning: true},
rebalance_cb: (err: any, assignment: any, consumer: any) => {
if (err.code !== RdKafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) return;
if (!ready) {
ready = true;
}
}
});
await confluentConsumer.connect();
await confluentConsumer.subscribe({topic: CONFLUENT_TOPIC});
await confluentConsumer.run({
eachBatch: async ({batch}) => {
console.log(`Received batch with ${batch.messages.length} messages with highWatermark ${batch.highWatermark} on partition ${batch.partition}`);
const highWatermark = parseInt(batch.highWatermark);
for (const message of batch.messages) {
const offset = parseInt(message.offset);
console.log(` Processing offset ${message.offset} which has a lag of ${highWatermark - offset}`);
receivedMessages++;
}
}
});
await until(() => ready);
for (let i = 0; i < 10; i++) {
await kafkaJSProducer.send({
topic: CONFLUENT_TOPIC,
messages: [{value: "one"}]
});
}
await until(() => receivedMessages == 20);
});
async function until(condition: () => boolean) {
const timeout = 10000;
const finish = Date.now() + timeout;
while (Date.now() <= finish) {
const result = condition();
if (result) return;
await new Promise(resolve => setTimeout(resolve, 50));
}
throw new Error(`Failed within ${timeout!}ms`);
}
});
KafkaJS output is:
Received batch with 13 messages with highWatermark 13 on partition 0
Processing offset 0 which has a lag of 13
Processing offset 1 which has a lag of 12
Processing offset 2 which has a lag of 11
Processing offset 3 which has a lag of 10
Processing offset 4 which has a lag of 9
Processing offset 5 which has a lag of 8
Processing offset 6 which has a lag of 7
Processing offset 7 which has a lag of 6
Processing offset 8 which has a lag of 5
Processing offset 9 which has a lag of 4
Processing offset 10 which has a lag of 3
Processing offset 11 which has a lag of 2
Processing offset 12 which has a lag of 1
Received batch with 4 messages with highWatermark 17 on partition 0
Processing offset 13 which has a lag of 4
Processing offset 14 which has a lag of 3
Processing offset 15 which has a lag of 2
Processing offset 16 which has a lag of 1
Received batch with 3 messages with highWatermark 20 on partition 0
Processing offset 17 which has a lag of 3
Processing offset 18 which has a lag of 2
Processing offset 19 which has a lag of 1
Confluent output is:
Received batch with 1 messages with highWatermark -1001 on partition 0
Processing offset 20 which has a lag of -1021
Received batch with 1 messages with highWatermark -1001 on partition 0
Processing offset 21 which has a lag of -1022
Received batch with 2 messages with highWatermark -1001 on partition 0
Processing offset 22 which has a lag of -1023
Processing offset 23 which has a lag of -1024
Received batch with 2 messages with highWatermark -1001 on partition 0
Processing offset 24 which has a lag of -1025
Processing offset 25 which has a lag of -1026
Received batch with 4 messages with highWatermark -1001 on partition 0
Processing offset 26 which has a lag of -1027
Processing offset 27 which has a lag of -1028
Processing offset 28 which has a lag of -1029
Processing offset 29 which has a lag of -1030
Received batch with 1 messages with highWatermark -1001 on partition 0
Processing offset 30 which has a lag of -1031
Received batch with 1 messages with highWatermark -1001 on partition 0
Processing offset 31 which has a lag of -1032
Received batch with 1 messages with highWatermark -1001 on partition 0
Processing offset 32 which has a lag of -1033
Received batch with 1 messages with highWatermark -1001 on partition 0
Processing offset 33 which has a lag of -1034
Received batch with 1 messages with highWatermark -1001 on partition 0
Processing offset 34 which has a lag of -1035
Received batch with 1 messages with highWatermark -1001 on partition 0
Processing offset 35 which has a lag of -1036
Received batch with 1 messages with highWatermark -1001 on partition 0
Processing offset 36 which has a lag of -1037
Received batch with 1 messages with highWatermark -1001 on partition 0
Processing offset 37 which has a lag of -1038
Received batch with 1 messages with highWatermark -1001 on partition 0
Processing offset 38 which has a lag of -1039
Received batch with 1 messages with highWatermark -1001 on partition 0
Processing offset 39 which has a lag of -1040