spring-ai icon indicating copy to clipboard operation
spring-ai copied to clipboard

Output of reasoning_content that supports deepseek-r1 is required

Open stillmoon opened this issue 11 months ago • 10 comments

Please do a quick search on GitHub issues first, the feature you are about to request might have already been requested.

Expected Behavior ChatModel.stream(xxx) .flatMapSequential(f -> { System.out.println(f.getResult().getOutput().getContent()); // Output reasoning_content System.out.println(f.getResult().getOutput().getReasoningContent()); })

Current Behavior

ChatModel.stream(xxx) .flatMapSequential(f -> { // Output reasoning_content is not supported System.out.println(f.getResult().getOutput().getContent()); })

Context

When launching LLM Q&A using ChatModel, the thinking response of deepseek-r1 will be output in reasoning_content. Currently, there is only content field in Message output. Unable to receive LLM's thinking, want to add related fields.

stillmoon avatar Feb 20 '25 03:02 stillmoon

+1,怎么还不支持思维链,烦死了

Ltyro avatar Feb 20 '25 10:02 Ltyro

related document : https://api-docs.deepseek.com/guides/reasoning_model

dev-jonghoonpark avatar Feb 20 '25 22:02 dev-jonghoonpark

Currently, the chain of thought (reasoning_content) is merged with the actual content, which also causes the structured output converters to fail. I created the rudimentary custom converter below as a temporary workaround, but yes, this functionality should be natively supported by the library.

DeepSeekModelOutputConverter.java ChatbotService.java

hardikSinghBehl avatar Feb 21 '25 04:02 hardikSinghBehl

This issue is getting resolved by this PR https://github.com/spring-projects/spring-ai/pull/2192

Example of deepseek-reasoner call and response:

request:

curl -v -X POST "https://api.deepseek.com/chat/completions" \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer sk-your-api-key" \
  -d '{
    "model": "deepseek-reasoner",
    "messages": [
      {"role": "system", "content": "You are a helpful assistant."},
      {"role": "user", "content": "What is the answer to the Great Question of Life, the Universe, and Everything?"}
    ],
    "stream": false
  }'

response:

{
  "id":"6090f86a-12aa-4xxxx-89af-85xxxxxx",
  "object":"chat.completion",
  "created":1740134353,
  "model":"deepseek-reasoner",
  "choices":[
    {
      "index":0,
      "message":{
        "role":"assistant",
        "content":"The answer to the Great Question of Life, the Universe, and Everything, as famously depicted in Douglas Adams' *The Hitchhiker's Guide to the Galaxy*, is **42**. \n\nHowever, the story humorously reveals that while the supercomputer Deep Thought calculated this answer over millions of years, the actual *question* corresponding to it remains ambiguous—highlighting the absurdity of seeking absolute meaning in a vast, chaotic universe. 😊",
        "reasoning_content":"Okay, let's see. The user is asking about the answer to the Great Question of Life, the Universe, and Everything. Hmm, I remember that this is a reference to \"The Hitchhiker's Guide to the Galaxy\" by Douglas Adams. In the book, a supercomputer named Deep Thought was built to calculate the answer to this ultimate question. After a lot of time and processing, the computer comes up with the number 42. But then the characters realize they didn't actually know what the question was. So the answer is 42, but the joke is that the question isn't really known.\n\nWait, but maybe the user is looking for a more philosophical answer? Like, not just the fictional reference. But given the way the question is phrased, \"the Great Question of Life, the Universe, and Everything\" is almost certainly pointing to the Hitchhiker's Guide joke. The answer is famously 42. I should confirm that I'm not missing any other context here. Maybe check if there's another interpretation, but I don't think so. This is a well-known pop culture reference. So the answer is 42, and maybe a brief explanation about the book reference to be helpful."
      },
      "logprobs":null,
      "finish_reason":"stop"
    }
  ],
  "usage":{
    "prompt_tokens":28,
    "completion_tokens":338,
    "total_tokens":366,
    "prompt_tokens_details":{
      "cached_tokens":0
    },
    "completion_tokens_details":{
      "reasoning_tokens":247
    },
    "prompt_cache_hit_tokens":0,
    "prompt_cache_miss_tokens":28
  },
  "system_fingerprint":"fp_5417b77867_prod"
}

apappascs avatar Feb 21 '25 11:02 apappascs

@Ltyro I have an alternative solution:
We can set up a WebClient interceptor to redirect the reasoning_content field from the DeepSeek official API into the content field, wrapping it with <think></think> tags. Afterwards, we can parse the chain-of-thought content from the content field. This temporarily resolves the issue of the current Spring-AI version not supporting the retrieval of the reasoning_content field.


@Slf4j
@Configuration
public class WebClientConfiguration {
    @Bean
    @Scope("prototype")
    @ConditionalOnMissingBean
    WebClientCustomizer webClientCustomizer() {
        return webClientBuilder -> {
            ExchangeFilterFunction requestFilter = ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
                log.info("Request Method: {}", clientRequest.method());
                log.info("Request URL: {}", clientRequest.url());
                return Mono.just(clientRequest);
            });

            // 响应拦截器
            ExchangeFilterFunction responseFilter = ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
                log.info("Intercepted Response Status: {}", clientResponse.statusCode());

                final boolean[] thinking = {false};

                // 处理SSE事件流
                Flux<DataBuffer> modifiedBody = clientResponse.bodyToFlux(DataBuffer.class)
                        .map(buf -> {
                            byte[] bytes = new byte[buf.readableByteCount()];
                            buf.read(bytes);
                            DataBufferUtils.release(buf);
                            return new String(bytes, StandardCharsets.UTF_8);
                        })
                        .flatMap(eventString -> {
                            log.debug(eventString);
                            if (eventString.startsWith("data: ")) {
                                String jsonPart = eventString.substring("data: ".length()).trim();
                                if (JSONUtil.isTypeJSON(jsonPart) && !jsonPart.equals("data: [DONE]")) {
                                    JSONObject retJson;
                                    try {
                                        retJson = JSONUtil.parseObj(jsonPart);
                                    } catch (Exception e) {
                                        log.warn("解析失败");
                                        return Mono.just(eventString);
                                    }

                                    // 修改content字段
                                    JSONArray choices = retJson.getJSONArray("choices");
                                    for (int i = 0; i < choices.size(); i++) {
                                        JSONObject choice = choices.getJSONObject(i);
                                        if (choice == null) {
                                            break;
                                        }
                                        JSONObject delta = choice.getJSONObject("delta");
                                        if (delta == null) {
                                            break;
                                        }
                                        String reasoningContent = delta.getStr("reasoning_content");
                                        String content = delta.getStr("content");
                                        if (StrUtil.isNotBlank(reasoningContent)) {
                                            if (!thinking[0]) {
                                                thinking[0] = true;
                                                delta.set("content", "<think>" + reasoningContent);
                                            } else {
                                                delta.set("content", reasoningContent);
                                            }

                                        } else {
                                            if (thinking[0]) {
                                                thinking[0] = false;
                                                delta.set("content", "</think>" + (content == null ? "" : content));
                                            }
                                        }
                                    }
                                    // 重新生成事件字符串
                                    String modifiedJson = retJson.toString();
                                    return Mono.just("data: " + modifiedJson + "\n\n");
                                }
                                return Mono.just(eventString);
                            }
                            return Mono.just(eventString);
                        })
                        .map(str -> {
                            byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
                            return new DefaultDataBufferFactory().wrap(bytes);
                        });

                // 创建新的ClientResponse,移除CONTENT_LENGTH头
                ClientResponse modifiedResponse = ClientResponse.from(clientResponse)
                        .headers(headers -> headers.remove(HttpHeaders.CONTENT_LENGTH))
                        .body(modifiedBody)
                        .build();

                return Mono.just(modifiedResponse);
            });

            // 将拦截器应用到 WebClient.Builder
            webClientBuilder.filter(requestFilter).filter(responseFilter);
        };
    }
}

