NIFI-10251 Add v5 protocol support for existing MQTT processors
Summary
NIFI-10251 This pr adds v5 protocol support for existing MQTT processors. For v5 connections the processor from now on uses HiveMQ Client library while in case of v3.1.x connections it uses the existing Paho library. HiveMQ Client could have been used for v3.1.x connections but it seemed to be safer to use the existing library for compatibility reasons.
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
- [x] Apache NiFi Jira issue created
Pull Request Tracking
- [x] Pull Request title starts with Apache NiFi Jira issue number, such as
NIFI-00000 - [x] Pull Request commit message starts with Apache NiFi Jira issue number, as such
NIFI-00000
Pull Request Formatting
- [x] Pull Request based on current revision of the
mainbranch - [x] Pull Request refers to a feature branch with one commit containing changes
Verification
Please indicate the verification steps performed prior to pull request creation.
Build
- [x] Build completed using
mvn clean install -P contrib-check- [x] JDK 8
- [ ] JDK 11
- [ ] JDK 17
Licensing
- [x] New dependencies are compatible with the Apache License 2.0 according to the License Policy
- [x] New dependencies are documented in applicable
LICENSEandNOTICEfiles
Documentation
- [ ] Documentation formatting appears as expected in rendered files
Thanks for the contribution @nandorsoma!
On initial review, the
Static Analysischeck failed due to multiple files missing the standard Apache License header. Please review the output of that check and add the header to the files indicated.
Interesting, because I've run the build with contrib-check enabled which I thought checks for that. Nevertheless I will add them of course!
Edit: I see now. The presence of the license header is checked with rat plugin, but unfortunately I need to disable it locally otherwise it fails on .iml files...
@nandorsoma Thanks for adding v5 protocol support to MQTT processors!
I started to review / test this PR and found that the v5 client cannot stop properly:
2022-07-28 07:53:40,084 ERROR [Timer-Driven Process Thread-4] org.apache.nifi.util.ReflectionUtils Failed while invoking annotated method 'public void org.apache.nifi.processors.mqtt.ConsumeMQTT.onUnscheduled(org.apache.nifi.processor.ProcessContext)' with arguments '[org.apache.nifi.processor.StandardProcessContext@5f360d4e]'.
com.hivemq.client.mqtt.exceptions.MqttClientStateException: MQTT client is not connected.
at com.hivemq.client.internal.mqtt.MqttBlockingClient.disconnect(MqttBlockingClient.java:195)
at com.hivemq.client.internal.mqtt.MqttBlockingClient.disconnect(MqttBlockingClient.java:186)
at org.apache.nifi.processors.mqtt.paho.HiveMqV5ClientAdapter.close(HiveMqV5ClientAdapter.java:99)
at org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor.onStopped(AbstractMQTTProcessor.java:292)
at org.apache.nifi.processors.mqtt.ConsumeMQTT.onUnscheduled(ConsumeMQTT.java:348)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:145)
at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:133)
at org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotations(ReflectionUtils.java:316)
at org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotation(ReflectionUtils.java:93)
at org.apache.nifi.controller.StandardProcessorNode$2.run(StandardProcessorNode.java:1877)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Could you please fix it first? Due to this error, the processor cannot create a new client and NiFi restart needed.
Thank you for the review @turcsanyip, @tpalfy and @exceptionfactory! I've tried to address your comments!
Rebased on top of current main because CI failed on nifi-security-utils that was referenced with 1.17.0-SNAPSHOT version, that's why the force push.
After a discussion I've removed commit [52204e4]. Probably I will open a separate pr for that change.
Thank you for your additional review @exceptionfactory! Please see my latest commit!
@exceptionfactory Thanks for the review! Your latest minor suggestions will be addressed in an upcoming PR (NIFI-10411) in order to avoid the extra build cycle here.
Merging to main...