iceberg
iceberg copied to clipboard
Can't write iceberg format on hadoop with pyflink
I add more jars on $FLINK_HOME/lib as below list:
hadoop-common-3.4.0.jar
hadoop-hdfs-client-3.4.0.jar
iceberg-flink-runtime-1.20-1.10.0.jar
My code is simple:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
t_env.execute_sql(
"""
CREATE CATALOG iceberg_catalog WITH (
'type'='iceberg',
'catalog-type'='jdbc',
'catalog-impl' = 'org.apache.iceberg.jdbc.JdbcCatalog',
'jdbc.uri' = 'jdbc:postgresql://<my_ip>:5433/iceberg_catalog',
'jdbc.user' = 'pguser',
'jdbc.password' = 'pgpass',
'warehouse'='hdfs://<my_ip>:9000/user/warehouse'
)
"""
)
t_env.execute_sql(
"""
CREATE TABLE `iceberg_catalog`.`default`.`sample` (
id BIGINT COMMENT 'unique id',
data STRING
)
"""
)
env.execute("Create Iceberg Catalog")
I submit job and got error:
py4j.protocol.Py4JJavaError: An error occurred while calling o8.executeSql.
: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.runtime.util.HadoopUtils
Flink version: 1.20 Iceberg version: 1.10
Thanks for your help!