KonngExplorer avatar Mar 01 '25 06:03 KonngExplorer

@Ltyro I have an alternative solution: We can set up a WebClient interceptor to redirect the reasoning_content field from the DeepSeek official API into the content field, wrapping it with <think></think> tags. Afterwards, we can parse the chain-of-thought content from the content field. This temporarily resolves the issue of the current Spring-AI version not supporting the retrieval of the reasoning_content field.

@Slf4j
@Configuration
public class WebClientConfiguration {
    @Bean
    @Scope("prototype")
    @ConditionalOnMissingBean
    WebClientCustomizer webClientCustomizer() {
        return webClientBuilder -> {
            ExchangeFilterFunction requestFilter = ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
                log.info("Request Method: {}", clientRequest.method());
                log.info("Request URL: {}", clientRequest.url());
                return Mono.just(clientRequest);
            });

            // 响应拦截器
            ExchangeFilterFunction responseFilter = ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
                log.info("Intercepted Response Status: {}", clientResponse.statusCode());

                final boolean[] thinking = {false};

                // 处理SSE事件流
                Flux<DataBuffer> modifiedBody = clientResponse.bodyToFlux(DataBuffer.class)
                        .map(buf -> {
                            byte[] bytes = new byte[buf.readableByteCount()];
                            buf.read(bytes);
                            DataBufferUtils.release(buf);
                            return new String(bytes, StandardCharsets.UTF_8);
                        })
                        .flatMap(eventString -> {
                            log.debug(eventString);
                            if (eventString.startsWith("data: ")) {
                                String jsonPart = eventString.substring("data: ".length()).trim();
                                if (JSONUtil.isTypeJSON(jsonPart) && !jsonPart.equals("data: [DONE]")) {
                                    JSONObject retJson;
                                    try {
                                        retJson = JSONUtil.parseObj(jsonPart);
                                    } catch (Exception e) {
                                        log.warn("解析失败");
                                        return Mono.just(eventString);
                                    }

                                    // 修改content字段
                                    JSONArray choices = retJson.getJSONArray("choices");
                                    for (int i = 0; i < choices.size(); i++) {
                                        JSONObject choice = choices.getJSONObject(i);
                                        if (choice == null) {
                                            break;
                                        }
                                        JSONObject delta = choice.getJSONObject("delta");
                                        if (delta == null) {
                                            break;
                                        }
                                        String reasoningContent = delta.getStr("reasoning_content");
                                        String content = delta.getStr("content");
                                        if (StrUtil.isNotBlank(reasoningContent)) {
                                            if (!thinking[0]) {
                                                thinking[0] = true;
                                                delta.set("content", "<think>" + reasoningContent);
                                            } else {
                                                delta.set("content", reasoningContent);
                                            }

                                        } else {
                                            if (thinking[0]) {
                                                thinking[0] = false;
                                                delta.set("content", "</think>" + (content == null ? "" : content));
                                            }
                                        }
                                    }
                                    // 重新生成事件字符串
                                    String modifiedJson = retJson.toString();
                                    return Mono.just("data: " + modifiedJson + "\n\n");
                                }
                                return Mono.just(eventString);
                            }
                            return Mono.just(eventString);
                        })
                        .map(str -> {
                            byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
                            return new DefaultDataBufferFactory().wrap(bytes);
                        });

