Skip to content

batch highWatermark always returns -1001 #282

Open
@apeloquin-agilysys

Description

@apeloquin-agilysys

Environment Information

  • OS: Mac M3 Sonoma 14.6.1:
  • Node Version: 22.12.0
  • NPM Version: 10.9.0:
  • confluent-kafka-javascript version: 1.2.0

Steps to Reproduce

Attempt to use highWatermark on EachBatchPayload. Value always returned is "-1001", which prevents us from tracking partition lag.

Given that the highWatermark is part of the type definition and no mention is made of it in the migration guide, I would expect this to provide an accurate value. Partition lag is a very useful application metric.

import {KafkaJS as Confluent, RdKafka} from "@confluentinc/kafka-javascript";
import {Admin, Consumer, Kafka, Producer} from "kafkajs";

const KAFKA_JS_TOPIC = `test-confluent-topic-${Date.now()}`;
const KAFKA_JS_GROUP_ID = `test-confluent-group-${Date.now()}`;

const CONFLUENT_TOPIC = `test-confluent-topic-${Date.now()}`;
const CONFLUENT_GROUP_ID = `test-confluent-group-${Date.now()}`;

describe("partitionLag", () => {
  let kafkaJSKafka: Kafka;
  let kafkaJSAdmin: Admin;
  let kafkaJSConsumer: Consumer;
  let kafkaJSProducer: Producer;

  let confluentKafka: Confluent.Kafka;
  let confluentAdmin: Confluent.Admin;
  let confluentConsumer: Confluent.Consumer;
  let confluentProducer: Confluent.Producer;

  before(async () => {
    kafkaJSKafka = new Kafka({brokers: ["localhost:9092"]});
    kafkaJSAdmin = kafkaJSKafka.admin();
    await kafkaJSAdmin.connect();

    confluentKafka = new Confluent.Kafka({kafkaJS: {brokers: ["localhost:9092"]}});
    confluentAdmin = confluentKafka.admin();
    await confluentAdmin.connect();
  });

  beforeEach(async () => {
    await kafkaJSAdmin.createTopics({topics: [{topic: KAFKA_JS_TOPIC}]});
    kafkaJSProducer = kafkaJSKafka.producer();
    await kafkaJSProducer.connect();

    await confluentAdmin.createTopics({topics: [{topic: CONFLUENT_TOPIC}]});
    confluentProducer = confluentKafka.producer();
    await confluentProducer.connect();
  });

  afterEach(async () => {
    await kafkaJSProducer.disconnect();
    await kafkaJSConsumer?.disconnect();

    await confluentProducer.disconnect();
    await confluentConsumer?.disconnect();
  });

  after(async () => {
    await confluentAdmin.disconnect();
    await confluentProducer.disconnect();
    await confluentConsumer?.disconnect();

    await kafkaJSAdmin.disconnect();
    await kafkaJSProducer.disconnect();
    await kafkaJSConsumer?.disconnect();
  });

  it("reports lag with KafkaJS", async () => {
    for (let i = 0; i < 10; i++) {
      await kafkaJSProducer.send({
        topic: CONFLUENT_TOPIC,
        messages: [{value: "one"}]
      });
    }

    let ready = false;
    let receivedMessages: number = 0;
    kafkaJSConsumer = kafkaJSKafka.consumer({groupId: KAFKA_JS_GROUP_ID});
    kafkaJSConsumer.on(kafkaJSConsumer.events.GROUP_JOIN, (event: any) => {
      ready = true;
    });
    await kafkaJSConsumer.connect();
    await kafkaJSConsumer.subscribe({topic: KAFKA_JS_TOPIC, fromBeginning: true});
    await kafkaJSConsumer.run({
      eachBatch: async ({batch}) => {
        console.log(`Received batch with ${batch.messages.length} messages with highWatermark ${batch.highWatermark} on partition ${batch.partition}`);
        const highWatermark = parseInt(batch.highWatermark);
        for (const message of batch.messages) {
          const offset = parseInt(message.offset);
          console.log(`  Processing offset ${message.offset} which has a lag of ${highWatermark - offset}`);
          receivedMessages++;
        }
      }
    });

    await until(() => ready);

    for (let i = 0; i < 10; i++) {
      await kafkaJSProducer.send({
        topic: CONFLUENT_TOPIC,
        messages: [{value: "one"}]
      });
    }

    await until(() => receivedMessages == 20);
  });

  it("reports lag with Confluent", async () => {
    for (let i = 0; i < 10; i++) {
      await confluentProducer.send({
        topic: CONFLUENT_TOPIC,
        messages: [{value: "one"}]
      });
    }

    let ready = false;
    let receivedMessages: number = 0;
    confluentConsumer = confluentKafka.consumer({
      kafkaJS: {groupId: CONFLUENT_GROUP_ID, fromBeginning: true},
      rebalance_cb: (err: any, assignment: any, consumer: any) => {
        if (err.code !== RdKafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) return;
        if (!ready) {
          ready = true;
        }
      }
    });
    await confluentConsumer.connect();
    await confluentConsumer.subscribe({topic: CONFLUENT_TOPIC});
    await confluentConsumer.run({
      eachBatch: async ({batch}) => {
        console.log(`Received batch with ${batch.messages.length} messages with highWatermark ${batch.highWatermark} on partition ${batch.partition}`);
        const highWatermark = parseInt(batch.highWatermark);
        for (const message of batch.messages) {
          const offset = parseInt(message.offset);
          console.log(`  Processing offset ${message.offset} which has a lag of ${highWatermark - offset}`);
          receivedMessages++;
        }
      }
    });

    await until(() => ready);

    for (let i = 0; i < 10; i++) {
      await kafkaJSProducer.send({
        topic: CONFLUENT_TOPIC,
        messages: [{value: "one"}]
      });
    }

    await until(() => receivedMessages == 20);
  });

  async function until(condition: () => boolean) {
    const timeout = 10000;
    const finish = Date.now() + timeout;
    while (Date.now() <= finish) {
      const result = condition();
      if (result) return;
      await new Promise(resolve => setTimeout(resolve, 50));
    }
    throw new Error(`Failed within ${timeout!}ms`);
  }
});

