NIFI-7085 add flowFile batching to ConsumeJMS and PublishJMS
Summary
This PR adds batching to PublishJMS and ConsumeJMS in the form of a new property. ConsumeJMS can read more than one message from the JMS broker per triggered task. PublishJMS can publish more than one flowFile to the JMS broker per triggered task. This helps get higher message throughput to JMS in busy NiFi environments.
The SupportsBatching annotation was considered, but it is not appropriate for processors that send/receive data from an external source.
The new Maximum Batch Size property has a default value of 1, to match old behavior. ConsumeJMS in record processing mode used to have a 10,000 max batch size which is replaced by this property value. ConsumeJMS will migrate the new Maximum Batch Size property to 10000 if it was not set and it was in record processing mode.
Fixed an bug in JMSConsumer:consumeMessageSet where it could read, but not acknowledge or commit, an extra message when batchCounter = MAX_MESSAGES_PER_FLOW_FILE
Fixed JMSPublisherConsumerIT integration test which didn't account for when consumeMessage() did not read a message.
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
- [x] Apache NiFi Jira issue created
Pull Request Tracking
- [x] Pull Request title starts with Apache NiFi Jira issue number, such as
NIFI-00000 - [x] Pull Request commit message starts with Apache NiFi Jira issue number, as such
NIFI-00000
Pull Request Formatting
- [x] Pull Request based on current revision of the
mainbranch - [x] Pull Request refers to a feature branch with one commit containing changes
Verification
Please indicate the verification steps performed prior to pull request creation.
Build
- [x] Build completed using
mvn clean install -P contrib-check- [x] JDK 21
Licensing
- [n/a] New dependencies are compatible with the Apache License 2.0 according to the License Policy
- [n/a] New dependencies are documented in applicable
LICENSEandNOTICEfiles
Documentation
- [x] Documentation formatting appears as expected in rendered files
Rebased to account for nifi-nar-bundles moving to nifi-extension-bundles. Removed the JMSPublisherConsumerIT testMultipleThreads integration test fixes since that test itself was removed.
This MR may not seem like an appealing change, but ConsumeJMS already supported batching messages when using a Record Reader/Writer. This MR promotes batching to be available in all configurations. And as a reminder this MR fixes an bug in JMSConsumer consumeMessageSet() where it could read then drop an extra message when batchCounter reaches MAX_MESSAGES_PER_FLOW_FILE.
Rebased onto main to resolve latest conflicts.
I reinstated the JMSPublisherConsumerIT testMultipleThreads integration test, with changes to test consuming a batch of messages and fixes to make it reliable.
@mosermw Sorry not sure why this has sat so long. From a quick visual scan it seems ok. Am building locally and will run all the tests/integration and otherwise.
Can you share more on whether you tested this fix before/after to verify that, in fact, performance is demonstrably better?
Thanks for looking at this @joewitt. I was going to ask for eyes after the 2.0.0 push was over, so I appreciate you finding this now.
I probably spent >60 hours testing this, and my notes are on my work computer so I'll try to remember. My setup was activemq and nifi on separate EC2 instances, with PublishJMS pushing messages as fast as it can and ConsumeJMS reading messages in various scenarios. Under ideal conditions (100B message size, nothing else running in NiFi), 1 batch size versus 25 did 160k versus 350k (messages per 1 minute), so roughly double. Under more real-world conditions (5KB message size, NiFi busy doing lots of things) 1 versus 25 batch size did 30k versus 300k, which was more noticeable. Super big batch size like 10,000 didn't perform much different than 25 in my environment.
I even put a speedbump latency generator between activemq and nifi, but results were predictable. Though it did seem like PublishJMS was more affected by latency than ConsumeJMS, I didn't dive deeper there.
[edited - I remember my measurements were per minute, not per 5 mins]