beam icon indicating copy to clipboard operation
beam copied to clipboard

[Bug]: Issues with Watermark propagation in Spark runner (streaming)

Open mosche opened this issue 3 years ago • 2 comments

What happened?

I was running into this issue when adding support for VR tests for Spark in streaming mode (https://github.com/apache/beam/pull/22620).

It looks like the runner only supports a global watermark, causing issues when running multiple stateful operators. In any case watermark propagation seems to be highly indeterministic and prone to race conditions.

In the logs below you can see that timers fire downstream in PAssert$0/GroupGlobally/GroupByKey (at 56,830) after advancing the global watermark (at 56,793) but before respective timers are fired upstream in Group.ByFields/ToKvs/GroupByKey (at 57,005). As a result the expected element is the considered expired when finally arriving in PAssert$0/GroupGlobally/GroupByKey (at 57,065):

52,112 [worker] INFO  TestSparkRunner  - About to run test pipeline avroschematest0testavropipelinegroupby-mmack-0909103252-2afdba63

55,816 [0] INFO  StateSpecFunctions  - Source 0_0 read 1 values, watermarks: [low=-290308-12-21T19:59:05.225Z, high=294247-01-10T04:00:54.775Z]

55,899 [JobGenerator] INFO  GlobalWatermarkHolder  - Queued watermarks for source 0: [low=-290308-12-21T19:59:05.225Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:55.000Z]

56,297 [1] INFO  StateSpecFunctions  - Source 0_0 read 0 values, watermarks: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z]

56,314 [JobGenerator] INFO  GlobalWatermarkHolder  - Queued watermarks for source 0: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:55.500Z]

56,546 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: input elements: 1, expired elements: 0 [current=-290308-12-21T19:59:05.225Z, high=-290308-12-21T19:59:05.225Z]
56,567 [3] INFO  SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: advancing watermark from -290308-12-21T19:59:05.225Z to -290308-12-21T19:59:05.225Z
56,570 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: output elements: 0, state size: 2

56,614 [9] TRACE SparkGroupAlsoByWindowViaWindowSet  - PAssert$0/GroupGlobally/GroupByKey: input elements: 1, expired elements: 0 [current=-290308-12-21T19:59:05.225Z, high=-290308-12-21T19:59:05.225Z]
56,615 [9] INFO  SparkGroupAlsoByWindowViaWindowSet  - PAssert$0/GroupGlobally/GroupByKey: advancing watermark from -290308-12-21T19:59:05.225Z to -290308-12-21T19:59:05.225Z
56,615 [9] TRACE SparkGroupAlsoByWindowViaWindowSet  - PAssert$0/GroupGlobally/GroupByKey: output elements: 0, state size: 2

56,770 [15] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: input elements: 0, expired elements: 0 [current=-290308-12-21T19:59:05.225Z, high=-290308-12-21T19:59:05.225Z]
56,771 [15] INFO  SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: advancing watermark from -290308-12-21T19:59:05.225Z to -290308-12-21T19:59:05.225Z
56,771 [15] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: output elements: 0, state size: 2

56,780 [8] INFO  StateSpecFunctions  - Source 0_0 read 0 values, watermarks: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z]

56,793 [listener] INFO  GlobalWatermarkHolder  - Batch 2022-09-09T10:32:55.000Z completed, new watermarks: {0=[low=-290308-12-21T19:59:05.225Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:55.000Z]}

56,793 [JobGenerator] INFO  GlobalWatermarkHolder  - Queued watermarks for source 0: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:56.000Z]

56,829 [20] TRACE SparkGroupAlsoByWindowViaWindowSet  - PAssert$0/GroupGlobally/GroupByKey: input elements: 0, expired elements: 0 [current=-290308-12-21T19:59:05.225Z, high=294247-01-10T04:00:54.775Z]
56,829 [20] INFO  SparkGroupAlsoByWindowViaWindowSet  - PAssert$0/GroupGlobally/GroupByKey: advancing watermark from -290308-12-21T19:59:05.225Z to 294247-01-10T04:00:54.775Z
56,830 [20] DEBUG SparkGroupAlsoByWindowViaWindowSet  - PAssert$0/GroupGlobally/GroupByKey: eligible timer at 294247-01-09T04:00:54.775Z: TimerData{timerId=0:9223371950454775, timerFamilyId=, namespace=Window(org.apache.beam.sdk.transforms.windowing.GlobalWindow@1f346ad2), timestamp=294247-01-09T04:00:54.775Z, outputTimestamp=294247-01-09T04:00:54.775Z, domain=EVENT_TIME, deleted=false}
56,835 [20] TRACE SparkGroupAlsoByWindowViaWindowSet  - PAssert$0/GroupGlobally/GroupByKey: output elements: 1, state size: 0

