confluent-kafka-javascript icon indicating copy to clipboard operation
confluent-kafka-javascript copied to clipboard

batch size limit is severely limiting compared to KafkaJS

Open apeloquin-agilysys opened this issue 9 months ago • 1 comments

Environment Information

  • OS: Mac M3 Sonoma 14.6.1
  • Node Version: 22.12.0
  • NPM Version: 10.9.0
  • confluent-kafka-javascript version: 1.2.0

Context

The migration guide states:

An API compatible version of eachBatch is available, but the batch size calculation is not as per configured parameters, rather, a constant maximum size is configured internally. This is subject to change.

What is not stated:

  • The maximum batch size is 32.
  • The batch size of 32 is only reached after a ramp up through 10 additional batches of size: 1, 1, 2, 2, 4, 4, 8, 8, 16, and 16.

An inquiry to Confluent received the following response:

Regarding the actual batch size of 32: This value is arbitrary for now, and we plan to make it configurable in the future. However, it is currently capped at 32, with no set timeline for when this will change.

Regarding the implementation: The batch size of messages retrieved by eachBatch does not directly correspond to the actual batch size received from Kafka. While we maintain API compatibility with KafkaJS, the behavior differs. Internally, we prefetch larger batches of messages and then split them into smaller batches before delivering them to the user. This approach ensures that concerns about downstream messaging or additional network overhead for fetching messages do not cause issues. We implemented it this way because, for now, the Kafka client library abstracts away per-partition details required by eachBatch, so we handle the splitting ourselves. The actual size of messages fetched from Kafka over the network is configurable through the Kafka client library’s configuration properties.

I'd like to raise two points:

  • We have several apps are designed to process large batches of small messages, and the efficiency of the app is directly dependent on the ability to aggregate many messages at once and reduce the number of downstream network interactions with databases or other services. With KafkaJS our batch sizes are typically in the 100's and 1000's. The Confluent limit of 32 is severely hampering, resulting in 5-200x (or more) increase in the number of downstream network interactions required to process the same volume of data.
  • The ramp-up that with single-message batches makes it annoying difficult to write integration tests that exercise batch functionality. Prior tests written for KafkaJS batches become invalid after converting to Confluent; you can no longer make the assumption that if you seed 5 messages in a topic and start a consumer that you'll receive 1 batch of 5 messages... now it's 4 batches of 1, 1, 2, and 1.

Steps to Reproduce

The following code illustrates the batch size sequence/limit and provides a comparison with KafkaJS:

import {KafkaJS as Confluent, RdKafka} from "@confluentinc/kafka-javascript";
import {Admin, Consumer, Kafka, logLevel, Producer, EachBatchPayload} from "kafkajs";

const topicFn = () => `test-batch-sizes-${Date.now()}`;
const groupIdFn = () => `test-group-${Date.now()}`;

describe("Batch sizes", async () => {
  let kafka: Kafka | Confluent.Kafka;
  let admin: Admin | Confluent.Admin | undefined;
  let producer: Producer | Confluent.Producer | undefined;
  let consumer: Consumer | Confluent.Consumer | undefined;
  let ready: boolean;
  let topic: string;
  let received: number;
  let batches: number[];

  before(() => {
    ready = false;
  });

  beforeEach(async () => {
    consumer!.pause([{topic}]);
    received = 0;
    batches = [];
  });

  describe("with KafkaJS", () => {
    before(async () => {
      kafka = new Kafka({brokers: ["localhost:9092"], logLevel: logLevel.NOTHING});
      admin = kafka.admin();
      await admin.connect();

      topic = topicFn();
      await admin.createTopics({topics: [{topic}]});
      producer = kafka.producer();
      await producer.connect();

      consumer = kafka.consumer({groupId: groupIdFn()});
      consumer.on(consumer.events.GROUP_JOIN, (event: any) => {
        ready = true;
      });
      await consumer.connect();
      await consumer.subscribe({topic, fromBeginning: true});
      await consumer.run({eachBatch: doConsumer});
      await until(() => ready);
    });

    it("10 messages", async () => doTest(10));
    it("50 messages", async () => doTest(50));
    it("100 messages", async () => doTest(100));
    it("500 messages", async () => doTest(500));
    it("1000 messages", async () => doTest(1000));
    it("5000 messages", async () => doTest(5000));

    after(async () => {
      await consumer?.disconnect();
      await producer?.disconnect();
      await admin?.disconnect();
    });
  });

  describe("with Confluent", () => {
    before(async () => {
      kafka = new Confluent.Kafka({kafkaJS: {brokers: ["localhost:9092"], logLevel: Confluent.logLevel.NOTHING}});
      admin = kafka.admin();
      await admin.connect();

      topic = topicFn();
      await admin.createTopics({topics: [{topic}]});
      producer = kafka.producer();
      await producer.connect();

      consumer = kafka.consumer({
        kafkaJS: {groupId: groupIdFn(), fromBeginning: true},
        rebalance_cb: (err: any, assignment: any, consumer: any) => {
          if (err.code !== RdKafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) return;
          if (!ready) ready = true;
        }
      });
      await consumer.connect();
      await consumer.subscribe({topic});
      await consumer.run({eachBatch: doConsumer});
      await until(() => ready);
    });

    it("10 messages", async () => doTest(10));
    it("50 messages", async () => doTest(50));
    it("100 messages", async () => doTest(100));
    it("500 messages", async () => doTest(500));
    it("1000 messages", async () => doTest(1000));
    it("5000 messages", async () => doTest(5000));

    after(async () => {
      await consumer?.disconnect();
      await producer?.disconnect();
      await admin?.disconnect();
    });
  });

  async function doConsumer(payload: EachBatchPayload | Confluent.EachBatchPayload) {
    const size = payload.batch.messages.length;
    received += size;
    batches.push(size);
  }

  async function doTest(count: number, sizeInKb: number = 1) {
    await sendMessages(count, sizeInKb);

    consumer!.resume([{topic}]);

    await until(() => received === count);
    console.log(`Processed ${received} ${sizeInKb}KB messages in ${batches.length} batches: ${batches}`);
  }

  async function sendMessages(count: number, sizeInKb: number) {
    const messages = [];
    for (let i = 0; i < count; i++) {
      messages.push({value: payload(sizeInKb)});
    }
    await producer!.send({topic: topic, messages});
  }

  function payload(sizeInKb: number): string {
    return `${"0".repeat(100)}1`.repeat(sizeInKb);
  }

  async function until(condition: () => boolean) {
    const timeout = 60000;
    const finish = Date.now() + timeout;
    while (Date.now() <= finish) {
      const result = condition();
      if (result) return;
      await new Promise(resolve => setTimeout(resolve, 50));
    }
    throw new Error(`Failed within ${timeout!}ms`);
  }
});

