spring-cloud-aws icon indicating copy to clipboard operation
spring-cloud-aws copied to clipboard

fix: context propagation fifo

Open igormq opened this issue 5 months ago • 4 comments

:loudspeaker: Type of change

  • [x] Bugfix
  • [ ] New feature
  • [ ] Enhancement
  • [ ] Refactoring

:scroll: Description

Fixes trace context propagation for FIFO queues when using SqsTemplate.sendAsync(). The issue occurred on the first call to a FIFO queue where trace headers (traceparent, tracestate, etc.) were not propagated due to observation creation happening on the wrong thread after an async operation.

:bulb: Motivation and Context

When using SqsTemplate.sendAsync() to send messages to a FIFO queue (.fifo), the trace context (traceId/spanId from Micrometer Observation or distributed tracing) was not propagated on the first call, but worked correctly on subsequent calls when queue attributes were cached.

Observed Behavior

// Test starts with traceId: 32dc16b916b8efd6d701100c325a157e

// First message - WRONG traceId (trace headers missing or new trace generated)
thread=[sdk-async-thread] traceId=e9e60bf0ee86c01ff6fcc78339dd9145 message=Processing message...

// Second message - CORRECT traceId (matches calling thread's context)
thread=[main] traceId=32dc16b916b8efd6d701100c325a157e message=Processing message...

Root Cause Analysis

Since Spring Cloud AWS 3.0.1 (PR #799), SqsTemplate automatically detects whether a FIFO queue has ContentBasedDeduplication enabled. This detection is asynchronous and involves calling the AWS SQS API to fetch queue attributes.

The Bug

The observation (which captures trace context and adds trace headers) was created in AbstractMessagingTemplate.observeAndSendAsync(), which is called in a thenCompose callback:

// BEFORE (Broken)
return preProcessMessageForSendAsync(endpointToUse, message)  // Async for FIFO queues
        .thenCompose(messageToUse -> observeAndSendAsync(messageToUse, endpointToUse));
        //                           ↑ Observation created HERE - wrong thread!

Why It Failed on First Call But Worked on Second Call

CompletableFuture.thenCompose() behavior:

  • First call (queue attributes not cached): The future completes on the SDK async thread (sdk-async-*), so the thenCompose callback runs on that thread. The observation is created on the SDK thread, which doesn't have the calling thread's trace context.
  • Second call (queue attributes cached): The future is already completed, so thenCompose runs synchronously on the calling thread, which has the correct trace context.

Move observation creation and trace header capture to before the async preprocessing, ensuring it always happens on the calling thread with the correct trace context:

// AFTER (Fixed)
// Step 1: Create observation and capture trace headers on calling thread
AbstractTemplateObservation.Context context = this.observationSpecifics.createContext(message, endpointToUse);
Observation observation = startObservation(context);
Map<String, Object> carrier = Objects.requireNonNull(context.getCarrier(), "No carrier found in context.");
Message<T> messageWithObservationHeaders = MessageHeaderUtils.addHeadersIfAbsent(message, carrier);

// Step 2: Pass message with trace headers through async preprocessing
return preProcessMessageForSendAsync(endpointToUse, messageWithObservationHeaders)
        .thenCompose(messageToUse -> doSendAndCompleteObservation(messageToUse, endpointToUse, context, observation));

Key Changes

File: spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/AbstractMessagingTemplate.java

  1. Moved observation creation (startObservation()) to execute before preProcessMessageForSendAsync()
  2. Added trace headers to the message before async operations
  3. Introduced doSendAndCompleteObservation() helper method that sends and completes an existing observation (rather than creating a new one)
  4. Removed observeAndSendAsync()

:green_heart: How did you test it?

  • Added SqsTemplateFifoTracingIntegrationTest. If you run this test against the old code, the first one sendAsync_toFifoQueue_shouldPropagateObservationScopeOnFirstCall will fail.

:pencil: Checklist

  • [x] I reviewed submitted code
  • [x] I added tests to verify changes
  • [ ] I updated reference documentation to reflect the change - Not needed
  • [x] All tests passing
  • [x] No breaking changes

:crystal_ball: Next steps

igormq avatar Nov 24 '25 11:11 igormq

If you agree to this pr, can we put this in the next release?

igormq avatar Nov 24 '25 11:11 igormq

@tomazfernandes after your review we can include this in 4.0.0-M2

4.0.0-M1 is to be released really soon and we sadly won't have time to review and merge before release.

MatejNedic avatar Nov 24 '25 11:11 MatejNedic

Hey @igormq, thanks for the detailed analysis and PR.

You're correct that, when TemplateContentBasedDeduplication is set to AUTO, the first call to fetch ContentBasedDeduplication settings changes threads and hence loses tracing / observation information.

The implementation you propose addresses that, but given Message is an immutable object, the ObservationContext is created without the FIFO-specific headers that are generated by the framework. Please check the failing observesMessageFifo test.

A short-term workaround is to explicitly set TemplateContentBasedDeduplication to ENABLED and supply your own MessageDeduplicationId header, or to DISABLED and let the framework one.

For context, are you using sendAsync as part of a CompletableFuture chain, or as a fire-and-forget inside a synchronous flow?

Let me know your thoughts, thanks.

tomazfernandes avatar Nov 30 '25 22:11 tomazfernandes

Hey @tomazfernandes , thanks for the detailed review and for identifying the issue!

You're absolutely right about the problem - my implementation creates the ObservationContext before the FIFO-specific headers are added by the framework, which means those headers are missing from the observation.

To answer your question about the use case: I'm using send inside a synchronous flow where I care about whether the message was sent successfully (so not fire-and-forget).

I understand the short-term workaround you suggested (explicitly setting TemplateContentBasedDeduplication to ENABLED or DISABLED), but this would require users to be aware of this limitation and work around it, which isn't ideal.

The proper fix is to:

  1. Capture the parent observation on the calling thread
  2. Allow preProcessMessageForSendAsync to add the FIFO headers asynchronously
  3. Create the observation context AFTER the FIFO headers are added (so it captures them correctly)
  4. Link the new observation to the parent observation to maintain trace continuity

I'll update the PR with this approach. Let me know if you have any concerns with this solution!

igormq avatar Dec 04 '25 16:12 igormq