confluent-kafka-javascript icon indicating copy to clipboard operation
confluent-kafka-javascript copied to clipboard

Consumers only boot up if no previous consumers exist.

Open Christopher-Stevers opened this issue 1 year ago • 0 comments

Environment Information

  • Linux
  • Node Version 20
  • NPM Version - don't have access to prod version at the moment
  • C++ Toolchain - don't have access to prod version at the moment
  • confluent-kafka-javascript version - 0.2.0

Steps to Reproduce Spin up a pod and have it connect consumers after a two minute delay to msk cluster on AWS. Include code to execute consumer.disconnect() on pod termination.

Now redeploy or re-rollout pod and consumers will try to connect but will never reach ready state and won't show up within conduktor.

Now that all consumers are down, redeploy again and everything will connect and ready up.

confluent-kafka-javascript Configuration Settings

Client Config

 {
        "client.id": KAFKA_PRODUCER_CLIENT_ID,
        "bootstrap.servers": kafkaBrokerUrls
          ? kafkaBrokerUrls
          : "localhost:9092",
        "security.protocol": "sasl_ssl",

        "sasl.mechanisms": "OAUTHBEARER",
        oauthbearer_token_refresh_cb: this.oauthBearerTokenProvider,
      }
      ```

Consumer Config
```js
{

        "client.id": KAFKA_PRODUCER_CLIENT_ID,
        "bootstrap.servers": kafkaBrokerUrls
          ? kafkaBrokerUrls
          : "localhost:9092",
        "security.protocol": "sasl_ssl",
        "sasl.mechanisms": "OAUTHBEARER",
        oauthbearer_token_refresh_cb: this.oauthBearerTokenProvider,
        "group.id": groupId,
        "enable.auto.offset.store": false,
        "heartbeat.interval.ms":3000,
        "max.poll.interval.ms":2000000,

        "session.timeout.ms": 120000,
        "auto.commit.enable": false,
      },
      ```
      launch consumer code:

```js
import { KafkaConstants } from "@openqlabs/utils";
import type { KafkaSingletonType } from "..";
import { KafkaConsumer, type Message } from "@confluentinc/kafka-javascript";

const processMessage = async (
  message: Message,
  launchFromMessage: (message: Message) => Promise<string | undefined>,
  markQueuedFalse: (message: Message) => Promise<void>,
  consumer: KafkaConsumer
) => {
  await launchFromMessage(message)
    .then(async () => {
      consumer.commitMessageSync({
        offset: message.offset,
        topic: message.topic,
        partition: message.partition,
      });
      await markQueuedFalse(message);
    })
    .catch(console.error);
};
export async function runGenericConsumer(
  kafkaSingleton: KafkaSingletonType,
  launchFromMessage: (message: Message) => Promise<string | undefined>,
  markQueuedFalse: (message: Message) => Promise<void>,
  groupId: string,
  topic: string
) {
  try {
    const consumer = new KafkaConsumer(
      {

        "client.id": KAFKA_PRODUCER_CLIENT_ID,
        "bootstrap.servers": kafkaBrokerUrls
          ? kafkaBrokerUrls
          : "localhost:9092",
        "security.protocol": "sasl_ssl",
        "sasl.mechanisms": "OAUTHBEARER",
        oauthbearer_token_refresh_cb: this.oauthBearerTokenProvider,
        "group.id": groupId,
        "enable.auto.offset.store": false,
        "heartbeat.interval.ms":3000,
        "max.poll.interval.ms":2000000,

        "session.timeout.ms": 120000,
        "auto.commit.enable": false,
      },
      {}
    );

    console.log(`Running evaluations consumer for ${groupId}`);
    // Flowing mode
    const queue: Message[] = [];
    let currentQueueSize = 0;
    const maxQueueSize = 10;
    let consumePaused = false;

    consumer.on("ready", () => {
      console.log("Consumer ready", groupId);
      consumer.subscribe([topic]);
      consumer.consume();
      setInterval(async () => {
        if (groupId === "target-batch-creations") {
          console.log("target-batch-creations", " interval");
        }
        if (currentQueueSize > 0) {
          const message = queue.shift();
          if (message) {
            try {
              await processMessage(
                message,
                launchFromMessage,
                markQueuedFalse,
                consumer
              );
              console.log("Processed message", message.offset);
            } catch (error) {
              console.error(`Error FAILED evaluations consumer: ${error}`);
            }
            currentQueueSize--;
          }
        }
        if (currentQueueSize === 0 && consumePaused) {
          consumePaused = false;
          consumer.resume(consumer.assignments());
        }
      }, 1000);
    });

    consumer.on("event.log", function (log) {
      console.log(log);
    });

    consumer.on("data", (data: Message) => {
      console.log(data.value?.toString(), "data");
      try {
        if (data.value) {
          queue.push(data);
          currentQueueSize++;
        }
        if (currentQueueSize > maxQueueSize && !consumePaused) {
          consumePaused = true;
          consumer.pause(consumer.assignments());
        }
      } catch (error) {
        console.error(`Error running evaluations consumer: ${error}`);
        console.log(data, data.value);
      }
    });

    consumer.on("event.error", (err) => {
      console.error(err);
    });
    consumer.connect();

    return consumer;
  } catch (error) {
    console.error(`Error running evaluations consumer: ${error}`);
  }
}

Workaround

Always deploy an even number of times

NB

By the way it would be nice if we had an example consumer like the above where the flowing consumer is configured to create back pressure and run async jobs. This is a pretty barebones approach and it could be more elegant for sure but its working for us for now. Right now if you try to use an async function directly within the "data" event listener, the consumer never even emits the "ready" event.

Christopher-Stevers avatar Oct 15 '24 09:10 Christopher-Stevers