Unable to ingest data from kafka in protobuf format
I'm running apache/druid:31.0.0 in Docker locally.
syntax = "proto3";
package com.example.kafka.event.v1;
option java_multiple_files=true;
option java_package = "com.example.kafka.event.v1";
message Event {
int64 timestamp = 1;
optional string identity_id = 2;
optional string clientId = 3;
optional string firstName = 4;
optional string lastName = 5;
optional string email = 6;
optional string phone = 7;
optional string productId = 8;
optional int32 allItems = 9;
optional int32 usedItems = 10;
}
I used this command to generate the .desc protoc -o event.desc --proto_path=proto proto/com/example/kafka/event/v1/event.proto
And mounted it in this folder /opt/druid/proto-descriptors/
In my KafkaProducer I use the Event class and these properties
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
KafkaProtobufSerializer.class);
properties.put(KafkaProtobufSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8084");
I can access the event.desc when I exec in the broker/historical/middlemanager This is the config for Druid
"protoBytesDecoder": {
"type": "file",
"descriptor": "file:///opt/druid/proto-descriptors/event.desc",
"protoMessageType": "com.example.kafka.event.v1.Event"
}
But I get this error either in supervisor or just using Load data tab
Failed to submit supervisor: Cannot construct instance of `org.apache.druid.data.input.protobuf.FileBasedProtobufBytesDecoder`, problem: Cannot read descriptor file: file:/opt/druid/proto-descriptors/event.desc at [Source: (org.eclipse.jetty.server.HttpInputOverHTTP); line: 1, column: 341] (through reference chain: org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec["spec"]->org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIngestionSpec["ioConfig"]->org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig["inputFormat"]->org.apache.druid.data.input.protobuf.ProtobufInputFormat["protoBytesDecoder"])
I tried Schema registry as well. I did a POST to locahlost:8084/subjects/test/vesrions with payload
{
"schemaType": "PROTOBUF",
"schema": "syntax = "proto3";\n\npackage com.example.kafka.event.v1;\n\n option java_multiple_files=true;\n option java_package = "com.example.kafka.event.v1";\n\n message Event {\n int64 timestamp = 1;\n optional string identity_id = 2;\n optional string clientId = 3;\n optional string firstName = 4;\n optional string lastName = 5;\n optional string email = 6;\n optional string phone = 7;\n optional string productId = 8;\n optional int32 allItems = 9;\n optional int32 usedItems = 10;\n}"
}
Downloaded all suggested libraries and put them in (tried both) /opt/druid/extensions/protobuf-extensions and /opt/druid/extensions/druid-protobuf-extensions also added in environment file DRUID_CLASSPATH=/opt/druid/extensions/druid-protobuf-extensions/*:/opt/druid/proto-descriptors/*(I tried echo $DRUID_CLASSPATH and it had only -CLASSPATH, so that's not really working):
- common-config-6.0.1.jar
- common-utils-6.0.1.jar
- kafka-protobuf-provider-6.0.1.jar
- kafka-protobuf-serializer-6.0.1.jar
- kafka-schema-registry-client-6.0.1.jar
- kotlin-stdlib-1.4.0.jar
- wire-schema-3.2.2.jar
I did also try just the suggested ones in the repo (still not working):
- https://packages.confluent.io/maven/io/confluent/kafka-protobuf-provider/6.0.1/kafka-protobuf-provider-6.0.1.jar
- https://repo1.maven.org/maven2/org/jetbrains/kotlin/kotlin-stdlib/1.4.0/kotlin-stdlib-1.4.0.jar
- https://repo1.maven.org/maven2/com/squareup/wire/wire-schema/3.2.2/wire-schema-3.2.2.jar
When I try to submit a supervisor
{
"type": "kafka",
"spec": {
"ioConfig": {
"type": "kafka",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"topic": "events",
"inputFormat": {
"type": "protobuf",
"protoBytesDecoder": {
"type": "schema_registry",
"url": "http://localhost:8084"
}
},
"useEarliestOffset": true
},
"tuningConfig": {
"type": "kafka"
},
"dataSchema": {
"dataSource": "events-proto",
"timestampSpec": {
"column": "time",
"format": "posix"
},
"dimensionsSpec": {
"dimensions": [
"identity_id",
"clientId",
"firstName",
"lastName",
"email",
"phone",
"productId",
{
"type": "long",
"name": "allItems"
},
{
"type": "long",
"name": "usedItems"
}
]
},
"granularitySpec": {
"queryGranularity": "none",
"rollup": false,
"segmentGranularity": "day"
}
}
}
}
I get this:
Failed to submit supervisor: Cannot construct instance of `org.apache.druid.data.input.protobuf.SchemaRegistryBasedProtobufBytesDecoder`, problem: io/confluent/kafka/schemaregistry/protobuf/ProtobufSchemaProvider at [Source: (org.eclipse.jetty.server.HttpInputOverHTTP); line: 1, column: 237] (through reference chain: org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec["spec"]->org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIngestionSpec["ioConfig"]->org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig["inputFormat"]->org.apache.druid.data.input.protobuf.ProtobufInputFormat["protoBytesDecoder"])
I tried all the steps provided and even more, but I'm not getting anywhere. The data is being read from the topic, but I cannot parse it. https://github.com/Godin/apache-druid/blob/master/docs/development/extensions-core/protobuf.md
When I tried the /extentions/protobuf-extentions folder from this example, the broker/historical and middle manager just all crashed
https://blog.hellmar-becker.de/2022/05/26/ingesting-protobuf-messages-into-apache-druid/
Feels like something is missing from the explanations in the documentation.
@obozhinov it looks to me that in the supervisor spec you are still referencing the schema registry but further up you wrote that you mounted the .desc as a file, could that be the reason why it fails?
also note if you use a file type protobuf decoder, the message payload is expected to be the plain protobuf data; but if you use schema registry, there should be a 5 byte preamble from the schema registry wire format
@obozhinov Have you been able to solve this? It looks like we have exactly the same problem,
You need also install the missing dependencies, for version 32.0.1 I used the listed below (mvn dependency:tree)
+- io.confluent:kafka-schema-registry-client:jar:6.2.12:compile
+- io.confluent:kafka-protobuf-provider:jar:6.2.12:provided
| +- com.squareup.wire:wire-schema:jar:3.6.0:provided
| | +- com.squareup.wire:wire-runtime:jar:3.6.0:provided
| | +- org.jetbrains.kotlin:kotlin-stdlib-common:jar:1.4.10:provided
| +- com.google.api.grpc:proto-google-common-protos:jar:2.22.1:provided
| \- io.confluent:kafka-protobuf-types:jar:6.2.12:provided
And as already pointed out .spec.ioConfig.inputFormat.valueFormat.type needs to be protobuf
minimal spec
{
"type": "kafka",
"spec": {
"ioConfig": {
"type": "kafka",
"topic": "dummy.topic",
"inputFormat": {
"type": "kafka",
"valueFormat": {
"type": "protobuf",
"protoBytesDecoder": {
"urls": [
"http://confluent-schema-registry:8081"
],
"type": "schema_registry",
"capacity": 100
[...]
wget -O /opt/druid/extensions/druid-protobuf-extensions/proto-google-common-protos-2.22.1.jar https://repo1.maven.org/maven2/com/google/api/grpc/proto-google-common-protos/2.22.1/proto-google-common-protos-2.22.1.jar
wget -O /opt/druid/extensions/druid-protobuf-extensions/kafka-protobuf-provider-6.2.12.jar https://packages.confluent.io/maven/io/confluent/kafka-protobuf-provider/6.2.12/kafka-protobuf-provider-6.2.12.jar
wget -O /opt/druid/extensions/druid-protobuf-extensions/kafka-protobuf-types-6.2.12.jar https://packages.confluent.io/maven/io/confluent/kafka-protobuf-types/6.2.12/kafka-protobuf-types-6.2.12.jar
wget -O /opt/druid/extensions/druid-protobuf-extensions/kotlin-stdlib-1.4.10.jar https://repo1.maven.org/maven2/org/jetbrains/kotlin/kotlin-stdlib/1.4.10/kotlin-stdlib-1.4.10.jar
wget -O /opt/druid/extensions/druid-protobuf-extensions/wire-schema-3.6.0.jar https://repo1.maven.org/maven2/com/squareup/wire/wire-schema/3.6.0/wire-schema-3.6.0.jar
wget -O /opt/druid/extensions/druid-protobuf-extensions/kafka-schema-registry-client-6.2.12.jar https://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/6.2.12/kafka-schema-registry-client-6.2.12.jar
wget -O /opt/druid/extensions/druid-protobuf-extensions/wire-runtime-3.6.0.jar https://repo1.maven.org/maven2/com/squareup/wire/wire-runtime/3.6.0/wire-runtime-3.6.0.jar
@obozhinov @danieldenktmit Any updates? I am having this same issue as well.
the solution is actually in my previous comment — the issue was missing dependencies.
I listed all the required jars along with their download links. Once you place them in your druid-protobuf-extensions directory and configure .spec.ioConfig.inputFormat.valueFormat.type as protobuf, it should work as expected.
I did as described in your comment and still doesn't work for me. I think there is something missing.
I am currently getting this error: javax.servlet.ServletException: java.lang.NoClassDefFoundError: Could not initialize class io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema
My /opt/druid/extensions/druid-protobuf-extensions/ folder looks like the following:
druid-protobuf-extensions-32.0.1.jar kafka-protobuf-types-6.2.12.jar kotlin-stdlib-common-1.4.10.jar wire-runtime-3.6.0.jar
kafka-protobuf-provider-6.2.12.jar kafka-schema-registry-client-6.2.12.jar proto-google-common-protos-2.22.1.jar wire-schema-3.6.0.jar
My spec looks like the following:
{
"type": "kafka",
"spec": {
"ioConfig": {
"type": "kafka",
"consumerProperties": {
"bootstrap.servers": "kafka:29092"
},
"topic": "metrics-events",
"inputFormat": {
"type": "kafka",
"keyFormat": {
"type": "regex",
"pattern": "([\\s\\S]*)",
"columns": [
"line"
]
},
"valueFormat": {
"type": "protobuf",
"protoBytesDecoder": {
"type": "schema_registry",
"urls": [
"http://schema-registry:8081"
],
"capacity": 100
}
}
}
},
"tuningConfig": {
"type": "kafka"
},
"dataSchema": {
"dataSource": "metrics-events"
}
}
}
@JSchlarb any suggestions?
I have discovered that the problem was that Druid 32.0.1 doesn't support the latest versions of kafka-protobuf dependencies. I have downgraded my producer to use kafka-protobuf-serializer 7.1.16 and kafka-schema-registry-maven-plugin 7.1.16 as well. Also updated my Druid image to add the following JAR files to /opt/druid/extensions/druid-protobuf-extensions/:
kafka-protobuf-provider-7.1.16.jar
kafka-protobuf-types-7.1.16.jar
kafka-schema-registry-client-7.1.16.jar
kotlin-stdlib-1.9.10.jar
okio-jvm-3.4.0.jar
proto-google-common-protos-2.22.1.jar
protobuf-java-3.25.5.jar
protobuf-java-util-3.25.5.jar
wire-runtime-jvm-4.9.7.jar
wire-schema-jvm-4.9.7.jar
This solution involved some trial and error and that could have been avoided by having supported dependencies versions listed in the documentation here.
Check the version of your extensions/druid-avro-extensions/kafka-schema-registry-client.jar and add a kafka-protobuf-provider.jar with the exact same version to avoid compatibility issues (6.2.15 for Druid 34.0.0).
You can then use Maven to resolve the other dependencies (types, wire, okio...).
Ex for v34.0.0:
FROM maven:3-eclipse-temurin-17 AS resolver
WORKDIR /tmp/app
# If not v34.0.0 replace below with your version of kafka-schema-registry-client.jar
RUN cat > pom.xml <<'EOF'
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>dummy</groupId><artifactId>dummy</artifactId><version>1.0.0</version>
<dependencies>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-provider</artifactId>
<version>6.2.15</version>
</dependency>
</dependencies>
<repositories>
<repository><id>confluent</id><url>https://packages.confluent.io/maven/</url></repository>
<repository><id>central</id><url>https://repo1.maven.org/maven2/</url></repository>
</repositories>
</project>
EOF
RUN mvn -q -Dmaven.repo.local=/tmp/.m2 dependency:copy-dependencies -DoutputDirectory=/deps -DincludeScope=runtime
FROM apache/druid:34.0.0
RUN mkdir -p /opt/druid/extensions/druid-protobuf-extensions
COPY --from=resolver /deps/*.jar /opt/druid/extensions/druid-protobuf-extensions/