flink restore failed with filenotfound
Apache Iceberg version
0.14.1 (latest release)
Query engine
Flink
Please describe the bug 🐞
We have a flink job that write upsert stream into a partitioned icebergV2 table . When that job get failed, we restart it from the latest checkPoint. But we got that exception: Files does not exists.FileNotFoundException: File does not exist: /rbf/warehouse/cupid_bi.db/ads_qixiao_olap_1min/metadata/c918a379b3cc15d7a8193cf27eb8b473-00000-1-38851-10287.avro and i saw task.log 2022-10-21 16:57:19,188 INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter [] - Committing append with 2 data files and 0 delete files to table icebergCatalog.xxx 2022-10-21 16:57:19,573 INFO org.apache.iceberg.BaseMetastoreTableOperations [] - Successfully committed to table icebergCatalog.xx 2022-10-21 16:57:19,573 INFO org.apache.iceberg.SnapshotProducer [] - Committed snapshot 7536746147835307981 (MergeAppend) 2022-10-21 16:57:19,594 INFO org.apache.iceberg.BaseMetastoreTableOperations [] - Refreshing table metadata from new version: qbfs://online01/warehouse/xx/metadata/99171-45858db6-5917-4397-afcc-76c10ea80305.metadata.json 2022-10-21 16:57:19,750 INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter [] - Committed in 562 ms without flushing snapshot state to state backend the reason is metadata checkpoint information is diffrent from snapshotstate? since the jobfailed without flush snapshot state to state backend
.avro might be a manifest file. do you have the complete stack trace? Which Flink version?
I couldn't find this log line in 1.13 (or 1.14 and 1.15).
2022-10-21 16:57:19,750 INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter [] - Committed in 562 ms
without flushing snapshot state to state backend
1.13 has log line without the part after Committed in 562 ms.
https://github.com/apache/iceberg/blob/master/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
.avromight be a manifest file. do you have the complete stack trace? Which Flink version?I couldn't find this log line in 1.13 (or 1.14 and 1.15).
2022-10-21 16:57:19,750 INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter [] - Committed in 562 ms without flushing snapshot state to state backend1.13 has log line without the part after
Committed in 562 ms. https://github.com/apache/iceberg/blob/master/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
it is my mistake, the right log should be "2022-10-21 16:57:19,188 INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter [] - Committing append with 2 data files and 0 delete files to table icebergCatalog.xxx 2022-10-21 16:57:19,573 INFO org.apache.iceberg.BaseMetastoreTableOperations [] - Successfully committed to table icebergCatalog.xx 2022-10-21 16:57:19,573 INFO org.apache.iceberg.SnapshotProducer [] - Committed snapshot 7536746147835307981 (MergeAppend) 2022-10-21 16:57:19,594 INFO org.apache.iceberg.BaseMetastoreTableOperations [] - Refreshing table metadata from new version: qbfs://online01/warehouse/xx/metadata/99171-45858db6-5917-4397-afcc-76c10ea80305.metadata.json 2022-10-21 16:57:19,750 INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter [] - Committed in 562 ms 2022-10-21 16:57:20,090 INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter [] - Start to flush snapshot state to state backend, table: icebergCatalog.cupid_bi.ads_qixiao_tracking_hawkeye_no_filter_1min, checkpointId: 8227" but in my situation, the task existed without showing flushing snapshot state to state backend, because of nodemanager restart, then i restart fllink job, the job failed with FileNotFoundException in the hdfs audit log, i never saw that avro file has been created.
Start to flush snapshot state to state backend
This happens in IcebergFilesCommitter#snapshotState. if checkpoint N didn't complete successfully, the written manifest file for the incomplete checkpoint won't be used because last completed checkpoint is N-1.
@stevenzwu if this scenario happens
if checkpoint N didn't complete successfully, the written manifest file for the incomplete checkpoint won't be used because last completed checkpoint is N-1.
What is the best approach to recover the job?
I have a similar behavior using Flink 1.14.1 and Iceberg 1.0.0 (V2)
Hi! We're running into this issue with Iceberg 1.5.2 and Flink 1.18.1. Seems like it's not yet fixed. We're happy to dedicate resources to fix it if we could get a pointer on resolving it. @stevenzwu would you please point us in the right direction? Thanks!
- Do we have any update here, whether we have planned the resolution for this bug? @jad-grepr @stevenzwu
- Secondly, as @congd123 mentioned, What is the best approach for recovery here? as Once i start seeing these errors -> the table is now inaccessible i.e., neither i am able to do any operations through flink nor through trino
Facing issue with version Flink 1.16 and Iceberg 1.3.1, as issue reported by @jad-grepr the issue is coming in latest version also Attaching StackTrace
{"flowId":"37d1732c-20d4-49b4-99b3-67d2ca10e492","id":"6","type":"Error","time":{"sec":1731996384,"micros":358067},"title":"exception while executing job","level":"ERROR","attributes":{"stackTrace":"org.apache.iceberg.exceptions.NotFoundException: Failed to open input stream for file: s3a://<>/<>/<>.events/metadata/12454-c7fe536d-cdd8-47b4-93d7-a6b1d915ef4c.metadata.json\n\tat org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:185)\n\tat org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:272)\n\tat org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:266)\n\tat org.apache.iceberg.BaseMetastoreTableOperations.lambda$refreshFromMetadataLocation$0(BaseMetastoreTableOperations.java:189)\n\tat org.apache.iceberg.BaseMetastoreTableOperations.lambda$refreshFromMetadataLocation$1(BaseMetastoreTableOperations.java:208)\n\tat org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)\n\tat org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)\n\tat org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)\n\tat org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)\n\tat org.apache.iceberg.BaseMetastoreTableOperations.refreshFromMetadataLocation(BaseMetastoreTableOperations.java:208)\n\tat org.apache.iceberg.BaseMetastoreTableOperations.refreshFromMetadataLocation(BaseMetastoreTableOperations.java:185)\n\tat org.apache.iceberg.BaseMetastoreTableOperations.refreshFromMetadataLocation(BaseMetastoreTableOperations.java:180)\n\tat org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:178)\n\tat org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:97)\n\tat org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:80)\n\tat org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:47)\n\tat org.apache.iceberg.catalog.Catalog.tableExists(Catalog.java:281)\n\tat in.zeta.perseus.iceberg.operator.sink.IcebergTableLoader.createTable(IcebergTableLoader.java:107)\n\tat in.zeta.perseus.iceberg.operator.sink.IcebergTableLoader.createIcebergTableIfNotExists(IcebergTableLoader.java:54)\n\tat in.zeta.perseus.iceberg.operator.sink.IcebergTableLoader.getTableLoader(IcebergTableLoader.java:31)\n\tat in.zeta.perseus.iceberg.operator.sink.IcebergSinkOperator.sinkStream(IcebergSinkOperator.java:40)\n\tat in.zeta.perseus.core.job.JobBuilder.sinkStream(JobBuilder.java:225)\n\tat in.zeta.perseus.core.job.JobBuilder.lambda$buildSinkOperator$2(JobBuilder.java:182)\n\tat java.base/java.util.Optional.ifPresent(Unknown Source)\n\tat in.zeta.perseus.core.job.JobBuilder.buildSinkOperator(JobBuilder.java:180)\n\tat in.zeta.perseus.core.job.JobBuilder.executeJobGraphTopologically(JobBuilder.java:111)\n\tat in.zeta.perseus.core.job.JobBuilder.buildAndExecuteJobGraph(JobBuilder.java:69)\n\tat in.zeta.perseus.core.job.JobBuilder.buildJob(JobBuilder.java:56)\n\tat in.zeta.perseus.core.JobDriver.validateAndBuildJob(JobDriver.java:44)\n\tat in.zeta.perseus.core.JobDriver.main(JobDriver.java:25)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)\n\tat java.base/java.lang.reflect.Method.invoke(Unknown Source)\n\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)\n\tat org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)\n\tat org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:107)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)\n\tat java.base/java.lang.Thread.run(Unknown Source)\nCaused by: java.io.FileNotFoundException: No such file or directory: s3a://<>/<>/<>/metadata/12454-c7fe536d-cdd8-47b4-93d7-a6b1d915ef4c.metadata.json\n\tat org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3866)\n\tat org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)\n\tat org.apache.hadoop.fs.s3a.S3AFileSystem.extractOrFetchSimpleFileStatus(S3AFileSystem.java:5401)\n\tat org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:1465)\n\tat org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:1441)\n\tat org.apache.hadoop.fs.FileSystem.open(FileSystem.java:976)\n\tat org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:183)\n\t... 41 more\n","message":"Failed to open input stream for file: s3a://<>/<>/<>_events
@rahulkr25 on our end, this issue stopped happening. Not sure if we changed something in how we do deployments, or if something else was fixed with an Iceberg update.
@jad-grepr Thanks for the confirmation. Could you also help with the version of iceberg/flink library you are using in your setup
@rahulkr25 we're on Flink 1.19 and Iceberg 1.6.1
Thanks for the confirmation @jad-grepr However, In the last thread mentioned, you were facing issue with Iceberg 1.5.2 and Flink 1.18.1 but now as in current setup of yours its - Flink 1.19 and Iceberg 1.6.1
so possibly, this has been solved due to new version deployment (ofc - a hypotheses)
I am running into the same issue, even with Flink 1.20 and Iceberg v1.7.0. Issue occurred upon a restart of a Flink CDC job
adding on the same context, in our case as well we have seen this problem mostly upon restart. However one thing to note here is - out of 4 parallel streaming jobs running, 1 job started encountering this issue, not sure what exactly went wrong with this job. @n14zio @stevenzwu @findepi
This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.
This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'