iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

Kafka Connect: Add mechanisms for routing records by topic name

Open mun1r0b0t opened this issue 1 year ago • 33 comments

Add a new routing mechanism for records that uses Kafka topic name for routing and update configuration for how to route records.

The changes move the routing logic to a separate class with different implementations that can be selected via the configuration. It preserves backwards compatibility of current behavior, while adding a way to route records to tables based on the Kafka topic and also allows custom implementations of how records are routed.

Closes #11163

mun1r0b0t avatar Nov 22 '24 00:11 mun1r0b0t

I wonder if it makes sense to do the routing in the Iceberg kafka connect sink. Is it not the purpose of integration framework (Camel, Nifi, ...) to do the routing and implement EIPs ?

@bryanck what do you think ?

@mun1r0b0t just a note: your PR includes a lot of not necessary change (especially in the md file). Please avoid unnecessary reformating changes as it's harder to review.

jbonofre avatar Nov 22 '24 08:11 jbonofre

Sorry about that. It was my over zealous auto-formatter. Reverted any changes that are not relevant to the PR.

mun1r0b0t avatar Nov 22 '24 18:11 mun1r0b0t

I feel like we can take a much simpler approach for this. For more complex routing needs, an SMT makes more sense to me.

bryanck avatar Nov 22 '24 18:11 bryanck

I am certainly open to simplifying this. However, as user and developer of other connectors, I strongly feel that the connector should support consuming from multiple topics and writing to multiple destination tables without the use of other services.

Kafka Connect API supports consuming from multiple topics (using a list of topic names or a regex), but this is just not possible to do with the current Iceberg connector. So, it is essentially preventing me from using Kafka Connect to its full potential and forcing me to create multiple connectors where one connector would work. Other Kafka connectors that I've worked with all support topic to destination mapping out of the box and do not require any additional services. This is the first connector I've come across that does not work with multiple topics.

The complexity is largely due to the existing setup of using a field from the data rather than some configuration for routing. With the current routing approach, we have to add the destination table name as part of the data and write data with entirely different schemas and use cases all to one topic. I did not want to break existing behavior and hence had to work it in the PR, but in my humble opinion, the existing approach is contradictory to how Kafka should be used.

I'd much rather keep only the topic based routing and do away with the field based routing, if it comes down to supporting only one of these options. It aligns much better with how Kafka and Kafka Connect is used and how other connectors behave.

How do you feel about making a breaking change like this with the configuration - keep the iceberg.table.table_name.topics and iceberg.table.table_name.topic-regex, remove the current routing set up and rely on other services for more complex routing patterns. I would also prefer to keep the option to plug in a custom routing solution for those who want to do their own routing without additional services, but I can drop that as well.

mun1r0b0t avatar Nov 25 '24 20:11 mun1r0b0t

I think that "basic routing" like consuming from multiple topics and "map" topic -> table could make sense. We should avoid complex routing in the kafka-connect component as we will overlap with SMT. I think that we can keep the iceberg.table.table_name.topics and iceberg.table.table_name.topic_regex properties and have a pluggable layer for actual routing.

jbonofre avatar Nov 26 '24 07:11 jbonofre

I made the changes as we discussed on Slack. Instead of using new keys for the topic based configuration, I am overloading the existing route-regex key to match against the topic.

mun1r0b0t avatar Dec 03 '24 01:12 mun1r0b0t

I don't feel we need a new RecordRouter abstraction to implement this feature, which introduces complexity. SinkWriter.extractRouteValue() could be enhanced to extract the source topic name, if configured, in addition to using a field value.

bryanck avatar Dec 03 '24 02:12 bryanck

With the abstraction, it doesn't need to check the configuration for each record. I want to use the connector with ~30 tables and not having to parse all the configuration for each record will help with the performance. It also allows saving routing specific state within the router instead of having it all in the SinkWriter, which I think is easier to manage rather than doing it all in one method.

I also like that it allows users to plugin their own routing solution easily. I can think of many other ways someone might determine the destination table, and it is good functionality to have in the connector. I intend to use the plugin feature myself as well.