                // 创建新的ClientResponse,移除CONTENT_LENGTH头
                ClientResponse modifiedResponse = ClientResponse.from(clientResponse)
                        .headers(headers -> headers.remove(HttpHeaders.CONTENT_LENGTH))
                        .body(modifiedBody)
                        .build();

                return Mono.just(modifiedResponse);
            });

            // 将拦截器应用到 WebClient.Builder
            webClientBuilder.filter(requestFilter).filter(responseFilter);
        };
    }
}

If you use a non-streaming API, you can use the RestClient interceptor to do something similar.

KonngExplorer avatar Mar 01 '25 06:03 KonngExplorer

That's a nice one! Thank you for sharing @KonngExplorer

apappascs avatar Mar 01 '25 14:03 apappascs

@Ltyro I have an alternative solution: We can set up a WebClient interceptor to redirect the reasoning_content field from the DeepSeek official API into the content field, wrapping it with <think></think> tags. Afterwards, we can parse the chain-of-thought content from the content field. This temporarily resolves the issue of the current Spring-AI version not supporting the retrieval of the reasoning_content field. @Slf4j @Configuration public class WebClientConfiguration { @Bean @Scope("prototype") @ConditionalOnMissingBean WebClientCustomizer webClientCustomizer() { return webClientBuilder -> { ExchangeFilterFunction requestFilter = ExchangeFilterFunction.ofRequestProcessor(clientRequest -> { log.info("Request Method: {}", clientRequest.method()); log.info("Request URL: {}", clientRequest.url()); return Mono.just(clientRequest); });

        // 响应拦截器
        ExchangeFilterFunction responseFilter = ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
            log.info("Intercepted Response Status: {}", clientResponse.statusCode());

            final boolean[] thinking = {false};

            // 处理SSE事件流
            Flux<DataBuffer> modifiedBody = clientResponse.bodyToFlux(DataBuffer.class)
                    .map(buf -> {
                        byte[] bytes = new byte[buf.readableByteCount()];
                        buf.read(bytes);
                        DataBufferUtils.release(buf);
                        return new String(bytes, StandardCharsets.UTF_8);
                    })
                    .flatMap(eventString -> {
                        log.debug(eventString);
                        if (eventString.startsWith("data: ")) {
                            String jsonPart = eventString.substring("data: ".length()).trim();
                            if (JSONUtil.isTypeJSON(jsonPart) && !jsonPart.equals("data: [DONE]")) {
                                JSONObject retJson;
                                try {
                                    retJson = JSONUtil.parseObj(jsonPart);
                                } catch (Exception e) {
                                    log.warn("解析失败");
                                    return Mono.just(eventString);
                                }

                                // 修改content字段
                                JSONArray choices = retJson.getJSONArray("choices");
                                for (int i = 0; i < choices.size(); i++) {
                                    JSONObject choice = choices.getJSONObject(i);
                                    if (choice == null) {
                                        break;
                                    }
                                    JSONObject delta = choice.getJSONObject("delta");
                                    if (delta == null) {
                                        break;
                                    }
                                    String reasoningContent = delta.getStr("reasoning_content");
                                    String content = delta.getStr("content");
                                    if (StrUtil.isNotBlank(reasoningContent)) {
                                        if (!thinking[0]) {
                                            thinking[0] = true;
                                            delta.set("content", "<think>" + reasoningContent);
                                        } else {
                                            delta.set("content", reasoningContent);
                                        }

                                    } else {
                                        if (thinking[0]) {
                                            thinking[0] = false;
                                            delta.set("content", "</think>" + (content == null ? "" : content));
                                        }
                                    }
                                }
                                // 重新生成事件字符串
                                String modifiedJson = retJson.toString();
                                return Mono.just("data: " + modifiedJson + "\n\n");
                            }
                            return Mono.just(eventString);
                        }
                        return Mono.just(eventString);
                    })
                    .map(str -> {
                        byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
                        return new DefaultDataBufferFactory().wrap(bytes);
                    });

            // 创建新的ClientResponse,移除CONTENT_LENGTH头
            ClientResponse modifiedResponse = ClientResponse.from(clientResponse)
                    .headers(headers -> headers.remove(HttpHeaders.CONTENT_LENGTH))
                    .body(modifiedBody)
                    .build();

            return Mono.just(modifiedResponse);
        });

        // 将拦截器应用到 WebClient.Builder
        webClientBuilder.filter(requestFilter).filter(responseFilter);
    };
}

}

