solace-samples-python icon indicating copy to clipboard operation
solace-samples-python copied to clipboard

[Bug]: negative ack

Open sowjanya1117 opened this issue 9 months ago • 12 comments

Bug Description

i'm using the NACK here both fail and reject is working as same like both cases messages are going to be nack messages are in queue ,for fail condition when we run the redelivery script messages are consumed but it's deleting from the queue reject case messages deleted from the queue based on solace concept but my case messages are in queue

Expected Behavior

Fail and reject must work on differently

Steps to Reproduce

import os import platform import time import argparse from dotenv import load_dotenv

from solace.messaging.messaging_service import MessagingService, ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener, ServiceEvent from solace.messaging.resources.queue import Queue from solace.messaging.config.retry_strategy import RetryStrategy from solace.messaging.receiver.persistent_message_receiver import PersistentMessageReceiver from solace.messaging.receiver.message_receiver import MessageHandler, InboundMessage from solace.messaging.errors.pubsubplus_client_error import PubSubPlusClientError from solace.messaging.config.missing_resources_creation_configuration import MissingResourcesCreationStrategy from solace.messaging.config.transport_security_strategy import TLS from solace.messaging.config.authentication_strategy import ClientCertificateAuthentication

Load environment variables from a .env file (if available)

load_dotenv()

Set up the environment variable for disabling stdout buffering on Windows

if platform.uname().system == 'Windows': os.environ["PYTHONUNBUFFERED"] = "1" # Disable stdout buffer

Parse command-line arguments using argparse

parser = argparse.ArgumentParser(description="BTP AEM consumer CLI") parser.add_argument('-q', '--queue', type=str, required=True, help='Valid AEM queue name') parser.add_argument('-a', '--ack', type=str, choices=['accept', 'reject', 'fail'], default='accept', help='Acknowledgement mode (accept, reject, fail)') args = parser.parse_args()

QUEUE_NAME = args.queue # Get queue name from CLI argument ACK_MODE = args.ack # Get ack mode from CLI argument

Handle received messages

class MessageHandlerImpl(MessageHandler): def init(self, persistent_receiver: PersistentMessageReceiver): self.receiver: PersistentMessageReceiver = persistent_receiver

def on_message(self, message: InboundMessage):
    payload = message.get_payload_as_string() if message.get_payload_as_string() is not None else message.get_payload_as_bytes()
    if isinstance(payload, bytearray):
        print(f"Received a message of type: {type(payload)}. Decoding to string")
        payload = payload.decode()

    topic = message.get_destination_name()
    print("\n" + f"Received message on: {topic}")
    print("\n" + f"Message payload: {payload} \n")
    
    # Acknowledge based on ack mode passed via CLI
    if ACK_MODE == 'accept':
        self.receiver.ack(message)
    elif ACK_MODE == 'reject':
        self.receiver.reject(message)
    elif ACK_MODE == 'fail':
        self.receiver.fail(message)

Inner classes for error handling

class ServiceEventHandler(ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener): def on_reconnected(self, e: ServiceEvent): print("\non_reconnected") print(f"Error cause: {e.get_cause()}") print(f"Message: {e.get_message()}")

def on_reconnecting(self, e: "ServiceEvent"):
    print("\non_reconnecting")
    print(f"Error cause: {e.get_cause()}")
    print(f"Message: {e.get_message()}")

def on_service_interrupted(self, e: "ServiceEvent"):
    print("\non_service_interrupted")
    print(f"Error cause: {e.get_cause()}")
    print(f"Message: {e.get_message()}")

Broker Config from environment variables

broker_props = { "solace.messaging.transport.host": os.getenv("AEM_HOST"), "solace.messaging.service.vpn-name": os.getenv("AEM_VPN_NAME"), }

Set up SSL Context for client certificate authentication

