DataflowTemplates icon indicating copy to clipboard operation
DataflowTemplates copied to clipboard

[Bug]: ElasticsearchIO templates don't respect retry parameters in common scenario

Open ggprod opened this issue 3 years ago • 2 comments

Related Template(s)

BigQueryToElasticsearch, GCSToElasticsearch, PubsubToElasticsearch

What happened?

When running these templates against a busy Elasticsearch cluster it is common to encounter request timeout errors (because the busy ES cluster takes longer than 30s to respond to a request). When this happens 4 times the job will be killed regardless of the elasticsearch retry parameters

Beam Version

Newer than 2.35.0

Relevant log output

First there will be logs like:
Error message from worker: java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-1 [ACTIVE] org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:187) org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:108) org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:56) org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:39) org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121) org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73) org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:117) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:420) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:389) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:314) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107) java.util.concurrent.FutureTask.run(FutureTask.java:266) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) Caused by: org.apache.beam.sdk.util.UserCodeException: java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-1 [ACTIVE] org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:187) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:404) com.google.cloud.teleport.v2.transforms.BigQueryConverters$TableRowToJsonFn.processElement(BigQueryConverters.java:198) com.google.cloud.teleport.v2.transforms.BigQueryConverters$TableRowToJsonFn$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:187) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:404) org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup$IdentityFn.processElement(PassThroughThenCleanup.java:84) org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup$IdentityFn$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:187) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:404) org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$TypedRead$3.processElement(BigQueryIO.java:1170) org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$TypedRead$3$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416) org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:76) org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:142) org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:187) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:404) org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1.processElement(ReshuffleOverrideFactory.java:86) org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:187) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:185) ... 21 more Caused by: java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-1 [ACTIVE] org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:834) org.elasticsearch.client.RestClient.performRequest(RestClient.java:259) org.elasticsearch.client.RestClient.performRequest(RestClient.java:246) com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1507) com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.processElement(ElasticsearchIO.java:1467) Caused by: java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-1 [ACTIVE] org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387) org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261) org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502) org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211) org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) java.lang.Thread.run(Thread.java:748)

Then when enough happen back-to-back to back the job is killed with a log like:
Workflow failed. Causes: S18:ReadFromBigQuery/ReadFromBigQueryWithQuery/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey/Read+ReadFromBigQuery/ReadFromBigQueryWithQuery/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey/GroupByWindow+ReadFromBigQuery/ReadFromBigQueryWithQuery/Reshuffle.ViaRandomKey/Reshuffle/ExpandIterable+ReadFromBigQuery/ReadFromBigQueryWithQuery/Reshuffle.ViaRandomKey/Values/Values/Map+ReadFromBigQuery/ReadFromBigQueryWithQuery/ReadFiles+ReadFromBigQuery/ReadFromBigQueryWithQuery/PassThroughThenCleanup/ParMultiDo(Identity)+TableRowsToJsonDocument+ReadFromBigQuery/ReadFromBigQueryWithQuery/PassThroughThenCleanup/View.AsIterable/ParDo(ToIsmRecordForGlobalWindow)+WriteToElasticsearch/WriteDocuments/ParDo(Write) failed., The job failed because a work item has failed 4 times. Look in previous log entries for the cause of each one of the 4 failures. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors. The work item was attempted on these workers: bigquery-to-elasticsearch-06160046-6l2z-harness-kfzc Root cause: Work item failed., bigquery-to-elasticsearch-06160046-6l2z-harness-kfzc Root cause: Work item failed., bigquery-to-elasticsearch-06160046-6l2z-harness-kfzc Root cause: Work item failed., bigquery-to-elasticsearch-06160046-6l2z-harness-kfzc Root cause: Work item failed.

ggprod avatar Jun 17 '22 17:06 ggprod

This appears to be due to 2 causes. Firstly the retry parameters are not properly used because of this line: https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/transforms/WriteToElasticsearch.java#L100

Updating that line to:

elasticsearchWriter = elasticsearchWriter.withRetryConfiguration(
          ElasticsearchIO.RetryConfiguration.create(
              options().getMaxRetryAttempts(), getDuration(options().getMaxRetryDuration())));

fixes that problem. However the retry parameters would still not be properly respected because there is no try/catch wrapping the client call to Elasticsearch on this line: https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/utils/ElasticsearchIO.java#L1502

This is strange because the ElasticsearchIO implementation in the apache beam repo wraps all its calls to performRequest in try/catch

ggprod avatar Jul 19 '22 22:07 ggprod

I've added fixes to the above 2 issues as part of my PR for other Elasticsearch template improvements https://github.com/GoogleCloudPlatform/DataflowTemplates/pull/399

ggprod avatar Jul 19 '22 23:07 ggprod