unable to create target format Delta with source format as Iceberg when the source table is on S3
I followed the documentation "Creating your first interoperable table", able to build the utilities-0.1.0-SNAPSHOT-bundled.jar successfully.
Initiated a pyspark session using below command. Spark version is 3.4.1 running on Amazon EMR 6.14
pyspark --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" --conf "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog" --conf "spark.sql.catalog.spark_catalog.type=hive"
Create an Iceberg table using below commands:
data =[("James","Smith","01012020","M","3000"), ("Michael","","02012021","M","4000"), ("Robert","Williams","03012023","M","4000"), ("Maria","Jones","04012024","F","4000"), ("Jen","Brown","05012025","F","-1")]
columns=["firstname","lastname","dob","gender","salary"]
df=spark.createDataFrame(data,columns)
spark.sql("""CREATE TABLE IF NOT EXISTS iceberg_table (firstname string,lastname string,dob string,gender string,salary string) USING iceberg""");
df.writeTo("iceberg_table").append()
I see the data and metadata directory under the table name on s3.
Created my_config.yaml as mentioned in the documentation my_config.txt
executed below command and see failing with metadata/version-hint.text not available sudo java -jar ./utilities-0.1.0-SNAPSHOT-bundled.jar --datasetConfig my_config.yaml
2024-04-30 10:24:25 INFO org.apache.xtable.conversion.ConversionController:240 - No previous InternalTable sync for target. Falling back to snapshot sync.
2024-04-30 10:24:25 WARN org.apache.iceberg.hadoop.HadoopTableOperations:325 - Error reading version hint file s3:/
I was looking into the documentation and understand if the source is Iceberg table I need to include catalog.yaml as well. But I am not sure what should be the value for catalogImpl in my case. Any insights on this would be very helpful.
catalogImpl: io.my.CatalogImpl catalogName: name catalogOptions: # all other options are passed through in a map key1: value1 key2: value2
Hi @rajender07! The error clarifies the problem. It says the version-hint.text file was not found in the source table format (Iceberg). Do you see it on S3?
This is the metadata file on Iceberg side when used with a Hadoop catalog. XTable would need this file to translate into the target Delta format.
The important part to understand here is that Iceberg needs a CATALOG to get started with. Your config currently connects Iceberg with a Hive catalog but I don't see any thrift URL or such here.
pyspark --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" --conf "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog" --conf "spark.sql.catalog.spark_catalog.type=hive"
Can you instead use a Hadoop catalog & configure with something like this:
spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type = hadoop
spark.sql.catalog.hadoop_prod.warehouse = s3a://your-bucket
@dipankarmazumdar , Thank you for looking into the issue. No, I do not version-hint.text this file on s3. when I looked into the documentation I understand this file is created while using Hadoop catalog. Since i was use Iceberg session catalog its not generated.
I will try as you suggested using Hadoop catalog and let you know the findings.
Could you please guide me to solve the issue while using Iceberg catalog. Should I use catalog.yaml file? if yes, I am confused on catalogName that should be used. FYI, I have added Thrift related properties under /etc/spark/conf/spark-default.conf and /etc/spark/conf/hive-site.xml. I have no issues connecting to my metastore and read/write data from it.
@rajender07 Which catalog are you using? If it is HMS, the implementation is org.apache.iceberg.hive.HiveCatalog, the other args and name are going to be used to configure any required configurations for using this catalog like a uri for your thrift server.
@dipankarmazumdar @the-other-tim-brown
I used Hadoop catalog as you mentioned and created a new Iceberg table. Now, I can see version-hint.text file as well.
However when I executed sync command it is with below error. Could you please assist how to resolve this issue. sudo java -jar ./utilities-0.1.0-SNAPSHOT-bundled.jar --datasetConfig my_config.yaml
2024-05-13 13:43:04 INFO org.apache.xtable.conversion.ConversionController:240 - No previous InternalTable sync for target. Falling back to snapshot sync. Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(Lorg/apache/hadoop/fs/statistics/DurationTracker;Lorg/apache/hadoop/util/functional/CallableRaisingIOE;)Ljava/lang/Object;
Here is my my_config.yaml
**sourceFormat: ICEBERG targetFormats:
- DELTA datasets:
- tableBasePath:
s3://
/ /x4_iceberg_table tableName: x4_iceberg_table**
@rajender07 - I am not really sure about this particular error. However, I tried reproducing this on my end and I was able to translate from ICEBERG to DELTA using the setup I suggested.
ICEBERG TABLE CONFIG & CREATION:
import pyspark
from pyspark.sql import SparkSession
import os
conf = (
pyspark.SparkConf()
.setAppName('app_name')
.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.4.3,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178')
.set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
.set('spark.sql.catalog.hdfs_catalog', 'org.apache.iceberg.spark.SparkCatalog')
.set('spark.sql.catalog.hdfs_catalog.type', 'hadoop')
.set('spark.sql.catalog.hdfs_catalog.warehouse', 's3a://my-bucket/new_iceberg/')
.set('spark.sql.catalog.hdfs_catalog.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
)
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")
spark.sql("CREATE TABLE hdfs_catalog.table1 (name string) USING iceberg")
spark.sql("INSERT INTO hdfs_catalog.table1 VALUES ('Alex'), ('Dipankar'), ('Mary')")
my_config.yaml
sourceFormat: ICEBERG
targetFormats:
- DELTA
datasets:
-
tableBasePath: s3://my-bucket/new_iceberg/table1/
tableDataPath: s3://my-bucket/new_iceberg/table1/data
tableName: table1
Run Sync
java -jar utilities/target/utilities-0.1.0-SNAPSHOT-bundled.jar --datasetConfig my_config.yaml
@rajender07 Which catalog are you using? If it is HMS, the implementation is
org.apache.iceberg.hive.HiveCatalog, the other args and name are going to be used to configure any required configurations for using this catalog like aurifor your thrift server. @the-other-tim-brown referring this when I'm using hive catalog and passing catalogImpl: org.apache.iceberg.hive.HiveCatalog I'm getting java.lang.NoSuchMethodException: Cannot find constructor for interface org,apache.iceberg.catalog.Catalog while if i use 'org.apache.iceberg.hadoop.HadoopCatalog' iam getting no such error. Is there anything else we need to implement if we are using hive Catalog for our iceberg tables?
@rajender07 - LMK if you were able to get past the error with the recommendation.
@rajender07 Can you pull the latest master try again ? This is the fix. https://github.com/apache/incubator-xtable/pull/441
@vinishjail97, Thank you. I will test the fix today and share an update.
Creating empty _delta_log dir and erroring out.
--Config File sourceFormat: ICEBERG targetFormats:
- DELTA datasets:
- tableBasePath: s3://<>/prod/orders tableDataPath: s3://<>/prod/orders/data tableName: orders namespace: prod.db
--
java -jar /Users/satyak/iceberg/demo/xtable/xtable-utilities-0.1.0-SNAPSHOT-bundled.jar --datasetConfig /Users/satyak/iceberg/demo/xtable/s3_orders_ice_delta.yaml
WARNING: Runtime environment or build system does not support multi-release JARs. This will impact location-based features.
2024-06-26 15:10:21 INFO org.apache.xtable.utilities.RunSync:148 - Running sync for basePath s3://<>/prod/orders for following table formats [DELTA]
2024-06-26 15:10:22 WARN org.apache.hadoop.util.NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/Users/satyak/iceberg/demo/xtable/xtable-utilities-0.1.0-SNAPSHOT-bundled.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2024-06-26 15:10:22 WARN org.apache.spark.util.Utils:72 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
2024-06-26 15:10:24 WARN org.apache.hadoop.metrics2.impl.MetricsConfig:136 - Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
2024-06-26 15:10:25 WARN org.apache.hadoop.fs.s3a.SDKV2Upgrade:39 - Directly referencing AWS SDK V1 credential provider com.amazonaws.auth.DefaultAWSCredentialsProviderChain. AWS SDK V1 credential providers will be removed once S3A is upgraded to SDK V2
2024-06-26 15:10:25 INFO org.apache.spark.sql.delta.storage.DelegatingLogStore:60 - LogStore LogStoreAdapter(io.delta.storage.S3SingleDriverLogStore) is used for scheme s3
2024-06-26 15:10:26 INFO org.apache.spark.sql.delta.DeltaLog:60 - Creating initial snapshot without metadata, because the directory is empty
2024-06-26 15:10:27 INFO org.apache.spark.sql.delta.InitialSnapshot:60 - [tableId=9f0c6a5d-2170-4167-b464-ec54fee685c3] Created snapshot InitialSnapshot(path=s3://ambaricloudsatya/prod/orders/data/_delta_log, version=-1, metadata=Metadata(e3727e72-0eda-476e-8cd7-bf7f85269529,null,null,Format(parquet,Map()),null,List(),Map(),Some(1719432627880)), logSegment=LogSegment(s3://ambaricloudsatya/prod/orders/data/_delta_log,-1,List(),None,-1), checksumOpt=None)
2024-06-26 15:10:28 INFO org.apache.xtable.conversion.ConversionController:240 - No previous InternalTable sync for target. Falling back to snapshot sync.
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(Lorg/apache/hadoop/fs/statistics/DurationTracker;Lorg/apache/hadoop/util/functional/CallableRaisingIOE;)Ljava/lang/Object;
at org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration(Invoker.java:147)
at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:282)
at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:435)
at org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$3(Invoker.java:284)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:122)
at org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$5(Invoker.java:408)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:468)
at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:404)
at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:282)
at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:326)
at org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:427)
at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:545)
at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.base/java.io.InputStreamReader.read(InputStreamReader.java:181)
at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
at org.apache.iceberg.hadoop.HadoopTableOperations.findVersion(HadoopTableOperations.java:320)
at org.apache.iceberg.hadoop.HadoopTableOperations.refresh(HadoopTableOperations.java:104)
at org.apache.iceberg.hadoop.HadoopTableOperations.current(HadoopTableOperations.java:84)
at org.apache.iceberg.hadoop.HadoopTables.load(HadoopTables.java:94)
at org.apache.xtable.iceberg.IcebergTableManager.lambda$getTable$1(IcebergTableManager.java:58)
at java.base/java.util.Optional.orElseGet(Optional.java:369)
at org.apache.xtable.iceberg.IcebergTableManager.getTable(IcebergTableManager.java:58)
at org.apache.xtable.iceberg.IcebergConversionSource.initSourceTable(IcebergConversionSource.java:81)
at org.apache.xtable.iceberg.IcebergConversionSource.getSourceTable(IcebergConversionSource.java:60)
at org.apache.xtable.iceberg.IcebergConversionSource.getCurrentSnapshot(IcebergConversionSource.java:121)
at org.apache.xtable.spi.extractor.ExtractFromSource.extractSnapshot(ExtractFromSource.java:38)
at org.apache.xtable.conversion.ConversionController.syncSnapshot(ConversionController.java:183)
at org.apache.xtable.conversion.ConversionController.sync(ConversionController.java:121)
at org.apache.xtable.utilities.RunSync.main(RunSync.java:169)