[Bug]: negative ack
Bug Description
i'm using the NACK here both fail and reject is working as same like both cases messages are going to be nack messages are in queue ,for fail condition when we run the redelivery script messages are consumed but it's deleting from the queue reject case messages deleted from the queue based on solace concept but my case messages are in queue
Expected Behavior
Fail and reject must work on differently
Steps to Reproduce
import os import platform import time import argparse from dotenv import load_dotenv
from solace.messaging.messaging_service import MessagingService, ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener, ServiceEvent from solace.messaging.resources.queue import Queue from solace.messaging.config.retry_strategy import RetryStrategy from solace.messaging.receiver.persistent_message_receiver import PersistentMessageReceiver from solace.messaging.receiver.message_receiver import MessageHandler, InboundMessage from solace.messaging.errors.pubsubplus_client_error import PubSubPlusClientError from solace.messaging.config.missing_resources_creation_configuration import MissingResourcesCreationStrategy from solace.messaging.config.transport_security_strategy import TLS from solace.messaging.config.authentication_strategy import ClientCertificateAuthentication
Load environment variables from a .env file (if available)
load_dotenv()
Set up the environment variable for disabling stdout buffering on Windows
if platform.uname().system == 'Windows': os.environ["PYTHONUNBUFFERED"] = "1" # Disable stdout buffer
Parse command-line arguments using argparse
parser = argparse.ArgumentParser(description="BTP AEM consumer CLI") parser.add_argument('-q', '--queue', type=str, required=True, help='Valid AEM queue name') parser.add_argument('-a', '--ack', type=str, choices=['accept', 'reject', 'fail'], default='accept', help='Acknowledgement mode (accept, reject, fail)') args = parser.parse_args()
QUEUE_NAME = args.queue # Get queue name from CLI argument ACK_MODE = args.ack # Get ack mode from CLI argument
Handle received messages
class MessageHandlerImpl(MessageHandler): def init(self, persistent_receiver: PersistentMessageReceiver): self.receiver: PersistentMessageReceiver = persistent_receiver
def on_message(self, message: InboundMessage):
payload = message.get_payload_as_string() if message.get_payload_as_string() is not None else message.get_payload_as_bytes()
if isinstance(payload, bytearray):
print(f"Received a message of type: {type(payload)}. Decoding to string")
payload = payload.decode()
topic = message.get_destination_name()
print("\n" + f"Received message on: {topic}")
print("\n" + f"Message payload: {payload} \n")
# Acknowledge based on ack mode passed via CLI
if ACK_MODE == 'accept':
self.receiver.ack(message)
elif ACK_MODE == 'reject':
self.receiver.reject(message)
elif ACK_MODE == 'fail':
self.receiver.fail(message)
Inner classes for error handling
class ServiceEventHandler(ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener): def on_reconnected(self, e: ServiceEvent): print("\non_reconnected") print(f"Error cause: {e.get_cause()}") print(f"Message: {e.get_message()}")
def on_reconnecting(self, e: "ServiceEvent"):
print("\non_reconnecting")
print(f"Error cause: {e.get_cause()}")
print(f"Message: {e.get_message()}")
def on_service_interrupted(self, e: "ServiceEvent"):
print("\non_service_interrupted")
print(f"Error cause: {e.get_cause()}")
print(f"Message: {e.get_message()}")
Broker Config from environment variables
broker_props = { "solace.messaging.transport.host": os.getenv("AEM_HOST"), "solace.messaging.service.vpn-name": os.getenv("AEM_VPN_NAME"), }
Set up SSL Context for client certificate authentication
ssl_cert_dir = os.getenv("AEM_SSL_CERT_DIR") client_cert_file = os.getenv("AEM_CERT_FILE") client_key_file = os.getenv("AEM_CLIENT_KEY_FILE") client_key_password = os.getenv("AEM_CLIENT_KEY_PASSWORD")
Configure the transport security with client certificate authentication
transport_security = TLS.create()
.with_certificate_validation(True, validate_server_name=False, trust_store_file_path=ssl_cert_dir)
Configure client certificate authentication
client_cert_authentication = ClientCertificateAuthentication.of( certificate_file=client_cert_file, key_file=client_key_file, key_password=client_key_password )
Build MessagingService with authentication strategy and transport security
messaging_service = MessagingService.builder()
.from_properties(broker_props)
.with_transport_security_strategy(transport_security)
.with_authentication_strategy(client_cert_authentication)
.build()
Event Handler for the messaging service
service_handler = ServiceEventHandler() messaging_service.add_reconnection_listener(service_handler) messaging_service.add_reconnection_attempt_listener(service_handler) messaging_service.add_service_interruption_listener(service_handler)
Blocking connect thread
messaging_service.connect() print(f'Messaging Service connected? {messaging_service.is_connected}')
Event Handling for the messaging service
Queue name from the command-line argument
durable_exclusive_queue = Queue.durable_exclusive_queue(QUEUE_NAME)
try:
# Build a receiver and bind it to the durable exclusive queue
persistent_receiver: PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder()
.with_missing_resources_creation_strategy(MissingResourcesCreationStrategy.CREATE_ON_START)
.build(durable_exclusive_queue)
persistent_receiver.start()
# Callback for received messages
persistent_receiver.receive_async(MessageHandlerImpl(persistent_receiver))
print(f'PERSISTENT receiver started... Bound to Queue [{durable_exclusive_queue.get_name()}]')
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print('\nKeyboardInterrupt received')
except PubSubPlusClientError as exception: print(f'\nMake sure queue {QUEUE_NAME} exists on the broker!')
finally: if persistent_receiver and persistent_receiver.is_running(): print('\nTerminating receiver') persistent_receiver.terminate(grace_period=0) print('\nDisconnecting Messaging Service') messaging_service.disconnect()
Solace Product
Solace PubSub+ Event Portal
Solace Broker version
No response
Solace API
python
Solace API version
No response
Hi @sowjanya1117, Can you verify that you are setting the proper outcomes? I don't see that in your code. Example here: https://github.com/SolaceSamples/solace-samples-python/blob/bb8631cc6ee90f41efa7a0df0bfc5a03218beefc/howtos/pubsub/how_to_consume_persistent_message_with_negative_acknowledgement.py#L37
If you are please explain a bit further what the issue is as your verbiage is a bit hard to follow.
Hi @sowjanya1117 - can you please confirm if you were able to follow the suggested solution in this issue https://github.com/SolaceSamples/solace-samples-python/issues/54
This is the script i'm using import os import platform import time import argparse from dotenv import load_dotenv
from solace.messaging.messaging_service import MessagingService, ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener, ServiceEvent from solace.messaging.resources.queue import Queue from solace.messaging.config.retry_strategy import RetryStrategy from solace.messaging.receiver.persistent_message_receiver import PersistentMessageReceiver from solace.messaging.receiver.message_receiver import MessageHandler, InboundMessage from solace.messaging.errors.pubsubplus_client_error import PubSubPlusClientError from solace.messaging.config.missing_resources_creation_configuration import MissingResourcesCreationStrategy from solace.messaging.config.transport_security_strategy import TLS from solace.messaging.config.authentication_strategy import ClientCertificateAuthentication
Load environment variables from a .env file (if available)
load_dotenv()
Set up the environment variable for disabling stdout buffering on Windows
if platform.uname().system == 'Windows': os.environ["PYTHONUNBUFFERED"] = "1" # Disable stdout buffer
Parse command-line arguments using argparse
parser = argparse.ArgumentParser(description="BTP AEM consumer CLI") parser.add_argument('-q', '--queue', type=str, required=True, help='Valid AEM queue name') parser.add_argument('-a', '--ack', type=str, choices=['accept', 'reject', 'fail'], default='accept', help='Acknowledgement mode (accept, reject, fail)') args = parser.parse_args()
QUEUE_NAME = args.queue # Get queue name from CLI argument ACK_MODE = args.ack # Get ack mode from CLI argument
Handle received messages
class MessageHandlerImpl(MessageHandler): def init(self, persistent_receiver: PersistentMessageReceiver): self.receiver: PersistentMessageReceiver = persistent_receiver
def on_message(self, message: InboundMessage):
payload = message.get_payload_as_string() if message.get_payload_as_string() is not None else message.get_payload_as_bytes()
if isinstance(payload, bytearray):
print(f"Received a message of type: {type(payload)}. Decoding to string")
payload = payload.decode()
topic = message.get_destination_name()
print("\n" + f"Received message on: {topic}")
print("\n" + f"Message payload: {payload} \n")
# Acknowledge based on ack mode passed via CLI
if ACK_MODE == 'accept':
self.receiver.ack(message)
elif ACK_MODE == 'reject':
self.receiver.reject(message)
elif ACK_MODE == 'fail':
self.receiver.fail(message)
Inner classes for error handling
class ServiceEventHandler(ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener): def on_reconnected(self, e: ServiceEvent): print("\non_reconnected") print(f"Error cause: {e.get_cause()}") print(f"Message: {e.get_message()}")
def on_reconnecting(self, e: "ServiceEvent"):
print("\non_reconnecting")
print(f"Error cause: {e.get_cause()}")
print(f"Message: {e.get_message()}")
def on_service_interrupted(self, e: "ServiceEvent"):
print("\non_service_interrupted")
print(f"Error cause: {e.get_cause()}")
print(f"Message: {e.get_message()}")
Broker Config from environment variables
broker_props = { "solace.messaging.transport.host": os.getenv("AEM_HOST"), "solace.messaging.service.vpn-name": os.getenv("AEM_VPN_NAME"), }
Set up SSL Context for client certificate authentication
ssl_cert_dir = os.getenv("AEM_SSL_CERT_DIR") client_cert_file = os.getenv("AEM_CERT_FILE") client_key_file = os.getenv("AEM_CLIENT_KEY_FILE") client_key_password = os.getenv("AEM_CLIENT_KEY_PASSWORD")
Configure the transport security with client certificate authentication
transport_security = TLS.create()
.with_certificate_validation(True, validate_server_name=False, trust_store_file_path=ssl_cert_dir)
Configure client certificate authentication
client_cert_authentication = ClientCertificateAuthentication.of( certificate_file=client_cert_file, key_file=client_key_file, key_password=client_key_password )
Build MessagingService with authentication strategy and transport security
messaging_service = MessagingService.builder()
.from_properties(broker_props)
.with_transport_security_strategy(transport_security)
.with_authentication_strategy(client_cert_authentication)
.build()
Event Handler for the messaging service
service_handler = ServiceEventHandler() messaging_service.add_reconnection_listener(service_handler) messaging_service.add_reconnection_attempt_listener(service_handler) messaging_service.add_service_interruption_listener(service_handler)
Blocking connect thread
messaging_service.connect() print(f'Messaging Service connected? {messaging_service.is_connected}')
Event Handling for the messaging service
Queue name from the command-line argument
durable_exclusive_queue = Queue.durable_exclusive_queue(QUEUE_NAME)
try:
# Build a receiver and bind it to the durable exclusive queue
persistent_receiver: PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder()
.with_missing_resources_creation_strategy(MissingResourcesCreationStrategy.CREATE_ON_START)
.build(durable_exclusive_queue)
persistent_receiver.start()
# Callback for received messages
persistent_receiver.receive_async(MessageHandlerImpl(persistent_receiver))
print(f'PERSISTENT receiver started... Bound to Queue [{durable_exclusive_queue.get_name()}]')
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print('\nKeyboardInterrupt received')
except PubSubPlusClientError as exception: print(f'\nMake sure queue {QUEUE_NAME} exists on the broker!')
finally: if persistent_receiver and persistent_receiver.is_running(): print('\nTerminating receiver') persistent_receiver.terminate(grace_period=0) print('\nDisconnecting Messaging Service') messaging_service.disconnect()
@TamimiGitHub i have tried that solution but still getting same error 1)This for Fail python test5.py -q "SBTest.Q1" -a fail Messaging Service connected? True PERSISTENT receiver started... Bound to Queue [SBTest.Q1]
Received message on: SBTest/T1
Message payload: test + 0
2025-04-22 23:04:25,765 [WARNING] solace.messaging.receiver: [_persistent_message_receiver.py:192] [owner: [SERVICE: 0x100abc780] [RECEIVER: 0x100cb3230]; thread PersistentMessageReceiverThread: 0x100cb3920] Failed to dispatch the callback [<class 'main.MessageHandlerImpl'>] '_PersistentMessageReceiver' object has no attribute 'fail'
Received message on: SBTest/T1
Message payload: test + 1
2025-04-22 23:04:25,866 [WARNING] solace.messaging.receiver: [_persistent_message_receiver.py:192] [owner: [SERVICE: 0x100abc780] [RECEIVER: 0x100cb3230]; thread PersistentMessageReceiverThread: 0x100cb3920] Failed to dispatch the callback [<class 'main.MessageHandlerImpl'>] '_PersistentMessageReceiver' object has no attribute 'fail'
2)This is for reject python test5.py -q "SBTest.Q1" -a reject Messaging Service connected? True PERSISTENT receiver started... Bound to Queue [SBTest.Q1]
Received message on: SBTest/T1
Message payload: test + 0
2025-04-22 23:05:51,010 [WARNING] solace.messaging.receiver: [_persistent_message_receiver.py:192] [owner: [SERVICE: 0x101414780] [RECEIVER: 0x10218f1a0]; thread PersistentMessageReceiverThread: 0x10218f860] Failed to dispatch the callback [<class 'main.MessageHandlerImpl'>] '_PersistentMessageReceiver' object has no attribute 'reject'
Received message on: SBTest/T1
Message payload: test + 1
2025-04-22 23:05:51,114 [WARNING] solace.messaging.receiver: [_persistent_message_receiver.py:192] [owner: [SERVICE: 0x101414780] [RECEIVER: 0x10218f1a0]; thread PersistentMessageReceiverThread: 0x10218f860] Failed to dispatch the callback [<class 'main.MessageHandlerImpl'>] '_PersistentMessageReceiver' object has no attribute 'reject'
@sowjanya1117 can you please encapsulate your code in code format blocks for easier reading and debugging. Read more about it here https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/creating-and-highlighting-code-blocks
Also, I realized that on March 10, 2025 you opened a similar issue https://github.com/SolaceSamples/solace-samples-python/issues/52 and still using a function that does not exist in your script above. Could you please try the solution proposed in that issue and see if you are still getting the same problem. Thanks!
import os
import platform
import time
import argparse
from dotenv import load_dotenv
from solace.messaging.messaging_service import MessagingService, ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener, ServiceEvent
from solace.messaging.resources.queue import Queue
from solace.messaging.config.retry_strategy import RetryStrategy
from solace.messaging.receiver.persistent_message_receiver import PersistentMessageReceiver
from solace.messaging.receiver.message_receiver import MessageHandler, InboundMessage
from solace.messaging.errors.pubsubplus_client_error import PubSubPlusClientError
from solace.messaging.config.missing_resources_creation_configuration import MissingResourcesCreationStrategy
from solace.messaging.config.transport_security_strategy import TLS
from solace.messaging.config.authentication_strategy import ClientCertificateAuthentication
# Load environment variables from a .env file (if available)
load_dotenv()
# Set up the environment variable for disabling stdout buffering on Windows
if platform.uname().system == 'Windows':
os.environ["PYTHONUNBUFFERED"] = "1" # Disable stdout buffer
# Parse command-line arguments using argparse
parser = argparse.ArgumentParser(description="BTP AEM consumer CLI")
parser.add_argument('-q', '--queue', type=str, required=True, help='Valid AEM queue name')
parser.add_argument('-a', '--ack', type=str, choices=['accept', 'reject', 'fail'], default='accept', help='Acknowledgement mode (accept, reject, fail)')
args = parser.parse_args()
QUEUE_NAME = args.queue # Get queue name from CLI argument
ACK_MODE = args.ack # Get ack mode from CLI argument
# Handle received messages
class MessageHandlerImpl(MessageHandler):
def __init__(self, persistent_receiver: PersistentMessageReceiver):
self.receiver: PersistentMessageReceiver = persistent_receiver
def on_message(self, message: InboundMessage):
payload = message.get_payload_as_string() if message.get_payload_as_string() is not None else message.get_payload_as_bytes()
if isinstance(payload, bytearray):
print(f"Received a message of type: {type(payload)}. Decoding to string")
payload = payload.decode()
topic = message.get_destination_name()
print("\n" + f"Received message on: {topic}")
print("\n" + f"Message payload: {payload} \n")
# Acknowledge based on ack mode passed via CLI
if ACK_MODE == 'accept':
self.receiver.ack(message)
elif ACK_MODE == 'reject':
self.receiver.reject(message)
elif ACK_MODE == 'fail':
self.receiver.fail(message)
# Inner classes for error handling
class ServiceEventHandler(ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener):
def on_reconnected(self, e: ServiceEvent):
print("\non_reconnected")
print(f"Error cause: {e.get_cause()}")
print(f"Message: {e.get_message()}")
def on_reconnecting(self, e: "ServiceEvent"):
print("\non_reconnecting")
print(f"Error cause: {e.get_cause()}")
print(f"Message: {e.get_message()}")
def on_service_interrupted(self, e: "ServiceEvent"):
print("\non_service_interrupted")
print(f"Error cause: {e.get_cause()}")
print(f"Message: {e.get_message()}")
# Broker Config from environment variables
broker_props = {
"solace.messaging.transport.host": os.getenv("AEM_HOST"),
"solace.messaging.service.vpn-name": os.getenv("AEM_VPN_NAME"),
}
# Set up SSL Context for client certificate authentication
ssl_cert_dir = os.getenv("AEM_SSL_CERT_DIR")
client_cert_file = os.getenv("AEM_CERT_FILE")
client_key_file = os.getenv("AEM_CLIENT_KEY_FILE")
client_key_password = os.getenv("AEM_CLIENT_KEY_PASSWORD")
# Configure the transport security with client certificate authentication
transport_security = TLS.create() \
.with_certificate_validation(True, validate_server_name=False, trust_store_file_path=ssl_cert_dir)
# Configure client certificate authentication
client_cert_authentication = ClientCertificateAuthentication.of(
certificate_file=client_cert_file,
key_file=client_key_file,
key_password=client_key_password
)
# Build MessagingService with authentication strategy and transport security
messaging_service = MessagingService.builder() \
.from_properties(broker_props) \
.with_transport_security_strategy(transport_security) \
.with_authentication_strategy(client_cert_authentication) \
.build()
# Event Handler for the messaging service
service_handler = ServiceEventHandler()
messaging_service.add_reconnection_listener(service_handler)
messaging_service.add_reconnection_attempt_listener(service_handler)
messaging_service.add_service_interruption_listener(service_handler)
# Blocking connect thread
messaging_service.connect()
print(f'Messaging Service connected? {messaging_service.is_connected}')
# Event Handling for the messaging service
# Queue name from the command-line argument
durable_exclusive_queue = Queue.durable_exclusive_queue(QUEUE_NAME)
try:
# Build a receiver and bind it to the durable exclusive queue
persistent_receiver: PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder()\
.with_missing_resources_creation_strategy(MissingResourcesCreationStrategy.CREATE_ON_START)\
.build(durable_exclusive_queue)
persistent_receiver.start()
# Callback for received messages
persistent_receiver.receive_async(MessageHandlerImpl(persistent_receiver))
print(f'PERSISTENT receiver started... Bound to Queue [{durable_exclusive_queue.get_name()}]')
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print('\nKeyboardInterrupt received')
except PubSubPlusClientError as exception:
print(f'\nMake sure queue {QUEUE_NAME} exists on the broker!')
finally:
if persistent_receiver and persistent_receiver.is_running():
print('\nTerminating receiver')
persistent_receiver.terminate(grace_period=0)
print('\nDisconnecting Messaging Service')
messaging_service.disconnect()
Hey @sowjanya1117 - can you please refer to the solution proposed in the linked issue from my previous comment and let me know if using the supported functions does not work. Thanks!
Hi @TamimiGitHub ! sorry for asking many times but somehow i'm not able to find the problem, i tried the solution that provided previously but i'm getting multiple error while using that script. if you don't mind please correct the script that i provided in previous message
No worries @sowjanya1117, here to help figure out what issue you are facing. So in the previous issue, I pointed out that you are trying to use a function that does not exist in the Solace Python API self.receiver.reject(message). I am guessing you are attempting to implement a negative acknowledgement functionality in your script?
As per the Feature Support in the Solace Messaging APIs section in the solace documentations, negative acknowledgement was introduced in the Python API in version 1.9+, so please make sure as well you are using the latest Solace Python API
If you would like to implement a NACK functionality (to "fail" the processing of the message and reject it) I would recommend to check this sample on how_to_consume_persistent_message_with_negative_acknowledgement. Particularly the settlement section. In order to settle the outcome of message processing you need to use the receiver.settle method and pass the message along with the outcome settlement you want to achieve. So in your example, you will have to do something along the lines of:
from solace.messaging.config.message_acknowledgement_configuration import Outcome
if ACK_MODE == 'accept':
self.receiver.settle(message, Outcome.ACCEPTED)
elif ACK_MODE == 'reject':
self.receiver.settle(message, Outcome.REJECTED)
elif ACK_MODE == 'fail':
# Instead of fail, you can reject the message or handle it differently
print(f"Message failed: {payload}")
self.receiver.settle(message, Outcome.FAILED) # Reject it instead of failing
[!CAUTION] I have not ran the above snippet locally to test it
From the API documentation, learn more about:
Try to change that section in your message handler and let me know if you face any issues with this
@TamimiGitHub , i used the above code and executed my script, i can see messages are moved to unacknowledge state Below is the output is it expected behaviour .
python negative_ack.py -q "SBTest.Q1" -a fail Messaging Service connected? True PERSISTENT receiver started... Bound to Queue [SBTest.Q1]
Received message on: SBTest/T1
Message payload: test + 0
Message failed: test + 0 2025-04-24 17:36:22,024 [WARNING] solace.messaging.receiver: [_persistent_message_receiver.py:856] [[SERVICE: 0x104] [RECEIVER: 0x110]] Caller Description: PersistentMessageReceiver->settle. Error Info Sub code: [93]. Error: [solClient_flow_settleMsg ignoring SOLCLIENT_OUTCOME_FAILED, flow was created without SOLCLIENT_FLOW_PROP_REQUIRED_OUTCOME_FAILED.session/flowId '(c0,s1)_general-us10-dev'/1067']. Sub code: [SOLCLIENT_SUBCODE_INVALID_FLOW_OPERATION]. Return code: [Fail] 2025-04-24 17:36:22,024 [WARNING] solace.messaging.receiver: [_persistent_message_receiver.py:192] [owner: [SERVICE: 0x104fb86f0] [RECEIVER: 0x1101276e0]; thread PersistentMessageReceiverThread: 0x110127bf0] Failed to dispatch the callback [<class 'main.MessageHandlerImpl'>] Caller Description: PersistentMessageReceiver->settle. Error Info Sub code: [93]. Error: [solClient_flow_settleMsg ignoring SOLCLIENT_OUTCOME_FAILED, flow was created without SOLCLIENT_FLOW_PROP_REQUIRED_OUTCOME_FAILED.session/flowId '(c0,s1)_general-us10-dev'/1067']. Sub code: [SOLCLIENT_SUBCODE_INVALID_FLOW_OPERATION]. Return code: [Fail]
@TamimiGitHub for negative ack reject is working same as fail means it taking maximum redelivery then only it's moving to DMQ is there any property which goes the reject messages directly to DMQ instead on redelivery
@TamimiGitHub 1)Actually i'm facing issue that reject condition is working same as fail which means it taking maximum redelivery then only moving to DMQ ,the actually behaviour of reject is directly moves to DMQ 2)below condition i'm using for Nack if ACK_MODE == 'accept': self.receiver.settle(message, Outcome.ACCEPTED) elif ACK_MODE == 'reject': print(f"Rejecting message: {payload}") self.receiver.settle(message, Outcome.REJECTED) elif ACK_MODE == 'fail': print(f"Failing message: {payload}") self.receiver.settle(message, Outcome.FAILED)
Hey @sowjanya1117 - I would recommend checking the documentation over here https://docs.solace.com/API/API-Developer-Guide/Acknowledging-Messages.htm#negative-acknowledgments-for-specific-messages as it breaks down the difference between accept, reject, and fail outcomes. From the documentation:
[!NOTE] REJECTED—This NACK notifies the event broker that your client application could process the message but it was not accepted (for example, failed validation). When the event broker receives this NACK it removes the message from its queue and then moves the message to the dead message queue (DMQ) if it is configured.
Closing as resolved from @TamimiGitHub's comments