ssl_cert_dir = os.getenv("AEM_SSL_CERT_DIR") client_cert_file = os.getenv("AEM_CERT_FILE") client_key_file = os.getenv("AEM_CLIENT_KEY_FILE") client_key_password = os.getenv("AEM_CLIENT_KEY_PASSWORD")

Configure the transport security with client certificate authentication

transport_security = TLS.create()
.with_certificate_validation(True, validate_server_name=False, trust_store_file_path=ssl_cert_dir)

Configure client certificate authentication

client_cert_authentication = ClientCertificateAuthentication.of( certificate_file=client_cert_file, key_file=client_key_file, key_password=client_key_password )

Build MessagingService with authentication strategy and transport security

messaging_service = MessagingService.builder()
.from_properties(broker_props)
.with_transport_security_strategy(transport_security)
.with_authentication_strategy(client_cert_authentication)
.build()

Event Handler for the messaging service

service_handler = ServiceEventHandler() messaging_service.add_reconnection_listener(service_handler) messaging_service.add_reconnection_attempt_listener(service_handler) messaging_service.add_service_interruption_listener(service_handler)

Blocking connect thread

messaging_service.connect() print(f'Messaging Service connected? {messaging_service.is_connected}')

Event Handling for the messaging service

Queue name from the command-line argument

durable_exclusive_queue = Queue.durable_exclusive_queue(QUEUE_NAME)

try: # Build a receiver and bind it to the durable exclusive queue persistent_receiver: PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder()
.with_missing_resources_creation_strategy(MissingResourcesCreationStrategy.CREATE_ON_START)
.build(durable_exclusive_queue) persistent_receiver.start()

# Callback for received messages
persistent_receiver.receive_async(MessageHandlerImpl(persistent_receiver))
print(f'PERSISTENT receiver started... Bound to Queue [{durable_exclusive_queue.get_name()}]')

try: 
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print('\nKeyboardInterrupt received')

except PubSubPlusClientError as exception: print(f'\nMake sure queue {QUEUE_NAME} exists on the broker!')

finally: if persistent_receiver and persistent_receiver.is_running(): print('\nTerminating receiver') persistent_receiver.terminate(grace_period=0) print('\nDisconnecting Messaging Service') messaging_service.disconnect()

Solace Product

Solace PubSub+ Event Portal

Solace Broker version

No response

Solace API

python

Solace API version

No response

sowjanya1117 avatar Apr 21 '25 17:04 sowjanya1117

Hi @sowjanya1117, Can you verify that you are setting the proper outcomes? I don't see that in your code. Example here: https://github.com/SolaceSamples/solace-samples-python/blob/bb8631cc6ee90f41efa7a0df0bfc5a03218beefc/howtos/pubsub/how_to_consume_persistent_message_with_negative_acknowledgement.py#L37

If you are please explain a bit further what the issue is as your verbiage is a bit hard to follow.

Mrc0113 avatar Apr 21 '25 17:04 Mrc0113

Hi @sowjanya1117 - can you please confirm if you were able to follow the suggested solution in this issue https://github.com/SolaceSamples/solace-samples-python/issues/54

TamimiGitHub avatar Apr 22 '25 01:04 TamimiGitHub

This is the script i'm using import os import platform import time import argparse from dotenv import load_dotenv

from solace.messaging.messaging_service import MessagingService, ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener, ServiceEvent from solace.messaging.resources.queue import Queue from solace.messaging.config.retry_strategy import RetryStrategy from solace.messaging.receiver.persistent_message_receiver import PersistentMessageReceiver from solace.messaging.receiver.message_receiver import MessageHandler, InboundMessage from solace.messaging.errors.pubsubplus_client_error import PubSubPlusClientError from solace.messaging.config.missing_resources_creation_configuration import MissingResourcesCreationStrategy from solace.messaging.config.transport_security_strategy import TLS from solace.messaging.config.authentication_strategy import ClientCertificateAuthentication

Load environment variables from a .env file (if available)

