With autogenerated keys HoodieStreamer failing with error - ts(Part -ts) field not found in record
I am using HoodieStreamer with Hudi 0.14 and trying to leverage autogenerated keys. Hence I am not passing hoodie.datasource.write.recordkey.field & hoodie.datasource.write.precombine.field . I do not want to have a pre-combine key. I want to dump all the data I get as is.
Additionally, I am passing hoodie.spark.sql.insert.into.operation = insert (instead of --op insert) which claims that there is no pre-combine key with bulk_insert and insert mode. ref
With above the .hoodie directory gets created but the data write to GCS fails with error -
org.apache.hudi.exception.HoodieException: ts(Part -ts) field not found in record. Acceptable fields were :[c1, c2, c3, c4, c5]
at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:601)
I also see in hoodie.properties file pre-combine key is getting set to ts (hoodie.table.precombine.field=ts). Seems like this is getting set due to default value of --source-ordering-field . How can we skip the pre-combine field in this case?
This is happening for both CoW & MoR tables.
Actually this is running fine via Spark-SQL, but while using HoodieStreamer I am facing the issue.
Sharing the configurations used:
hudi-table.properties
hoodie.datasource.write.partitionpath.field=job_id
hoodie.spark.sql.insert.into.operation=insert
bootstrap.servers=***
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='***' password='***';
auto.offset.reset=earliest
hoodie.deltastreamer.source.kafka.topic=<topicName>
hoodie.deltastreamer.schemaprovider.source.schema.file=gs://<fullPath>/<schemaName>.avsc
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider
spark-submit command
spark-submit \
--class org.apache.hudi.utilities.streamer.HoodieStreamer \
--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.14.0, \
--properties-file /home/sarfaraz_h/spark-config.properties \
--master yarn \
--deploy-mode cluster \
--driver-memory 12G \
--driver-cores 3 \
--executor-memory 12G \
--executor-cores 3 \
--num-executors 3 \
--conf spark.yarn.maxAppAttempts=1 \
--conf spark.sql.shuffle.partitions=18 \
gs://<fullPath>/jar/hudi-utilities-slim-bundle_2.12-0.14.0.jar \
--continuous \
--source-limit 1000000 \
--min-sync-interval-seconds 600 \
--table-type COPY_ON_WRITE \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--target-base-path gs://<fullPath>/<tableName> \
--target-table <tableName> \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--props gs://<fullPath>/configfolder/hudi-table.properties
Spark version used is - 3.3.2
Hi, @Sarfaraz-214 - Automatically the HoodieStreamer take hoodie.datasource.write.precombine.field default value as ts is taken, you need to pass value in properties file for the property hoodie.datasource.write.precombine.field, by using that it takes precedence while selecting two records having similar primary keys
Hi @Amar1404 - I do not want to have a pre-combine key. I want to dump all the data I get as is.
@Sarfaraz-214 You are right. With autogenerated keys there should not be any requirement for precombine or ordering field. This actually works fine with datasource writer.
JIRA - https://issues.apache.org/jira/browse/HUDI-7211
@codope , @ad1happy2go When is the fix going to land? In next version of 0.14 or in Hudi 1.x?
For auto record key gen, you need to set operation type to "INSERT". Can you give that a try. w/ "UPSERT" I do see the same exception as you. but with "INSERT" I don't see any exception.
Hi @nsivabalan I am already using INSERT mode. Shared all the configs above.
hoodie.spark.sql.insert.into.operation=insert