hudi icon indicating copy to clipboard operation
hudi copied to clipboard

With autogenerated keys HoodieStreamer failing with error - ts(Part -ts) field not found in record

Open Sarfaraz-214 opened this issue 2 years ago • 4 comments

I am using HoodieStreamer with Hudi 0.14 and trying to leverage autogenerated keys. Hence I am not passing hoodie.datasource.write.recordkey.field & hoodie.datasource.write.precombine.field . I do not want to have a pre-combine key. I want to dump all the data I get as is.

Additionally, I am passing hoodie.spark.sql.insert.into.operation = insert (instead of --op insert) which claims that there is no pre-combine key with bulk_insert and insert mode. ref

With above the .hoodie directory gets created but the data write to GCS fails with error -

org.apache.hudi.exception.HoodieException: ts(Part -ts) field not found in record. Acceptable fields were :[c1, c2, c3, c4, c5]
        at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:601)

I also see in hoodie.properties file pre-combine key is getting set to ts (hoodie.table.precombine.field=ts). Seems like this is getting set due to default value of --source-ordering-field . How can we skip the pre-combine field in this case?

This is happening for both CoW & MoR tables.

Actually this is running fine via Spark-SQL, but while using HoodieStreamer I am facing the issue.

Sharing the configurations used:

hudi-table.properties

hoodie.datasource.write.partitionpath.field=job_id
hoodie.spark.sql.insert.into.operation=insert
bootstrap.servers=***
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='***' password='***';
auto.offset.reset=earliest
hoodie.deltastreamer.source.kafka.topic=<topicName>
hoodie.deltastreamer.schemaprovider.source.schema.file=gs://<fullPath>/<schemaName>.avsc
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider

spark-submit command

spark-submit \
    --class org.apache.hudi.utilities.streamer.HoodieStreamer \
    --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.14.0, \
    --properties-file /home/sarfaraz_h/spark-config.properties \
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 12G \
    --driver-cores 3 \
    --executor-memory 12G \
    --executor-cores 3 \
    --num-executors 3 \
    --conf spark.yarn.maxAppAttempts=1 \
    --conf spark.sql.shuffle.partitions=18 \
    gs://<fullPath>/jar/hudi-utilities-slim-bundle_2.12-0.14.0.jar \
    --continuous \
    --source-limit 1000000 \
    --min-sync-interval-seconds 600 \
    --table-type COPY_ON_WRITE \
    --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
    --target-base-path gs://<fullPath>/<tableName> \
    --target-table <tableName> \
    --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
    --props gs://<fullPath>/configfolder/hudi-table.properties

Spark version used is - 3.3.2

Sarfaraz-214 avatar Dec 04 '23 04:12 Sarfaraz-214

Hi, @Sarfaraz-214 - Automatically the HoodieStreamer take hoodie.datasource.write.precombine.field default value as ts is taken, you need to pass value in properties file for the property hoodie.datasource.write.precombine.field, by using that it takes precedence while selecting two records having similar primary keys

Amar1404 avatar Dec 04 '23 04:12 Amar1404

Hi @Amar1404 - I do not want to have a pre-combine key. I want to dump all the data I get as is.

Sarfaraz-214 avatar Dec 04 '23 04:12 Sarfaraz-214

@Sarfaraz-214 You are right. With autogenerated keys there should not be any requirement for precombine or ordering field. This actually works fine with datasource writer.

JIRA - https://issues.apache.org/jira/browse/HUDI-7211

ad1happy2go avatar Dec 11 '23 08:12 ad1happy2go

@codope , @ad1happy2go When is the fix going to land? In next version of 0.14 or in Hudi 1.x?

Sarfaraz-214 avatar Apr 30 '24 05:04 Sarfaraz-214

For auto record key gen, you need to set operation type to "INSERT". Can you give that a try. w/ "UPSERT" I do see the same exception as you. but with "INSERT" I don't see any exception.

nsivabalan avatar May 29 '24 22:05 nsivabalan

Hi @nsivabalan I am already using INSERT mode. Shared all the configs above.

hoodie.spark.sql.insert.into.operation=insert

Sarfaraz-214 avatar May 30 '24 07:05 Sarfaraz-214