If you use a non-streaming API, you can use the RestClient interceptor to do something similar.

感谢大佬,有点小问题,调整了一下:

import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.web.reactive.function.client.WebClientCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

@Slf4j
@Configuration
public class WebClientConfiguration {

    @Bean
    @ConditionalOnMissingBean
    WebClientCustomizer webClientCustomizer() {
        return webClientBuilder -> {
            ExchangeFilterFunction requestFilter = ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
                log.info("Request Method: {}", clientRequest.method());
                log.info("Request URL: {}", clientRequest.url());
                return Mono.just(clientRequest);
            });

            // 响应拦截器
            ExchangeFilterFunction responseFilter = ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
                log.info("Intercepted Response Status: {}", clientResponse.statusCode());

                AtomicBoolean thinkingFlag = new AtomicBoolean(false);

                // 处理SSE事件流
                Flux<DataBuffer> modifiedBody = clientResponse.bodyToFlux(DataBuffer.class)
                        .map(buf -> {
                            byte[] bytes = new byte[buf.readableByteCount()];
                            buf.read(bytes);
                            DataBufferUtils.release(buf);
                            return new String(bytes, StandardCharsets.UTF_8);
                        })
                        .flatMap(eventString -> {
                            log.debug("原始报文:==> 【{}】", eventString);
                            // 在一条消息中可能有多个data
                            String[] list = eventString.split("\\n", -1);
                            // System.out.println(list.size());
                            // System.out.println(list);

                            List<String> lines = new ArrayList<>();
                            for (String line : list) {
                                if (line.startsWith("data: ")) {
                                    String jsonPart = line.substring("data: ".length()).trim();
                                    if (JSONUtil.isTypeJSON(jsonPart) && !"data: [DONE]".equals(line)) {
                                        JSONObject retJson;
                                        try {
                                            retJson = JSONUtil.parseObj(jsonPart);
                                        } catch (Exception e) {
                                            lines.add(line);
                                            break;
                                        }

                                        // 修改content字段
                                        JSONArray choices = retJson.getJSONArray("choices");
                                        for (int i = 0; i < choices.size(); i++) {
                                            JSONObject choice = choices.getJSONObject(i);
                                            if (choice == null) {
                                                break;
                                            }
                                            JSONObject delta = choice.getJSONObject("delta");
                                            if (delta == null) {
                                                break;
                                            }
                                            String reasoningContent = delta.getStr("reasoning_content");
                                            String content = delta.getStr("content");
                                            if (reasoningContent != null) {
                                                if (!thinkingFlag.get()) {
                                                    thinkingFlag.set(true);
                                                    delta.set("content", "<think>" + reasoningContent);
                                                } else {
                                                    delta.set("content", reasoningContent);
                                                }
                                            } else {
                                                if (thinkingFlag.get()) {
                                                    thinkingFlag.set(false);
                                                    delta.set("content", "</think>" + (content == null ? "" : content));
                                                }
                                            }
                                        }
                                        // 重新生成事件字符串
                                        String modifiedJson = retJson.toString();
                                        lines.add("data: " + modifiedJson);
                                    } else {
                                        lines.add(line);
                                    }
                                } else {
                                    lines.add(line);
                                }
                            }

                            String finalLine = StringUtils.join(lines, "\n");
                            log.debug("处理后报文:==> 【{}】", finalLine);
                            return Mono.just(finalLine);
                        })
                        .map(str -> {
                            byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
                            return new DefaultDataBufferFactory().wrap(bytes);
                        });