KafkaJS output is:

Received batch with 13 messages with highWatermark 13 on partition 0
  Processing offset 0 which has a lag of 13
  Processing offset 1 which has a lag of 12
  Processing offset 2 which has a lag of 11
  Processing offset 3 which has a lag of 10
  Processing offset 4 which has a lag of 9
  Processing offset 5 which has a lag of 8
  Processing offset 6 which has a lag of 7
  Processing offset 7 which has a lag of 6
  Processing offset 8 which has a lag of 5
  Processing offset 9 which has a lag of 4
  Processing offset 10 which has a lag of 3
  Processing offset 11 which has a lag of 2
  Processing offset 12 which has a lag of 1
Received batch with 4 messages with highWatermark 17 on partition 0
  Processing offset 13 which has a lag of 4
  Processing offset 14 which has a lag of 3
  Processing offset 15 which has a lag of 2
  Processing offset 16 which has a lag of 1
Received batch with 3 messages with highWatermark 20 on partition 0
  Processing offset 17 which has a lag of 3
  Processing offset 18 which has a lag of 2
  Processing offset 19 which has a lag of 1

Confluent output is:

Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 20 which has a lag of -1021
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 21 which has a lag of -1022
Received batch with 2 messages with highWatermark -1001 on partition 0
  Processing offset 22 which has a lag of -1023
  Processing offset 23 which has a lag of -1024
Received batch with 2 messages with highWatermark -1001 on partition 0
  Processing offset 24 which has a lag of -1025
  Processing offset 25 which has a lag of -1026
Received batch with 4 messages with highWatermark -1001 on partition 0
  Processing offset 26 which has a lag of -1027
  Processing offset 27 which has a lag of -1028
  Processing offset 28 which has a lag of -1029
  Processing offset 29 which has a lag of -1030
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 30 which has a lag of -1031
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 31 which has a lag of -1032
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 32 which has a lag of -1033
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 33 which has a lag of -1034
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 34 which has a lag of -1035
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 35 which has a lag of -1036
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 36 which has a lag of -1037
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 37 which has a lag of -1038
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 38 which has a lag of -1039
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 39 which has a lag of -1040

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions