kafka icon indicating copy to clipboard operation
kafka copied to clipboard

KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items

Open VictorvandenHoven opened this issue 2 years ago • 13 comments

Flipped joinSpuriousLookBackTimeMs, because they were not correct. Moved the "emit non-joined items"-logic after the "joined items"-logic instead of before, because only then you know whether to emit or not. Introduced the windowsAfterIntervalMs-field that is used to find if emitting records can be skipped.

Now the KStreamKStreamLeftJoinTest tests leftJoin with .after(ZERO). (Invalid) unitTests have been modified in KStreamKStreamLeftJoinTest and KStreamKStreamOuterJoinTest according to new logic.

VictorvandenHoven avatar Sep 22 '23 12:09 VictorvandenHoven

Thanks for the PR. I did not forget about it (sorry for the wait; very busy times...).

Moved the "emit non-joined items"-logic after the "joined items"-logic instead of before, because only then you know whether to emit or not.

Can you elaborate? Not sure if I can follow?

mjsax avatar Oct 06 '23 04:10 mjsax

Thanks for the PR. I did not forget about it (sorry for the wait; very busy times...).

Moved the "emit non-joined items"-logic after the "joined items"-logic instead of before, because only then you know whether to emit or not.

Can you elaborate? Not sure if I can follow?

Sure. If not-joined record is emitted at window-close, but there is also a joined-record available at that time, then both not-joined and joined would be emitted wouldn't they? In the joined-item-logic the not-joined record in the outerJoinStore is nullified (correctly) but it has already been emitted before.

If I change the order back to original, I get the following failures in unit test "shouldNotEmitLeftJoinResultForAsymmetricAfterWindow" in KStreamKStreamOuterJoinTest.java:

java.lang.AssertionError: the number of outputs:[Record{key=2, value=null+a2, timestamp=101, headers=RecordHeaders(headers = [], isReadOnly = false)}, Record{key=3, value=null+a3, timestamp=101, headers=RecordHeaders(headers = [], isReadOnly = false)}, Record{key=2, value=A2+a2, timestamp=201, headers=RecordHeaders(headers = [], isReadOnly = false)}] Expected: is <2> but: was <3>

Here, the a2 record is both joined to null and A2. I don't thnink this is logical. To join or not-to join.... ;-)

Or am I mistaken?

VictorvandenHoven avatar Oct 06 '23 08:10 VictorvandenHoven

Also made a very quick pass, and I think the fix is spot on. It would be great to get this merged sooner.

guozhangwang avatar Nov 19 '23 01:11 guozhangwang

In javaDoc of JoinWindows:

There are three different window configuration supported: before = after = time-difference before = 0 and after = time-difference before = time-difference and after = 0 A join is symmetric in the sense, that a join specification on the first stream returns the same result record as a join specification on the second stream with flipped before and after values. Both values (before and after) must not result in an "inverse" window, i.e., upper-interval bound cannot be smaller than lower-interval bound.

I think with this change that non-symmetric values for the window configuration can be supported as well now.

VictorvandenHoven avatar Nov 21 '23 08:11 VictorvandenHoven

@mjsax , @guozhangwang , can we merge this?

lihaosky avatar Dec 11 '23 18:12 lihaosky

@mjsax , @guozhangwang , can we merge this?

Since it has been a couple of months, I suppose it will not be merged then?

Can we discuss this?

VictorvandenHoven avatar Jan 08 '24 08:01 VictorvandenHoven

@mjsax , @guozhangwang , can we merge this?

How long does it normally take to get a reaction?

VictorvandenHoven avatar Jan 19 '24 12:01 VictorvandenHoven

Accidently closed the PR, reopening again

VictorvandenHoven avatar Feb 09 '24 09:02 VictorvandenHoven

Merged the code of KAFKA-16123 into this PR. Everything else left as it was. All the tests still passed.

VictorvandenHoven avatar Feb 09 '24 13:02 VictorvandenHoven

Merged the code of KAFKA-16123 into this PR.

Why? We are mixing up two ticket if we do this (cf https://github.com/apache/kafka/pull/14426#discussion_r1483677544)

Can you remove those changes? Fixing the grace period should be kept separate to get different commits for different fixes.

mjsax avatar Feb 09 '24 22:02 mjsax

All the tests still passed.

What currently has been merged from KAFKA-16123 into this PR wouldn't solve the general case (non null-key records).

In other words, currently the fix in KAFKA-16123 only adresses the issue for null-key records and needs to be generalized for keyed records too.

I agree with @mjsax , I would keep it separate.

florin-akermann avatar Feb 10 '24 10:02 florin-akermann

I now pushed a 'generalized' fix for KAFKA-16123

florin-akermann avatar Feb 10 '24 22:02 florin-akermann

So I reverted the code of KAFKA-16123.

VictorvandenHoven avatar Feb 12 '24 08:02 VictorvandenHoven

@VictorvandenHoven -- it seems KStreamKStreamIntegrationTest.shouldOuterJoin fails consistently. Can you take a look?

mjsax avatar Mar 01 '24 23:03 mjsax

@VictorvandenHoven -- it seems KStreamKStreamIntegrationTest.shouldOuterJoin fails consistently. Can you take a look?

Ouch, didn't test that one.

Apparently, the internalProcessorContext.currentSystemTimeMs() behaves differently as expected. The MOCK_TIME advances 10 seconds and the internalProcessorContext.currentSystemTimeMs() only advances about 100 ms. Since the "next time to emit interval" is 1000ms by default, the "next time to emit" will never be reached.

When I change the value of the "next time to emit interval" to 0, by adding the following line in de @BeforeEach. streamsConfig.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 0L); Then the test succeeds.

I do not understand however, why this test worked before?

Shall I change the test with the EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX?

VictorvandenHoven avatar Mar 04 '24 14:03 VictorvandenHoven

Looking into the test, we create new KafkaStreams(builder.build(streamsConfig), streamsConfig), but we don't pass in the mock time object. So KS creates it's own Time object, so it's decoupled...

But yes, setting streamsConfig.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 0L); is a valid fix, and we do the same thing for others test -- in the end, this config is a perf optimization, but for testing we can disable it by setting it to zero.

I do not understand however, why this test worked before?

Not 100% sure TBO.

mjsax avatar Mar 05 '24 00:03 mjsax

Thanks for the fix! Merged to trunk.

Really appreciate that you did push this through. Was more complicated than expected and took way to long to get finished.

mjsax avatar Mar 06 '24 01:03 mjsax