spring_json_header_types not updated with tracing header types after migration from Sleuth to Micrometer
Describe the issue
After migrating from Spring Boot 2.7.x to Spring Boot 3.x.x the spring_json_header_types header is no longer updated with the types of the tracing headers such as b3 or traceparent or any of the baggage headers with the Kafka binder. With Sleuth we used to see the following Kafka message headers:
-
b3:d501ea8b2acdcd56-1f7c7b3a48f28941-0 -
scst_partition:1 -
baggagesample:baggageValue -
spring_json_header_types:{"b3":"java.lang.String","scst_partition":"java.lang.Integer","baggagesample":"java.lang.String"}
With micrometer we see the following:
-
b3:65cf4648a9deb2968c23a2070084596e-f2194f78ae26054d-0-8c23a2070084596e -
scst_partition:1 -
baggagesample:baggageValue -
baggage:baggageSample=baggageValue -
spring_json_header_types:{"scst_partition":"java.lang.Integer"}
As you can see the tracing header types are missing in the spring_json_header_types header value which changes how this message can be consumed which in turn breaks our consumers and we run into type mapping issues.
To Reproduce Steps to reproduce the behavior:
- Use the latest Spring Boot 3.2.x app to publish with Spring Cloud Stream (using e.g. StreamBridge) with micrometer tracing enabled, e.g.:
@PostMapping("/")
public Mono<Void> test() {
return Mono.fromRunnable(() -> {
try (BaggageInScope scope = tracer.createBaggageInScope("baggageSample", "baggageValue")) {
streamBridge.send("output", MessageBuilder
.withPayload(Map.of("hello", "world"))
.build());
}
});
}
Version of the framework Spring Boot 3.2.2 with 2023.0.0 Cloud dependencies (including Spring Cloud Stream)
Expected behavior
spring_json_header_types contains all the tracing header types.
Since spring_json_header_types is a Spring for Apache Kafka feature, then I assume that you use Kafka Binder.
With new Observation API we do populate those headers into the ProducerRecord already after the DefaultKafkaHeaderMapper:
public KafkaRecordSenderContext(ProducerRecord<?, ?> record, String beanName, Supplier<String> clusterId) {
super((carrier, key, value) -> record.headers().add(key,
value == null ? null : value.getBytes(StandardCharsets.UTF_8)));
Therefore it is not a surprise that tracing header types are not present in that spring_json_header_types.
On the other hand they must not be there anyway. Just because they are simple strings, so no reason to place that type into spring_json_header_types collector.
The Kafka consumer side must rely on the respective observation support in Spring for Apache Kafka: https://docs.spring.io/spring-kafka/reference/appendix/micrometer.html.
Technically the target application must not deal with tracing headers at all: everything has to be left for the framework and its abilities to propagate such an info internally.
Unfortunately, the framework does not handle the b3 header properly if the spring_json_header_types header is not set to indicate that b3 is of type String. Without this setting, the b3 value in Kafka messages is recognized as a byte array, which causes a problem in the constructor of the class org.springframework.integration.support.management.observation.MessageReceiverContext, where the value read from the b3 header is directly cast to String. This results in the error java.lang.IllegalArgumentException: Incorrect type specified for header 'b3'. Expected [class java.lang.String] but actual type is [class [B].
public MessageReceiverContext(Message<?> message, @Nullable String handlerName) {
super((carrier, key) -> carrier.getHeaders().get(key, String.class));
this.message = message;
this.handlerName = handlerName != null ? handlerName : "unknown";
}
So, how that turned out Spring Integration is involved here?
Why doesn't regular Spring for Apache Kafka KafkaRecordReceiverContext do the trick to re-instate an observation on the consumer side?
I see that we talk in the context of Spring Cloud Stream, so any chances you can share with us a simple project to reproduce and play with?
Thanks
However I agree with you that Spring Integration should be flexible enough to support raw byte[] for those headers as well.
What I want is to find some workaround for you.
For example, the DefaultKafkaHeaderMapper can be configured with:
/**
* Set the headers to not perform any conversion on (except {@code String} to
* {@code byte[]} for outbound). Inbound headers that match will be mapped as
* {@code byte[]} unless the corresponding boolean in the map value is true,
* in which case it will be mapped as a String.
* @param rawMappedHeaders the header names to not convert and
* @since 2.2.5
* @see #setCharset(Charset)
* @see #setMapAllStringsOut(boolean)
*/
public void setRawMappedHeaders(Map<String, Boolean> rawMappedHeaders) {
So, how that turned out Spring Integration is involved here?
yes, you are right
Why doesn't regular Spring for Apache Kafka
KafkaRecordReceiverContextdo the trick to re-instate an observation on the consumer side?
Regular Spring for Kafka reinstates observation on the consumer side, and that is fine and works well. The problem lies in the handling of message reception from Kafka using spring-cloud-stream-kafka. Besides the observation with the KafkaRecordReceiverContext, an additional CONSUMER observation with the MessageReceiverContext is reinstated in the sendMessage() method within the KafkaMessageDrivenChannelAdapter class.
protected void sendMessage(Message<?> message) {
if (message == null) {
throw new MessagingException("cannot send a null message");
}
try {
IntegrationObservation.HANDLER.observation(
this.observationConvention,
DefaultMessageReceiverObservationConvention.INSTANCE,
() -> new MessageReceiverContext(message, getComponentName()),
this.observationRegistry)
.observe(() -> this.messagingTemplate.send(getRequiredOutputChannel(), trackMessageIfAny(message)));
}
catch (RuntimeException ex) {
if (!sendErrorMessageIfNecessary(message, ex)) {
throw ex;
}
}
}
What I want is to find some workaround for you.
@artembilan THANK YOU VERY MUCH for your advice! It helped me a lot.
Closed in favor of: https://github.com/spring-projects/spring-kafka/pull/3286. Plus there is going to be a respective fix in Spring Integration.