iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

Flink: Rewrite all data files( upsert mode, v2 table) lead to taskmanager out of memory

Open Shane-Yu opened this issue 3 years ago • 2 comments

I rewrite small files( whole file) with flink api on un-partitioned table, got the failed job and error log as follows.

The environment like this:

TM's memory 16G numFiles 717 numRows 176450547

As we can see from the source code, it seems that the Eq-Delelte file should be loaded into memory when applyEqDeletes with records. Is that too many Eq-Delete files lead to taskmanager OOM?

https://github.com/apache/iceberg/blob/197325f3e31193054ecdd860ac0fbc2af4b64748/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java#L149-L159

  • error log:
Caused by: java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap space
	at org.apache.iceberg.flink.source.RowDataRewriter$RewriteMap.map(RowDataRewriter.java:150) ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
	at org.apache.iceberg.flink.source.RowDataRewriter$RewriteMap.map(RowDataRewriter.java:91) ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_201]
Caused by: java.lang.OutOfMemoryError: Java heap space
	at java.util.stream.ReferencePipeline.map(ReferencePipeline.java:186) ~[?:1.8.0_201]
	at org.apache.iceberg.types.Comparators$StructLikeComparator.<init>(Comparators.java:108) ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
	at org.apache.iceberg.types.Comparators$StructLikeComparator.<init>(Comparators.java:102) ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
	at org.apache.iceberg.types.Comparators.forType(Comparators.java:53) ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
	at org.apache.iceberg.util.StructLikeWrapper.<init>(StructLikeWrapper.java:43) ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
	at org.apache.iceberg.util.StructLikeWrapper.forType(StructLikeWrapper.java:34) ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
	at org.apache.iceberg.util.StructLikeSet.add(StructLikeSet.java:103) ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
	at org.apache.iceberg.util.StructLikeSet.add(StructLikeSet.java:33) ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
	at org.apache.iceberg.relocated.com.google.common.collect.Iterators.addAll(Iterators.java:356) ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
	at org.apache.iceberg.relocated.com.google.common.collect.Iterables.addAll(Iterables.java:320) ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
	**at org.apache.iceberg.deletes.Deletes.toEqualitySet(Deletes.java:79) ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
	at org.apache.iceberg.data.DeleteFilter.applyEqDeletes(DeleteFilter.java:156) ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
	at org.apache.iceberg.data.DeleteFilter.applyEqDeletes(DeleteFilter.java:185) ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]**
	at org.apache.iceberg.data.DeleteFilter.filter(DeleteFilter.java:126) ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
	at org.apache.iceberg.flink.source.RowDataFileScanTaskReader.open(RowDataFileScanTaskReader.java:75) ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
	at org.apache.iceberg.flink.source.DataIterator.openTaskIterator(DataIterator.java:84) ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
	at org.apache.iceberg.flink.source.DataIterator.updateCurrentIterator(DataIterator.java:76) ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
	at org.apache.iceberg.flink.source.DataIterator.hasNext(DataIterator.java:58) ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
	at org.apache.iceberg.flink.source.RowDataRewriter$RewriteMap.map(RowDataRewriter.java:130) ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
	at org.apache.iceberg.flink.source.RowDataRewriter$RewriteMap.map(RowDataRewriter.java:91) ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$184/1365369498.runDefaultAction(Unknown Source) ~[?:?]
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
t scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

Shane-Yu avatar Mar 11 '22 06:03 Shane-Yu

The same problem in Spark when rewrite V2 table, we fixed this problem locally by bloomfilter in parquet files,our code will be committed after the bloomfilter feature be merged.

Zhangg7723 avatar Mar 21 '22 03:03 Zhangg7723

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] avatar Sep 18 '22 00:09 github-actions[bot]

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'

github-actions[bot] avatar Oct 02 '22 00:10 github-actions[bot]