[FLINK-34440][formats][protobuf-confluent] Protobuf confluent dynamic format
What is the purpose of the change
- Add support for deserializing protobuf messages using the Confluent wire format and whose schemas can be fetched from Confluent Schema Registry
- Add support for serializing Flink records using the Confluent protobuf wire format
Out of scope for this PR
- Support for using the format with user-supplied schemas and classes will be tackled in a separate PR
Brief change log
My intention was to:
- Maintain parity with the existing flink-protobuf format's semantics in terms of the Flink -> Protobuf / Protobuf -> Flink conversions
- Maximize code reuse between flink-protobuf-confluent and flink-protobuf formats
Deserializer
- Fetch the message's protobuf descriptor from the Confluent schema registry
- Generate a java class from the descriptor at runtime
- Deserialize
byte[]s to the generatedprotobuf.Messagetype using aio.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer - Delegate the work of converting between a
protobuf.Messageand aRowDataobject to the existing flink-protobuf format
Serializer
- Convert the user's
RowTypeto a protobuf descriptor - Generate a java class from the descriptor at runtime
- Delegate the
RowData->AbstractMessageconversion to the existing flink-protobuf format - Serialize the
AbstractMessageobject using aio.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
Verifying this change
Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.
This change added tests and can be verified as follows:
- Added comprehensive test coverage
- Deployed to Shopify Flink clusters
Performance
We saw a 2-4X performance boost when using this implementation over our previous in-house serdes that used DynamicMessage types for the conversion rather than generated Java objects.
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): yes (com.github.os72:protoc-jar)
- The public API, i.e., is any changed class annotated with
@Public(Evolving): no - The serializers: yes
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
- The S3 file system connector: no
Documentation
- Does this pull request introduce a new feature? yes
- If yes, how is the feature documented? docs to follow
CI report:
- d8f85dec485d741d7d69f3a5031d0d22c98e532d Azure: FAILURE
Bot commands
The @flinkbot bot supports the following commands:-
@flinkbot run azurere-run the last Azure build
@rmetzger @anupamaggarwal do you have time to review? Since you were involved in the Protobuf Confluent Format discussions earlier.
@libenchao @MartijnVisser : as reviewers of the original flink-protobuf format, I was wondering if you'd be open to reviewing this PR 🙏🏻
@rmetzger @anupamaggarwal do you have time to review? Since you were involved in the Protobuf Confluent Format discussions earlier.
Hi @klam-shop apologies for missing your message earlier, I won't be able to look into this in the coming weeks :(
@rmetzger , do you think you could provide feedback?
@dmariassy Can you rebase? Without a passing CI, it will be impossible to merge it. It will also be good to follow the naming conventions around commits, see the guide at https://flink.apache.org/how-to-contribute/code-style-and-quality-pull-requests/ for that.
Hey @MartijnVisser , I rebased the branch and CI is now passing. The commit message titles have also been updated. I'd be really grateful for a review 🙂
Great, thanks a lot! I will try to find a reviewer.
Looks like I failed so far, sorry.
Looks like I failed so far, sorry.
Thanks @rmetzger ! We'd love to get this reviewed and merged 🙂
Still no luck @rmetzger ?
@dmariassy Can you do another rebase? The CI run for this PR has failed (not related to this change), but that means that not all tests have run yet.
I would actually only rebase once we have a committer who's willing to review the PR. Sadly, I currently do not have capacity to work on this. I'm really deeply sorry.
Is there any progress on this, very keen to use this if it can make it to a release soon.
Is there any progress on this, very keen to use this if it can make it to a release soon.
I'm sorry to say but I have basically given up on getting this in.
This is such a bummer :-( There is someone in the community doing all the heavy lifting, the changes even look reasonably in size of changes and project is not able to verify and merge this in :-(
This would be such a leap forward for people using protobuf with kafka and schema registries