Propagate traces and log context to non-main threads
Powertools for AWS Lambda (Java) should support multithreading properly, especially but not-only:
- with log context and MDC, so that custom keys are not lost between threads
- with traces and X-Ray, so that subsegment have the good parent segment when changing thread
Note that these 2 components use ThreadLocal.
What were you trying to accomplish?
Expected Behavior
- X-Ray traces should have the good hierarchy: subsegments within threads should be under segment within 'main' thread
- MDC keys (for logs) should be kept in all threads.
Current Behavior
- Using the new feature (#1620), we can see in X-Ray that thread sub-segments don't have the good parent (handler).
- MDC keys are lost between threads
Steps to Reproduce (for bugs)
- Use the powertools-examples-batch with SQS
- See traces:
Possible Solution
For logs, a temporary solution has been implemented (MultiThreadMDC.
We should think about a more "generic" or "universal" way of managing multithreading in Powertools to not only make logging and tracing work, but also enable users to use multithread more simply.
Initial thoughts:
- Having an
ThreadAwareinterface in the common module and a static list ofThreadAwareclasses. -
ThreadAwareinterface should have at least 2 methods to act before the thread starts and within the thread execution, to be able to copy data frommainthread (or perform another task like getting the parent x-ray segment) and reuse it within the thread. - When working with thread, we should iterate over this list and call these methods. Maybe a util class in common could simplify this too.
Environment
- Powertools for AWS Lambda (Java) version used: 2.0.0-SNAPSHOT
x-ray specific #1670
@scottgerring on x-ray specifically, and independent of powertools, I do wonder if anything can be done in https://github.com/aws/aws-xray-sdk-java (don't really have thoughts/solutions on this) but I am just saying, that if u guys are thinking abt it, and if u find a nice solution, then maybe we should see if such thing can fit into the xray sdk itself
I know it's not a solution for now, but the moment I came across Scoped Values (https://openjdk.org/jeps/464), I felt like, why are we not in the future, and already logging context/xray is using that 😄
@scottgerring on x-ray specifically, and independent of powertools, I do wonder if anything can be done in https://github.com/aws/aws-xray-sdk-java (don't really have thoughts/solutions on this) but I am just saying, that if u guys are thinking abt it, and if u find a nice solution, then maybe we should see if such thing can fit into the xray sdk itself
It looks like X-Ray used to use InheritableThreadLocal (see 1.0.6-beta) to store trace context, which may have just worked. Now it's suggesting providing a SegmentContext impl. Chasing after this I've stumbled across LambdaSegmentContext in the x-ray package itself. Because this is pulling the trace header from the environment, I wonder if providing this, or encouraging our users to use this, will be enough to jam all the subsegments under the current request together?
I know it's not a solution for now, but the moment I came across Scoped Values (https://openjdk.org/jeps/464), I felt like, why are we not in the future, and already logging context/xray is using that 😄
This is super jazzy, but probably not helpful for a few years yet sadly 😄
There's a great comment from @humanzz that outlines some of the issues that happen with thread pooling in general in a Java-Lambda environment.
Potentially, we can provide users with a PowertoolsThreadFactory implementing ThreadFactory. This would be a flexible approach because the thread factory can be passed to various concurrency models in Java (see example below). Also, it will not "force" the customer in adopting any logic into their threads if they don't want to use it. Taking @jeromevdl's ThreadAware idea this ThreadFactory would iterate over all (installed) utilities that implement the ThreadAware interface and perform the corresponding action (e.g. propagating log context). This way, users can adopt features such as log context propagation also outside of Powertools utilities like parallel batch procesing.
Here is an example how this experience can look like using PowertoolsThreadFactory. Of course, we need to be careful that we don't add to much overhead in the Thread initialization phase in our custom factory. It needs careful performance testing.
Example with parallelStream() and CompletableFuture and ForkJoinPool
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
public class DataProcessingService {
private static final Logger logger = LoggerFactory.getLogger(DataProcessingService.class);
private final ExecutorService executor;
private final PowertoolsContextAwareForkJoinPool forkJoinPool;
public DataProcessingService() {
// Initialize with PowertoolsThreadFactory
this.executor = Executors.newFixedThreadPool(
10, new PowertoolsThreadFactory("service-worker")
);
// For ForkJoinPool used by parallelStream() we need our own implementation
this.forkJoinPool = new PowertoolsContextAwareForkJoinPool(
Runtime.getRuntime().availableProcessors()
);
}
public void processRequest(String requestId, List<String> items) {
// Set MDC for this request
MDC.put("requestId", requestId);
MDC.put("itemCount", String.valueOf(items.size()));
logger.info("Starting processing for request");
// Use executor for async operations
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
logger.info("Preparing data in background");
prepareData();
}, executor);
// Use parallel stream with custom pool for batch processing
future.thenRun(() -> {
try {
forkJoinPool.submit(() -> {
List<String> results = items.parallelStream()
.map(item -> {
logger.info("Processing item: {}", item);
return processItem(item);
})
.collect(Collectors.toList());
logger.info("Processed {} items", results.size());
saveResults(results);
}).get();
} catch (Exception e) {
logger.error("Error in parallel processing", e);
}
});
logger.info("Request processing initiated");
}
private void prepareData() {
// Simulate preparation work
}
private String processItem(String item) {
// Simulate item processing
return item + "-processed";
}
private void saveResults(List<String> results) {
// Simulate saving results
logger.info("Saved {} results", results.size());
}
public void shutdown() {
executor.shutdown();
forkJoinPool.shutdown();
}
}