hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] How to do Schema Evolution with Apache Flink DataStream API when doing CDC?

Open FranMorilloAWS opened this issue 2 years ago • 5 comments

I have an Apache Flink application, that is consuming directly from a database using Flink CDC Connectors, however, I am not able to find any documentation on how to manage when a table schema evolves, when writing to Hudi. The only Apache Flink Sink that is showed in the documentation is the Hoodie Pipeline builder and its for rowData. Is there an Avro Sink?

How could one build this, so the Flink Application doesnt have to be stopped and is able to dnamically evolve schema in the hudi tables??

FranMorilloAWS avatar Dec 18 '23 08:12 FranMorilloAWS

What kind of schema evolution usecases you may have, like common usecases like adding a new field should be automatically supported with new write.

ad1happy2go avatar Dec 18 '23 08:12 ad1happy2go

Do you have an example? Would this only work for adding a new field, or could it work as well by modifying a column type?

FranMorilloAWS avatar Dec 18 '23 09:12 FranMorilloAWS

Yes, It should ideally work if the data type is backward compatible. @danny0405 Any thoughts?

ad1happy2go avatar Dec 18 '23 16:12 ad1happy2go

Currently, you should stop the streaming job and execute the alter table cmd with spark then restart the job.

danny0405 avatar Dec 19 '23 03:12 danny0405

And also modify the schema in rowdata or capture the schema from a schema registry and then use it in the sink? Wondering if there is any other writer that could be helpful here, more than the hoodie pipeline sink

FranMorilloAWS avatar Dec 19 '23 08:12 FranMorilloAWS

@danny0405

ad1happy2go avatar Jan 31 '24 14:01 ad1happy2go

any updates on this?

FranMorilloAWS avatar Feb 28 '24 16:02 FranMorilloAWS

No automatic schema evolution for streaming writer now, the limitation is from the Flink engine, the Flink table API already assumes constant schema for all the records there, so for the pipeline itself, the user needs to restart it manually when the schema change is detected.

danny0405 avatar Feb 29 '24 02:02 danny0405

Then how is Apache Paimon being able to do it now?

FranMorilloAWS avatar Feb 29 '24 09:02 FranMorilloAWS

Paimon does not do it, it just detect the schema when the first time it is started.

danny0405 avatar Mar 01 '24 01:03 danny0405

With the MySQLSyncDatabaseAction they claim the following:Currently supported schema changes includes:

Adding columns.

Altering column types. More specifically,
    altering from a string type (char, varchar, text) to another string type with longer length,
    altering from a binary type (binary, varbinary, blob) to another binary type with longer length,
    altering from an integer type (tinyint, smallint, int, bigint) to another integer type with wider range,
    altering from a floating-point type (float, double) to another floating-point type with wider range,

are supported.

So i am wondering how we would be able to do the same with Flink Hudi? Or at least the steps on how to manage if there is a schema change from the database?

Any documentation you can provide or example?

FranMorilloAWS avatar Mar 07 '24 15:03 FranMorilloAWS

Or if using a schema registry could help?

FranMorilloAWS avatar Mar 07 '24 15:03 FranMorilloAWS