flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-34440][formats][protobuf-confluent] Protobuf confluent dynamic format

Open dmariassy opened this issue 1 year ago • 16 comments

What is the purpose of the change

  • Add support for deserializing protobuf messages using the Confluent wire format and whose schemas can be fetched from Confluent Schema Registry
  • Add support for serializing Flink records using the Confluent protobuf wire format

Out of scope for this PR

  • Support for using the format with user-supplied schemas and classes will be tackled in a separate PR

Brief change log

My intention was to:

  • Maintain parity with the existing flink-protobuf format's semantics in terms of the Flink -> Protobuf / Protobuf -> Flink conversions
  • Maximize code reuse between flink-protobuf-confluent and flink-protobuf formats

Deserializer

  • Fetch the message's protobuf descriptor from the Confluent schema registry
  • Generate a java class from the descriptor at runtime
  • Deserialize byte[]s to the generated protobuf.Message type using a io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
  • Delegate the work of converting between a protobuf.Message and a RowData object to the existing flink-protobuf format

Serializer

  • Convert the user's RowType to a protobuf descriptor
  • Generate a java class from the descriptor at runtime
  • Delegate the RowData -> AbstractMessage conversion to the existing flink-protobuf format
  • Serialize the AbstractMessage object using a io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer

Verifying this change

Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.

This change added tests and can be verified as follows:

  • Added comprehensive test coverage
  • Deployed to Shopify Flink clusters

Performance

We saw a 2-4X performance boost when using this implementation over our previous in-house serdes that used DynamicMessage types for the conversion rather than generated Java objects. Old serde(2)

New serde

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): yes (com.github.os72:protoc-jar)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: yes
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs to follow

dmariassy avatar Jul 23 '24 19:07 dmariassy

CI report:

  • d8f85dec485d741d7d69f3a5031d0d22c98e532d Azure: FAILURE
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Jul 23 '24 19:07 flinkbot

@rmetzger @anupamaggarwal do you have time to review? Since you were involved in the Protobuf Confluent Format discussions earlier.

klam-shop avatar Jul 29 '24 20:07 klam-shop

@libenchao @MartijnVisser : as reviewers of the original flink-protobuf format, I was wondering if you'd be open to reviewing this PR 🙏🏻

dmariassy avatar Jul 30 '24 19:07 dmariassy

@rmetzger @anupamaggarwal do you have time to review? Since you were involved in the Protobuf Confluent Format discussions earlier.

Hi @klam-shop apologies for missing your message earlier, I won't be able to look into this in the coming weeks :(

anupamaggarwal avatar Aug 08 '24 12:08 anupamaggarwal

@rmetzger , do you think you could provide feedback?

dmariassy avatar Aug 23 '24 14:08 dmariassy

@dmariassy Can you rebase? Without a passing CI, it will be impossible to merge it. It will also be good to follow the naming conventions around commits, see the guide at https://flink.apache.org/how-to-contribute/code-style-and-quality-pull-requests/ for that.

MartijnVisser avatar Oct 02 '24 06:10 MartijnVisser

Hey @MartijnVisser , I rebased the branch and CI is now passing. The commit message titles have also been updated. I'd be really grateful for a review 🙂

dmariassy avatar Oct 02 '24 17:10 dmariassy

Great, thanks a lot! I will try to find a reviewer.

rmetzger avatar Oct 07 '24 18:10 rmetzger

Looks like I failed so far, sorry.

rmetzger avatar Oct 15 '24 14:10 rmetzger

Looks like I failed so far, sorry.

Thanks @rmetzger ! We'd love to get this reviewed and merged 🙂

dmariassy avatar Oct 16 '24 12:10 dmariassy

Still no luck @rmetzger ?

dmariassy avatar Nov 04 '24 18:11 dmariassy

@dmariassy Can you do another rebase? The CI run for this PR has failed (not related to this change), but that means that not all tests have run yet.

MartijnVisser avatar Feb 26 '25 16:02 MartijnVisser

I would actually only rebase once we have a committer who's willing to review the PR. Sadly, I currently do not have capacity to work on this. I'm really deeply sorry.

rmetzger avatar Feb 26 '25 18:02 rmetzger

Is there any progress on this, very keen to use this if it can make it to a release soon.

michaelandrepearce avatar Apr 16 '25 14:04 michaelandrepearce

Is there any progress on this, very keen to use this if it can make it to a release soon.

I'm sorry to say but I have basically given up on getting this in.

dmariassy avatar Apr 16 '25 14:04 dmariassy

This is such a bummer :-( There is someone in the community doing all the heavy lifting, the changes even look reasonably in size of changes and project is not able to verify and merge this in :-(

This would be such a leap forward for people using protobuf with kafka and schema registries

pfeigl avatar May 14 '25 09:05 pfeigl