Kafka Connect: Add mechanisms for routing records by topic name
Add a new routing mechanism for records that uses Kafka topic name for routing and update configuration for how to route records.
The changes move the routing logic to a separate class with different implementations that can be selected via the configuration. It preserves backwards compatibility of current behavior, while adding a way to route records to tables based on the Kafka topic and also allows custom implementations of how records are routed.
Closes #11163
I wonder if it makes sense to do the routing in the Iceberg kafka connect sink. Is it not the purpose of integration framework (Camel, Nifi, ...) to do the routing and implement EIPs ?
@bryanck what do you think ?
@mun1r0b0t just a note: your PR includes a lot of not necessary change (especially in the md file). Please avoid unnecessary reformating changes as it's harder to review.
Sorry about that. It was my over zealous auto-formatter. Reverted any changes that are not relevant to the PR.
I feel like we can take a much simpler approach for this. For more complex routing needs, an SMT makes more sense to me.
I am certainly open to simplifying this. However, as user and developer of other connectors, I strongly feel that the connector should support consuming from multiple topics and writing to multiple destination tables without the use of other services.
Kafka Connect API supports consuming from multiple topics (using a list of topic names or a regex), but this is just not possible to do with the current Iceberg connector. So, it is essentially preventing me from using Kafka Connect to its full potential and forcing me to create multiple connectors where one connector would work. Other Kafka connectors that I've worked with all support topic to destination mapping out of the box and do not require any additional services. This is the first connector I've come across that does not work with multiple topics.
The complexity is largely due to the existing setup of using a field from the data rather than some configuration for routing. With the current routing approach, we have to add the destination table name as part of the data and write data with entirely different schemas and use cases all to one topic. I did not want to break existing behavior and hence had to work it in the PR, but in my humble opinion, the existing approach is contradictory to how Kafka should be used.
I'd much rather keep only the topic based routing and do away with the field based routing, if it comes down to supporting only one of these options. It aligns much better with how Kafka and Kafka Connect is used and how other connectors behave.
How do you feel about making a breaking change like this with the configuration - keep the iceberg.table.table_name.topics and iceberg.table.table_name.topic-regex, remove the current routing set up and rely on other services for more complex routing patterns. I would also prefer to keep the option to plug in a custom routing solution for those who want to do their own routing without additional services, but I can drop that as well.
I think that "basic routing" like consuming from multiple topics and "map" topic -> table could make sense. We should avoid complex routing in the kafka-connect component as we will overlap with SMT.
I think that we can keep the iceberg.table.table_name.topics and iceberg.table.table_name.topic_regex properties and have a pluggable layer for actual routing.
I made the changes as we discussed on Slack. Instead of using new keys for the topic based configuration, I am overloading the existing route-regex key to match against the topic.
I don't feel we need a new RecordRouter abstraction to implement this feature, which introduces complexity. SinkWriter.extractRouteValue() could be enhanced to extract the source topic name, if configured, in addition to using a field value.
With the abstraction, it doesn't need to check the configuration for each record. I want to use the connector with ~30 tables and not having to parse all the configuration for each record will help with the performance. It also allows saving routing specific state within the router instead of having it all in the SinkWriter, which I think is easier to manage rather than doing it all in one method.
I also like that it allows users to plugin their own routing solution easily. I can think of many other ways someone might determine the destination table, and it is good functionality to have in the connector. I intend to use the plugin feature myself as well.
So, overall, I think the abstraction is worth the performance and usability benefits, and it is not that much more code. Could you explain your concerns around complexity? Perhaps I can address them in other ways.
I was thinking something like this
But that's very limiting in terms of what it can do. The table name has to be the topic name, which is extremely restrictive, and it uses magic string to determine topic vs field. It also does not allow reading from multiple topics into the same table or one topic into multiple tables.
For my use case, I cannot use the topic name as the table name since the topic names have . in them, which does not work with AWS Glue. So I absolutely need a way to specify the source topic per table. I'm sure there are plenty of other such cases where people would want to explicitly specify the mapping from topic to table and simply cannot use the topic name as the table name.
The topic name can be mapped to a table via static routing, isn't that what your TopicRecordRouter is doing?
Yeah, but the code in your link does not do that. To do that in your version, it'll have to loop through all the table configurations for each record. The abstraction around the routing optimizes this by doing it once per topic. It also makes static routing faster using the same optimization.
I still don't understand the issue with the RecordRouter abstraction. To me, it is a good thing both performance and usability wise.
I feel your solution is reasonable, though I'm trying to reconcile this with the need for a more flexible, pluggable way to route records. For example, one case we had was to support dynamic routing based on the topic name but with flexible table names (as you pointed out, that's limited), so for that we used an SMT to populate the route field. I'm interested in other opinions on this.
IMO making it a plugin gives the user the flexibility to choose. User can add their own routing plugin to the runtime or use SMT or some other means. I think the connector shouldn't preclude how the user routes records. That is best left to the user.
But this is just one user's opinion. Definitely open to hearing from others.
I think that "basic routing" like consuming from multiple topics and "map" topic -> table could make sense.
That's something I've suggested here https://github.com/apache/iceberg/pull/10422
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.
@bryanck Any further thoughts on this?
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.
Still waiting on response from maintainer
hi @jbonofre @bryanck , any update on this PR? as for our usecase we want to route all tables from a source mysql into s3, (we are using debezium as source connector, so need to map each topic as a table in iceberg).
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.
Somehow neither of suggestions gets merged or left with feedback
Yeah, not sure why. I'm leaving it open in hope that they'll reconsider the feature.
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.
Still here, still waiting.
BTW, faced an issue with current type of routing. We are using datahub as metastore and there is no way to correctly map table to topic to produce linage.
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.
Keep open please.
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.