load_dotenv()

Set up the environment variable for disabling stdout buffering on Windows

if platform.uname().system == 'Windows': os.environ["PYTHONUNBUFFERED"] = "1" # Disable stdout buffer

Parse command-line arguments using argparse

parser = argparse.ArgumentParser(description="BTP AEM consumer CLI") parser.add_argument('-q', '--queue', type=str, required=True, help='Valid AEM queue name') parser.add_argument('-a', '--ack', type=str, choices=['accept', 'reject', 'fail'], default='accept', help='Acknowledgement mode (accept, reject, fail)') args = parser.parse_args()

QUEUE_NAME = args.queue # Get queue name from CLI argument ACK_MODE = args.ack # Get ack mode from CLI argument

Handle received messages

class MessageHandlerImpl(MessageHandler): def init(self, persistent_receiver: PersistentMessageReceiver): self.receiver: PersistentMessageReceiver = persistent_receiver

def on_message(self, message: InboundMessage):
    payload = message.get_payload_as_string() if message.get_payload_as_string() is not None else message.get_payload_as_bytes()
    if isinstance(payload, bytearray):
        print(f"Received a message of type: {type(payload)}. Decoding to string")
        payload = payload.decode()

    topic = message.get_destination_name()
    print("\n" + f"Received message on: {topic}")
    print("\n" + f"Message payload: {payload} \n")
    
    # Acknowledge based on ack mode passed via CLI
    if ACK_MODE == 'accept':
        self.receiver.ack(message)
    elif ACK_MODE == 'reject':
        self.receiver.reject(message)
    elif ACK_MODE == 'fail':
        self.receiver.fail(message)

Inner classes for error handling

class ServiceEventHandler(ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener): def on_reconnected(self, e: ServiceEvent): print("\non_reconnected") print(f"Error cause: {e.get_cause()}") print(f"Message: {e.get_message()}")

def on_reconnecting(self, e: "ServiceEvent"):
    print("\non_reconnecting")
    print(f"Error cause: {e.get_cause()}")
    print(f"Message: {e.get_message()}")

def on_service_interrupted(self, e: "ServiceEvent"):
    print("\non_service_interrupted")
    print(f"Error cause: {e.get_cause()}")
    print(f"Message: {e.get_message()}")

Broker Config from environment variables

broker_props = { "solace.messaging.transport.host": os.getenv("AEM_HOST"), "solace.messaging.service.vpn-name": os.getenv("AEM_VPN_NAME"), }

Set up SSL Context for client certificate authentication

ssl_cert_dir = os.getenv("AEM_SSL_CERT_DIR") client_cert_file = os.getenv("AEM_CERT_FILE") client_key_file = os.getenv("AEM_CLIENT_KEY_FILE") client_key_password = os.getenv("AEM_CLIENT_KEY_PASSWORD")

Configure the transport security with client certificate authentication

transport_security = TLS.create()
.with_certificate_validation(True, validate_server_name=False, trust_store_file_path=ssl_cert_dir)

Configure client certificate authentication

client_cert_authentication = ClientCertificateAuthentication.of( certificate_file=client_cert_file, key_file=client_key_file, key_password=client_key_password )

Build MessagingService with authentication strategy and transport security

messaging_service = MessagingService.builder()
.from_properties(broker_props)
.with_transport_security_strategy(transport_security)
.with_authentication_strategy(client_cert_authentication)
.build()

Event Handler for the messaging service

service_handler = ServiceEventHandler() messaging_service.add_reconnection_listener(service_handler) messaging_service.add_reconnection_attempt_listener(service_handler) messaging_service.add_service_interruption_listener(service_handler)

Blocking connect thread

messaging_service.connect() print(f'Messaging Service connected? {messaging_service.is_connected}')

Event Handling for the messaging service

Queue name from the command-line argument

durable_exclusive_queue = Queue.durable_exclusive_queue(QUEUE_NAME)

