Unable to use GlueCatalog in flink environments without hadoop
When attempting to use the GlueCatalog implementation (or really any implementation) in flink, hadoop is expected to be in the classpath.
The FlinkCatalogFactory always attempts to load the hadoop config from flink but flink does not guarantee that there is a valid hadoop environment present. In environments where hadoop is not available (e.g. AWS Kinesis Data Analytics), this throws java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration.
Presently, most of the catalog implementations implement Configurable and thus the util functions like loadCatalog expect to be passed an instance of hadoopConf. In catalogs like GlueCatalog and DynamoCatalog, the only reason for the Configurable interface is to enable dynamic FileIO loading
More context in this Slack thread
If the only reason to have the GlueCatalog implement Configurable is to satisfy the current constraints of dynamic FileIO loading, likely we can change that (I believe dynamic FileIO loading was added when maybe the second additional Catalog beyond the basic Hadoop and Hive ones were added).
In fact, on first look, it seems like the GlueCatalog could possibly just pass in null for the hadoopConf if it didn't implement Configurable, as the current dynamic FileIO loading only sets the hadoop configuration if the class implements Configurable.
https://github.com/apache/iceberg/blob/4eb0853cd787bf2f5778195558d22a45ecf6c601/core/src/main/java/org/apache/iceberg/CatalogUtil.java#L189-L191
@jackye1995 do you know if GlueCatalog really requires anything from the hadoop configuration? Or have any input on how Flink can be run in Kinesis Data Analytics?
Last one-two days I wrote a streaming job and packaged as a fat jar. I had to add the minimal hadoop dependency and in the code I had to initialize the hadoop conf with the constructor which has a boolean false, telling that to not load any hadoop config file (since I don't have in Flink). Finally, I managed to work with the streaming job, which is running in Flink 1.13.1 (behind Ververica CE). Definitively, I see a good reason to remove the hadoop conf dependency from GlueCatalog, which uses the custom catalog from iceberg code.
Currently there are three catalog implementation classes and somehow GlueCatalog falls under the custom catalog implementation. From design aspects, the constellation of these three type of catalogs and how another specialized like GlueCatalog is used, may cause some confusion at first time when you try to use. Some refactoring, such as removal of hadoop conf or more, it would be nice to see.
it seems like the GlueCatalog could possibly just pass in
nullfor thehadoopConf
I tried that, but I got serialization error of the jobgraph. The workaround was to create an empty hadoop configuration object with the constructor which has a boolean false, telling them to not load any config file (since I don't have), thus the conf object will have an empty map.
Since is there this unnecessary hadoop conf dependency, the size of my fat jar doubled it's size to somewhere currently at 310MB.
If the only reason to have the GlueCatalog implement Configurable is to satisfy the current constraints of dynamic FileIO loading, likely we can change that (I believe dynamic FileIO loading was added when maybe the second additional Catalog beyond the basic Hadoop and Hive ones were added).
It was added because some users still want to use GlueCatalog with HadoopFIleIO. One example is for users who are using EMR file system and has to use the EmrFileSystem plugin to make sure all their file system access are synchronized. (I know EmrFS is on deprecation path but it's a sticky dependency that is not easy to migrate)
With that being said, I agree that Glue catalog should work without a Hadoop installation. This is the most important reason to have such an independent implementation.
One approach I can think of is to make another HadoopGlueCatalog that supports Hadoop configuration, and remove the configurable aspect of Glue catalog. Let me put up a PR for this to discuss in more details.
it seems like the GlueCatalog could possibly just pass in
nullfor thehadoopConfI tried that, but I got serialization error of the jobgraph. The workaround was to create an empty hadoop configuration object with the constructor which has a boolean false, telling them to not load any config file (since I don't have), thus the conf object will have an empty map.
Since is there this unnecessary hadoop conf dependency, the size of my fat jar doubled it's size to somewhere currently at 310MB.
Oh my apologies. I meant that with some reconfiguration we could potentially just pass in null.
I'm going to take a look at @jackye1995's PR though as he's the author of the original GlueCatalog and there's possibly a need to have a HadoopConfigurableGlueCatalog (oftentimes for example, with Spark, the hadoop configuration is used for various purposes).
Thanks for putting this PR up so quickly @jackye1995. I think these changes are reasonable, but I have a couple of high-level comments.
- Additional updates to the FlinkCatalogFactory are still needed on top of these changes in order to fully remove the hadoop dependency. One possible solution is to add
glueas one of the knowncatalog-type's and avoid callingclusterHadoopConf()when the user sets this value (I tested this change with a local build and it worked in a non-hadoop environment). - While this addresses the issue for the GlueCatalog, it will not fix this issue for other catalog implementations which also should be able to work outside of a hadoop environment (e.g. DynamoCatalog).
I am not completely sure how best to address concern 2. When I look at both the GlueCatalog and DynamoCatalog, it seems the only reason for the Configurable interface is for supporting dynamic loading of FileIO implementations which may or may not need to inspect hadoop conf (currently only HadoopFileIO). HadoopCatalog and HiveCatalog both look for additional configuration settings in the hadoop conf (beyond just for HadoopFileIO).
Perhaps it makes sense to decouple these two dependencies such that a FileIO implementation can be provided to any catalog independent of that catalog's interaction with hadoop (or lack thereof). I think this would also simplify the story for the GlueCatalog without having to have two distinct classes. (There is a potential strange place a user could get into trying to use the base GlueCatalog for the catalog-impl and HadoopFileIO for the io-impl).
I believe that one potential root issue is that FileIO has leaked out from TableOperations into catalog implementations like GlueCatalog. Theoretically, provided a catalog extending BaseMetastoreCatalog can implement newTableOps then all the FileIO configuration should happen inside TableOperations.
It makes more sense that GlueTableOperations might have multiple implementations (one that explicitly handles HadoopFileIO). Even then, in an ideal case the only class that would explicitly need to handle hadoop conf would be HadoopFileIO. This would require a more complex Catalog initialization implementation (probably some sort of builder pattern) where all of the requisite pieces could be instantiated and configured independently.
Once we sort this out, we should definitely update the docs to clarify how to use Iceberg on Kinesis Data Analytics. We presently mention EMR in depth and have a section that mentions Kinesis (KDA), but the KDA docs don't go into much detail: https://iceberg.apache.org/aws/#run-iceberg-on-aws
There's an open PR for upgrading to Flink 1.13. I left some comments there, but my larger concern is not being able to access additional catalog options. I've left some comments on that PR: https://github.com/apache/iceberg/pull/2629
One possible solution is to add glue as one of the known catalog-type's and avoid calling clusterHadoopConf() when the user sets this value (I tested this change with a local build and it worked in a non-hadoop environment).
At present, I think adding to the known catalog-types might be the best path forward to resolve the issue more immediately. I asked on the upgrade to Flink 1.13 PR, but it's possible that generically allowing * as an additional catalog option might not be possible. In which case, we'll definitely need to add glue etc (or at least all of their possible options).
I believe that one potential root issue is that FileIO has leaked out from TableOperations into catalog implementations like GlueCatalog. Theoretically, provided a catalog extending BaseMetastoreCatalog can implement newTableOps then all the FileIO configuration should happen inside TableOperations.
In the longer term, if these larger API changes to the interactions between FileIO and TableOperations and Catalog are undertaken / considered, we can remove the added catalog-types if need be. But this is possibly a larger API change that will probably need to be considered a bit more closely.
In the near-term, I think it's most realistic to add the catalogs we support here in the repo to catalog-type to address the immediate issue. They can be removed later on if they're not needed / if the API changes. Plus, the additional configs (even for S3FileIO / AWS Authentication) might need to be added as predeclared to the FlinkCatalogFactoryOptions class as part of the 1.13 upgrade.
If somebody can provide a workaround for KDA, then the issue wouldn't necessarily be as pressing.
At present, I think adding to the known catalog-types might be the best path forward to resolve the issue more immediately.
That is actually something we did not want to do and that's why the aws module is not a part of the flink module dependency. Adding that dependency is likely good for AWS, but as more catalog implementations are added, it becomes not manageable to have that many dependencies.
believe that one potential root issue is that FileIO has leaked out from TableOperations into catalog implementations like GlueCatalog.
This was also discussed when implementing the catalog. Having FileIO default definition at catalog level is a feature, that's why CatalogProperties.FILE_IO_IMPL was created. Initializing the default FileIO in catalog allows reusing the same FileIO singleton instead of creating many different ones.
I don't think the problem is solved even if you hide the FileIO creation in TableOprations, because FileIO also checks for the Configurable interface, it does not make much difference.
Additional updates to the FlinkCatalogFactory are still needed on top of these changes in order to fully remove the hadoop dependency
Yes you are right, we can fully remove the dependency in GlueCatalog, but the issue is more on the engine side that basically requires such dependency. The Flink catalog entry point CatalogFactory.createCatalog(String name, Map<String, String> properties) has a direct call to createCatalog(name, properties, clusterHadoopConf()) that initializes Hadoop configuration, and the serialized catalog loader CustomCatalogLoader has SerializableConfiguration as a field, so you are guaranteed to get serialization exception in Flink if you don't have the Hadoop configuration. This looks like a deeper issue than just a fix at catalog side.
I think we should first tackle this on engine side, and then see what's the best way forward for catalog implementations. This seems like a valid ask for Flink catalog factory improvement.
Meanwhile, although a bit hacky, why not add just 2 empty classes Configuration and Configurable to your classpath? That removes the need for the entire hadoop jar.
The Flink catalog entry point CatalogFactory.createCatalog(String name, Map<String, String> properties) has a direct call to createCatalog(name, properties, clusterHadoopConf()) that initializes Hadoop configuration, and the serialized catalog loader CustomCatalogLoader has SerializableConfiguration as a field, so you are guaranteed to get serialization exception in Flink if you don't have the Hadoop configuration.
Yes, you are right. And I also agree that there is likely a larger issue to tackle on the Flink side longer term.
In the interim, how would you feel about updating the dynamic catalog loader (CatalogUtil.loadCatalog) to accept a SerializableSupplier<Configuration> instead of a concrete instance of Configuration?
This could allow delaying and potentially avoiding the call to HadoopUtils.getHadoopConfiguration when it is not required by the Catalog or the FileIO. I was able to get a version of that working locally for flink w/o any hadoop env.
Meanwhile, although a bit hacky, why not add just 2 empty classes Configuration and Configurable to your classpath? That removes the need for the entire hadoop jar.
I wasn't able to get this to work. I think there is something about the way that Flink loads user-supplied jars that makes them unavailable to flink's HadoopUtils.getHadoopConfiguration method?
I opened #3590 to partially address this. That removes the dependency on Hadoop Configuration and Configurable from CatalogUtil. I think it will also require a similar change to FlinkCatalogFactory because that class can't be loaded without Configuration or Flink's HadoopUtils.
Having this same issue while trying to use an iceberg table in flink along with jdbccatalog implementation and s3fileio. Is there a working workaround that we can config in code?
Last one-two days I wrote a streaming job and packaged as a fat jar. I had to add the minimal hadoop dependency and in the code I had to initialize the hadoop conf with the constructor which has a boolean false, telling that to not load any hadoop config file (since I don't have in Flink).
@tiborkiss, by chance do you have this in GitHub? It seems like there are a number of people struggling with this error and it sounds like you have a workaround.
I found this thread while trying to make Flink, Iceberg, and Glue work together without the Hadoop dependencies, so I'm hoping this is useful for someone else in the future. To answer my own question, this seems to work (@tiborkiss gave me an important clue about the null Hadoop config):
import org.apache.hadoop.conf.Configuration;
import org.apache.flink.table.catalog.Catalog;
import org.apache.iceberg.flink.FlinkCatalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.flink.CatalogLoader;
/* skipping other pieces */
Configuration hadoopEmpty = new Configuration(false);
CatalogLoader catalogLoader = CatalogLoader.custom(catalogName, map, hadoopEmpty, impl);
Catalog flinkCatalog = new FlinkCatalog(catalogName, databaseName, Namespace.empty(), catalogLoader, true);
tableEnv.registerCatalog(catalogName, flinkCatalog);
tableEnv.useCatalog(catalogName);
Are there any updates/workarounds for this, in particular any updates on getting this to run on KDA? Thanks!
EDIT:
Also, the docs here specify:
By default, Iceberg ships with Hadoop jars for Hadoop catalog.
But this doesn't seem correct either?
@mgmarino, I had this working with a Java KDA app using the notes I included above. What's the specific issue you're seeing?
Thanks for the response, @matt-slalom. I did get something working, mainly following the comment here, though converting it to maven. One important point is that we are using the Table/SQL API (also from pyFlink), so are not explicitly instantiating the FlinkCatalog. As such, we are dependent on this code.
A few things:
- Important here was the relocation/shading, which, for me not being a java programmer, took some time to get.
- I still had to include (some) hadoop libraries, but I was able to drop
'org.apache.flink:flink-hadoop-fs'by writing my ownHadoopUtils::getHadoopConfigurationthat returnsnew Configuration(false);. I think it should be possible to "hack" in aConfigurationclass to avoid pulling in hadoop libraries, but I haven't dug more just yet. - One important thing I ran across was that flink does manipulate class loading ("child first" vs "parent first") in general, but explicitly does not do this for hadoop libraries (see here). This could have been the source of problems that some other posters mentioned above.
Anyways, I would still classify this as workaround, but, since I had to piece this together, I think it would still somehow make sense to document this until the hadoop dependencies are fully removed. I will try to come back and update this comment once I have wrapped things up.
** EDIT: **
I've placed the relevant code here for those that want to take a look. The pom.xml file is simple as is, I didn't spend any time to filter out the things that aren't needed. However, the relocations/shading configurations are basically all needed. Another note is that we did need to explicitly instantiate the Catalog, which is why we introduced a light wrapper in python calling through to the jvm. HTH.
@mgmarino - how did you make this work in KDA, lacking documentation, i am either landing in to linkage error due to hadoop jars though i have shaded and relocated hadoop classes.
@rajcoolguy I've updated my comment 👆 to include a link to a gist with the relevant code. HTH.
@mgmarino - Thank you so much for the reference, i am able to make the KDA iceberg sink working.
This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.
It is my understanding that this issue should be addressed by https://github.com/apache/iceberg/pull/7369.
@mgmarino - I am also trying to use your solution in my pyFlink application running on managed Flink, but the only thing I can't understand yet is - how do I incorporate the HadoopUtils::getHadoopConfiguration in the jar file? How do I make super().__init__(j_catalog_factory.createCatalog(name, j_properties)) call my HadoopUtils?
@RoeeDev It needs to be properly shaded in your jar, as it's done in the pom.xml file. The shading process will rewrite the references in the included iceberg jars to point to your HadoopUtils.
@mgmarino Excellent I got it to work, thank you!
I found this thread while trying to make Flink, Iceberg, and Glue work together without the Hadoop dependencies, so I'm hoping this is useful for someone else in the future. To answer my own question, this seems to work (@tiborkiss gave me an important clue about the null Hadoop config):
import org.apache.hadoop.conf.Configuration; import org.apache.flink.table.catalog.Catalog; import org.apache.iceberg.flink.FlinkCatalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.flink.CatalogLoader; /* skipping other pieces */ Configuration hadoopEmpty = new Configuration(false); CatalogLoader catalogLoader = CatalogLoader.custom(catalogName, map, hadoopEmpty, impl); Catalog flinkCatalog = new FlinkCatalog(catalogName, databaseName, Namespace.empty(), catalogLoader, true); tableEnv.registerCatalog(catalogName, flinkCatalog); tableEnv.useCatalog(catalogName);
Thank you for this tip!
For anyone using Scala/SBT, here is what worked for me:
assembly / assemblyShadeRules := Seq(
ShadeRule.rename("org.apache.hadoop.**" -> "shadow.org.apache.hadoop.@1").inAll,
ShadeRule.rename("org.apache.flink.runtime.util.HadoopUtils" -> "shadow.org.apache.flink.runtime.util.HadoopUtils").inAll
),
If there are no plans to fix it, I think this has to be in the documentation, because otherwise Flink connector is just broken out of the box when using SQL.