hudi
hudi copied to clipboard
[WIP] Detect new data in GCS buckets via Cloud Pubsub
Starting of a feature that aims to replicate on GCS the reliable ingestion of data from AWS S3 buckets (https://hudi.apache.org/blog/2021/08/23/s3-events-source). Compare with equivalent code in S3EventsSource: https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java#L38-L44
Currently tested like below, and successfully generates Parquet file corresponding to file event notification data found in the Pubsub topic:
$ bin/spark-submit \
--conf spark.driver.extraJavaOptions="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=4044" \
--packages org.apache.spark:spark-avro_2.11:2.4.4 \
--packages com.google.apis:google-api-services-pubsub:v1-rev7-1.20.0 \
--driver-memory 4g \
--executor-memory 4g \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
/home/pramod/3workspace/hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.11.0-SNAPSHOT.jar \
--hoodie-conf hoodie.datasource.write.recordkey.field="id,metageneration" \
--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator \
--hoodie-conf hoodie.datasource.write.partitionpath.field=bucket \
--source-class org.apache.hudi.utilities.sources.GcsEventsSource \
--table-type COPY_ON_WRITE \
--source-ordering-field timeCreated \
--target-base-path file:\/\/\/home/pramod/2tmp/dl-try/gcs-data \
--target-table gcs_tbl \
--op UPSERT \
--continuous \
--source-limit 50 \
--min-sync-interval-seconds 180