try: # Build a receiver and bind it to the durable exclusive queue persistent_receiver: PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder()
.with_missing_resources_creation_strategy(MissingResourcesCreationStrategy.CREATE_ON_START)
.build(durable_exclusive_queue) persistent_receiver.start()

# Callback for received messages
persistent_receiver.receive_async(MessageHandlerImpl(persistent_receiver))
print(f'PERSISTENT receiver started... Bound to Queue [{durable_exclusive_queue.get_name()}]')

try: 
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print('\nKeyboardInterrupt received')

except PubSubPlusClientError as exception: print(f'\nMake sure queue {QUEUE_NAME} exists on the broker!')

finally: if persistent_receiver and persistent_receiver.is_running(): print('\nTerminating receiver') persistent_receiver.terminate(grace_period=0) print('\nDisconnecting Messaging Service') messaging_service.disconnect()

sowjanya1117 avatar Apr 22 '25 17:04 sowjanya1117

@TamimiGitHub i have tried that solution but still getting same error 1)This for Fail python test5.py -q "SBTest.Q1" -a fail Messaging Service connected? True PERSISTENT receiver started... Bound to Queue [SBTest.Q1]

Received message on: SBTest/T1

Message payload: test + 0

2025-04-22 23:04:25,765 [WARNING] solace.messaging.receiver: [_persistent_message_receiver.py:192] [owner: [SERVICE: 0x100abc780] [RECEIVER: 0x100cb3230]; thread PersistentMessageReceiverThread: 0x100cb3920] Failed to dispatch the callback [<class 'main.MessageHandlerImpl'>] '_PersistentMessageReceiver' object has no attribute 'fail'

Received message on: SBTest/T1

Message payload: test + 1

2025-04-22 23:04:25,866 [WARNING] solace.messaging.receiver: [_persistent_message_receiver.py:192] [owner: [SERVICE: 0x100abc780] [RECEIVER: 0x100cb3230]; thread PersistentMessageReceiverThread: 0x100cb3920] Failed to dispatch the callback [<class 'main.MessageHandlerImpl'>] '_PersistentMessageReceiver' object has no attribute 'fail'

2)This is for reject python test5.py -q "SBTest.Q1" -a reject Messaging Service connected? True PERSISTENT receiver started... Bound to Queue [SBTest.Q1]

Received message on: SBTest/T1

Message payload: test + 0

2025-04-22 23:05:51,010 [WARNING] solace.messaging.receiver: [_persistent_message_receiver.py:192] [owner: [SERVICE: 0x101414780] [RECEIVER: 0x10218f1a0]; thread PersistentMessageReceiverThread: 0x10218f860] Failed to dispatch the callback [<class 'main.MessageHandlerImpl'>] '_PersistentMessageReceiver' object has no attribute 'reject'

Received message on: SBTest/T1

Message payload: test + 1

2025-04-22 23:05:51,114 [WARNING] solace.messaging.receiver: [_persistent_message_receiver.py:192] [owner: [SERVICE: 0x101414780] [RECEIVER: 0x10218f1a0]; thread PersistentMessageReceiverThread: 0x10218f860] Failed to dispatch the callback [<class 'main.MessageHandlerImpl'>] '_PersistentMessageReceiver' object has no attribute 'reject'

sowjanya1117 avatar Apr 22 '25 17:04 sowjanya1117

@sowjanya1117 can you please encapsulate your code in code format blocks for easier reading and debugging. Read more about it here https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/creating-and-highlighting-code-blocks

TamimiGitHub avatar Apr 23 '25 02:04 TamimiGitHub

Also, I realized that on March 10, 2025 you opened a similar issue https://github.com/SolaceSamples/solace-samples-python/issues/52 and still using a function that does not exist in your script above. Could you please try the solution proposed in that issue and see if you are still getting the same problem. Thanks!

