[EventHub] Fix race condition when buffered mode is enabled
Description
This PR implements a fix to the race condition described in #34711.
The approach here is twofold:
- Require the
lockwhile trying to add new events to internal EventBatch buffer - Remove unnecessary clean up code
Implementing either of these would be enough to fix the related issue, but I would argue that both are necessary to make thinks consistent:
- Change 1. helps preventing future race condition issues, since it properly protects the shared resource
self._cur_batch. It is especially important to synchronousBufferedProducer, since it relies on multi-threading, which can be even more unpredictable than an event loop. - Change 2. improves the consistency of
self._cur_batchandself._cur_buffered_len:-
_cur_batchis "reset" (i.e., assigned to a newEventBatchobject) when and only when it gets enqueued toself._buffered_queue -
_cur_buffered_lenis decreased solely as a consequence of aself._producer.send. PR #25406 previously introduced a manual reset to this number, but it did not consider the timeout-based exit condition: https://github.com/Azure/azure-sdk-for-python/blob/23121a583108b6a09f2d45e3c7d1e99e70c365da/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py#L190-L200
-
A last note regarding Change 1.:
- it seems that the
azure-sdk-for-netalso updates theself._cur_batchwith a synchronization tool, seeRunPublishingAsync
I also have another small set of changes that could make sync and async BufferedProducer codebase more consistent. Since they do not have an impact on functionality, I thought it would be better to submit them in follow up PR.
All SDK Contribution checklist:
- [x] The pull request does not introduce [breaking changes]
- [x] CHANGELOG is updated for new features, bug fixes or other significant changes.
- [x] I have read the contribution guidelines.
General Guidelines and Best Practices
- [x] Title of the pull request is clear and informative.
- [x] There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, see this page.
Testing Guidelines
- [ ] Pull request includes test coverage for the included changes.
It would indeed be interesting to have some test code here, though I think it's hard to make determinist tests check this potential race condition. We would think something based on this test, though it is currently skipped:
https://github.com/Azure/azure-sdk-for-python/blob/23121a583108b6a09f2d45e3c7d1e99e70c365da/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_buffered_producer_async.py#L529-L532
Thank you for your contribution @falcaopetri! We will review the pull request and get back to you soon.
@microsoft-github-policy-service agree
/azp run python - eventhub - tests
Azure Pipelines successfully started running 1 pipeline(s).
Thank you so much for taking the time to add this @falcaopetri! Much appreciated :)