                // 创建新的ClientResponse,移除CONTENT_LENGTH头
                ClientResponse modifiedResponse = ClientResponse.from(clientResponse)
                        .headers(headers -> headers.remove(HttpHeaders.CONTENT_LENGTH))
                        .body(modifiedBody)
                        .build();

                return Mono.just(modifiedResponse);
            });

            // 将拦截器应用到 WebClient.Builder
            webClientBuilder.filter(requestFilter).filter(responseFilter);
        };
    }
}

EyoungShieh1992 avatar Mar 05 '25 07:03 EyoungShieh1992

@KonngExplorer thanks for your solution. 我当时就直接用了deepseek-spring-boot-starter,将这个包返回的数据再重新组装成和ChatResponse一样的结构,就是重写了一下ChatResponse,Generation和AssistantMessage,比如叫给ReasonChatResponse,ReasonGeneration和,ReasonAssistantMessage,ReasonAssistantMessage就加个字段reasoningContent,convert like this:

generateDeepSeekStream(param)
  .map(ccc -> {
      List<ReasonGeneration> generations = ccc.choices().stream()
              .filter(c -> c.delta() != null)
              .map(choice -> {
                  ReasonAssistantMessage msg = new ReasonAssistantMessage(choice.delta().content());
                  msg.setReasoningContent(choice.delta().reasoningContent());
                  ChatGenerationMetadata metadata = ChatGenerationMetadata.builder()
                          .finishReason(choice.finishReason()).build();
                  return new ReasonGeneration(msg, metadata);
              })
              .toList();
      ChatResponseMetadata metadata = ChatResponseMetadata.builder()
              .id(ccc.id()).model(ccc.model()).build();
      ReasonChatResponse reasonChatResponse = new ReasonChatResponse(generations, metadata);
      return reasonChatResponse;
  })