TamimiGitHub avatar Apr 23 '25 03:04 TamimiGitHub

import os
import platform
import time
import argparse
from dotenv import load_dotenv

from solace.messaging.messaging_service import MessagingService, ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener, ServiceEvent
from solace.messaging.resources.queue import Queue
from solace.messaging.config.retry_strategy import RetryStrategy
from solace.messaging.receiver.persistent_message_receiver import PersistentMessageReceiver
from solace.messaging.receiver.message_receiver import MessageHandler, InboundMessage
from solace.messaging.errors.pubsubplus_client_error import PubSubPlusClientError
from solace.messaging.config.missing_resources_creation_configuration import MissingResourcesCreationStrategy
from solace.messaging.config.transport_security_strategy import TLS
from solace.messaging.config.authentication_strategy import ClientCertificateAuthentication

# Load environment variables from a .env file (if available)
load_dotenv()

# Set up the environment variable for disabling stdout buffering on Windows
if platform.uname().system == 'Windows':
    os.environ["PYTHONUNBUFFERED"] = "1"  # Disable stdout buffer

# Parse command-line arguments using argparse
parser = argparse.ArgumentParser(description="BTP AEM consumer CLI")
parser.add_argument('-q', '--queue', type=str, required=True, help='Valid AEM queue name')
parser.add_argument('-a', '--ack', type=str, choices=['accept', 'reject', 'fail'], default='accept', help='Acknowledgement mode (accept, reject, fail)')
args = parser.parse_args()

QUEUE_NAME = args.queue  # Get queue name from CLI argument
ACK_MODE = args.ack  # Get ack mode from CLI argument

# Handle received messages
class MessageHandlerImpl(MessageHandler):
    def __init__(self, persistent_receiver: PersistentMessageReceiver):
        self.receiver: PersistentMessageReceiver = persistent_receiver

    def on_message(self, message: InboundMessage):
        payload = message.get_payload_as_string() if message.get_payload_as_string() is not None else message.get_payload_as_bytes()
        if isinstance(payload, bytearray):
            print(f"Received a message of type: {type(payload)}. Decoding to string")
            payload = payload.decode()

        topic = message.get_destination_name()
        print("\n" + f"Received message on: {topic}")
        print("\n" + f"Message payload: {payload} \n")
        
        # Acknowledge based on ack mode passed via CLI
        if ACK_MODE == 'accept':
            self.receiver.ack(message)
        elif ACK_MODE == 'reject':
            self.receiver.reject(message)
        elif ACK_MODE == 'fail':
            self.receiver.fail(message)

# Inner classes for error handling
class ServiceEventHandler(ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener):
    def on_reconnected(self, e: ServiceEvent):
        print("\non_reconnected")
        print(f"Error cause: {e.get_cause()}")
        print(f"Message: {e.get_message()}")
    
    def on_reconnecting(self, e: "ServiceEvent"):
        print("\non_reconnecting")
        print(f"Error cause: {e.get_cause()}")
        print(f"Message: {e.get_message()}")

    def on_service_interrupted(self, e: "ServiceEvent"):
        print("\non_service_interrupted")
        print(f"Error cause: {e.get_cause()}")
        print(f"Message: {e.get_message()}")

# Broker Config from environment variables
broker_props = {
    "solace.messaging.transport.host": os.getenv("AEM_HOST"),
    "solace.messaging.service.vpn-name": os.getenv("AEM_VPN_NAME"),
}

# Set up SSL Context for client certificate authentication
ssl_cert_dir = os.getenv("AEM_SSL_CERT_DIR")
client_cert_file = os.getenv("AEM_CERT_FILE")
client_key_file = os.getenv("AEM_CLIENT_KEY_FILE")
client_key_password = os.getenv("AEM_CLIENT_KEY_PASSWORD")

# Configure the transport security with client certificate authentication
transport_security = TLS.create() \
    .with_certificate_validation(True, validate_server_name=False, trust_store_file_path=ssl_cert_dir)

