[Bug]: Issues with Watermark propagation in Spark runner (streaming)
What happened?
I was running into this issue when adding support for VR tests for Spark in streaming mode (https://github.com/apache/beam/pull/22620).
It looks like the runner only supports a global watermark, causing issues when running multiple stateful operators. In any case watermark propagation seems to be highly indeterministic and prone to race conditions.
In the logs below you can see that timers fire downstream in PAssert$0/GroupGlobally/GroupByKey (at 56,830) after advancing the global watermark (at 56,793) but before respective timers are fired upstream in Group.ByFields/ToKvs/GroupByKey (at 57,005). As a result the expected element is the considered expired when finally arriving in PAssert$0/GroupGlobally/GroupByKey (at 57,065):
52,112 [worker] INFO TestSparkRunner - About to run test pipeline avroschematest0testavropipelinegroupby-mmack-0909103252-2afdba63
55,816 [0] INFO StateSpecFunctions - Source 0_0 read 1 values, watermarks: [low=-290308-12-21T19:59:05.225Z, high=294247-01-10T04:00:54.775Z]
55,899 [JobGenerator] INFO GlobalWatermarkHolder - Queued watermarks for source 0: [low=-290308-12-21T19:59:05.225Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:55.000Z]
56,297 [1] INFO StateSpecFunctions - Source 0_0 read 0 values, watermarks: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z]
56,314 [JobGenerator] INFO GlobalWatermarkHolder - Queued watermarks for source 0: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:55.500Z]
56,546 [3] TRACE SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: input elements: 1, expired elements: 0 [current=-290308-12-21T19:59:05.225Z, high=-290308-12-21T19:59:05.225Z]
56,567 [3] INFO SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: advancing watermark from -290308-12-21T19:59:05.225Z to -290308-12-21T19:59:05.225Z
56,570 [3] TRACE SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: output elements: 0, state size: 2
56,614 [9] TRACE SparkGroupAlsoByWindowViaWindowSet - PAssert$0/GroupGlobally/GroupByKey: input elements: 1, expired elements: 0 [current=-290308-12-21T19:59:05.225Z, high=-290308-12-21T19:59:05.225Z]
56,615 [9] INFO SparkGroupAlsoByWindowViaWindowSet - PAssert$0/GroupGlobally/GroupByKey: advancing watermark from -290308-12-21T19:59:05.225Z to -290308-12-21T19:59:05.225Z
56,615 [9] TRACE SparkGroupAlsoByWindowViaWindowSet - PAssert$0/GroupGlobally/GroupByKey: output elements: 0, state size: 2
56,770 [15] TRACE SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: input elements: 0, expired elements: 0 [current=-290308-12-21T19:59:05.225Z, high=-290308-12-21T19:59:05.225Z]
56,771 [15] INFO SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: advancing watermark from -290308-12-21T19:59:05.225Z to -290308-12-21T19:59:05.225Z
56,771 [15] TRACE SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: output elements: 0, state size: 2
56,780 [8] INFO StateSpecFunctions - Source 0_0 read 0 values, watermarks: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z]
56,793 [listener] INFO GlobalWatermarkHolder - Batch 2022-09-09T10:32:55.000Z completed, new watermarks: {0=[low=-290308-12-21T19:59:05.225Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:55.000Z]}
56,793 [JobGenerator] INFO GlobalWatermarkHolder - Queued watermarks for source 0: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:56.000Z]
56,829 [20] TRACE SparkGroupAlsoByWindowViaWindowSet - PAssert$0/GroupGlobally/GroupByKey: input elements: 0, expired elements: 0 [current=-290308-12-21T19:59:05.225Z, high=294247-01-10T04:00:54.775Z]
56,829 [20] INFO SparkGroupAlsoByWindowViaWindowSet - PAssert$0/GroupGlobally/GroupByKey: advancing watermark from -290308-12-21T19:59:05.225Z to 294247-01-10T04:00:54.775Z
56,830 [20] DEBUG SparkGroupAlsoByWindowViaWindowSet - PAssert$0/GroupGlobally/GroupByKey: eligible timer at 294247-01-09T04:00:54.775Z: TimerData{timerId=0:9223371950454775, timerFamilyId=, namespace=Window(org.apache.beam.sdk.transforms.windowing.GlobalWindow@1f346ad2), timestamp=294247-01-09T04:00:54.775Z, outputTimestamp=294247-01-09T04:00:54.775Z, domain=EVENT_TIME, deleted=false}
56,835 [20] TRACE SparkGroupAlsoByWindowViaWindowSet - PAssert$0/GroupGlobally/GroupByKey: output elements: 1, state size: 0
56,850 [20] ERROR Executor - Exception in task 1.0 in stage 14.0 (TID 20)
org.apache.beam.sdk.util.UserCodeException: java.lang.AssertionError: Group.ByFields/ToRow/ParMultiDo(Anonymous).output:
Expected: iterable with items [<Row: key:Row: string:mystring,,value:[Row: bool_non_nullable:true,int:43,long:44,float:44.1,double:44.2,string:mystring,bytes:[1, 2, 3, 4],fixed:[B@7cb94f7,date:1979-03-14T00:00:00.000Z,timestampMillis:1979-03-14T00:02:03.004Z,testEnum:enum value: 0,row:Row: BOOL_NON_NULLABLE:true,int:42,,array:[Row: BOOL_NON_NULLABLE:true,int:42,, Row: BOOL_NON_NULLABLE:true,int:42,, ],map:{(k1, Row: BOOL_NON_NULLABLE:true,int:42,), (k2, Row: BOOL_NON_NULLABLE:true,int:42,), },, ],>] in any order but: no item matches: <Row: key:Row: string:mystring,,value:[Row: bool_non_nullable:true,int:43,long:44,float:44.1,double:44.2,string:mystring,bytes:[1, 2, 3, 4],fixed:[B@7cb94f7,date:1979-03-14T00:00:00.000Z,timestampMillis:1979-03-14T00:02:03.004Z,testEnum:enum value: 0,row:Row: BOOL_NON_NULLABLE:true,int:42,,array:[Row: BOOL_NON_NULLABLE:true,int:42,, Row: BOOL_NON_NULLABLE:true,int:42,, ],map:{(k1, Row: BOOL_NON_NULLABLE:true,int:42,), (k2, Row: BOOL_NON_NULLABLE:true,int:42,), },, ],> in []
56,879 [task-result] ERROR TaskSetManager - Task 1 in stage 14.0 failed 1 times; aborting job
56,893 [JobScheduler] ERROR JobScheduler - Error running job streaming job 1662719575500 ms.0
57,004 [26] TRACE SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: input elements: 0, expired elements: 0 [current=-290308-12-21T19:59:05.225Z, high=294247-01-10T04:00:54.775Z]
57,004 [26] INFO SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: advancing watermark from -290308-12-21T19:59:05.225Z to 294247-01-10T04:00:54.775Z
57,005 [26] DEBUG SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: eligible timer at 294247-01-09T04:00:54.775Z: TimerData{timerId=0:9223371950454775, timerFamilyId=, namespace=Window(org.apache.beam.sdk.transforms.windowing.GlobalWindow@1f346ad2), timestamp=294247-01-09T04:00:54.775Z, outputTimestamp=294247-01-09T04:00:54.775Z, domain=EVENT_TIME, deleted=false}
57,005 [26] TRACE SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: output elements: 1, state size: 0
57,005 [listener] INFO GlobalWatermarkHolder - Current watermarks for sourceId 0: [low=-290308-12-21T19:59:05.225Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:55.000Z]
57,005 [listener] INFO GlobalWatermarkHolder - Batch 2022-09-09T10:32:55.500Z completed, new watermarks {0=[low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:55.500Z]}
57,065 [31] TRACE SparkGroupAlsoByWindowViaWindowSet - PAssert$0/GroupGlobally/GroupByKey: input elements: 1, expired elements: 1 [current=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z]
57,065 [31] INFO SparkGroupAlsoByWindowViaWindowSet - PAssert$0/GroupGlobally/GroupByKey: advancing watermark from 294247-01-10T04:00:54.775Z to 294247-01-10T04:00:54.775Z
57,174 [listener] INFO GlobalWatermarkHolder - Current watermarks for sourceId 0: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:55.500Z]
57,176 [listener] INFO GlobalWatermarkHolder - Batch 2022-09-09T10:32:56.000Z completed: new watermarks {0=[low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:56.000Z]}
57,180 [24] INFO StateSpecFunctions - Source 0_0 read 0 values, watermarks: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z]
57,190 [JobGenerator] INFO GlobalWatermarkHolder - Queued watermarks for source 0: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:56.500Z]
57,552 [listener] INFO GlobalWatermarkHolder - Current watermarks for sourceId 0: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:56.000Z]
57,555 [listener] INFO GlobalWatermarkHolder - Batch 2022-09-09T10:32:56.500Z completed, new watermarks {0=[low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:56.500Z]}
Issue Priority
Priority: 2
Issue Component
Component: runner-spark
Hmm, seems like a major problem. There really must be a watermark per PCollection. In fact, to do timers right you need
- watermark for incoming elements (held by upstream and element timestamps, timers fire using this)
- watermark for incoming elements + queued timers (held also by timer output timestamps, GC fires using this)
- output watermark for each PCollection (related to the downstream watermarks but might not be equal if there is propagation queue)
I agree with @kennknowles. I took a look as well and it seems that DStream Spark runner uses only a global watermark updated when the microbatch ends instead of when new elements arrive. That seems similar to the issue we're having on the Dataset runner: see watermark issue