confluent-kafka-javascript icon indicating copy to clipboard operation
confluent-kafka-javascript copied to clipboard

batch highWatermark always returns -1001

Open apeloquin-agilysys opened this issue 10 months ago • 0 comments

Environment Information

  • OS: Mac M3 Sonoma 14.6.1:
  • Node Version: 22.12.0
  • NPM Version: 10.9.0:
  • confluent-kafka-javascript version: 1.2.0

Steps to Reproduce

Attempt to use highWatermark on EachBatchPayload. Value always returned is "-1001", which prevents us from tracking partition lag.

Given that the highWatermark is part of the type definition and no mention is made of it in the migration guide, I would expect this to provide an accurate value. Partition lag is a very useful application metric.

import {KafkaJS as Confluent, RdKafka} from "@confluentinc/kafka-javascript";
import {Admin, Consumer, Kafka, Producer} from "kafkajs";

const KAFKA_JS_TOPIC = `test-confluent-topic-${Date.now()}`;
const KAFKA_JS_GROUP_ID = `test-confluent-group-${Date.now()}`;

const CONFLUENT_TOPIC = `test-confluent-topic-${Date.now()}`;
const CONFLUENT_GROUP_ID = `test-confluent-group-${Date.now()}`;

describe("partitionLag", () => {
  let kafkaJSKafka: Kafka;
  let kafkaJSAdmin: Admin;
  let kafkaJSConsumer: Consumer;
  let kafkaJSProducer: Producer;

  let confluentKafka: Confluent.Kafka;
  let confluentAdmin: Confluent.Admin;
  let confluentConsumer: Confluent.Consumer;
  let confluentProducer: Confluent.Producer;

  before(async () => {
    kafkaJSKafka = new Kafka({brokers: ["localhost:9092"]});
    kafkaJSAdmin = kafkaJSKafka.admin();
    await kafkaJSAdmin.connect();

    confluentKafka = new Confluent.Kafka({kafkaJS: {brokers: ["localhost:9092"]}});
    confluentAdmin = confluentKafka.admin();
    await confluentAdmin.connect();
  });

  beforeEach(async () => {
    await kafkaJSAdmin.createTopics({topics: [{topic: KAFKA_JS_TOPIC}]});
    kafkaJSProducer = kafkaJSKafka.producer();
    await kafkaJSProducer.connect();

    await confluentAdmin.createTopics({topics: [{topic: CONFLUENT_TOPIC}]});
    confluentProducer = confluentKafka.producer();
    await confluentProducer.connect();
  });

  afterEach(async () => {
    await kafkaJSProducer.disconnect();
    await kafkaJSConsumer?.disconnect();

    await confluentProducer.disconnect();
    await confluentConsumer?.disconnect();
  });

  after(async () => {
    await confluentAdmin.disconnect();
    await confluentProducer.disconnect();
    await confluentConsumer?.disconnect();

    await kafkaJSAdmin.disconnect();
    await kafkaJSProducer.disconnect();
    await kafkaJSConsumer?.disconnect();
  });

  it("reports lag with KafkaJS", async () => {
    for (let i = 0; i < 10; i++) {
      await kafkaJSProducer.send({
        topic: CONFLUENT_TOPIC,
        messages: [{value: "one"}]
      });
    }

    let ready = false;
    let receivedMessages: number = 0;
    kafkaJSConsumer = kafkaJSKafka.consumer({groupId: KAFKA_JS_GROUP_ID});
    kafkaJSConsumer.on(kafkaJSConsumer.events.GROUP_JOIN, (event: any) => {
      ready = true;
    });
    await kafkaJSConsumer.connect();
    await kafkaJSConsumer.subscribe({topic: KAFKA_JS_TOPIC, fromBeginning: true});
    await kafkaJSConsumer.run({
      eachBatch: async ({batch}) => {
        console.log(`Received batch with ${batch.messages.length} messages with highWatermark ${batch.highWatermark} on partition ${batch.partition}`);
        const highWatermark = parseInt(batch.highWatermark);
        for (const message of batch.messages) {
          const offset = parseInt(message.offset);
          console.log(`  Processing offset ${message.offset} which has a lag of ${highWatermark - offset}`);
          receivedMessages++;
        }
      }
    });

    await until(() => ready);

    for (let i = 0; i < 10; i++) {
      await kafkaJSProducer.send({
        topic: CONFLUENT_TOPIC,
        messages: [{value: "one"}]
      });
    }

    await until(() => receivedMessages == 20);
  });

  it("reports lag with Confluent", async () => {
    for (let i = 0; i < 10; i++) {
      await confluentProducer.send({
        topic: CONFLUENT_TOPIC,
        messages: [{value: "one"}]
      });
    }

    let ready = false;
    let receivedMessages: number = 0;
    confluentConsumer = confluentKafka.consumer({
      kafkaJS: {groupId: CONFLUENT_GROUP_ID, fromBeginning: true},
      rebalance_cb: (err: any, assignment: any, consumer: any) => {
        if (err.code !== RdKafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) return;
        if (!ready) {
          ready = true;
        }
      }
    });
    await confluentConsumer.connect();
    await confluentConsumer.subscribe({topic: CONFLUENT_TOPIC});
    await confluentConsumer.run({
      eachBatch: async ({batch}) => {
        console.log(`Received batch with ${batch.messages.length} messages with highWatermark ${batch.highWatermark} on partition ${batch.partition}`);
        const highWatermark = parseInt(batch.highWatermark);
        for (const message of batch.messages) {
          const offset = parseInt(message.offset);
          console.log(`  Processing offset ${message.offset} which has a lag of ${highWatermark - offset}`);
          receivedMessages++;
        }
      }
    });

    await until(() => ready);

    for (let i = 0; i < 10; i++) {
      await kafkaJSProducer.send({
        topic: CONFLUENT_TOPIC,
        messages: [{value: "one"}]
      });
    }

    await until(() => receivedMessages == 20);
  });

  async function until(condition: () => boolean) {
    const timeout = 10000;
    const finish = Date.now() + timeout;
    while (Date.now() <= finish) {
      const result = condition();
      if (result) return;
      await new Promise(resolve => setTimeout(resolve, 50));
    }
    throw new Error(`Failed within ${timeout!}ms`);
  }
});

