NIFI-10317 Taking care of NullPointerException if AMQP header value i…
…s null
Summary
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
- [ ] Apache NiFi Jira issue created
Pull Request Tracking
- [ ] Pull Request title starts with Apache NiFi Jira issue number, such as
NIFI-00000 - [ ] Pull Request commit message starts with Apache NiFi Jira issue number, as such
NIFI-00000
Pull Request Formatting
- [ ] Pull Request based on current revision of the
mainbranch - [ ] Pull Request refers to a feature branch with one commit containing changes
Verification
Please indicate the verification steps performed prior to pull request creation. Locally tested change
Build
- [ ] Build completed using
mvn clean install -P contrib-check- [ ] JDK 8
- [ ] JDK 11
- [ ] JDK 17
Licensing
- [ ] New dependencies are compatible with the Apache License 2.0 according to the License Policy
- [ ] New dependencies are documented in applicable
LICENSEandNOTICEfiles
Documentation
- [ ] Documentation formatting appears as expected in rendered files
@nandorsoma As per your suggestion, I have added the header string even if it is null and proposed a solution so that it doesn't break due to npe which happens in existing case. Please review and merge.
I'm interested in getting this one merged so I have pushed a small change to the tests for null/empty headers. Let me know what else should be improved.
I've updated the tests as recommended
Wait with the merge. I wanted to test it with RabbitMQ and it seems like it doesn't transfer headers with empty value. So when the header is set to foo1=bar,bar|foo2=bar,bar|foo3=null|foo4 on the publisher side, it will be {foo1=bar,bar,foo2=bar,bar,foo3=null} on the consumer side. Do we know which amqp provider sends a header with null value?
Hello @nandorsoma,
I don't know if it really matches your concern, but you'll find below a Python script example that breaks ConsumeAMQP when uses with RabbitMQ.
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672,'/'))
channel = connection.channel()
channel.exchange_declare('my_exchange')
channel.queue_declare('my_queue')
channel.queue_bind('my_queue', 'my_exchange', routing_key='my_routing_key')
hdrs={
'x-retry-count': None,
}
channel.basic_publish(
exchange='my_exchange',
routing_key='my_routing_key',
properties=pika.BasicProperties(
app_id='test-nifi',
headers=hdrs
),
body='test',
)
Hey @fabien-sarcel! Previously I didn't use NiFi for publishing, thats why I didn't encounter this problem. Apparently it seems like PublishMQTT logs a warning when the value is null and it doesn't add the header to the header list and it misguided me. With your script the processor now runs correctly. Sorry for the inconvenience!
I was able to verify the error described with Fabien's script, and that it is now fixed with these changes. +1 LGTM, will merge. Thank you everyone for your contributions!