ES IndexerBold - Fix behaviour of afterBulk
Hi @jnioche,
I was looking into https://github.com/DigitalPebble/storm-crawler/pull/989#discussion_r918581042 and reviewed the old code in order to make sure, that I get the wanted behaviour. (see https://github.com/FelixEngl/storm-crawler/blob/834347e53f79376d3a79f125a6203c91d062e04f/external/elasticsearch/src/main/java/com/digitalpebble/stormcrawler/elasticsearch/bolt/IndexerBolt.java)
Now I am wondering, shouldn't it be enough to only process the first encounter of a BulkResponseElement with a specific id and otherwise just print the required LOG-events and update the counters accordingly?
Because the old code worked like this (if I got that right):
:START afterBulk
:ITERATION 1
+ waitAck ---------------+
| "A" | [tuple1, tuple3] |
| "B" | [tuple2] |
+------------------------+
+ bulk_response ---------------+
| 1. (id: "A", state: SUCCESS) |
| 2. (id: "B", state: SUCCESS) |
| 3. (id: "A", state: FAILURE) |
+------------------------------+
respone = bulk_respose.removeFirst() : (id: "A", state: SUCCESS)
tuples = waitAck.getIfPresent(response.id) : [tuple1, tuple3]
for(tuple in tuples){
// process all tuples as state: SUCCESS
...
}
waitAck.invalidate(response.id) // Immediate removal
:ITERATION 1
:ITERATION 2
+ waitAck -------+
| "B" | [tuple2] |
+----------------+
+ bulk_response ---------------+
| 2. (id: "B", state: SUCCESS) |
| 3. (id: "A", state: FAILURE) |
+------------------------------+
respone = bulk_respose.removeFirst() : (id: "B", state: SUCCESS)
tuples = waitAck.getIfPresent(response.id) : [tuple2]
for(tuple in tuples){
// process all tuples as state: SUCCESS
...
}
waitAck.invalidate(response.id) // Immediate removal
:ITERATION 2
:ITERATION 3
+ waitAck -------+
+----------------+
+ bulk_response ---------------+
| 3. (id: "A", state: FAILURE) |
+------------------------------+
respone = bulk_respose.removeFirst() : (id: "A", state: FAILURE)
tuples = waitAck.getIfPresent(response.id) : null
LOG.warn("could not find unacked tuple for A")
:ITERATION 3
:STOP afterBulk
Best Regards
Felix
Hi @FelixEngl I have to admit that some of your recent changes have made the code slightly more complicated than it was. Are you trying to simplify the current code?
Hi @FelixEngl I have to admit that some of your recent changes have made the code slightly more complicated than it was. Are you trying to simplify the current code?
Yes. To be honest right now i am not happy with the design i used in the ES part.
Its working fine, but looks too "wild" and may become problematic to support in the future.
So right now i am working on a redesign, but first i want to make sure, that I understood the ACK logic.
Its working fine, but looks too "wild" and may become problematic to support in the future.
That's also what I am worried about.
So right now i am working on a redesign, but first i want to make sure, that I understood the ACK logic.
Your step by step description above is correct although we just iterated on the bulk_response items without removing them.
BTW could you please add some comments to BulkItemResponseToFailedFlag so that readers can understand what it does?
Its working fine, but looks too "wild" and may become problematic to support in the future.
That's also what I am worried about.
Yes, that also came to me when I started to fix the bug. So I decided to da a "big rewrite" (Like Dylan Beattie described in his song "Big Rewrite": https://www.youtube.com/watch?v=xCGu5Z_vaps)
So right now i am working on a redesign, but first i want to make sure, that I understood the ACK logic.
Your step by step description above is correct although we just iterated on the bulk_response items without removing them.
BTW could you please add some comments to BulkItemResponseToFailedFlag so that readers can understand what it does?
Yes, I only added this step to emphasize the iteration. (A queue is easier to visualize than an iterator. 😄) And I'll look into that, maybe we won't need that flag BulkItemResponseToFailedFlag anymore when I redesigne the code. (I hope to get rid of some of the confusing parts, and to remove or at least streamline the streaming part.)
(But at the moment I'm quite busy so I don't think I can manage this week. 😖)
What about reverting to the previous version and simply add the lock logic?
What about reverting to the previous version and simply add the lock logic?
That wont do it, because the lock would span over too much logic. So I have to rewrite the whole thing anyway.