druid icon indicating copy to clipboard operation
druid copied to clipboard

Unable to ingest data from kafka in protobuf format

Open obozhinov opened this issue 1 year ago • 8 comments

I'm running apache/druid:31.0.0 in Docker locally.

syntax = "proto3";

package com.example.kafka.event.v1;

option java_multiple_files=true;
option java_package = "com.example.kafka.event.v1";

message Event {
  int64 timestamp = 1; 
  optional string identity_id = 2;
  optional string clientId = 3;
  optional string firstName = 4;
  optional string lastName = 5;
  optional string email = 6;
  optional string phone = 7;
  optional string productId = 8;
  optional int32 allItems = 9;
  optional int32 usedItems = 10;
}

I used this command to generate the .desc protoc -o event.desc --proto_path=proto proto/com/example/kafka/event/v1/event.proto And mounted it in this folder /opt/druid/proto-descriptors/ In my KafkaProducer I use the Event class and these properties

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                KafkaProtobufSerializer.class);
        properties.put(KafkaProtobufSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8084");

I can access the event.desc when I exec in the broker/historical/middlemanager This is the config for Druid

"protoBytesDecoder": {
          "type": "file",
          "descriptor": "file:///opt/druid/proto-descriptors/event.desc",
          "protoMessageType": "com.example.kafka.event.v1.Event"
        }

But I get this error either in supervisor or just using Load data tab

Failed to submit supervisor: Cannot construct instance of `org.apache.druid.data.input.protobuf.FileBasedProtobufBytesDecoder`, problem: Cannot read descriptor file: file:/opt/druid/proto-descriptors/event.desc at [Source: (org.eclipse.jetty.server.HttpInputOverHTTP); line: 1, column: 341] (through reference chain: org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec["spec"]->org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIngestionSpec["ioConfig"]->org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig["inputFormat"]->org.apache.druid.data.input.protobuf.ProtobufInputFormat["protoBytesDecoder"])

I tried Schema registry as well. I did a POST to locahlost:8084/subjects/test/vesrions with payload { "schemaType": "PROTOBUF", "schema": "syntax = "proto3";\n\npackage com.example.kafka.event.v1;\n\n option java_multiple_files=true;\n option java_package = "com.example.kafka.event.v1";\n\n message Event {\n int64 timestamp = 1;\n optional string identity_id = 2;\n optional string clientId = 3;\n optional string firstName = 4;\n optional string lastName = 5;\n optional string email = 6;\n optional string phone = 7;\n optional string productId = 8;\n optional int32 allItems = 9;\n optional int32 usedItems = 10;\n}" }

Downloaded all suggested libraries and put them in (tried both) /opt/druid/extensions/protobuf-extensions and /opt/druid/extensions/druid-protobuf-extensions also added in environment file DRUID_CLASSPATH=/opt/druid/extensions/druid-protobuf-extensions/*:/opt/druid/proto-descriptors/*(I tried echo $DRUID_CLASSPATH and it had only -CLASSPATH, so that's not really working):

  • common-config-6.0.1.jar
  • common-utils-6.0.1.jar
  • kafka-protobuf-provider-6.0.1.jar
  • kafka-protobuf-serializer-6.0.1.jar
  • kafka-schema-registry-client-6.0.1.jar
  • kotlin-stdlib-1.4.0.jar
  • wire-schema-3.2.2.jar

I did also try just the suggested ones in the repo (still not working):

  • https://packages.confluent.io/maven/io/confluent/kafka-protobuf-provider/6.0.1/kafka-protobuf-provider-6.0.1.jar
  • https://repo1.maven.org/maven2/org/jetbrains/kotlin/kotlin-stdlib/1.4.0/kotlin-stdlib-1.4.0.jar
  • https://repo1.maven.org/maven2/com/squareup/wire/wire-schema/3.2.2/wire-schema-3.2.2.jar

When I try to submit a supervisor

{
  "type": "kafka",
  "spec": {
    "ioConfig": {
      "type": "kafka",
      "consumerProperties": {
        "bootstrap.servers": "localhost:9092"
      },
      "topic": "events",
      "inputFormat": {
        "type": "protobuf",
        "protoBytesDecoder": {
          "type": "schema_registry",
          "url": "http://localhost:8084"
        }
      },
      "useEarliestOffset": true
    },
    "tuningConfig": {
      "type": "kafka"
    },
    "dataSchema": {
      "dataSource": "events-proto",
      "timestampSpec": {
        "column": "time",
        "format": "posix"
      },
      "dimensionsSpec": {
        "dimensions": [
          "identity_id",
          "clientId",
          "firstName",
          "lastName",
          "email",
          "phone",
          "productId",
           {
            "type": "long",
            "name": "allItems"
          },
          {
            "type": "long",
            "name": "usedItems"
          }
        ]
      },
      "granularitySpec": {
        "queryGranularity": "none",
        "rollup": false,
        "segmentGranularity": "day"
      }
    }
  }
}

I get this:

Failed to submit supervisor: Cannot construct instance of `org.apache.druid.data.input.protobuf.SchemaRegistryBasedProtobufBytesDecoder`, problem: io/confluent/kafka/schemaregistry/protobuf/ProtobufSchemaProvider at [Source: (org.eclipse.jetty.server.HttpInputOverHTTP); line: 1, column: 237] (through reference chain: org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec["spec"]->org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIngestionSpec["ioConfig"]->org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig["inputFormat"]->org.apache.druid.data.input.protobuf.ProtobufInputFormat["protoBytesDecoder"])

I tried all the steps provided and even more, but I'm not getting anywhere. The data is being read from the topic, but I cannot parse it. https://github.com/Godin/apache-druid/blob/master/docs/development/extensions-core/protobuf.md

When I tried the /extentions/protobuf-extentions folder from this example, the broker/historical and middle manager just all crashed https://blog.hellmar-becker.de/2022/05/26/ingesting-protobuf-messages-into-apache-druid/

Feels like something is missing from the explanations in the documentation.

obozhinov avatar Jan 13 '25 16:01 obozhinov

@obozhinov it looks to me that in the supervisor spec you are still referencing the schema registry but further up you wrote that you mounted the .desc as a file, could that be the reason why it fails?

also note if you use a file type protobuf decoder, the message payload is expected to be the plain protobuf data; but if you use schema registry, there should be a 5 byte preamble from the schema registry wire format

hellmarbecker avatar Mar 11 '25 08:03 hellmarbecker

@obozhinov Have you been able to solve this? It looks like we have exactly the same problem,

danieldenktmit avatar Mar 17 '25 12:03 danieldenktmit

You need also install the missing dependencies, for version 32.0.1 I used the listed below (mvn dependency:tree)

+- io.confluent:kafka-schema-registry-client:jar:6.2.12:compile
+- io.confluent:kafka-protobuf-provider:jar:6.2.12:provided
|  +- com.squareup.wire:wire-schema:jar:3.6.0:provided
|  |  +- com.squareup.wire:wire-runtime:jar:3.6.0:provided
|  |  +- org.jetbrains.kotlin:kotlin-stdlib-common:jar:1.4.10:provided
|  +- com.google.api.grpc:proto-google-common-protos:jar:2.22.1:provided
|  \- io.confluent:kafka-protobuf-types:jar:6.2.12:provided

And as already pointed out .spec.ioConfig.inputFormat.valueFormat.type needs to be protobuf

minimal spec
{
  "type": "kafka",
  "spec": {
    "ioConfig": {
      "type": "kafka",
      "topic": "dummy.topic",
      "inputFormat": {
        "type": "kafka",
        "valueFormat": {
          "type": "protobuf",
          "protoBytesDecoder": {
            "urls": [
              "http://confluent-schema-registry:8081"
            ],
            "type": "schema_registry",
            "capacity": 100
[...]
wget -O /opt/druid/extensions/druid-protobuf-extensions/proto-google-common-protos-2.22.1.jar https://repo1.maven.org/maven2/com/google/api/grpc/proto-google-common-protos/2.22.1/proto-google-common-protos-2.22.1.jar
wget -O /opt/druid/extensions/druid-protobuf-extensions/kafka-protobuf-provider-6.2.12.jar https://packages.confluent.io/maven/io/confluent/kafka-protobuf-provider/6.2.12/kafka-protobuf-provider-6.2.12.jar
wget -O /opt/druid/extensions/druid-protobuf-extensions/kafka-protobuf-types-6.2.12.jar https://packages.confluent.io/maven/io/confluent/kafka-protobuf-types/6.2.12/kafka-protobuf-types-6.2.12.jar
wget -O /opt/druid/extensions/druid-protobuf-extensions/kotlin-stdlib-1.4.10.jar https://repo1.maven.org/maven2/org/jetbrains/kotlin/kotlin-stdlib/1.4.10/kotlin-stdlib-1.4.10.jar
wget -O /opt/druid/extensions/druid-protobuf-extensions/wire-schema-3.6.0.jar https://repo1.maven.org/maven2/com/squareup/wire/wire-schema/3.6.0/wire-schema-3.6.0.jar
wget -O /opt/druid/extensions/druid-protobuf-extensions/kafka-schema-registry-client-6.2.12.jar https://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/6.2.12/kafka-schema-registry-client-6.2.12.jar
wget -O /opt/druid/extensions/druid-protobuf-extensions/wire-runtime-3.6.0.jar https://repo1.maven.org/maven2/com/squareup/wire/wire-runtime/3.6.0/wire-runtime-3.6.0.jar

JSchlarb avatar Mar 18 '25 09:03 JSchlarb

@obozhinov @danieldenktmit Any updates? I am having this same issue as well.

paulo-alves-flttr avatar Apr 30 '25 13:04 paulo-alves-flttr

the solution is actually in my previous comment — the issue was missing dependencies.

I listed all the required jars along with their download links. Once you place them in your druid-protobuf-extensions directory and configure .spec.ioConfig.inputFormat.valueFormat.type as protobuf, it should work as expected.

JSchlarb avatar Apr 30 '25 14:04 JSchlarb

I did as described in your comment and still doesn't work for me. I think there is something missing.

I am currently getting this error: javax.servlet.ServletException: java.lang.NoClassDefFoundError: Could not initialize class io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema

My /opt/druid/extensions/druid-protobuf-extensions/ folder looks like the following:

druid-protobuf-extensions-32.0.1.jar     kafka-protobuf-types-6.2.12.jar          kotlin-stdlib-common-1.4.10.jar          wire-runtime-3.6.0.jar
kafka-protobuf-provider-6.2.12.jar       kafka-schema-registry-client-6.2.12.jar  proto-google-common-protos-2.22.1.jar    wire-schema-3.6.0.jar

My spec looks like the following:

{
  "type": "kafka",
  "spec": {
    "ioConfig": {
      "type": "kafka",
      "consumerProperties": {
        "bootstrap.servers": "kafka:29092"
      },
      "topic": "metrics-events",
      "inputFormat": {
        "type": "kafka",
        "keyFormat": {
          "type": "regex",
          "pattern": "([\\s\\S]*)",
          "columns": [
            "line"
          ]
        },
        "valueFormat": {
          "type": "protobuf",
          "protoBytesDecoder": {
            "type": "schema_registry",
            "urls": [
              "http://schema-registry:8081"
            ],
            "capacity": 100
          }
        }
      }
    },
    "tuningConfig": {
      "type": "kafka"
    },
    "dataSchema": {
      "dataSource": "metrics-events"
    }
  }
}

paulo-alves-flttr avatar Apr 30 '25 14:04 paulo-alves-flttr

@JSchlarb any suggestions?

paulo-alves-flttr avatar Apr 30 '25 15:04 paulo-alves-flttr

I have discovered that the problem was that Druid 32.0.1 doesn't support the latest versions of kafka-protobuf dependencies. I have downgraded my producer to use kafka-protobuf-serializer 7.1.16 and kafka-schema-registry-maven-plugin 7.1.16 as well. Also updated my Druid image to add the following JAR files to /opt/druid/extensions/druid-protobuf-extensions/:

kafka-protobuf-provider-7.1.16.jar
kafka-protobuf-types-7.1.16.jar
kafka-schema-registry-client-7.1.16.jar
kotlin-stdlib-1.9.10.jar
okio-jvm-3.4.0.jar
proto-google-common-protos-2.22.1.jar
protobuf-java-3.25.5.jar
protobuf-java-util-3.25.5.jar
wire-runtime-jvm-4.9.7.jar
wire-schema-jvm-4.9.7.jar

This solution involved some trial and error and that could have been avoided by having supported dependencies versions listed in the documentation here.

paulo-alves-flttr avatar May 02 '25 10:05 paulo-alves-flttr

Check the version of your extensions/druid-avro-extensions/kafka-schema-registry-client.jar and add a kafka-protobuf-provider.jar with the exact same version to avoid compatibility issues (6.2.15 for Druid 34.0.0).

You can then use Maven to resolve the other dependencies (types, wire, okio...).

Ex for v34.0.0:

FROM maven:3-eclipse-temurin-17 AS resolver
WORKDIR /tmp/app

# If not v34.0.0 replace below with your version of kafka-schema-registry-client.jar
RUN cat > pom.xml <<'EOF'
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>dummy</groupId><artifactId>dummy</artifactId><version>1.0.0</version>
  <dependencies>
    <dependency>
      <groupId>io.confluent</groupId>
      <artifactId>kafka-protobuf-provider</artifactId>
      <version>6.2.15</version>
    </dependency>
  </dependencies>
  <repositories>
    <repository><id>confluent</id><url>https://packages.confluent.io/maven/</url></repository>
    <repository><id>central</id><url>https://repo1.maven.org/maven2/</url></repository>
  </repositories>
</project>
EOF

RUN mvn -q -Dmaven.repo.local=/tmp/.m2 dependency:copy-dependencies -DoutputDirectory=/deps -DincludeScope=runtime

FROM apache/druid:34.0.0

RUN mkdir -p /opt/druid/extensions/druid-protobuf-extensions

COPY --from=resolver /deps/*.jar /opt/druid/extensions/druid-protobuf-extensions/

charlescol avatar Sep 11 '25 19:09 charlescol