hudi icon indicating copy to clipboard operation
hudi copied to clipboard

Hudi Write Performance

Open p-powell opened this issue 3 years ago • 5 comments

Concerned about performance. How long should the following mocked-up sample take to write to s3? There are 1,369,765 records and 308 columns. It is taking ~10.5min running in docker container on an t2.xlarge ec2 instance using the datamechanics/spark:3.2.0-hadoop-3.3.1-java-11-scala-2.12-python-3.8-latest image. Any suggestions how to increate performance. The sample file generated below is just to illustrate our issue.

Steps to reproduce the behavior:

  1. Start docker container docker run -it datamechanics/spark:3.2.0-hadoop-3.3.1-java-11-scala-2.12-python-3.8-latest /bin/bash

  2. Download sample file cd /tmp wget https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2021-01.csv

  3. Start spark shell /opt/spark/bin/spark-shell --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.8.0,org.apache.spark:spark-avro_2.12:3.0.1,org.apache.hadoop:hadoop-aws:2.7.3 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --driver-memory 16g

  4. run the following code(replace {bucket_} with a valid bucket):

mport org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.row_number

var df = spark.read.option("header","true").csv(file:///tmp/yellow_tripdata_2021-01.csv);

var a = 0; var b = 0;

// Just constructing a table for testing. var cols = df.columns; var num_cols = cols.length;

// duplicating colums to make a larger dataset for( a <- 1 to 16; b <- 0 to num_cols-1){

     var col_name = cols(b);
     var new_col_name = col_name + "_" + a;
     df = df.withColumn(new_col_name, col(col_name));
};

// going to written to one partition val w = Window.partitionBy(lit('A')).orderBy(lit('A')) var df_id = df.withColumn("_id", row_number().over(w)).withColumn("partpath", lit('N'))

val tableName = "hudi_test" val basePath = "s3a://{bucket_}/hudi_test_table"

val starttime = System.nanoTime

df_id.write.format("hudi"). option(PRECOMBINE_FIELD_OPT_KEY, "_id"). option(RECORDKEY_FIELD_OPT_KEY, "_id"). option(PARTITIONPATH_FIELD_OPT_KEY, "partpath"). option("hoodie.datasource.write.operation","upsert"). option("hoodie.datasource.write.table.type","COPY_ON_WRITE"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)

val duration = (System.nanoTime - starttime) / 1e9d print("write time:" + duration )

Expected behavior

Not sure if this is the expected performance that we can expect with this instance size. Or if there are suggestions on how to increase the performance.

Environment Description

  • Hudi version : 0.8.0 / 0.9.0

  • Spark version : 3.0.1

  • Hive version :

  • Hadoop version : 3.3.1

  • Storage (HDFS/S3/GCS..) :

  • Running on Docker? (yes/no) : yes

p-powell avatar Apr 18 '22 20:04 p-powell

@p-powell You can check by writing parquet directly.

df_id.write.format("parquet").mode(Overwrite).save(parquetBasePath)

I tried this and the time taken was comparable. For Hudi, it was 605s and for patquet it was 568s.

codope avatar Apr 19 '22 15:04 codope

@codope Thanks for the response. Strange I get 333s when I call the parquet writer directly using the exact snippet provided. Should hudi write and parquet write be similar in performance? Really new to hudi and not sure if there are some optimization I can make.

p-powell avatar Apr 19 '22 16:04 p-powell

Ok. I tried with latest Hudi master. Can you give build from the latest master? I'll try Hudi 0.9. I think latest EMR already has it.

codope avatar Apr 20 '22 03:04 codope

@codope I built from master and takes 492 secs. Still seems slow.

We have an internal file(2.6m rows ~300col) takes 16min to load into a new table(one partition). If we dump the same df to parquet(gzip) using pandas it takes 2m 4secs.

should df_id.write.format("parquet").mode(Overwrite).save(parquetBasePath) times be similar to pandas parquet write times?

p-powell avatar Apr 21 '22 15:04 p-powell

@p-powell Just an update. There were a couple of fixes in the write path to improve performance. Can you please try out the latest master or release(0.11.1)?

codope avatar Jun 20 '22 15:06 codope

@p-powell : for immutable use-cases, we recommend setting some configs to get better performance. https://hudi.apache.org/docs/performance#bulk-insert

let us know if you are looking for more assistance. would be happy to help.

nsivabalan avatar Nov 04 '22 03:11 nsivabalan