confluent-kafka-javascript icon indicating copy to clipboard operation
confluent-kafka-javascript copied to clipboard

How to use logging callback?

Open apeloquin-agilysys opened this issue 1 year ago • 2 comments

With KafkaJS, we used the logCreator config field on when creating the Kafka instance. That has been removed in this library.

I see the underlying rdkafka supports a log_cb logging callback, but I'm unable to determine how to make use of it and was unable to find any examples on the internet.

The type definition for the callback is any (not terribly helpful). I have tried various incarnations, but all of them have the same result.

const kafka = new Confluent.Kafka({
  kafkaJS: {
    brokers: ["localhost:9092"]
  },
  log_cb: () => console.log("log_cb")
});

Results in:

Error: Invalid callback type
    at Client.connect (/Users/peloquina/src/agilysys-inc/stay/backplane-base/node_modules/@confluentinc/kafka-javascript/lib/client.js:253:16)
    at /Users/peloquina/src/agilysys-inc/stay/backplane-base/node_modules/@confluentinc/kafka-javascript/lib/kafkajs/_consumer.js:802:28
    at new Promise (<anonymous>)
    at Consumer.connect (/Users/peloquina/src/agilysys-inc/stay/backplane-base/node_modules/@confluentinc/kafka-javascript/lib/kafkajs/_consumer.js:800:12)
    at Context.<anonymous> (itest/confluentKafkaAccessor.test.ts:149:20)

Please help.

apeloquin-agilysys avatar May 09 '24 21:05 apeloquin-agilysys

Hi, it's possible to do using the 'logger' config. The type definitions for that were missing, so I added them, and I will add an example too.

The logger type is the same as KafkaJS's logger (which can be obtained with <clientType>.logger() methods. I'll keep this issue open as a means of checking if we can add logCreator too.

For now, here's an example of a logger.

class MyLogger {
  constructor() {
    this.logLevel = logLevel.INFO;
  }

  setLogLevel(logLevel) {
    this.logLevel = logLevel;
  }

  info(message, extra) {
    if (this.logLevel >= logLevel.INFO)
      console.info({ message, ...extra });
  }

  error(message, extra) {
    if (this.logLevel >= logLevel.ERROR)
      console.error({ message, ...extra });
  }

  warn(message, extra) {
    if (this.logLevel >= logLevel.WARN)
      console.warn({ message, ...extra });
  }

  debug(message, extra) {
    if (this.logLevel >= logLevel.DEBUG)
      console.debug({ message, ...extra });
  }

  namespace() {
    return this;
  }
}

and usage:

const logger = new MyLogger();
const kafka = new Kafka({ kafkaJS: { brokers: ["localhost:9092"] } });
const producer = kafka.producer({kafkaJS: {logger, logLevel: logLevel.DEBUG},});
/* later on in the code, okay, had enough of debug logs now */
if (hadEnoughDebug) logger.setLogLevel(logLevel.INFO);

There are some restrictions around it, let me add that to the documentation too. In particular, going to a more verbose log level than what's given in the config will not work (If you start with INFO in the config, and then setLogLevel to DEBUG, it would not work.)

milindl avatar May 13 '24 04:05 milindl

I use this class as an adapter between the previous logCreator and the new logger interface. Note we just ignore the configured logLevel as we filter/ignore messages in the logCreator instead.

class LibrdKafkaLogger implements LibrdKafka.KafkaJS.Logger {
	constructor(
		private kafkaJSLogCreator: kafkaJSLogCreator,
		private readonly librdKafkaNamespace: string,
		private librdKafkaLogLevel: LibrdKafka.KafkaJS.logLevel,
	) {}

	private logToLegacyLogCreator(
		levelString: "info" | "error" | "warn" | "debug",
		message: string,
		extra?: object,
	) {
		let level: LogEntry["level"] = kafkaJSLogLevel.ERROR
		switch (levelString) {
			case "info":
				level = kafkaJSLogLevel.INFO
				break
			case "error":
				level = kafkaJSLogLevel.ERROR
				break
			case "warn":
				level = kafkaJSLogLevel.WARN
				break
			case "debug":
				level = kafkaJSLogLevel.DEBUG
				break
			default:
				unreachable(levelString)
		}

		const timestamp =
			extra && "timestamp" in extra && typeof extra.timestamp === "string"
				? extra.timestamp
				: new Date().toISOString()
		const kafkaLibrary: KafkaLibrary = "librdkafka"
		const syntheticLogEntry: LogEntry = {
			level,
			label: "librdKafkaLabel", // TODO: what should go here?
			namespace: this.librdKafkaNamespace,
			log: {
				message,
				timestamp,
				...(extra ?? {}),
				kafkaLibrary,
			},
		}

		this.kafkaJSLogCreator(level)(syntheticLogEntry)
	}

	info: (message: string, extra?: object) => void = (...args) =>
		this.logToLegacyLogCreator("info", ...args)
	error: (message: string, extra?: object) => void = (...args) =>
		this.logToLegacyLogCreator("error", ...args)
	warn: (message: string, extra?: object) => void = (...args) =>
		this.logToLegacyLogCreator("warn", ...args)
	debug: (message: string, extra?: object) => void = (...args) =>
		this.logToLegacyLogCreator("debug", ...args)

	namespace: (
		namespace: string,
		logLevel?: LibrdKafka.KafkaJS.logLevel,
	) => LibrdKafka.KafkaJS.Logger = (namespace, logLevel) => {
		const newLogLevel = logLevel ?? this.librdKafkaLogLevel
		const newNamespace = `${this.librdKafkaNamespace}.${namespace}`
		return new LibrdKafkaLogger(
			this.kafkaJSLogCreator,
			newNamespace,
			newLogLevel,
		)
	}

	setLogLevel: (logLevel: LibrdKafka.KafkaJS.logLevel) => void = logLevel => {
		this.librdKafkaLogLevel = logLevel
	}
}

justjake avatar Sep 17 '24 16:09 justjake