So, overall, I think the abstraction is worth the performance and usability benefits, and it is not that much more code. Could you explain your concerns around complexity? Perhaps I can address them in other ways.

mun1r0b0t avatar Dec 03 '24 19:12 mun1r0b0t

I was thinking something like this

bryanck avatar Dec 03 '24 20:12 bryanck

But that's very limiting in terms of what it can do. The table name has to be the topic name, which is extremely restrictive, and it uses magic string to determine topic vs field. It also does not allow reading from multiple topics into the same table or one topic into multiple tables.

For my use case, I cannot use the topic name as the table name since the topic names have . in them, which does not work with AWS Glue. So I absolutely need a way to specify the source topic per table. I'm sure there are plenty of other such cases where people would want to explicitly specify the mapping from topic to table and simply cannot use the topic name as the table name.

mun1r0b0t avatar Dec 03 '24 20:12 mun1r0b0t

The topic name can be mapped to a table via static routing, isn't that what your TopicRecordRouter is doing?

bryanck avatar Dec 03 '24 21:12 bryanck

Yeah, but the code in your link does not do that. To do that in your version, it'll have to loop through all the table configurations for each record. The abstraction around the routing optimizes this by doing it once per topic. It also makes static routing faster using the same optimization.

I still don't understand the issue with the RecordRouter abstraction. To me, it is a good thing both performance and usability wise.

mun1r0b0t avatar Dec 03 '24 21:12 mun1r0b0t

I feel your solution is reasonable, though I'm trying to reconcile this with the need for a more flexible, pluggable way to route records. For example, one case we had was to support dynamic routing based on the topic name but with flexible table names (as you pointed out, that's limited), so for that we used an SMT to populate the route field. I'm interested in other opinions on this.

bryanck avatar Dec 03 '24 23:12 bryanck

IMO making it a plugin gives the user the flexibility to choose. User can add their own routing plugin to the runtime or use SMT or some other means. I think the connector shouldn't preclude how the user routes records. That is best left to the user.

But this is just one user's opinion. Definitely open to hearing from others.

mun1r0b0t avatar Dec 04 '24 01:12 mun1r0b0t

I think that "basic routing" like consuming from multiple topics and "map" topic -> table could make sense.

That's something I've suggested here https://github.com/apache/iceberg/pull/10422

igorvoltaic avatar Dec 09 '24 04:12 igorvoltaic

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

github-actions[bot] avatar Jan 09 '25 00:01 github-actions[bot]

@bryanck Any further thoughts on this?

mun1r0b0t avatar Jan 09 '25 00:01 mun1r0b0t

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

github-actions[bot] avatar Feb 09 '25 00:02 github-actions[bot]

Still waiting on response from maintainer

mun1r0b0t avatar Feb 09 '25 01:02 mun1r0b0t

hi @jbonofre @bryanck , any update on this PR? as for our usecase we want to route all tables from a source mysql into s3, (we are using debezium as source connector, so need to map each topic as a table in iceberg).

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

github-actions[bot] avatar Apr 29 '25 00:04 github-actions[bot]

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

github-actions[bot] avatar May 31 '25 00:05 github-actions[bot]

Somehow neither of suggestions gets merged or left with feedback

igorvoltaic avatar May 31 '25 05:05 igorvoltaic

Yeah, not sure why. I'm leaving it open in hope that they'll reconsider the feature.

mun1r0b0t avatar Jun 02 '25 17:06 mun1r0b0t

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

github-actions[bot] avatar Jul 03 '25 00:07 github-actions[bot]

Still here, still waiting.

mun1r0b0t avatar Jul 03 '25 15:07 mun1r0b0t

BTW, faced an issue with current type of routing. We are using datahub as metastore and there is no way to correctly map table to topic to produce linage.

igorvoltaic avatar Jul 03 '25 16:07 igorvoltaic

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

github-actions[bot] avatar Aug 03 '25 00:08 github-actions[bot]

Keep open please.

mun1r0b0t avatar Aug 05 '25 01:08 mun1r0b0t

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

github-actions[bot] avatar Sep 06 '25 00:09 github-actions[bot]