nifi icon indicating copy to clipboard operation
nifi copied to clipboard

NIFI-7085 add flowFile batching to ConsumeJMS and PublishJMS

Open mosermw opened this issue 1 year ago • 2 comments

Summary

NIFI-7085

This PR adds batching to PublishJMS and ConsumeJMS in the form of a new property. ConsumeJMS can read more than one message from the JMS broker per triggered task. PublishJMS can publish more than one flowFile to the JMS broker per triggered task. This helps get higher message throughput to JMS in busy NiFi environments.

The SupportsBatching annotation was considered, but it is not appropriate for processors that send/receive data from an external source.

The new Maximum Batch Size property has a default value of 1, to match old behavior. ConsumeJMS in record processing mode used to have a 10,000 max batch size which is replaced by this property value. ConsumeJMS will migrate the new Maximum Batch Size property to 10000 if it was not set and it was in record processing mode.

Fixed an bug in JMSConsumer:consumeMessageSet where it could read, but not acknowledge or commit, an extra message when batchCounter = MAX_MESSAGES_PER_FLOW_FILE

Fixed JMSPublisherConsumerIT integration test which didn't account for when consumeMessage() did not read a message.

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • [x] Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • [x] Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • [x] Pull Request based on current revision of the main branch
  • [x] Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • [x] Build completed using mvn clean install -P contrib-check
    • [x] JDK 21

Licensing

  • [n/a] New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • [n/a] New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • [x] Documentation formatting appears as expected in rendered files

mosermw avatar Mar 29 '24 17:03 mosermw

Rebased to account for nifi-nar-bundles moving to nifi-extension-bundles. Removed the JMSPublisherConsumerIT testMultipleThreads integration test fixes since that test itself was removed.

This MR may not seem like an appealing change, but ConsumeJMS already supported batching messages when using a Record Reader/Writer. This MR promotes batching to be available in all configurations. And as a reminder this MR fixes an bug in JMSConsumer consumeMessageSet() where it could read then drop an extra message when batchCounter reaches MAX_MESSAGES_PER_FLOW_FILE.

mosermw avatar May 13 '24 14:05 mosermw

Rebased onto main to resolve latest conflicts.

mosermw avatar Jun 04 '24 14:06 mosermw

I reinstated the JMSPublisherConsumerIT testMultipleThreads integration test, with changes to test consuming a batch of messages and fixes to make it reliable.

mosermw avatar Jul 26 '24 18:07 mosermw

@mosermw Sorry not sure why this has sat so long. From a quick visual scan it seems ok. Am building locally and will run all the tests/integration and otherwise.

Can you share more on whether you tested this fix before/after to verify that, in fact, performance is demonstrably better?

joewitt avatar Oct 18 '24 22:10 joewitt

Thanks for looking at this @joewitt. I was going to ask for eyes after the 2.0.0 push was over, so I appreciate you finding this now.

I probably spent >60 hours testing this, and my notes are on my work computer so I'll try to remember. My setup was activemq and nifi on separate EC2 instances, with PublishJMS pushing messages as fast as it can and ConsumeJMS reading messages in various scenarios. Under ideal conditions (100B message size, nothing else running in NiFi), 1 batch size versus 25 did 160k versus 350k (messages per 1 minute), so roughly double. Under more real-world conditions (5KB message size, NiFi busy doing lots of things) 1 versus 25 batch size did 30k versus 300k, which was more noticeable. Super big batch size like 10,000 didn't perform much different than 25 in my environment.

I even put a speedbump latency generator between activemq and nifi, but results were predictable. Though it did seem like PublishJMS was more affected by latency than ConsumeJMS, I didn't dive deeper there.

[edited - I remember my measurements were per minute, not per 5 mins]

mosermw avatar Oct 19 '24 00:10 mosermw