How to use logging callback?
With KafkaJS, we used the logCreator config field on when creating the Kafka instance. That has been removed in this library.
I see the underlying rdkafka supports a log_cb logging callback, but I'm unable to determine how to make use of it and was unable to find any examples on the internet.
The type definition for the callback is any (not terribly helpful). I have tried various incarnations, but all of them have the same result.
const kafka = new Confluent.Kafka({
kafkaJS: {
brokers: ["localhost:9092"]
},
log_cb: () => console.log("log_cb")
});
Results in:
Error: Invalid callback type
at Client.connect (/Users/peloquina/src/agilysys-inc/stay/backplane-base/node_modules/@confluentinc/kafka-javascript/lib/client.js:253:16)
at /Users/peloquina/src/agilysys-inc/stay/backplane-base/node_modules/@confluentinc/kafka-javascript/lib/kafkajs/_consumer.js:802:28
at new Promise (<anonymous>)
at Consumer.connect (/Users/peloquina/src/agilysys-inc/stay/backplane-base/node_modules/@confluentinc/kafka-javascript/lib/kafkajs/_consumer.js:800:12)
at Context.<anonymous> (itest/confluentKafkaAccessor.test.ts:149:20)
Please help.
Hi, it's possible to do using the 'logger' config. The type definitions for that were missing, so I added them, and I will add an example too.
The logger type is the same as KafkaJS's logger (which can be obtained with <clientType>.logger() methods. I'll keep this issue open as a means of checking if we can add logCreator too.
For now, here's an example of a logger.
class MyLogger {
constructor() {
this.logLevel = logLevel.INFO;
}
setLogLevel(logLevel) {
this.logLevel = logLevel;
}
info(message, extra) {
if (this.logLevel >= logLevel.INFO)
console.info({ message, ...extra });
}
error(message, extra) {
if (this.logLevel >= logLevel.ERROR)
console.error({ message, ...extra });
}
warn(message, extra) {
if (this.logLevel >= logLevel.WARN)
console.warn({ message, ...extra });
}
debug(message, extra) {
if (this.logLevel >= logLevel.DEBUG)
console.debug({ message, ...extra });
}
namespace() {
return this;
}
}
and usage:
const logger = new MyLogger();
const kafka = new Kafka({ kafkaJS: { brokers: ["localhost:9092"] } });
const producer = kafka.producer({kafkaJS: {logger, logLevel: logLevel.DEBUG},});
/* later on in the code, okay, had enough of debug logs now */
if (hadEnoughDebug) logger.setLogLevel(logLevel.INFO);
There are some restrictions around it, let me add that to the documentation too. In particular, going to a more verbose log level than what's given in the config will not work (If you start with INFO in the config, and then setLogLevel to DEBUG, it would not work.)
I use this class as an adapter between the previous logCreator and the new logger interface. Note we just ignore the configured logLevel as we filter/ignore messages in the logCreator instead.
class LibrdKafkaLogger implements LibrdKafka.KafkaJS.Logger {
constructor(
private kafkaJSLogCreator: kafkaJSLogCreator,
private readonly librdKafkaNamespace: string,
private librdKafkaLogLevel: LibrdKafka.KafkaJS.logLevel,
) {}
private logToLegacyLogCreator(
levelString: "info" | "error" | "warn" | "debug",
message: string,
extra?: object,
) {
let level: LogEntry["level"] = kafkaJSLogLevel.ERROR
switch (levelString) {
case "info":
level = kafkaJSLogLevel.INFO
break
case "error":
level = kafkaJSLogLevel.ERROR
break
case "warn":
level = kafkaJSLogLevel.WARN
break
case "debug":
level = kafkaJSLogLevel.DEBUG
break
default:
unreachable(levelString)
}
const timestamp =
extra && "timestamp" in extra && typeof extra.timestamp === "string"
? extra.timestamp
: new Date().toISOString()
const kafkaLibrary: KafkaLibrary = "librdkafka"
const syntheticLogEntry: LogEntry = {
level,
label: "librdKafkaLabel", // TODO: what should go here?
namespace: this.librdKafkaNamespace,
log: {
message,
timestamp,
...(extra ?? {}),
kafkaLibrary,
},
}
this.kafkaJSLogCreator(level)(syntheticLogEntry)
}
info: (message: string, extra?: object) => void = (...args) =>
this.logToLegacyLogCreator("info", ...args)
error: (message: string, extra?: object) => void = (...args) =>
this.logToLegacyLogCreator("error", ...args)
warn: (message: string, extra?: object) => void = (...args) =>
this.logToLegacyLogCreator("warn", ...args)
debug: (message: string, extra?: object) => void = (...args) =>
this.logToLegacyLogCreator("debug", ...args)
namespace: (
namespace: string,
logLevel?: LibrdKafka.KafkaJS.logLevel,
) => LibrdKafka.KafkaJS.Logger = (namespace, logLevel) => {
const newLogLevel = logLevel ?? this.librdKafkaLogLevel
const newNamespace = `${this.librdKafkaNamespace}.${namespace}`
return new LibrdKafkaLogger(
this.kafkaJSLogCreator,
newNamespace,
newLogLevel,
)
}
setLogLevel: (logLevel: LibrdKafka.KafkaJS.logLevel) => void = logLevel => {
this.librdKafkaLogLevel = logLevel
}
}