Implement dead letter queue for unacked batched messages
Currently automatically sending unacked messages to dead letter queue only works with single messages, for batched messages the implementation appears to be postponed (there is a TODO message https://github.com/wyyerd/pulsar-rs/blob/master/src/consumer.rs#L1164). Pulsar-admin and our other producer implementation appears to be sending messages batched, so I had to implement this myself using a local counter. Having this coded in DB would be more optimal.
Would you be able to provide a PR for it, or at least share the code you've been working with?
Hi, although this is no longer being required for our case, as we opted out of using batched messages, here is our previous code that we used as a workaround. Note that this keeps a local count per job id, so they get redelivered lots of times (depending on the replica count of deployment) before they are sent to dead letter queue.
Batched messages were also very problematic for us, they weren't being properly acknowledged and they were getting redelivered even after being acknowledged. Luckily we needed to delayed delivery (jobs were not immediately processable for us), and pulsar wasn't using batched messages for messages with delayed delivery. See this issue I opened for that case: https://github.com/wyyerd/pulsar-rs/issues/175 (using pulsar-client produce -n ... you can easily reproduce this issue, without having to close the consumer, produce command sends messages as batched if I understood correctly)
We have stripped the dead-letter queue related code from the below snippet and relying on the mechanism provided by pulsar-rs since we no longer receive batched messages.
/// Processes the given pulsar message with given processing function, handles all
/// errors by logging, only propagates errors from pulsar.
#[instrument(
skip_all,
fields(
job_id = tracing::field::Empty,
request_id = %uuid::Uuid::new_v4(),
),
)]
async fn process_message<F, FF>(
&self,
message: Message<JobMessage>,
job_consumer: &mut JobConsumer,
ack_producer: &mut AckProducer,
observed_jobs_cache: &mut lru::LruCache<String, usize>,
process: &F,
) -> Result<()>
where
F: Fn(JobMessage) -> FF,
FF: Future<Output = Result<()>>,
{
debug!(
message.message_id = ?message.message_id(),
message.metadata = ?message.metadata(),
"Received message."
);
match message.deserialize() {
Err(error) => {
// TODO: this can potentially log some credentials :-/
let maybe_body = String::from_utf8(message.payload.data.clone());
error!(
?maybe_body,
?error,
"Deserialization failed, dropping message by acknowledging."
);
job_consumer.ack(&message).await?;
}
Ok(job_message) => {
Span::current().record("job_id", &job_message.job_id.as_str());
info!(?job_message, "Got job message.");
let process_result = process(job_message.clone()).in_current_span().await;
match process_result {
Err(error) => {
// TODO: when pulsar-rs fixes dead letter queue issue, remove this workaround
let retries = match observed_jobs_cache.get_mut(&job_message.job_id) {
Some(count) => {
*count += 1;
*count
}
None => {
observed_jobs_cache.put(job_message.job_id.clone(), 1);
1
}
};
if retries > self.config.redeliver_max_times {
let dlq_topic = self.dead_letter_topic();
error!(
?job_message,
retries,
%dlq_topic,
?error,
"Processing failed, acknowledging, sending to dead-letter queue, no more retries left."
);
if self.config.job_dead_letter_enabled {
self.pulsar.send(dlq_topic, job_message).await?;
}
job_consumer.ack(&message).await?;
} else {
error!(
?job_message,
?error,
retries,
"Processing failed, not acknowledging, will retry later."
);
}
}
Ok(()) => {
info!("Successfully processed message, acknowledging.");
ack_producer.send(job_message).await?;
job_consumer.ack(&message).await?;
}
};
}
};
Ok(())
}