[SUPPORT] How to do Schema Evolution with Apache Flink DataStream API when doing CDC?
I have an Apache Flink application, that is consuming directly from a database using Flink CDC Connectors, however, I am not able to find any documentation on how to manage when a table schema evolves, when writing to Hudi. The only Apache Flink Sink that is showed in the documentation is the Hoodie Pipeline builder and its for rowData. Is there an Avro Sink?
How could one build this, so the Flink Application doesnt have to be stopped and is able to dnamically evolve schema in the hudi tables??
What kind of schema evolution usecases you may have, like common usecases like adding a new field should be automatically supported with new write.
Do you have an example? Would this only work for adding a new field, or could it work as well by modifying a column type?
Yes, It should ideally work if the data type is backward compatible. @danny0405 Any thoughts?
Currently, you should stop the streaming job and execute the alter table cmd with spark then restart the job.
And also modify the schema in rowdata or capture the schema from a schema registry and then use it in the sink? Wondering if there is any other writer that could be helpful here, more than the hoodie pipeline sink
@danny0405
any updates on this?
No automatic schema evolution for streaming writer now, the limitation is from the Flink engine, the Flink table API already assumes constant schema for all the records there, so for the pipeline itself, the user needs to restart it manually when the schema change is detected.
Then how is Apache Paimon being able to do it now?
Paimon does not do it, it just detect the schema when the first time it is started.
With the MySQLSyncDatabaseAction they claim the following:Currently supported schema changes includes:
Adding columns.
Altering column types. More specifically,
altering from a string type (char, varchar, text) to another string type with longer length,
altering from a binary type (binary, varbinary, blob) to another binary type with longer length,
altering from an integer type (tinyint, smallint, int, bigint) to another integer type with wider range,
altering from a floating-point type (float, double) to another floating-point type with wider range,
are supported.
So i am wondering how we would be able to do the same with Flink Hudi? Or at least the steps on how to manage if there is a schema change from the database?
Any documentation you can provide or example?
Or if using a schema registry could help?