这样之后用升级后的spring ai,前端就不用改了

Ltyro avatar Mar 11 '25 07:03 Ltyro

@Ltyro I have an alternative solution: We can set up a WebClient interceptor to redirect the reasoning_content field from the DeepSeek official API into the content field, wrapping it with <think></think> tags. Afterwards, we can parse the chain-of-thought content from the content field. This temporarily resolves the issue of the current Spring-AI version not supporting the retrieval of the reasoning_content field. @slf4j @configuration public class WebClientConfiguration { @bean @scope("prototype") @ConditionalOnMissingBean WebClientCustomizer webClientCustomizer() { return webClientBuilder -> { ExchangeFilterFunction requestFilter = ExchangeFilterFunction.ofRequestProcessor(clientRequest -> { log.info("Request Method: {}", clientRequest.method()); log.info("Request URL: {}", clientRequest.url()); return Mono.just(clientRequest); });

        // 响应拦截器
        ExchangeFilterFunction responseFilter = ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
            log.info("Intercepted Response Status: {}", clientResponse.statusCode());

            final boolean[] thinking = {false};

            // 处理SSE事件流
            Flux<DataBuffer> modifiedBody = clientResponse.bodyToFlux(DataBuffer.class)
                    .map(buf -> {
                        byte[] bytes = new byte[buf.readableByteCount()];
                        buf.read(bytes);
                        DataBufferUtils.release(buf);
                        return new String(bytes, StandardCharsets.UTF_8);
                    })
                    .flatMap(eventString -> {
                        log.debug(eventString);
                        if (eventString.startsWith("data: ")) {
                            String jsonPart = eventString.substring("data: ".length()).trim();
                            if (JSONUtil.isTypeJSON(jsonPart) && !jsonPart.equals("data: [DONE]")) {
                                JSONObject retJson;
                                try {
                                    retJson = JSONUtil.parseObj(jsonPart);
                                } catch (Exception e) {
                                    log.warn("解析失败");
                                    return Mono.just(eventString);
                                }

                                // 修改content字段
                                JSONArray choices = retJson.getJSONArray("choices");
                                for (int i = 0; i < choices.size(); i++) {
                                    JSONObject choice = choices.getJSONObject(i);
                                    if (choice == null) {
                                        break;
                                    }
                                    JSONObject delta = choice.getJSONObject("delta");
                                    if (delta == null) {
                                        break;
                                    }
                                    String reasoningContent = delta.getStr("reasoning_content");
                                    String content = delta.getStr("content");
                                    if (StrUtil.isNotBlank(reasoningContent)) {
                                        if (!thinking[0]) {
                                            thinking[0] = true;
                                            delta.set("content", "<think>" + reasoningContent);
                                        } else {
                                            delta.set("content", reasoningContent);
                                        }

                                    } else {
                                        if (thinking[0]) {
                                            thinking[0] = false;
                                            delta.set("content", "</think>" + (content == null ? "" : content));
                                        }
                                    }
                                }
                                // 重新生成事件字符串
                                String modifiedJson = retJson.toString();
                                return Mono.just("data: " + modifiedJson + "\n\n");
                            }
                            return Mono.just(eventString);
                        }
                        return Mono.just(eventString);
                    })
                    .map(str -> {
                        byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
                        return new DefaultDataBufferFactory().wrap(bytes);
                    });

            // 创建新的ClientResponse,移除CONTENT_LENGTH头
            ClientResponse modifiedResponse = ClientResponse.from(clientResponse)
                    .headers(headers -> headers.remove(HttpHeaders.CONTENT_LENGTH))
                    .body(modifiedBody)
                    .build();

            return Mono.just(modifiedResponse);
        });

        // 将拦截器应用到 WebClient.Builder
        webClientBuilder.filter(requestFilter).filter(responseFilter);
    };
}

}

