[Bug] [Module Name] Bug title No match found for function signature
Search before asking
- [x] I had searched in the issues and found no similar issues.
What happened
package com.heckerstone.udf;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import okhttp3.*; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.functions.AsyncLookupFunction; import org.apache.flink.table.functions.AsyncTableFunction; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.types.extraction.TypeInferenceExtractor; import org.apache.flink.table.types.inference.TypeInference; import org.apache.flink.types.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit;
public class WBRUTransV38 extends AsyncTableFunction<Row> { private static final Logger logger = LoggerFactory.getLogger(WBRUTransV38.class); private static final String GEMINI_API_KEY = "sk-or-v1-xxxxx"; private static final Row EMPTY_ROW = Row.of("", "", "");
// 实例级连接池(非静态)
private transient OkHttpClient client;
// --- 关键修改2:实现类型推导接口 ---
// @Override // public TypeInference getTypeInference(DataTypeFactory typeFactory) { // return TypeInference.newBuilder() // .inputTypeStrategy(TypeInferenceExtractor.forAsyncTableFunction(typeFactory, this.getClass()).getInputTypeStrategy()) // .outputTypeStrategy(TypeInferenceExtractor.forAsyncTableFunction(typeFactory, this.getClass()).getOutputTypeStrategy()) // .build(); // }
// --- 资源生命周期管理 ---
@Override
public void open(FunctionContext context) {
this.client = new OkHttpClient.Builder()
.connectTimeout(10, TimeUnit.SECONDS)
.readTimeout(10, TimeUnit.SECONDS)
.writeTimeout(10, TimeUnit.SECONDS)
.connectionPool(new ConnectionPool(20, 5, TimeUnit.MINUTES)) // 连接复用
.build();
}
@Override
public void close() {
if (client != null) {
client.dispatcher().executorService().shutdown(); // 关闭线程池
client.connectionPool().evictAll(); // 清理连接
}
}
// --- 核心逻辑(含完整提示词)---
@FunctionHint(
input = {@DataTypeHint("STRING"), @DataTypeHint("STRING"), @DataTypeHint("STRING")},
output = @DataTypeHint("ROW<product_name_zh STRING, text_zh STRING, answer_zh STRING>")
)
public void eval(CompletableFuture<Collection<Row>> resultFuture, String product_name_ru, String text_ru, String answer_ru) {
// 1. NULL输入检查
// if (inputRow == null || // inputRow.getField("product_name_ru") == null || // inputRow.getField("text_ru") == null || // inputRow.getField("answer_ru") == null // ) { // logger.warn("输入包含NULL值,返回空行"); // resultFuture.complete(Collections.singleton(EMPTY_ROW)); // return; // }
// 2. 组装AI请求数据(保留所有字段)
JSONObject aiJSON = new JSONObject();
aiJSON.put("product_name_ru", product_name_ru);
aiJSON.put("text", text_ru);
aiJSON.put("answer_text", answer_ru);
// 3. 完整提示词组装(无省略)
String isAnsweredStr = "{\"answer_text\":\"\",\"answer_text_zh\":\"\",\"id\":\"\",\"productDetails_brandName\":\"\",\"productDetails_brandName_zh\":\"\",\"productDetails_imtId\":\"\",\"productDetails_nmId\":\"\",\"productDetails_productName\":\"\",\"productDetails_productName_zh\":\"\",\"productDetails_supplierArticle\":\"\",\"productDetails_supplierName\":\"\",\"productDetails_supplierName_zh\":\"\",\"text\":\"\",\"text_zh\":\"\",\"wasViewed\":\"\"}";
String prompt = "你是一个专业的俄语电商客服专家,精通俄语和中文。你的任务是分析俄罗斯电商场景下的客户问题,并按照指定格式输出结果。\n" +
"核心规则:\n" +
"1. 你必须结合产品标题和客户问题的完整信息来综合判断,不能孤立地分析任何一方。\n" +
"2. 分类唯一性: category的值必须是下面提供的15个选项之一,不能创造新的选项。\n" +
"3. 优先级原则: 当一个问题可以归入多个分类时,请选择最核心、最直接的那个分类。例如,询问“收到的产品坏了,怎么退货?”应优先归为售后问题,而不是退货流程或产品质量,因为它是一个已经发生问题的售后场景。\n" +
"任务1:问题分类\n" +
"分类选项: 产品型问题, 付款问题, 售后问题, 下单方法, 库存型, 生产地, 配送时效, 发货周期, 价格问题, 物流信息, 退货流程, 产品质量, 保修时间, 产品型号问题, 其他通用型问题。\n" +
"任务2:翻译\n" +
"将所有俄语翻译为中文\n" +
"结合产品标题(productName)的上下文,翻译为易于理解的中文\n" +
"需要处理的内容为: " + aiJSON.toString() + "\n" +
"输出格式:\n" +
"严格按照以下JSON格式输出,不要添加任何额外的解释或文字,当json中字段没有值时请将该字段赋值为空字符串:\n" +
isAnsweredStr;
logger.debug("提示词组装完成"); // 避免日志过大
// 4. 构建HTTP请求
JSONObject requestBodyJson = new JSONObject();
requestBodyJson.put("model", "google/gemini-2.0-flash-001");
requestBodyJson.put("prompt", prompt);
requestBodyJson.put("transform", "middle-out");
Request request = new Request.Builder()
.url("https://openrouter.ai/api/v1/completions")
.addHeader("Authorization", "Bearer " + GEMINI_API_KEY)
.post(RequestBody.create(
requestBodyJson.toJSONString(),
MediaType.parse("application/json")
))
.build();
// 5. 异步调用
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
logger.error("HTTP请求失败: {}", e.getMessage());
resultFuture.complete(Collections.singleton(EMPTY_ROW));
}
@Override
public void onResponse(Call call, Response response) {
try (ResponseBody body = response.body()) {
if (!response.isSuccessful()) {
logger.error("HTTP状态码异常: {}", response.code());
resultFuture.complete(Collections.singleton(EMPTY_ROW));
return;
}
parseResponse(body.string(), resultFuture);
} catch (Exception e) {
logger.error("响应处理异常: {}", e.getMessage());
resultFuture.complete(Collections.singleton(EMPTY_ROW));
}
}
});
}
// --- 响应解析(完整逻辑)---
private void parseResponse(String responseBody, CompletableFuture<Collection<Row>> resultFuture) {
try {
JSONObject jsonResponse = JSON.parseObject(responseBody);
JSONArray choices = jsonResponse.getJSONArray("choices");
if (choices == null || choices.isEmpty()) {
throw new Exception("choices数组为空");
}
String rawText = choices.getJSONObject(0).getString("text");
if (rawText == null || rawText.trim().isEmpty()) {
throw new Exception("AI返回文本为空");
}
// 提取JSON片段(严格边界检查)
int start = rawText.indexOf("{");
int end = rawText.lastIndexOf("}");
if (start == -1 || end <= start) {
throw new Exception("无效的JSON边界: start=" + start + ", end=" + end);
}
String jsonStr = rawText.substring(start, end + 1)
.replace("\n", "")
.replace("\r", "");
JSONObject resultJson = JSON.parseObject(jsonStr);
Row output = Row.of(
resultJson.getString("productDetails_productName_zh"),
resultJson.getString("text_zh"),
resultJson.getString("answer_text_zh")
);
// 空字段兜底处理
for (int i = 0; i < output.getArity(); i++) {
if (output.getField(i) == null) {
output.setField(i, "");
}
}
resultFuture.complete(Collections.singleton(output));
} catch (Exception e) {
logger.error("解析失败: {}", e.getMessage());
resultFuture.complete(Collections.singleton(EMPTY_ROW));
}
}
} flinkSQL:-- DROP FUNCTION IF EXISTS WBRUTransV28; CREATE TEMPORARY FUNCTION wBRUTransV39 AS 'com.heckerstone.udf.WBRUTransV38' LANGUAGE JAVA;
INSERT INTO kl_boot.wb_questions_translated SELECT -- 日期类字段:空值替换为默认日期 COALESCE( NULLIF(TRIM(original.create_date), '1970-01-01'), '1970-01-01' ) AS create_date, COALESCE(NULLIF(original.product_nmid, ''), '00') AS product_nmid, -- 文本类字段:空值替换为空字符串 COALESCE(NULLIF(original.product_name_ru, ''), '00') AS product_name_ru, COALESCE(NULLIF(original.text_ru, ''), '00') AS text_ru, COALESCE(NULLIF(TRIM(original.question_id), ''), '00') AS question_id, COALESCE(NULLIF(original.answer_ru, ''), '00') AS answer_ru, COALESCE(NULLIF(T.answer_zh, ''), '00') AS answer_zh, COALESCE( NULLIF(TRIM(original.answer_date), '1970-01-01'), '1970-01-01' ) AS answer_date, -- 布尔/数值类字段:空值替换为默认值(0/false) COALESCE(original.is_warned, 0) AS is_warned, COALESCE(NULLIF(T.text_zh, ''), '00') AS text_zh, COALESCE(NULLIF(original.state, ''), '00') AS state, COALESCE(original.was_viewed, 0) AS was_viewed, COALESCE(NULLIF(original.product_suppliername, ''), '00') AS product_suppliername, COALESCE( NULLIF(original.product_suppliername_zh, ''), '00' ) AS product_suppliername_zh, COALESCE(NULLIF(TRIM(original.product_imt_id), ''), '00') AS product_imt_id, COALESCE(NULLIF(original.product_brandname, ''), '00') AS product_brandname, COALESCE(NULLIF(original.product_brandname_zh, ''), '00') AS product_brandname_zh, COALESCE(NULLIF(T.product_name_zh, ''), '00') AS product_name_zh, COALESCE(NULLIF(original.product_size, ''), '00') AS product_size, COALESCE( NULLIF(original.product_supplierarticle, ''), '00' ) AS product_supplierarticle FROM kl_boot.wb_questions_source AS original LEFT JOIN LATERAL TABLE ( wBRUTransV39 ( CONCAT('', COALESCE(original.product_name_ru, '00')), CONCAT('', COALESCE(original.text_ru, '00')), CONCAT('', COALESCE(original.answer_ru, '00')) ) ) AS T (product_name_zh, text_zh, answer_zh) ON TRUE;
运行报错: Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature wBRUTransV39(<CHARACTER>, <CHARACTER>, <CHARACTER>) at sun.reflect.GeneratedConstructorAccessor234.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:505) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:599) ... 207 more
What you expected to happen
package com.heckerstone.udf;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import okhttp3.*; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.functions.AsyncLookupFunction; import org.apache.flink.table.functions.AsyncTableFunction; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.types.extraction.TypeInferenceExtractor; import org.apache.flink.table.types.inference.TypeInference; import org.apache.flink.types.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit;
public class WBRUTransV38 extends AsyncTableFunction<Row> { private static final Logger logger = LoggerFactory.getLogger(WBRUTransV38.class); private static final String GEMINI_API_KEY = "sk-or-v1-xxxxx"; private static final Row EMPTY_ROW = Row.of("", "", "");
// 实例级连接池(非静态)
private transient OkHttpClient client;
// --- 关键修改2:实现类型推导接口 ---
// @Override // public TypeInference getTypeInference(DataTypeFactory typeFactory) { // return TypeInference.newBuilder() // .inputTypeStrategy(TypeInferenceExtractor.forAsyncTableFunction(typeFactory, this.getClass()).getInputTypeStrategy()) // .outputTypeStrategy(TypeInferenceExtractor.forAsyncTableFunction(typeFactory, this.getClass()).getOutputTypeStrategy()) // .build(); // }
// --- 资源生命周期管理 ---
@Override
public void open(FunctionContext context) {
this.client = new OkHttpClient.Builder()
.connectTimeout(10, TimeUnit.SECONDS)
.readTimeout(10, TimeUnit.SECONDS)
.writeTimeout(10, TimeUnit.SECONDS)
.connectionPool(new ConnectionPool(20, 5, TimeUnit.MINUTES)) // 连接复用
.build();
}
@Override
public void close() {
if (client != null) {
client.dispatcher().executorService().shutdown(); // 关闭线程池
client.connectionPool().evictAll(); // 清理连接
}
}
// --- 核心逻辑(含完整提示词)---
@FunctionHint(
input = {@DataTypeHint("STRING"), @DataTypeHint("STRING"), @DataTypeHint("STRING")},
output = @DataTypeHint("ROW<product_name_zh STRING, text_zh STRING, answer_zh STRING>")
)
public void eval(CompletableFuture<Collection<Row>> resultFuture, String product_name_ru, String text_ru, String answer_ru) {
// 1. NULL输入检查
// if (inputRow == null || // inputRow.getField("product_name_ru") == null || // inputRow.getField("text_ru") == null || // inputRow.getField("answer_ru") == null // ) { // logger.warn("输入包含NULL值,返回空行"); // resultFuture.complete(Collections.singleton(EMPTY_ROW)); // return; // }
// 2. 组装AI请求数据(保留所有字段)
JSONObject aiJSON = new JSONObject();
aiJSON.put("product_name_ru", product_name_ru);
aiJSON.put("text", text_ru);
aiJSON.put("answer_text", answer_ru);
// 3. 完整提示词组装(无省略)
String isAnsweredStr = "{\"answer_text\":\"\",\"answer_text_zh\":\"\",\"id\":\"\",\"productDetails_brandName\":\"\",\"productDetails_brandName_zh\":\"\",\"productDetails_imtId\":\"\",\"productDetails_nmId\":\"\",\"productDetails_productName\":\"\",\"productDetails_productName_zh\":\"\",\"productDetails_supplierArticle\":\"\",\"productDetails_supplierName\":\"\",\"productDetails_supplierName_zh\":\"\",\"text\":\"\",\"text_zh\":\"\",\"wasViewed\":\"\"}";
String prompt = "你是一个专业的俄语电商客服专家,精通俄语和中文。你的任务是分析俄罗斯电商场景下的客户问题,并按照指定格式输出结果。\n" +
"核心规则:\n" +
"1. 你必须结合产品标题和客户问题的完整信息来综合判断,不能孤立地分析任何一方。\n" +
"2. 分类唯一性: category的值必须是下面提供的15个选项之一,不能创造新的选项。\n" +
"3. 优先级原则: 当一个问题可以归入多个分类时,请选择最核心、最直接的那个分类。例如,询问“收到的产品坏了,怎么退货?”应优先归为售后问题,而不是退货流程或产品质量,因为它是一个已经发生问题的售后场景。\n" +
"任务1:问题分类\n" +
"分类选项: 产品型问题, 付款问题, 售后问题, 下单方法, 库存型, 生产地, 配送时效, 发货周期, 价格问题, 物流信息, 退货流程, 产品质量, 保修时间, 产品型号问题, 其他通用型问题。\n" +
"任务2:翻译\n" +
"将所有俄语翻译为中文\n" +
"结合产品标题(productName)的上下文,翻译为易于理解的中文\n" +
"需要处理的内容为: " + aiJSON.toString() + "\n" +
"输出格式:\n" +
"严格按照以下JSON格式输出,不要添加任何额外的解释或文字,当json中字段没有值时请将该字段赋值为空字符串:\n" +
isAnsweredStr;
logger.debug("提示词组装完成"); // 避免日志过大
// 4. 构建HTTP请求
JSONObject requestBodyJson = new JSONObject();
requestBodyJson.put("model", "google/gemini-2.0-flash-001");
requestBodyJson.put("prompt", prompt);
requestBodyJson.put("transform", "middle-out");
Request request = new Request.Builder()
.url("https://openrouter.ai/api/v1/completions")
.addHeader("Authorization", "Bearer " + GEMINI_API_KEY)
.post(RequestBody.create(
requestBodyJson.toJSONString(),
MediaType.parse("application/json")
))
.build();
// 5. 异步调用
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
logger.error("HTTP请求失败: {}", e.getMessage());
resultFuture.complete(Collections.singleton(EMPTY_ROW));
}
@Override
public void onResponse(Call call, Response response) {
try (ResponseBody body = response.body()) {
if (!response.isSuccessful()) {
logger.error("HTTP状态码异常: {}", response.code());
resultFuture.complete(Collections.singleton(EMPTY_ROW));
return;
}
parseResponse(body.string(), resultFuture);
} catch (Exception e) {
logger.error("响应处理异常: {}", e.getMessage());
resultFuture.complete(Collections.singleton(EMPTY_ROW));
}
}
});
}
// --- 响应解析(完整逻辑)---
private void parseResponse(String responseBody, CompletableFuture<Collection<Row>> resultFuture) {
try {
JSONObject jsonResponse = JSON.parseObject(responseBody);
JSONArray choices = jsonResponse.getJSONArray("choices");
if (choices == null || choices.isEmpty()) {
throw new Exception("choices数组为空");
}
String rawText = choices.getJSONObject(0).getString("text");
if (rawText == null || rawText.trim().isEmpty()) {
throw new Exception("AI返回文本为空");
}
// 提取JSON片段(严格边界检查)
int start = rawText.indexOf("{");
int end = rawText.lastIndexOf("}");
if (start == -1 || end <= start) {
throw new Exception("无效的JSON边界: start=" + start + ", end=" + end);
}
String jsonStr = rawText.substring(start, end + 1)
.replace("\n", "")
.replace("\r", "");
JSONObject resultJson = JSON.parseObject(jsonStr);
Row output = Row.of(
resultJson.getString("productDetails_productName_zh"),
resultJson.getString("text_zh"),
resultJson.getString("answer_text_zh")
);
// 空字段兜底处理
for (int i = 0; i < output.getArity(); i++) {
if (output.getField(i) == null) {
output.setField(i, "");
}
}
resultFuture.complete(Collections.singleton(output));
} catch (Exception e) {
logger.error("解析失败: {}", e.getMessage());
resultFuture.complete(Collections.singleton(EMPTY_ROW));
}
}
} flinkSQL:-- DROP FUNCTION IF EXISTS WBRUTransV28; CREATE TEMPORARY FUNCTION wBRUTransV39 AS 'com.heckerstone.udf.WBRUTransV38' LANGUAGE JAVA;
INSERT INTO kl_boot.wb_questions_translated SELECT -- 日期类字段:空值替换为默认日期 COALESCE( NULLIF(TRIM(original.create_date), '1970-01-01'), '1970-01-01' ) AS create_date, COALESCE(NULLIF(original.product_nmid, ''), '00') AS product_nmid, -- 文本类字段:空值替换为空字符串 COALESCE(NULLIF(original.product_name_ru, ''), '00') AS product_name_ru, COALESCE(NULLIF(original.text_ru, ''), '00') AS text_ru, COALESCE(NULLIF(TRIM(original.question_id), ''), '00') AS question_id, COALESCE(NULLIF(original.answer_ru, ''), '00') AS answer_ru, COALESCE(NULLIF(T.answer_zh, ''), '00') AS answer_zh, COALESCE( NULLIF(TRIM(original.answer_date), '1970-01-01'), '1970-01-01' ) AS answer_date, -- 布尔/数值类字段:空值替换为默认值(0/false) COALESCE(original.is_warned, 0) AS is_warned, COALESCE(NULLIF(T.text_zh, ''), '00') AS text_zh, COALESCE(NULLIF(original.state, ''), '00') AS state, COALESCE(original.was_viewed, 0) AS was_viewed, COALESCE(NULLIF(original.product_suppliername, ''), '00') AS product_suppliername, COALESCE( NULLIF(original.product_suppliername_zh, ''), '00' ) AS product_suppliername_zh, COALESCE(NULLIF(TRIM(original.product_imt_id), ''), '00') AS product_imt_id, COALESCE(NULLIF(original.product_brandname, ''), '00') AS product_brandname, COALESCE(NULLIF(original.product_brandname_zh, ''), '00') AS product_brandname_zh, COALESCE(NULLIF(T.product_name_zh, ''), '00') AS product_name_zh, COALESCE(NULLIF(original.product_size, ''), '00') AS product_size, COALESCE( NULLIF(original.product_supplierarticle, ''), '00' ) AS product_supplierarticle FROM kl_boot.wb_questions_source AS original LEFT JOIN LATERAL TABLE ( wBRUTransV39 ( CONCAT('', COALESCE(original.product_name_ru, '00')), CONCAT('', COALESCE(original.text_ru, '00')), CONCAT('', COALESCE(original.answer_ru, '00')) ) ) AS T (product_name_zh, text_zh, answer_zh) ON TRUE;
运行报错: Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature wBRUTransV39(<CHARACTER>, <CHARACTER>, <CHARACTER>) at sun.reflect.GeneratedConstructorAccessor234.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:505) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:599) ... 207 more
How to reproduce
package com.heckerstone.udf;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import okhttp3.*; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.functions.AsyncLookupFunction; import org.apache.flink.table.functions.AsyncTableFunction; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.types.extraction.TypeInferenceExtractor; import org.apache.flink.table.types.inference.TypeInference; import org.apache.flink.types.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit;
public class WBRUTransV38 extends AsyncTableFunction<Row> { private static final Logger logger = LoggerFactory.getLogger(WBRUTransV38.class); private static final String GEMINI_API_KEY = "sk-or-v1-xxxxx"; private static final Row EMPTY_ROW = Row.of("", "", "");
// 实例级连接池(非静态)
private transient OkHttpClient client;
// --- 关键修改2:实现类型推导接口 ---
// @Override // public TypeInference getTypeInference(DataTypeFactory typeFactory) { // return TypeInference.newBuilder() // .inputTypeStrategy(TypeInferenceExtractor.forAsyncTableFunction(typeFactory, this.getClass()).getInputTypeStrategy()) // .outputTypeStrategy(TypeInferenceExtractor.forAsyncTableFunction(typeFactory, this.getClass()).getOutputTypeStrategy()) // .build(); // }
// --- 资源生命周期管理 ---
@Override
public void open(FunctionContext context) {
this.client = new OkHttpClient.Builder()
.connectTimeout(10, TimeUnit.SECONDS)
.readTimeout(10, TimeUnit.SECONDS)
.writeTimeout(10, TimeUnit.SECONDS)
.connectionPool(new ConnectionPool(20, 5, TimeUnit.MINUTES)) // 连接复用
.build();
}
@Override
public void close() {
if (client != null) {
client.dispatcher().executorService().shutdown(); // 关闭线程池
client.connectionPool().evictAll(); // 清理连接
}
}
// --- 核心逻辑(含完整提示词)---
@FunctionHint(
input = {@DataTypeHint("STRING"), @DataTypeHint("STRING"), @DataTypeHint("STRING")},
output = @DataTypeHint("ROW<product_name_zh STRING, text_zh STRING, answer_zh STRING>")
)
public void eval(CompletableFuture<Collection<Row>> resultFuture, String product_name_ru, String text_ru, String answer_ru) {
// 1. NULL输入检查
// if (inputRow == null || // inputRow.getField("product_name_ru") == null || // inputRow.getField("text_ru") == null || // inputRow.getField("answer_ru") == null // ) { // logger.warn("输入包含NULL值,返回空行"); // resultFuture.complete(Collections.singleton(EMPTY_ROW)); // return; // }
// 2. 组装AI请求数据(保留所有字段)
JSONObject aiJSON = new JSONObject();
aiJSON.put("product_name_ru", product_name_ru);
aiJSON.put("text", text_ru);
aiJSON.put("answer_text", answer_ru);
// 3. 完整提示词组装(无省略)
String isAnsweredStr = "{\"answer_text\":\"\",\"answer_text_zh\":\"\",\"id\":\"\",\"productDetails_brandName\":\"\",\"productDetails_brandName_zh\":\"\",\"productDetails_imtId\":\"\",\"productDetails_nmId\":\"\",\"productDetails_productName\":\"\",\"productDetails_productName_zh\":\"\",\"productDetails_supplierArticle\":\"\",\"productDetails_supplierName\":\"\",\"productDetails_supplierName_zh\":\"\",\"text\":\"\",\"text_zh\":\"\",\"wasViewed\":\"\"}";
String prompt = "你是一个专业的俄语电商客服专家,精通俄语和中文。你的任务是分析俄罗斯电商场景下的客户问题,并按照指定格式输出结果。\n" +
"核心规则:\n" +
"1. 你必须结合产品标题和客户问题的完整信息来综合判断,不能孤立地分析任何一方。\n" +
"2. 分类唯一性: category的值必须是下面提供的15个选项之一,不能创造新的选项。\n" +
"3. 优先级原则: 当一个问题可以归入多个分类时,请选择最核心、最直接的那个分类。例如,询问“收到的产品坏了,怎么退货?”应优先归为售后问题,而不是退货流程或产品质量,因为它是一个已经发生问题的售后场景。\n" +
"任务1:问题分类\n" +
"分类选项: 产品型问题, 付款问题, 售后问题, 下单方法, 库存型, 生产地, 配送时效, 发货周期, 价格问题, 物流信息, 退货流程, 产品质量, 保修时间, 产品型号问题, 其他通用型问题。\n" +
"任务2:翻译\n" +
"将所有俄语翻译为中文\n" +
"结合产品标题(productName)的上下文,翻译为易于理解的中文\n" +
"需要处理的内容为: " + aiJSON.toString() + "\n" +
"输出格式:\n" +
"严格按照以下JSON格式输出,不要添加任何额外的解释或文字,当json中字段没有值时请将该字段赋值为空字符串:\n" +
isAnsweredStr;
logger.debug("提示词组装完成"); // 避免日志过大
// 4. 构建HTTP请求
JSONObject requestBodyJson = new JSONObject();
requestBodyJson.put("model", "google/gemini-2.0-flash-001");
requestBodyJson.put("prompt", prompt);
requestBodyJson.put("transform", "middle-out");
Request request = new Request.Builder()
.url("https://openrouter.ai/api/v1/completions")
.addHeader("Authorization", "Bearer " + GEMINI_API_KEY)
.post(RequestBody.create(
requestBodyJson.toJSONString(),
MediaType.parse("application/json")
))
.build();
// 5. 异步调用
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
logger.error("HTTP请求失败: {}", e.getMessage());
resultFuture.complete(Collections.singleton(EMPTY_ROW));
}
@Override
public void onResponse(Call call, Response response) {
try (ResponseBody body = response.body()) {
if (!response.isSuccessful()) {
logger.error("HTTP状态码异常: {}", response.code());
resultFuture.complete(Collections.singleton(EMPTY_ROW));
return;
}
parseResponse(body.string(), resultFuture);
} catch (Exception e) {
logger.error("响应处理异常: {}", e.getMessage());
resultFuture.complete(Collections.singleton(EMPTY_ROW));
}
}
});
}
// --- 响应解析(完整逻辑)---
private void parseResponse(String responseBody, CompletableFuture<Collection<Row>> resultFuture) {
try {
JSONObject jsonResponse = JSON.parseObject(responseBody);
JSONArray choices = jsonResponse.getJSONArray("choices");
if (choices == null || choices.isEmpty()) {
throw new Exception("choices数组为空");
}
String rawText = choices.getJSONObject(0).getString("text");
if (rawText == null || rawText.trim().isEmpty()) {
throw new Exception("AI返回文本为空");
}
// 提取JSON片段(严格边界检查)
int start = rawText.indexOf("{");
int end = rawText.lastIndexOf("}");
if (start == -1 || end <= start) {
throw new Exception("无效的JSON边界: start=" + start + ", end=" + end);
}
String jsonStr = rawText.substring(start, end + 1)
.replace("\n", "")
.replace("\r", "");
JSONObject resultJson = JSON.parseObject(jsonStr);
Row output = Row.of(
resultJson.getString("productDetails_productName_zh"),
resultJson.getString("text_zh"),
resultJson.getString("answer_text_zh")
);
// 空字段兜底处理
for (int i = 0; i < output.getArity(); i++) {
if (output.getField(i) == null) {
output.setField(i, "");
}
}
resultFuture.complete(Collections.singleton(output));
} catch (Exception e) {
logger.error("解析失败: {}", e.getMessage());
resultFuture.complete(Collections.singleton(EMPTY_ROW));
}
}
} flinkSQL:-- DROP FUNCTION IF EXISTS WBRUTransV28; CREATE TEMPORARY FUNCTION wBRUTransV39 AS 'com.heckerstone.udf.WBRUTransV38' LANGUAGE JAVA;
INSERT INTO kl_boot.wb_questions_translated SELECT -- 日期类字段:空值替换为默认日期 COALESCE( NULLIF(TRIM(original.create_date), '1970-01-01'), '1970-01-01' ) AS create_date, COALESCE(NULLIF(original.product_nmid, ''), '00') AS product_nmid, -- 文本类字段:空值替换为空字符串 COALESCE(NULLIF(original.product_name_ru, ''), '00') AS product_name_ru, COALESCE(NULLIF(original.text_ru, ''), '00') AS text_ru, COALESCE(NULLIF(TRIM(original.question_id), ''), '00') AS question_id, COALESCE(NULLIF(original.answer_ru, ''), '00') AS answer_ru, COALESCE(NULLIF(T.answer_zh, ''), '00') AS answer_zh, COALESCE( NULLIF(TRIM(original.answer_date), '1970-01-01'), '1970-01-01' ) AS answer_date, -- 布尔/数值类字段:空值替换为默认值(0/false) COALESCE(original.is_warned, 0) AS is_warned, COALESCE(NULLIF(T.text_zh, ''), '00') AS text_zh, COALESCE(NULLIF(original.state, ''), '00') AS state, COALESCE(original.was_viewed, 0) AS was_viewed, COALESCE(NULLIF(original.product_suppliername, ''), '00') AS product_suppliername, COALESCE( NULLIF(original.product_suppliername_zh, ''), '00' ) AS product_suppliername_zh, COALESCE(NULLIF(TRIM(original.product_imt_id), ''), '00') AS product_imt_id, COALESCE(NULLIF(original.product_brandname, ''), '00') AS product_brandname, COALESCE(NULLIF(original.product_brandname_zh, ''), '00') AS product_brandname_zh, COALESCE(NULLIF(T.product_name_zh, ''), '00') AS product_name_zh, COALESCE(NULLIF(original.product_size, ''), '00') AS product_size, COALESCE( NULLIF(original.product_supplierarticle, ''), '00' ) AS product_supplierarticle FROM kl_boot.wb_questions_source AS original LEFT JOIN LATERAL TABLE ( wBRUTransV39 ( CONCAT('', COALESCE(original.product_name_ru, '00')), CONCAT('', COALESCE(original.text_ru, '00')), CONCAT('', COALESCE(original.answer_ru, '00')) ) ) AS T (product_name_zh, text_zh, answer_zh) ON TRUE;
运行报错: Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature wBRUTransV39(<CHARACTER>, <CHARACTER>, <CHARACTER>) at sun.reflect.GeneratedConstructorAccessor234.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:505) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:599) ... 207 more
Anything else
No response
Version
dev
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [x] I agree to follow this project's Code of Conduct
Hello @, this issue has not been active for more than 30 days. This issue will be closed in 7 days if there is no response. If you have any questions, you can comment and reply.
你好 @, 这个 issue 30 天内没有活跃,7 天后将关闭,如需回复,可以评论回复。