flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

[mysql] notify DDL with GTIDs and skip all queries after DDL

Open SML0127 opened this issue 2 years ago • 0 comments

We implemented a notification DDL with GTIDs and skip DDL and after all queries after DDL

Our team use Flink CDC to perform MySql CDC. But since our team sources other team's database and table, we had a need to be notified about DDL. Because without appling DDL to our target table, kafak connect that we used occurs erros (conflict table schema between chagne event log and kafka connect).


Here's the our team use case with more detail.

  1. We source tables from other teams.
  2. And we send change event log from flink to kafka to use kafka connect (w. Debezium's JDBC Sink Connector).
  3. When DDL performed and schema in change event log changed, error occur in kafka connect. Because Kafka connect loads the table schema on startup, but the schema is different from the schema in the change event log and kafka connect has.
  4. We don't want to occur above error, so we get notified that DDL has been performed in source table and skip the DDL and any subsequent queries.
  5. When notified, we apply the same DDL to our target table, restart Kafka connect (to load the changed target table's schema), and then rerun the flink cdc app using the returned GTIDs.

We can't share a real picture of the notification, because our company recommends using in-house tools rather than Slack(🥲🥲🥲) and has some security policy..

But it looks something like the format below!

  • Notification about performed DDL
[SKIPPED DDL]
Host: database.host.com
DB & Table: db_name.table_name
GTIDs: UUID:1-100
Query: alter table table_name ...(performed DD)
  • Notification about first performed DML after DDL.
[FIRST SKIPPED DML]
Host: database.host.com
DB & Table: db_name.table_name
GTIDs: UUID:1-101
Event: Event{header=EventHeaderV4{timestamp=1706603179000, eventType=EXT_UPDATE_ROWS, serverId=101761, headerLength=19, dataLength=527, nextPosition=21284895, flags=0}, data=UpdateRowsEventData{tableId=1797, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38}, includedColumns={0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38}, rows=[    {before=[...], after=[...]}]}}

If you are interested in this feature, please feel free to check it out. Any comments or feedback would also be appreciated. Have a great day all Flink users 😄

SML0127 avatar Feb 01 '24 13:02 SML0127