incubator-stormcrawler icon indicating copy to clipboard operation
incubator-stormcrawler copied to clipboard

ES IndexerBold - Fix behaviour of afterBulk

Open FelixEngl opened this issue 3 years ago • 6 comments

Hi @jnioche,

I was looking into https://github.com/DigitalPebble/storm-crawler/pull/989#discussion_r918581042 and reviewed the old code in order to make sure, that I get the wanted behaviour. (see https://github.com/FelixEngl/storm-crawler/blob/834347e53f79376d3a79f125a6203c91d062e04f/external/elasticsearch/src/main/java/com/digitalpebble/stormcrawler/elasticsearch/bolt/IndexerBolt.java)

Now I am wondering, shouldn't it be enough to only process the first encounter of a BulkResponseElement with a specific id and otherwise just print the required LOG-events and update the counters accordingly?

Because the old code worked like this (if I got that right):

:START afterBulk

:ITERATION 1
+ waitAck ---------------+
| "A" | [tuple1, tuple3] |
| "B" | [tuple2]         |
+------------------------+

+ bulk_response ---------------+
| 1. (id: "A", state: SUCCESS) |
| 2. (id: "B", state: SUCCESS) |
| 3. (id: "A", state: FAILURE) |
+------------------------------+

respone = bulk_respose.removeFirst() : (id: "A", state: SUCCESS)
tuples = waitAck.getIfPresent(response.id) : [tuple1, tuple3]
for(tuple in tuples){
    // process all tuples as state: SUCCESS
    ...
}
waitAck.invalidate(response.id) // Immediate removal
:ITERATION 1

:ITERATION 2
+ waitAck -------+
| "B" | [tuple2] |
+----------------+

+ bulk_response ---------------+
| 2. (id: "B", state: SUCCESS) |
| 3. (id: "A", state: FAILURE) |
+------------------------------+

respone = bulk_respose.removeFirst() : (id: "B", state: SUCCESS)
tuples = waitAck.getIfPresent(response.id) : [tuple2]
for(tuple in tuples){
    // process all tuples as state: SUCCESS
    ...
}
waitAck.invalidate(response.id) // Immediate removal
:ITERATION 2

:ITERATION 3
+ waitAck -------+
+----------------+

+ bulk_response ---------------+
| 3. (id: "A", state: FAILURE) |
+------------------------------+

respone = bulk_respose.removeFirst() : (id: "A", state: FAILURE)
tuples = waitAck.getIfPresent(response.id) : null
LOG.warn("could not find unacked tuple for A")
:ITERATION 3

:STOP afterBulk

Best Regards

Felix

FelixEngl avatar Jul 16 '22 08:07 FelixEngl

Hi @FelixEngl I have to admit that some of your recent changes have made the code slightly more complicated than it was. Are you trying to simplify the current code?

jnioche avatar Jul 18 '22 08:07 jnioche

Hi @FelixEngl I have to admit that some of your recent changes have made the code slightly more complicated than it was. Are you trying to simplify the current code?

Yes. To be honest right now i am not happy with the design i used in the ES part.

Its working fine, but looks too "wild" and may become problematic to support in the future.

So right now i am working on a redesign, but first i want to make sure, that I understood the ACK logic.

FelixEngl avatar Jul 18 '22 08:07 FelixEngl

Its working fine, but looks too "wild" and may become problematic to support in the future.

That's also what I am worried about.

So right now i am working on a redesign, but first i want to make sure, that I understood the ACK logic.

Your step by step description above is correct although we just iterated on the bulk_response items without removing them.

BTW could you please add some comments to BulkItemResponseToFailedFlag so that readers can understand what it does?

jnioche avatar Jul 18 '22 09:07 jnioche

Its working fine, but looks too "wild" and may become problematic to support in the future.

That's also what I am worried about.

Yes, that also came to me when I started to fix the bug. So I decided to da a "big rewrite" (Like Dylan Beattie described in his song "Big Rewrite": https://www.youtube.com/watch?v=xCGu5Z_vaps)

So right now i am working on a redesign, but first i want to make sure, that I understood the ACK logic.

Your step by step description above is correct although we just iterated on the bulk_response items without removing them.

BTW could you please add some comments to BulkItemResponseToFailedFlag so that readers can understand what it does?

Yes, I only added this step to emphasize the iteration. (A queue is easier to visualize than an iterator. 😄) And I'll look into that, maybe we won't need that flag BulkItemResponseToFailedFlag anymore when I redesigne the code. (I hope to get rid of some of the confusing parts, and to remove or at least streamline the streaming part.)

(But at the moment I'm quite busy so I don't think I can manage this week. 😖)

FelixEngl avatar Jul 18 '22 09:07 FelixEngl

What about reverting to the previous version and simply add the lock logic?

jnioche avatar Jul 18 '22 15:07 jnioche

What about reverting to the previous version and simply add the lock logic?

That wont do it, because the lock would span over too much logic. So I have to rewrite the whole thing anyway.

FelixEngl avatar Jul 19 '22 08:07 FelixEngl