elastigo icon indicating copy to clipboard operation
elastigo copied to clipboard

Flush() in BulkIndexor is funky

Open dmichael opened this issue 12 years ago • 3 comments

This issue is really for @araddon

This works, but has some idiosyncrasies. For instance, in the bulk_test.go file for func TestBulkSmallBatch, if you do not "sleep" and only Flush(), the test fails. So it seems that in order to properly Flush, you have to wait for some unspecified duration of time so that the channel can receive and format the messages - at least I think this is what is happening.

I would not mind if the Flush method actually handled this wait time, but I don't really have any clue as to how to calculate a reasonable wait time.

Does this make sense?

dmichael avatar Jul 29 '13 17:07 dmichael

Yes, I think the thing to do is instead of a "amount of time", maybe a wait group on the startHttpSendor https://github.com/mattbaird/elastigo/blob/master/core/bulk.go#L155

araddon avatar Aug 03 '13 17:08 araddon

Although fixed at some point, this problem seems to have resurfaced. In my tests, I want to be able to:

  • bulk index some data
  • flush the bulk indexer
  • refresh the index
  • search

I don't expect to be able to do this in production, but it is a reasonable expectation for testing.

I may well be doing something wrong, but it seems that without a (rather long) delay of between flushing and refreshing, this doesn't work.

Here is the code from the old version:

// Flush all current documents to ElasticSearch
func (b *BulkIndexer) Flush() {
    b.mu.Lock()
    if b.docCt > 0 {
        b.send(b.buf)
    }
    b.mu.Unlock()
    for {
        select {
        case <-wgChan(b.sendWg):
            // done
            return
        case <-time.After(time.Second * time.Duration(MAX_SHUTDOWN_SECS)):
            // timeout!
            return
        }
    }
}

and the current version:

// Flush all current documents to ElasticSearch
func (b *BulkIndexer) Flush() {
    b.mu.Lock()
    if b.docCt > 0 {
        b.send(b.buf)
    }
    b.mu.Unlock()
}

bejayoharen avatar Oct 12 '16 19:10 bejayoharen

Just a note that this problem also hinders checking for errors at the end of a bulk index operation. Ideally, a developer would like to use a strategy like this:

  • start a bulk operation
  • push a bunch of data to the bulk operation
  • flush the bulk operation
  • check for errors
  • if errors, try again (the bulk operations would have to be idempotent)

However, flushing currently only ensures that the data has been passed to the write thread (if I understand correctly), not that it's been written and appropriate errors have been processed, so you don't know if you've gotten the errors yet.

At the very least, the flush operation should be fixed (ideally with some way to collect errors). I can provide a patch and merge request, but I want to make sure I understand the problem correctly. I would first try to fix the flush method in a way that is similar to the old code, and then attempt to provide an additional flush method that returns accumulated errors. Shall I proceed?

bejayoharen avatar Oct 13 '16 19:10 bejayoharen