If you use a non-streaming API, you can use the RestClient interceptor to do something similar.

感谢大佬,有点小问题,调整了一下:

import cn.hutool.json.JSONArray; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.web.reactive.function.client.WebClientCustomizer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.http.HttpHeaders; import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.ExchangeFilterFunction; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono;

import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean;

@Slf4j @Configuration public class WebClientConfiguration {

@Bean
@ConditionalOnMissingBean
WebClientCustomizer webClientCustomizer() {
    return webClientBuilder -> {
        ExchangeFilterFunction requestFilter = ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
            log.info("Request Method: {}", clientRequest.method());
            log.info("Request URL: {}", clientRequest.url());
            return Mono.just(clientRequest);
        });

        // 响应拦截器
        ExchangeFilterFunction responseFilter = ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
            log.info("Intercepted Response Status: {}", clientResponse.statusCode());

            AtomicBoolean thinkingFlag = new AtomicBoolean(false);

            // 处理SSE事件流
            Flux<DataBuffer> modifiedBody = clientResponse.bodyToFlux(DataBuffer.class)
                    .map(buf -> {
                        byte[] bytes = new byte[buf.readableByteCount()];
                        buf.read(bytes);
                        DataBufferUtils.release(buf);
                        return new String(bytes, StandardCharsets.UTF_8);
                    })
                    .flatMap(eventString -> {
                        log.debug("原始报文:==> 【{}】", eventString);
                        // 在一条消息中可能有多个data
                        String[] list = eventString.split("\\n", -1);
                        // System.out.println(list.size());
                        // System.out.println(list);

                        List<String> lines = new ArrayList<>();
                        for (String line : list) {
                            if (line.startsWith("data: ")) {
                                String jsonPart = line.substring("data: ".length()).trim();
                                if (JSONUtil.isTypeJSON(jsonPart) && !"data: [DONE]".equals(line)) {
                                    JSONObject retJson;
                                    try {
                                        retJson = JSONUtil.parseObj(jsonPart);
                                    } catch (Exception e) {
                                        lines.add(line);
                                        break;
                                    }

                                    // 修改content字段
                                    JSONArray choices = retJson.getJSONArray("choices");
                                    for (int i = 0; i < choices.size(); i++) {
                                        JSONObject choice = choices.getJSONObject(i);
                                        if (choice == null) {
                                            break;
                                        }
                                        JSONObject delta = choice.getJSONObject("delta");
                                        if (delta == null) {
                                            break;
                                        }
                                        String reasoningContent = delta.getStr("reasoning_content");
                                        String content = delta.getStr("content");
                                        if (reasoningContent != null) {
                                            if (!thinkingFlag.get()) {
                                                thinkingFlag.set(true);
                                                delta.set("content", "<think>" + reasoningContent);
                                            } else {
                                                delta.set("content", reasoningContent);
                                            }
                                        } else {
                                            if (thinkingFlag.get()) {
                                                thinkingFlag.set(false);
                                                delta.set("content", "</think>" + (content == null ? "" : content));
                                            }
                                        }
                                    }
                                    // 重新生成事件字符串
                                    String modifiedJson = retJson.toString();
                                    lines.add("data: " + modifiedJson);
                                } else {
                                    lines.add(line);
                                }
                            } else {
                                lines.add(line);
                            }
                        }

                        String finalLine = StringUtils.join(lines, "\n");
                        log.debug("处理后报文:==> 【{}】", finalLine);
                        return Mono.just(finalLine);
                    })
                    .map(str -> {
                        byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
                        return new DefaultDataBufferFactory().wrap(bytes);
                    });

            // 创建新的ClientResponse,移除CONTENT_LENGTH头
            ClientResponse modifiedResponse = ClientResponse.from(clientResponse)
                    .headers(headers -> headers.remove(HttpHeaders.CONTENT_LENGTH))
                    .body(modifiedBody)
                    .build();

            return Mono.just(modifiedResponse);
        });

        // 将拦截器应用到 WebClient.Builder
        webClientBuilder.filter(requestFilter).filter(responseFilter);
    };
}

}

请问这个应该如何使用?我是新手不太懂,求帮助谢谢