With KafkaJS:

Processed 10 1KB messages in 1 batches: 10
Processed 50 1KB messages in 1 batches: 50
Processed 100 1KB messages in 1 batches: 100
Processed 500 1KB messages in 1 batches: 500
Processed 1000 1KB messages in 1 batches: 1000
Processed 5000 1KB messages in 1 batches: 5000

With Confluent:

Processed 10 1KB messages in 5 batches: 1,1,2,2,4
Processed 50 1KB messages in 10 batches: 1,1,2,2,4,4,8,8,16,4
Processed 100 1KB messages in 12 batches: 1,1,2,2,4,4,8,8,16,16,32,6
Processed 500 1KB messages in 24 batches: 1,1,2,2,4,4,8,8,16,16,32,32,32,32,32,32,32,32,32,32,32,32,32,22
Processed 1000 1KB messages in 40 batches: 1,1,2,2,4,4,8,8,16,16,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,10
Processed 5000 1KB messages in 165 batches: 1,1,2,2,4,4,8,8,16,16,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,10

apeloquin-agilysys avatar Apr 09 '25 19:04 apeloquin-agilysys

Out of interest, do you get the same behavior with a hardcoded key or partition for the messages - I guess so looking at KafkaJs results. https://kafka.js.org/docs/producing#message-key

Otherwise sounds like a complete deal-breaker for us when the whole point of eachBatch is to process as many messages as possible for the topic-partition batch and now you get 1-32 messages. We too set the limit on the kafka/consumer to be ~0.1MB => 1000 messages and assume a batch should contain something closer to that per partition if there's enough messages. Well KafkaJs behaved more like that limit were divided by the number of partitions but anyway. Buffering the batches until a partition changes could do something (assuming they're not random partitions) but that too is a bit far-fetched.

So does sound like the eachBatch isn't ready for usage yet and might as well just call the eachMessage internally?

leppaott avatar Apr 28 '25 18:04 leppaott

This is a huge performance problem for us - and we've had to hack in patches to up the cap to a more reasonable limit.

This approach ensures that concerns about downstream messaging or additional network overhead for fetching messages do not cause issues.

This is a pretty nonsense reply, frankly.

There is little point in designing systems around batch throughput when you're so heavily restricting batch sizes at an arbitrary application bottleneck.

It also makes it essentially mandatory to do manual offset committing given the "eachBatch" interface for the kafkajs shim is effectively useless.

Frankly, having spent several weeks dealing with migration pain imposed by this library, I have to say I'm not impressed with the clear attempt to steer the community here as a purported simple alternative with a compatible API.

EricMCornelius avatar May 28 '25 22:05 EricMCornelius

@EricMCornelius I agree with you. I am also facing so many issues with this library, with no resolution in sight. It seems very weird for a company like Confluent to make so many bad decisions when designing a library like this. we are also facing issue because of limited batch size, and had to implement confluent magic byte logic ourselves, because schema registry package is just shitty.

Dhruv-Garg79 avatar Jun 03 '25 07:06 Dhruv-Garg79

I wanted to toss in an additional perspective. Our company has one consumer of a few topics that aggregates payloads and archives them as CSVs to cloud storage. To make this as cost effective as is reasonable we have intentionally long fetch max wait times and min byte configurations to ensure this consumer takes very large batches (1000s of payloads, for our sizing). This arbitrary batch size limit would absolutely be an issue for us with using this library for that service.

Totally get that this is an odd use case, and likely far from an optimal one, but it is supported by Kafka and works well for us with KafkaJS as our current client.

kylevogt avatar Jun 24 '25 14:06 kylevogt