hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Hudi 0.14.0 - deletion from table failing for org.apache.hudi.keygen.TimestampBasedKeyGenerator

Open ShrutiBansal309 opened this issue 1 year ago • 3 comments

Issue I am using Hudi 0.14.0 and Spark 3.4.0 on EMR cluster 6.15.0. I have a service that writes a Dataset<Row> to a table in Hudi located on S3. I am facing issues when trying to delete data from this table (reference from https://hudi.apache.org/docs/0.13.1/quick-start-guide#hard-deletes) with below error:

java.lang.ClassCastException: java.lang.Integer cannot be cast to org.apache.spark.unsafe.types.UTF8String
	at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String(rows.scala:46) ~[spark-catalyst_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String$(rows.scala:46) ~[spark-catalyst_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195) ~[spark-catalyst_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.sql.execution.vectorized.ColumnVectorUtils.populate(ColumnVectorUtils.java:76) ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:294) ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:307) ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.sql.execution.datasources.parquet.Spark32PlusHoodieParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(Spark32PlusHoodieParquetFileFormat.scala:330) ~[hudi-utilities-bundle_2.12-0.13.1-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:348) ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:412) ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:239) ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:731) ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source) ~[?:?]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) ~[?:?]
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35) ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown Source) ~[?:?]
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:959) ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at scala.collection.Iterator.isEmpty(Iterator.scala:387) ~[scala-library-2.12.15.jar:?]
	at scala.collection.Iterator.isEmpty$(Iterator.scala:387) ~[scala-library-2.12.15.jar:?]
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.isEmpty(WholeStageCodegenExec.scala:957) ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$2(HoodieSparkUtils.scala:108) ~[hudi-utilities-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853) ~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853) ~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.sql.execution.SQLConfInjectingRDD.compute(SQLConfInjectingRDD.scala:58) ~[hudi-utilities-bundle_2.12-0.13.1-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) ~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101) ~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) ~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) ~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.scheduler.Task.run(Task.scala:141) ~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) ~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541) ~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) ~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_402]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_402]
	at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_402]

According to debug logs, the above error is coming from partitioning key which is of DateType but Hudi is reading it as StringType. This is what I see in debug logs :

24/02/13 01:44:49 DEBUG Spark32PlusHoodieParquetFileFormat: Appending StructType(StructField(yyyymmdate_prt,StringType,true)) [19358]

To Reproduce Use below code to reproduce:

sudo spark-shell --jars s3://dmoncloud-voltst-emr-data/testing/patchedbuild/hudi-utilities-bundle_2.12-0.13.1-amzn-0.jar --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"

//------
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.HiveSyncConfig
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.spark.sql.types._

// -------

val s3RootLocation = "s3://dmoncloud-voltst-emr-data/sparkhudi"
val tableName = "testhudi11"
val tablePath = s3RootLocation + "/" + tableName

val hudiOptions = Map[String,String](
  "hoodie.table.name" -> tableName,
  "hoodie.datasource.write.table.name" -> tableName,
  "hoodie.insert.shuffle.parallelism" -> "2",
  "hoodie.upsert.shuffle.parallelism" -> "2",
  "hoodie.bulkinsert.shuffle.parallelism" -> "2",
  "hoodie.datasource.write.partitionpath.field" -> "creation_date",
  "hoodie.datasource.write.table.type" -> "COPY_ON_WRITE",
  "hoodie.datasource.write.recordkey.field" -> "id",
  "hoodie.datasource.write.precombine.field" -> "id",
  "hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled" -> "true",
  "hoodie.datasource.write.keygenerator.class" -> "org.apache.hudi.keygen.TimestampBasedKeyGenerator",
  "hoodie.deltastreamer.keygen.timebased.timestamp.type" -> "SCALAR",
  "hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit" -> "DAYS",
  "hoodie.deltastreamer.keygen.timebased.input.dateformat" -> "yyyy-MM-dd",
  "hoodie.deltastreamer.keygen.timebased.output.dateformat" -> "yyyy-MM-dd",
  "hoodie.deltastreamer.keygen.timebased.timezone" -> "GMT+8:00",
  "hoodie.datasource.hive_sync.enable" -> "true",
  "hoodie.datasource.write.hive_style_partitioning" -> "true",
  "hoodie.datasource.hive_sync.partition_fields" -> "creation_date",
  "hoodie.datasource.hive_sync.database" -> "default",
  "hoodie.datasource.hive_sync.table" -> tableName,
  "hoodie.datasource.hive_sync.support_timestamp" -> "true",
  "hoodie.datasource.hive_sync.mode" -> "hms",
  "hoodie.metadata.enable" -> "false"
)