Zheng-Shuo avatar Apr 02 '25 14:04 Zheng-Shuo

@Slf4j
@Configuration
public class WebClientConfiguration {

    /**
     * 字节 火山引擎 deepseekR1 chat路径
     */
    private static final String chatUrl = "/api/v3/chat/completions";

    private static final String DATA_PREFIX = "data: ";

    private static final String DATA_DONE = DATA_PREFIX + "[DONE]";

    private final ExchangeFilterFunction requestFilter = ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
        log.info("web client Request URL: {}", clientRequest.url());
        return Mono.just(clientRequest);
    });


    @Bean
    WebClientCustomizer webClientCustomizer() {
        return webClientBuilder -> {

            ExchangeFilterFunction responseFilter = ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
                if (clientResponse.request().getURI().toString().endsWith(chatUrl)) {
                    log.info("handler DeepSeek reasoning content");
                    final boolean[] isThinkStart = {true};
                    final boolean[] isThinkEnd = {true};
                    Flux<String> eventStream = clientResponse.bodyToFlux(DataBuffer.class)
                            .flatMap(dataBuffer -> {
                                byte[] bytes = new byte[dataBuffer.readableByteCount()];
                                dataBuffer.read(bytes);
                                DataBufferUtils.release(dataBuffer);
                                String chunk = new String(bytes, StandardCharsets.UTF_8);
                                return Flux.fromArray(chunk.split("\n\n"));
                            })
                            .map(s -> {
                                if (!DATA_DONE.equals(s)) {
                                    String jsonData = s.substring(DATA_PREFIX.length());
                                    JSONObject jsonObject = JSON.parseObject(jsonData);
                                    JSONArray choices = jsonObject.getJSONArray("choices");
                                    choices.forEach(o -> {
                                        JSONObject choice = (JSONObject) o;
                                        JSONObject delta = choice.getJSONObject("delta");
                                        if (delta != null) {
                                            String reasoning_content = delta.getString("reasoning_content");
                                            String content = delta.getString("content");
                                            if (reasoning_content != null && !reasoning_content.isEmpty() && content.isEmpty()) {
                                                //思考的内容。
                                                if (isThinkStart[0]) {
                                                    isThinkStart[0] = false;
                                                    //思考开始了。
                                                    delta.put("content", Const.THINK_TAG + THINKING_BEGIN + "\n" + reasoning_content);
                                                } else {
                                                    delta.put("content", Const.THINK_TAG + reasoning_content);
                                                }
                                            } else {
                                                if (isThinkEnd[0]) {
                                                    isThinkEnd[0] = false;
                                                    delta.put("content", Const.THINK_TAG + THINKING_END + "\n" + (Objects.isNull(content) ? "" : content));
                                                }
                                            }
                                        }
                                    });
                                    s = DATA_PREFIX + jsonObject.toJSONString();
                                } else {
                                    log.info("DATA_DONE");
                                }
                                return s + "\n\n";
                            });

                    Flux<DataBuffer> modifiedBody = eventStream.map(s ->
                            new DefaultDataBufferFactory().wrap(s.getBytes(StandardCharsets.UTF_8))
                    );

                    ClientResponse modifiedResponse = ClientResponse.create(clientResponse.statusCode())
                            .headers(headers -> {
                                headers.addAll(clientResponse.headers().asHttpHeaders());
                                headers.remove(HttpHeaders.CONTENT_LENGTH);
                            })
                            .body(modifiedBody)
                            .build();
                    return Mono.just(modifiedResponse);
                } else {
                    return Mono.just(clientResponse);
                }
            });
            webClientBuilder.filter(requestFilter).filter(responseFilter);
        };
    }

}

nullpointerxyz avatar Apr 22 '25 08:04 nullpointerxyz

@apappascs it seems like a community project to get active incubation and feedback would be the most effective route.

markpollack avatar Apr 30 '25 13:04 markpollack

@stillmoon this issue has been resolved as now spring ai has a seperate deepseek package https://github.com/spring-projects/spring-ai/blob/main/models/spring-ai-deepseek/src/main/java/org/springframework/ai/deepseek/api/DeepSeekApi.java

apappascs avatar May 27 '25 13:05 apappascs