KafkaJS output is:

Received batch with 13 messages with highWatermark 13 on partition 0
  Processing offset 0 which has a lag of 13
  Processing offset 1 which has a lag of 12
  Processing offset 2 which has a lag of 11
  Processing offset 3 which has a lag of 10
  Processing offset 4 which has a lag of 9
  Processing offset 5 which has a lag of 8
  Processing offset 6 which has a lag of 7
  Processing offset 7 which has a lag of 6
  Processing offset 8 which has a lag of 5
  Processing offset 9 which has a lag of 4
  Processing offset 10 which has a lag of 3
  Processing offset 11 which has a lag of 2
  Processing offset 12 which has a lag of 1
Received batch with 4 messages with highWatermark 17 on partition 0
  Processing offset 13 which has a lag of 4
  Processing offset 14 which has a lag of 3
  Processing offset 15 which has a lag of 2
  Processing offset 16 which has a lag of 1
Received batch with 3 messages with highWatermark 20 on partition 0
  Processing offset 17 which has a lag of 3
  Processing offset 18 which has a lag of 2
  Processing offset 19 which has a lag of 1

Confluent output is:

Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 20 which has a lag of -1021
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 21 which has a lag of -1022
Received batch with 2 messages with highWatermark -1001 on partition 0
  Processing offset 22 which has a lag of -1023
  Processing offset 23 which has a lag of -1024
Received batch with 2 messages with highWatermark -1001 on partition 0
  Processing offset 24 which has a lag of -1025
  Processing offset 25 which has a lag of -1026
Received batch with 4 messages with highWatermark -1001 on partition 0
  Processing offset 26 which has a lag of -1027
  Processing offset 27 which has a lag of -1028
  Processing offset 28 which has a lag of -1029
  Processing offset 29 which has a lag of -1030
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 30 which has a lag of -1031
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 31 which has a lag of -1032
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 32 which has a lag of -1033
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 33 which has a lag of -1034
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 34 which has a lag of -1035
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 35 which has a lag of -1036
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 36 which has a lag of -1037
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 37 which has a lag of -1038
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 38 which has a lag of -1039
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 39 which has a lag of -1040

apeloquin-agilysys avatar Apr 06 '25 15:04 apeloquin-agilysys