val customSchema = StructType(Array(
  StructField("id", IntegerType, true),
  StructField("creation_date", DateType, true)
)

//-------

val df=spark.read.format("csv").option("header","true").schema(customSchema).load(s3RootLocation + "/input")

(df.write
    .format("hudi")
    .options(hudiOptions)
    .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"upsert")
    .mode(SaveMode.Append)
    .save(tablePath))

//-------DELETE

val deleteDF=spark.sql("select * from " + tableName)

(deleteDF.write
    .format("hudi")
    .options(hudiOptions)
    .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"delete")
    .mode(SaveMode.Append)
    .save(tablePath))

To Resolve Tried https://github.com/apache/hudi/pull/9273 but it did not work.

CC: @ad1happy2go

ShrutiBansal309 avatar Mar 05 '24 18:03 ShrutiBansal309

@ShrutiBansal309 Able to reproduce this issue. Issue comes even when we just try to read this table.

JIRA - https://issues.apache.org/jira/browse/HUDI-7485 Reproducible Code

columns = ["ts","uuid","rider","driver","fare","dt"]
data =[(1695159649087,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"2012-01-01"),
       (1695091554788,"e96c4396-3fad-413a-a942-4cb36106d721","rider-B","driver-L",27.70 ,"2012-01-01"),
       (1695046462179,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-C","driver-M",33.90 ,"2012-01-01"),
       (1695516137016,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-C","driver-N",34.15,"2012-01-01")]


inserts = spark.createDataFrame(data).toDF(*columns)

hudi_options = {
    'hoodie.table.name': tableName,
    'hoodie.datasource.write.recordkey.field' : 'uuid',
    'hoodie.datasource.write.precombine.field' : 'ts',
    'hoodie.datasource.write.partitionpath.field': 'dt',
    'hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled' : 'true',
    'hoodie.datasource.write.keygenerator.class' : 'org.apache.hudi.keygen.TimestampBasedKeyGenerator',
    'hoodie.keygen.timebased.timestamp.type' : 'SCALAR',
    'hoodie.keygen.timebased.timestamp.scalar.time.unit' : 'DAYS',
    'hoodie.keygen.timebased.input.dateformat' : 'yyyy-MM-dd',
    'hoodie.keygen.timebased.output.dateformat' : 'yyyy-MM-dd',
    'hoodie.keygen.timebased.timezone' : 'GMT+8:00',
    'hoodie.datasource.write.hive_style_partitioning' : 'true',

}
# Insert data
inserts.withColumn("dt", expr("CAST(dt as date)")).write.format("hudi"). \
    options(**hudi_options). \
    mode("overwrite"). \
    save(basePath)

deleteDF=spark.read.format("hudi").load(basePath)
deleteDF.show()

ad1happy2go avatar Mar 06 '24 10:03 ad1happy2go

I think your timestamp.type should be "DATE_STRING".

xicm avatar Mar 28 '24 10:03 xicm

I think your timestamp.type should be "DATE_STRING".

Tried setting "hoodie.deltastreamer.keygen.timebased.timestamp.type" -> "DATE_STRING"

but getting below exception: Caused by: java.lang.RuntimeException: hoodie.keygen.timebased.timestamp.scalar.time.unit is not specified but scalar it supplied as time value at org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.convertLongTimeToMillis(TimestampBasedAvroKeyGenerator.java:216) at org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.getPartitionPath(TimestampBasedAvroKeyGenerator.java:187) at org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.getPartitionPath(TimestampBasedAvroKeyGenerator.java:118) ... 18 more

After encountering this exception, removed "hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit" -> "DAYS" but still same exception "Caused by: java.lang.RuntimeException: hoodie.keygen.timebased.timestamp.scalar.time.unit is not specified but scalar it supplied as time value" was coming.

Priyanka128 avatar May 15 '24 07:05 Priyanka128