WeijieQ
WeijieQ
We met the same issue, is there any workaround available? FlinkRunner 1.18 Beam 2.57.0 KafkaIO ``` "Adding splits [FlinkSourceSplit{splitIndex=0, beamSource=[org.apache.beam.sdk.io](http://org.apache.beam.sdk.io/).kafka.KafkaUnboundedSource@5420eacc, splitState.isNull=false, checkpointMark=null}]", "Adding splits [FlinkSourceSplit{splitIndex=0, beamSource=[org.apache.beam.sdk.io](http://org.apache.beam.sdk.io/).kafka.KafkaUnboundedSource@174df24, splitState.isNull=true, checkpointMark=null}]", ```
hi @je-ik , tried with `--experiments=use_deprecated_read`, the behavior is the same - duplicated Source.Reader are created. With thread dump, we could observe multiple KafkaConsumerPoll-thread are running - reading from the...
Looks like very similar issue. We do normal shutdown with savepoint and then restore from savepoint. It causes the duplicate message issue (all new produced messages after restart will be...
@je-ik verified, it works, thank you!! When starting without savepoint, the splits are added with "Starting source". When starting with savepoint, this start is skipped thus no duplicate splits are...
@je-ik After applying this change, I noticed a side-effect: when I scale out my job (for example, increasing the parallelism from 4 to 8) and then restart from a savepoint,...
Thanks @je-ik , once you have the updated patch, I can also verify on my local.
@je-ik Confirmed with `--maxParallelism=32768` (set this before scale out), the additional new splits after scale out could up and run. So shall we apply this in our env or there...
Got it, double confirm on the suggested solution at the moment 1. applying the previous shared patch two days ago 2. set --maxParallelism as a start option Am I right?...