# Configure client certificate authentication
client_cert_authentication = ClientCertificateAuthentication.of(
    certificate_file=client_cert_file,
    key_file=client_key_file,
    key_password=client_key_password
)

# Build MessagingService with authentication strategy and transport security
messaging_service = MessagingService.builder() \
    .from_properties(broker_props) \
    .with_transport_security_strategy(transport_security) \
    .with_authentication_strategy(client_cert_authentication) \
    .build()

# Event Handler for the messaging service
service_handler = ServiceEventHandler()
messaging_service.add_reconnection_listener(service_handler)
messaging_service.add_reconnection_attempt_listener(service_handler)
messaging_service.add_service_interruption_listener(service_handler)

# Blocking connect thread
messaging_service.connect()
print(f'Messaging Service connected? {messaging_service.is_connected}')

# Event Handling for the messaging service

# Queue name from the command-line argument
durable_exclusive_queue = Queue.durable_exclusive_queue(QUEUE_NAME)

try:
    # Build a receiver and bind it to the durable exclusive queue
    persistent_receiver: PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder()\
        .with_missing_resources_creation_strategy(MissingResourcesCreationStrategy.CREATE_ON_START)\
        .build(durable_exclusive_queue)
    persistent_receiver.start()

    # Callback for received messages
    persistent_receiver.receive_async(MessageHandlerImpl(persistent_receiver))
    print(f'PERSISTENT receiver started... Bound to Queue [{durable_exclusive_queue.get_name()}]')
    
    try: 
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print('\nKeyboardInterrupt received')
    
except PubSubPlusClientError as exception:
    print(f'\nMake sure queue {QUEUE_NAME} exists on the broker!')

finally:
    if persistent_receiver and persistent_receiver.is_running():
        print('\nTerminating receiver')
        persistent_receiver.terminate(grace_period=0)
    print('\nDisconnecting Messaging Service')
    messaging_service.disconnect()

sowjanya1117 avatar Apr 23 '25 16:04 sowjanya1117

Hey @sowjanya1117 - can you please refer to the solution proposed in the linked issue from my previous comment and let me know if using the supported functions does not work. Thanks!

TamimiGitHub avatar Apr 24 '25 04:04 TamimiGitHub

Hi @TamimiGitHub ! sorry for asking many times but somehow i'm not able to find the problem, i tried the solution that provided previously but i'm getting multiple error while using that script. if you don't mind please correct the script that i provided in previous message

sowjanya1117 avatar Apr 24 '25 07:04 sowjanya1117

No worries @sowjanya1117, here to help figure out what issue you are facing. So in the previous issue, I pointed out that you are trying to use a function that does not exist in the Solace Python API self.receiver.reject(message). I am guessing you are attempting to implement a negative acknowledgement functionality in your script?

As per the Feature Support in the Solace Messaging APIs section in the solace documentations, negative acknowledgement was introduced in the Python API in version 1.9+, so please make sure as well you are using the latest Solace Python API

If you would like to implement a NACK functionality (to "fail" the processing of the message and reject it) I would recommend to check this sample on how_to_consume_persistent_message_with_negative_acknowledgement. Particularly the settlement section. In order to settle the outcome of message processing you need to use the receiver.settle method and pass the message along with the outcome settlement you want to achieve. So in your example, you will have to do something along the lines of:

from solace.messaging.config.message_acknowledgement_configuration import Outcome

    if ACK_MODE == 'accept':
        self.receiver.settle(message, Outcome.ACCEPTED)
    elif ACK_MODE == 'reject':
        self.receiver.settle(message, Outcome.REJECTED)
    elif ACK_MODE == 'fail':
        # Instead of fail, you can reject the message or handle it differently
        print(f"Message failed: {payload}")
        self.receiver.settle(message, Outcome.FAILED)  # Reject it instead of failing