56,850 [20] ERROR Executor  - Exception in task 1.0 in stage 14.0 (TID 20)
org.apache.beam.sdk.util.UserCodeException: java.lang.AssertionError: Group.ByFields/ToRow/ParMultiDo(Anonymous).output: 
Expected: iterable with items [<Row: key:Row: string:mystring,,value:[Row: bool_non_nullable:true,int:43,long:44,float:44.1,double:44.2,string:mystring,bytes:[1, 2, 3, 4],fixed:[B@7cb94f7,date:1979-03-14T00:00:00.000Z,timestampMillis:1979-03-14T00:02:03.004Z,testEnum:enum value: 0,row:Row: BOOL_NON_NULLABLE:true,int:42,,array:[Row: BOOL_NON_NULLABLE:true,int:42,, Row: BOOL_NON_NULLABLE:true,int:42,, ],map:{(k1, Row: BOOL_NON_NULLABLE:true,int:42,), (k2, Row: BOOL_NON_NULLABLE:true,int:42,), },, ],>] in any order but: no item matches: <Row: key:Row: string:mystring,,value:[Row: bool_non_nullable:true,int:43,long:44,float:44.1,double:44.2,string:mystring,bytes:[1, 2, 3, 4],fixed:[B@7cb94f7,date:1979-03-14T00:00:00.000Z,timestampMillis:1979-03-14T00:02:03.004Z,testEnum:enum value: 0,row:Row: BOOL_NON_NULLABLE:true,int:42,,array:[Row: BOOL_NON_NULLABLE:true,int:42,, Row: BOOL_NON_NULLABLE:true,int:42,, ],map:{(k1, Row: BOOL_NON_NULLABLE:true,int:42,), (k2, Row: BOOL_NON_NULLABLE:true,int:42,), },, ],> in []
56,879 [task-result] ERROR TaskSetManager  - Task 1 in stage 14.0 failed 1 times; aborting job
56,893 [JobScheduler] ERROR JobScheduler  - Error running job streaming job 1662719575500 ms.0

57,004 [26] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: input elements: 0, expired elements: 0 [current=-290308-12-21T19:59:05.225Z, high=294247-01-10T04:00:54.775Z]
57,004 [26] INFO  SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: advancing watermark from -290308-12-21T19:59:05.225Z to 294247-01-10T04:00:54.775Z
57,005 [26] DEBUG SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: eligible timer at 294247-01-09T04:00:54.775Z: TimerData{timerId=0:9223371950454775, timerFamilyId=, namespace=Window(org.apache.beam.sdk.transforms.windowing.GlobalWindow@1f346ad2), timestamp=294247-01-09T04:00:54.775Z, outputTimestamp=294247-01-09T04:00:54.775Z, domain=EVENT_TIME, deleted=false}
57,005 [26] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: output elements: 1, state size: 0

57,005 [listener] INFO  GlobalWatermarkHolder  - Current watermarks for sourceId 0: [low=-290308-12-21T19:59:05.225Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:55.000Z]
57,005 [listener] INFO  GlobalWatermarkHolder  - Batch 2022-09-09T10:32:55.500Z completed, new watermarks {0=[low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:55.500Z]}

57,065 [31] TRACE SparkGroupAlsoByWindowViaWindowSet  - PAssert$0/GroupGlobally/GroupByKey: input elements: 1, expired elements: 1 [current=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z]
57,065 [31] INFO  SparkGroupAlsoByWindowViaWindowSet  - PAssert$0/GroupGlobally/GroupByKey: advancing watermark from 294247-01-10T04:00:54.775Z to 294247-01-10T04:00:54.775Z

57,174 [listener] INFO  GlobalWatermarkHolder  - Current watermarks for sourceId 0: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:55.500Z]
57,176 [listener] INFO  GlobalWatermarkHolder  - Batch 2022-09-09T10:32:56.000Z completed: new watermarks {0=[low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:56.000Z]}

57,180 [24] INFO  StateSpecFunctions  - Source 0_0 read 0 values, watermarks: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z]

57,190 [JobGenerator] INFO  GlobalWatermarkHolder  - Queued watermarks for source 0: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:56.500Z]

57,552 [listener] INFO  GlobalWatermarkHolder  - Current watermarks for sourceId 0: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:56.000Z]
57,555 [listener] INFO  GlobalWatermarkHolder  - Batch 2022-09-09T10:32:56.500Z completed, new watermarks {0=[low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:56.500Z]}

Issue Priority

Priority: 2

Issue Component

Component: runner-spark

mosche avatar Sep 09 '22 15:09 mosche

Hmm, seems like a major problem. There really must be a watermark per PCollection. In fact, to do timers right you need

  1. watermark for incoming elements (held by upstream and element timestamps, timers fire using this)
  2. watermark for incoming elements + queued timers (held also by timer output timestamps, GC fires using this)
  3. output watermark for each PCollection (related to the downstream watermarks but might not be equal if there is propagation queue)

kennknowles avatar Sep 13 '22 20:09 kennknowles

I agree with @kennknowles. I took a look as well and it seems that DStream Spark runner uses only a global watermark updated when the microbatch ends instead of when new elements arrive. That seems similar to the issue we're having on the Dataset runner: see watermark issue

echauchot avatar Sep 21 '22 09:09 echauchot