nifi icon indicating copy to clipboard operation
nifi copied to clipboard

NIFI-10251 Add v5 protocol support for existing MQTT processors

Open nandorsoma opened this issue 3 years ago • 2 comments

Summary

NIFI-10251 This pr adds v5 protocol support for existing MQTT processors. For v5 connections the processor from now on uses HiveMQ Client library while in case of v3.1.x connections it uses the existing Paho library. HiveMQ Client could have been used for v3.1.x connections but it seemed to be safer to use the existing library for compatibility reasons.

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • [x] Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • [x] Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • [x] Pull Request based on current revision of the main branch
  • [x] Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • [x] Build completed using mvn clean install -P contrib-check
    • [x] JDK 8
    • [ ] JDK 11
    • [ ] JDK 17

Licensing

  • [x] New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • [x] New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • [ ] Documentation formatting appears as expected in rendered files

nandorsoma avatar Jul 19 '22 21:07 nandorsoma

Thanks for the contribution @nandorsoma!

On initial review, the Static Analysis check failed due to multiple files missing the standard Apache License header. Please review the output of that check and add the header to the files indicated.

Interesting, because I've run the build with contrib-check enabled which I thought checks for that. Nevertheless I will add them of course!

Edit: I see now. The presence of the license header is checked with rat plugin, but unfortunately I need to disable it locally otherwise it fails on .iml files...

nandorsoma avatar Jul 21 '22 09:07 nandorsoma

@nandorsoma Thanks for adding v5 protocol support to MQTT processors!

I started to review / test this PR and found that the v5 client cannot stop properly:

2022-07-28 07:53:40,084 ERROR [Timer-Driven Process Thread-4] org.apache.nifi.util.ReflectionUtils Failed while invoking annotated method 'public void org.apache.nifi.processors.mqtt.ConsumeMQTT.onUnscheduled(org.apache.nifi.processor.ProcessContext)' with arguments '[org.apache.nifi.processor.StandardProcessContext@5f360d4e]'.
com.hivemq.client.mqtt.exceptions.MqttClientStateException: MQTT client is not connected.
	at com.hivemq.client.internal.mqtt.MqttBlockingClient.disconnect(MqttBlockingClient.java:195)
	at com.hivemq.client.internal.mqtt.MqttBlockingClient.disconnect(MqttBlockingClient.java:186)
	at org.apache.nifi.processors.mqtt.paho.HiveMqV5ClientAdapter.close(HiveMqV5ClientAdapter.java:99)
	at org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor.onStopped(AbstractMQTTProcessor.java:292)
	at org.apache.nifi.processors.mqtt.ConsumeMQTT.onUnscheduled(ConsumeMQTT.java:348)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:145)
	at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:133)
	at org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotations(ReflectionUtils.java:316)
	at org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotation(ReflectionUtils.java:93)
	at org.apache.nifi.controller.StandardProcessorNode$2.run(StandardProcessorNode.java:1877)
	at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Could you please fix it first? Due to this error, the processor cannot create a new client and NiFi restart needed.

turcsanyip avatar Jul 28 '22 05:07 turcsanyip

Thank you for the review @turcsanyip, @tpalfy and @exceptionfactory! I've tried to address your comments!

nandorsoma avatar Aug 23 '22 17:08 nandorsoma

Rebased on top of current main because CI failed on nifi-security-utils that was referenced with 1.17.0-SNAPSHOT version, that's why the force push.

nandorsoma avatar Aug 24 '22 01:08 nandorsoma

After a discussion I've removed commit [52204e4]. Probably I will open a separate pr for that change.

nandorsoma avatar Aug 25 '22 11:08 nandorsoma

Thank you for your additional review @exceptionfactory! Please see my latest commit!

nandorsoma avatar Aug 29 '22 23:08 nandorsoma

@exceptionfactory Thanks for the review! Your latest minor suggestions will be addressed in an upcoming PR (NIFI-10411) in order to avoid the extra build cycle here.

Merging to main...

turcsanyip avatar Aug 30 '22 08:08 turcsanyip