[!CAUTION] I have not ran the above snippet locally to test it

From the API documentation, learn more about:

Try to change that section in your message handler and let me know if you face any issues with this

TamimiGitHub avatar Apr 24 '25 11:04 TamimiGitHub

@TamimiGitHub , i used the above code and executed my script, i can see messages are moved to unacknowledge state Below is the output is it expected behaviour .

python negative_ack.py -q "SBTest.Q1" -a fail Messaging Service connected? True PERSISTENT receiver started... Bound to Queue [SBTest.Q1]

Received message on: SBTest/T1

Message payload: test + 0

Message failed: test + 0 2025-04-24 17:36:22,024 [WARNING] solace.messaging.receiver: [_persistent_message_receiver.py:856] [[SERVICE: 0x104] [RECEIVER: 0x110]] Caller Description: PersistentMessageReceiver->settle. Error Info Sub code: [93]. Error: [solClient_flow_settleMsg ignoring SOLCLIENT_OUTCOME_FAILED, flow was created without SOLCLIENT_FLOW_PROP_REQUIRED_OUTCOME_FAILED.session/flowId '(c0,s1)_general-us10-dev'/1067']. Sub code: [SOLCLIENT_SUBCODE_INVALID_FLOW_OPERATION]. Return code: [Fail] 2025-04-24 17:36:22,024 [WARNING] solace.messaging.receiver: [_persistent_message_receiver.py:192] [owner: [SERVICE: 0x104fb86f0] [RECEIVER: 0x1101276e0]; thread PersistentMessageReceiverThread: 0x110127bf0] Failed to dispatch the callback [<class 'main.MessageHandlerImpl'>] Caller Description: PersistentMessageReceiver->settle. Error Info Sub code: [93]. Error: [solClient_flow_settleMsg ignoring SOLCLIENT_OUTCOME_FAILED, flow was created without SOLCLIENT_FLOW_PROP_REQUIRED_OUTCOME_FAILED.session/flowId '(c0,s1)_general-us10-dev'/1067']. Sub code: [SOLCLIENT_SUBCODE_INVALID_FLOW_OPERATION]. Return code: [Fail]

sowjanya1117 avatar Apr 24 '25 17:04 sowjanya1117

@TamimiGitHub for negative ack reject is working same as fail means it taking maximum redelivery then only it's moving to DMQ is there any property which goes the reject messages directly to DMQ instead on redelivery

sowjanya1117 avatar Apr 29 '25 07:04 sowjanya1117

@TamimiGitHub 1)Actually i'm facing issue that reject condition is working same as fail which means it taking maximum redelivery then only moving to DMQ ,the actually behaviour of reject is directly moves to DMQ 2)below condition i'm using for Nack if ACK_MODE == 'accept': self.receiver.settle(message, Outcome.ACCEPTED) elif ACK_MODE == 'reject': print(f"Rejecting message: {payload}") self.receiver.settle(message, Outcome.REJECTED) elif ACK_MODE == 'fail': print(f"Failing message: {payload}") self.receiver.settle(message, Outcome.FAILED)

sowjanya1117 avatar May 15 '25 09:05 sowjanya1117

Hey @sowjanya1117 - I would recommend checking the documentation over here https://docs.solace.com/API/API-Developer-Guide/Acknowledging-Messages.htm#negative-acknowledgments-for-specific-messages as it breaks down the difference between accept, reject, and fail outcomes. From the documentation:

[!NOTE] REJECTED—This NACK notifies the event broker that your client application could process the message but it was not accepted (for example, failed validation). When the event broker receives this NACK it removes the message from its queue and then moves the message to the dead message queue (DMQ) if it is configured.

TamimiGitHub avatar May 30 '25 04:05 TamimiGitHub

Closing as resolved from @TamimiGitHub's comments

Mrc0113 avatar Jul 23 '25 14:07 Mrc0113