dinky icon indicating copy to clipboard operation
dinky copied to clipboard

[Bug] [Module Name] Bug title No match found for function signature

Open heckerstone opened this issue 7 months ago • 1 comments

Search before asking

  • [x] I had searched in the issues and found no similar issues.

What happened

package com.heckerstone.udf;

import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import okhttp3.*; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.functions.AsyncLookupFunction; import org.apache.flink.table.functions.AsyncTableFunction; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.types.extraction.TypeInferenceExtractor; import org.apache.flink.table.types.inference.TypeInference; import org.apache.flink.types.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory;

import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit;

public class WBRUTransV38 extends AsyncTableFunction<Row> { private static final Logger logger = LoggerFactory.getLogger(WBRUTransV38.class); private static final String GEMINI_API_KEY = "sk-or-v1-xxxxx"; private static final Row EMPTY_ROW = Row.of("", "", "");

// 实例级连接池(非静态)
private transient OkHttpClient client;

// --- 关键修改2:实现类型推导接口 ---

// @Override // public TypeInference getTypeInference(DataTypeFactory typeFactory) { // return TypeInference.newBuilder() // .inputTypeStrategy(TypeInferenceExtractor.forAsyncTableFunction(typeFactory, this.getClass()).getInputTypeStrategy()) // .outputTypeStrategy(TypeInferenceExtractor.forAsyncTableFunction(typeFactory, this.getClass()).getOutputTypeStrategy()) // .build(); // }

// --- 资源生命周期管理 ---
@Override
public void open(FunctionContext context) {
    this.client = new OkHttpClient.Builder()
            .connectTimeout(10, TimeUnit.SECONDS)
            .readTimeout(10, TimeUnit.SECONDS)
            .writeTimeout(10, TimeUnit.SECONDS)
            .connectionPool(new ConnectionPool(20, 5, TimeUnit.MINUTES)) // 连接复用
            .build();
}

@Override
public void close() {
    if (client != null) {
        client.dispatcher().executorService().shutdown(); // 关闭线程池
        client.connectionPool().evictAll(); // 清理连接
    }
}

// --- 核心逻辑(含完整提示词)---
@FunctionHint(
        input = {@DataTypeHint("STRING"), @DataTypeHint("STRING"), @DataTypeHint("STRING")},
        output = @DataTypeHint("ROW<product_name_zh STRING, text_zh STRING, answer_zh STRING>")
)
public void eval(CompletableFuture<Collection<Row>> resultFuture, String product_name_ru, String text_ru, String answer_ru) {
    // 1. NULL输入检查

// if (inputRow == null || // inputRow.getField("product_name_ru") == null || // inputRow.getField("text_ru") == null || // inputRow.getField("answer_ru") == null // ) { // logger.warn("输入包含NULL值,返回空行"); // resultFuture.complete(Collections.singleton(EMPTY_ROW)); // return; // }

    // 2. 组装AI请求数据(保留所有字段)
    JSONObject aiJSON = new JSONObject();
    aiJSON.put("product_name_ru", product_name_ru);
    aiJSON.put("text", text_ru);
    aiJSON.put("answer_text", answer_ru);

    // 3. 完整提示词组装(无省略)
    String isAnsweredStr = "{\"answer_text\":\"\",\"answer_text_zh\":\"\",\"id\":\"\",\"productDetails_brandName\":\"\",\"productDetails_brandName_zh\":\"\",\"productDetails_imtId\":\"\",\"productDetails_nmId\":\"\",\"productDetails_productName\":\"\",\"productDetails_productName_zh\":\"\",\"productDetails_supplierArticle\":\"\",\"productDetails_supplierName\":\"\",\"productDetails_supplierName_zh\":\"\",\"text\":\"\",\"text_zh\":\"\",\"wasViewed\":\"\"}";

    String prompt = "你是一个专业的俄语电商客服专家,精通俄语和中文。你的任务是分析俄罗斯电商场景下的客户问题,并按照指定格式输出结果。\n" +
            "核心规则:\n" +
            "1. 你必须结合产品标题和客户问题的完整信息来综合判断,不能孤立地分析任何一方。\n" +
            "2. 分类唯一性: category的值必须是下面提供的15个选项之一,不能创造新的选项。\n" +
            "3. 优先级原则: 当一个问题可以归入多个分类时,请选择最核心、最直接的那个分类。例如,询问“收到的产品坏了,怎么退货?”应优先归为售后问题,而不是退货流程或产品质量,因为它是一个已经发生问题的售后场景。\n" +
            "任务1:问题分类\n" +
            "分类选项: 产品型问题, 付款问题, 售后问题, 下单方法, 库存型, 生产地, 配送时效, 发货周期, 价格问题, 物流信息, 退货流程, 产品质量, 保修时间, 产品型号问题, 其他通用型问题。\n" +
            "任务2:翻译\n" +
            "将所有俄语翻译为中文\n" +
            "结合产品标题(productName)的上下文,翻译为易于理解的中文\n" +
            "需要处理的内容为: " + aiJSON.toString() + "\n" +
            "输出格式:\n" +
            "严格按照以下JSON格式输出,不要添加任何额外的解释或文字,当json中字段没有值时请将该字段赋值为空字符串:\n" +
            isAnsweredStr;
    logger.debug("提示词组装完成"); // 避免日志过大

    // 4. 构建HTTP请求
    JSONObject requestBodyJson = new JSONObject();
    requestBodyJson.put("model", "google/gemini-2.0-flash-001");
    requestBodyJson.put("prompt", prompt);
    requestBodyJson.put("transform", "middle-out");

    Request request = new Request.Builder()
            .url("https://openrouter.ai/api/v1/completions")
            .addHeader("Authorization", "Bearer " + GEMINI_API_KEY)
            .post(RequestBody.create(
                    requestBodyJson.toJSONString(),
                    MediaType.parse("application/json")
            ))
            .build();

    // 5. 异步调用
    client.newCall(request).enqueue(new Callback() {
        @Override
        public void onFailure(Call call, IOException e) {
            logger.error("HTTP请求失败: {}", e.getMessage());
            resultFuture.complete(Collections.singleton(EMPTY_ROW));
        }

        @Override
        public void onResponse(Call call, Response response) {
            try (ResponseBody body = response.body()) {
                if (!response.isSuccessful()) {
                    logger.error("HTTP状态码异常: {}", response.code());
                    resultFuture.complete(Collections.singleton(EMPTY_ROW));
                    return;
                }
                parseResponse(body.string(), resultFuture);
            } catch (Exception e) {
                logger.error("响应处理异常: {}", e.getMessage());
                resultFuture.complete(Collections.singleton(EMPTY_ROW));
            }
        }
    });
}

// --- 响应解析(完整逻辑)---
private void parseResponse(String responseBody, CompletableFuture<Collection<Row>> resultFuture) {
    try {
        JSONObject jsonResponse = JSON.parseObject(responseBody);
        JSONArray choices = jsonResponse.getJSONArray("choices");
        if (choices == null || choices.isEmpty()) {
            throw new Exception("choices数组为空");
        }

        String rawText = choices.getJSONObject(0).getString("text");
        if (rawText == null || rawText.trim().isEmpty()) {
            throw new Exception("AI返回文本为空");
        }

        // 提取JSON片段(严格边界检查)
        int start = rawText.indexOf("{");
        int end = rawText.lastIndexOf("}");
        if (start == -1 || end <= start) {
            throw new Exception("无效的JSON边界: start=" + start + ", end=" + end);
        }
        String jsonStr = rawText.substring(start, end + 1)
                .replace("\n", "")
                .replace("\r", "");

        JSONObject resultJson = JSON.parseObject(jsonStr);
        Row output = Row.of(
                resultJson.getString("productDetails_productName_zh"),
                resultJson.getString("text_zh"),
                resultJson.getString("answer_text_zh")
        );

        // 空字段兜底处理
        for (int i = 0; i < output.getArity(); i++) {
            if (output.getField(i) == null) {
                output.setField(i, "");
            }
        }
        resultFuture.complete(Collections.singleton(output));

    } catch (Exception e) {
        logger.error("解析失败: {}", e.getMessage());
        resultFuture.complete(Collections.singleton(EMPTY_ROW));
    }
}

} flinkSQL:-- DROP FUNCTION IF EXISTS WBRUTransV28; CREATE TEMPORARY FUNCTION wBRUTransV39 AS 'com.heckerstone.udf.WBRUTransV38' LANGUAGE JAVA;

INSERT INTO kl_boot.wb_questions_translated SELECT -- 日期类字段:空值替换为默认日期 COALESCE( NULLIF(TRIM(original.create_date), '1970-01-01'), '1970-01-01' ) AS create_date, COALESCE(NULLIF(original.product_nmid, ''), '00') AS product_nmid, -- 文本类字段:空值替换为空字符串 COALESCE(NULLIF(original.product_name_ru, ''), '00') AS product_name_ru, COALESCE(NULLIF(original.text_ru, ''), '00') AS text_ru, COALESCE(NULLIF(TRIM(original.question_id), ''), '00') AS question_id, COALESCE(NULLIF(original.answer_ru, ''), '00') AS answer_ru, COALESCE(NULLIF(T.answer_zh, ''), '00') AS answer_zh, COALESCE( NULLIF(TRIM(original.answer_date), '1970-01-01'), '1970-01-01' ) AS answer_date, -- 布尔/数值类字段:空值替换为默认值(0/false) COALESCE(original.is_warned, 0) AS is_warned, COALESCE(NULLIF(T.text_zh, ''), '00') AS text_zh, COALESCE(NULLIF(original.state, ''), '00') AS state, COALESCE(original.was_viewed, 0) AS was_viewed, COALESCE(NULLIF(original.product_suppliername, ''), '00') AS product_suppliername, COALESCE( NULLIF(original.product_suppliername_zh, ''), '00' ) AS product_suppliername_zh, COALESCE(NULLIF(TRIM(original.product_imt_id), ''), '00') AS product_imt_id, COALESCE(NULLIF(original.product_brandname, ''), '00') AS product_brandname, COALESCE(NULLIF(original.product_brandname_zh, ''), '00') AS product_brandname_zh, COALESCE(NULLIF(T.product_name_zh, ''), '00') AS product_name_zh, COALESCE(NULLIF(original.product_size, ''), '00') AS product_size, COALESCE( NULLIF(original.product_supplierarticle, ''), '00' ) AS product_supplierarticle FROM kl_boot.wb_questions_source AS original LEFT JOIN LATERAL TABLE ( wBRUTransV39 ( CONCAT('', COALESCE(original.product_name_ru, '00')), CONCAT('', COALESCE(original.text_ru, '00')), CONCAT('', COALESCE(original.answer_ru, '00')) ) ) AS T (product_name_zh, text_zh, answer_zh) ON TRUE;

运行报错: Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature wBRUTransV39(<CHARACTER>, <CHARACTER>, <CHARACTER>) at sun.reflect.GeneratedConstructorAccessor234.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:505) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:599) ... 207 more

What you expected to happen

package com.heckerstone.udf;

import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import okhttp3.*; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.functions.AsyncLookupFunction; import org.apache.flink.table.functions.AsyncTableFunction; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.types.extraction.TypeInferenceExtractor; import org.apache.flink.table.types.inference.TypeInference; import org.apache.flink.types.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory;

import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit;

public class WBRUTransV38 extends AsyncTableFunction<Row> { private static final Logger logger = LoggerFactory.getLogger(WBRUTransV38.class); private static final String GEMINI_API_KEY = "sk-or-v1-xxxxx"; private static final Row EMPTY_ROW = Row.of("", "", "");

// 实例级连接池(非静态)
private transient OkHttpClient client;

// --- 关键修改2:实现类型推导接口 ---

// @Override // public TypeInference getTypeInference(DataTypeFactory typeFactory) { // return TypeInference.newBuilder() // .inputTypeStrategy(TypeInferenceExtractor.forAsyncTableFunction(typeFactory, this.getClass()).getInputTypeStrategy()) // .outputTypeStrategy(TypeInferenceExtractor.forAsyncTableFunction(typeFactory, this.getClass()).getOutputTypeStrategy()) // .build(); // }

// --- 资源生命周期管理 ---
@Override
public void open(FunctionContext context) {
    this.client = new OkHttpClient.Builder()
            .connectTimeout(10, TimeUnit.SECONDS)
            .readTimeout(10, TimeUnit.SECONDS)
            .writeTimeout(10, TimeUnit.SECONDS)
            .connectionPool(new ConnectionPool(20, 5, TimeUnit.MINUTES)) // 连接复用
            .build();
}

@Override
public void close() {
    if (client != null) {
        client.dispatcher().executorService().shutdown(); // 关闭线程池
        client.connectionPool().evictAll(); // 清理连接
    }
}

// --- 核心逻辑(含完整提示词)---
@FunctionHint(
        input = {@DataTypeHint("STRING"), @DataTypeHint("STRING"), @DataTypeHint("STRING")},
        output = @DataTypeHint("ROW<product_name_zh STRING, text_zh STRING, answer_zh STRING>")
)
public void eval(CompletableFuture<Collection<Row>> resultFuture, String product_name_ru, String text_ru, String answer_ru) {
    // 1. NULL输入检查

// if (inputRow == null || // inputRow.getField("product_name_ru") == null || // inputRow.getField("text_ru") == null || // inputRow.getField("answer_ru") == null // ) { // logger.warn("输入包含NULL值,返回空行"); // resultFuture.complete(Collections.singleton(EMPTY_ROW)); // return; // }

    // 2. 组装AI请求数据(保留所有字段)
    JSONObject aiJSON = new JSONObject();
    aiJSON.put("product_name_ru", product_name_ru);
    aiJSON.put("text", text_ru);
    aiJSON.put("answer_text", answer_ru);

    // 3. 完整提示词组装(无省略)
    String isAnsweredStr = "{\"answer_text\":\"\",\"answer_text_zh\":\"\",\"id\":\"\",\"productDetails_brandName\":\"\",\"productDetails_brandName_zh\":\"\",\"productDetails_imtId\":\"\",\"productDetails_nmId\":\"\",\"productDetails_productName\":\"\",\"productDetails_productName_zh\":\"\",\"productDetails_supplierArticle\":\"\",\"productDetails_supplierName\":\"\",\"productDetails_supplierName_zh\":\"\",\"text\":\"\",\"text_zh\":\"\",\"wasViewed\":\"\"}";

    String prompt = "你是一个专业的俄语电商客服专家,精通俄语和中文。你的任务是分析俄罗斯电商场景下的客户问题,并按照指定格式输出结果。\n" +
            "核心规则:\n" +
            "1. 你必须结合产品标题和客户问题的完整信息来综合判断,不能孤立地分析任何一方。\n" +
            "2. 分类唯一性: category的值必须是下面提供的15个选项之一,不能创造新的选项。\n" +
            "3. 优先级原则: 当一个问题可以归入多个分类时,请选择最核心、最直接的那个分类。例如,询问“收到的产品坏了,怎么退货?”应优先归为售后问题,而不是退货流程或产品质量,因为它是一个已经发生问题的售后场景。\n" +
            "任务1:问题分类\n" +
            "分类选项: 产品型问题, 付款问题, 售后问题, 下单方法, 库存型, 生产地, 配送时效, 发货周期, 价格问题, 物流信息, 退货流程, 产品质量, 保修时间, 产品型号问题, 其他通用型问题。\n" +
            "任务2:翻译\n" +
            "将所有俄语翻译为中文\n" +
            "结合产品标题(productName)的上下文,翻译为易于理解的中文\n" +
            "需要处理的内容为: " + aiJSON.toString() + "\n" +
            "输出格式:\n" +
            "严格按照以下JSON格式输出,不要添加任何额外的解释或文字,当json中字段没有值时请将该字段赋值为空字符串:\n" +
            isAnsweredStr;
    logger.debug("提示词组装完成"); // 避免日志过大

    // 4. 构建HTTP请求
    JSONObject requestBodyJson = new JSONObject();
    requestBodyJson.put("model", "google/gemini-2.0-flash-001");
    requestBodyJson.put("prompt", prompt);
    requestBodyJson.put("transform", "middle-out");

    Request request = new Request.Builder()
            .url("https://openrouter.ai/api/v1/completions")
            .addHeader("Authorization", "Bearer " + GEMINI_API_KEY)
            .post(RequestBody.create(
                    requestBodyJson.toJSONString(),
                    MediaType.parse("application/json")
            ))
            .build();

    // 5. 异步调用
    client.newCall(request).enqueue(new Callback() {
        @Override
        public void onFailure(Call call, IOException e) {
            logger.error("HTTP请求失败: {}", e.getMessage());
            resultFuture.complete(Collections.singleton(EMPTY_ROW));
        }

        @Override
        public void onResponse(Call call, Response response) {
            try (ResponseBody body = response.body()) {
                if (!response.isSuccessful()) {
                    logger.error("HTTP状态码异常: {}", response.code());
                    resultFuture.complete(Collections.singleton(EMPTY_ROW));
                    return;
                }
                parseResponse(body.string(), resultFuture);
            } catch (Exception e) {
                logger.error("响应处理异常: {}", e.getMessage());
                resultFuture.complete(Collections.singleton(EMPTY_ROW));
            }
        }
    });
}

// --- 响应解析(完整逻辑)---
private void parseResponse(String responseBody, CompletableFuture<Collection<Row>> resultFuture) {
    try {
        JSONObject jsonResponse = JSON.parseObject(responseBody);
        JSONArray choices = jsonResponse.getJSONArray("choices");
        if (choices == null || choices.isEmpty()) {
            throw new Exception("choices数组为空");
        }

        String rawText = choices.getJSONObject(0).getString("text");
        if (rawText == null || rawText.trim().isEmpty()) {
            throw new Exception("AI返回文本为空");
        }

        // 提取JSON片段(严格边界检查)
        int start = rawText.indexOf("{");
        int end = rawText.lastIndexOf("}");
        if (start == -1 || end <= start) {
            throw new Exception("无效的JSON边界: start=" + start + ", end=" + end);
        }
        String jsonStr = rawText.substring(start, end + 1)
                .replace("\n", "")
                .replace("\r", "");

        JSONObject resultJson = JSON.parseObject(jsonStr);
        Row output = Row.of(
                resultJson.getString("productDetails_productName_zh"),
                resultJson.getString("text_zh"),
                resultJson.getString("answer_text_zh")
        );

        // 空字段兜底处理
        for (int i = 0; i < output.getArity(); i++) {
            if (output.getField(i) == null) {
                output.setField(i, "");
            }
        }
        resultFuture.complete(Collections.singleton(output));

    } catch (Exception e) {
        logger.error("解析失败: {}", e.getMessage());
        resultFuture.complete(Collections.singleton(EMPTY_ROW));
    }
}

} flinkSQL:-- DROP FUNCTION IF EXISTS WBRUTransV28; CREATE TEMPORARY FUNCTION wBRUTransV39 AS 'com.heckerstone.udf.WBRUTransV38' LANGUAGE JAVA;

INSERT INTO kl_boot.wb_questions_translated SELECT -- 日期类字段:空值替换为默认日期 COALESCE( NULLIF(TRIM(original.create_date), '1970-01-01'), '1970-01-01' ) AS create_date, COALESCE(NULLIF(original.product_nmid, ''), '00') AS product_nmid, -- 文本类字段:空值替换为空字符串 COALESCE(NULLIF(original.product_name_ru, ''), '00') AS product_name_ru, COALESCE(NULLIF(original.text_ru, ''), '00') AS text_ru, COALESCE(NULLIF(TRIM(original.question_id), ''), '00') AS question_id, COALESCE(NULLIF(original.answer_ru, ''), '00') AS answer_ru, COALESCE(NULLIF(T.answer_zh, ''), '00') AS answer_zh, COALESCE( NULLIF(TRIM(original.answer_date), '1970-01-01'), '1970-01-01' ) AS answer_date, -- 布尔/数值类字段:空值替换为默认值(0/false) COALESCE(original.is_warned, 0) AS is_warned, COALESCE(NULLIF(T.text_zh, ''), '00') AS text_zh, COALESCE(NULLIF(original.state, ''), '00') AS state, COALESCE(original.was_viewed, 0) AS was_viewed, COALESCE(NULLIF(original.product_suppliername, ''), '00') AS product_suppliername, COALESCE( NULLIF(original.product_suppliername_zh, ''), '00' ) AS product_suppliername_zh, COALESCE(NULLIF(TRIM(original.product_imt_id), ''), '00') AS product_imt_id, COALESCE(NULLIF(original.product_brandname, ''), '00') AS product_brandname, COALESCE(NULLIF(original.product_brandname_zh, ''), '00') AS product_brandname_zh, COALESCE(NULLIF(T.product_name_zh, ''), '00') AS product_name_zh, COALESCE(NULLIF(original.product_size, ''), '00') AS product_size, COALESCE( NULLIF(original.product_supplierarticle, ''), '00' ) AS product_supplierarticle FROM kl_boot.wb_questions_source AS original LEFT JOIN LATERAL TABLE ( wBRUTransV39 ( CONCAT('', COALESCE(original.product_name_ru, '00')), CONCAT('', COALESCE(original.text_ru, '00')), CONCAT('', COALESCE(original.answer_ru, '00')) ) ) AS T (product_name_zh, text_zh, answer_zh) ON TRUE;

运行报错: Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature wBRUTransV39(<CHARACTER>, <CHARACTER>, <CHARACTER>) at sun.reflect.GeneratedConstructorAccessor234.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:505) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:599) ... 207 more

How to reproduce

package com.heckerstone.udf;

import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import okhttp3.*; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.functions.AsyncLookupFunction; import org.apache.flink.table.functions.AsyncTableFunction; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.types.extraction.TypeInferenceExtractor; import org.apache.flink.table.types.inference.TypeInference; import org.apache.flink.types.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory;

import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit;

public class WBRUTransV38 extends AsyncTableFunction<Row> { private static final Logger logger = LoggerFactory.getLogger(WBRUTransV38.class); private static final String GEMINI_API_KEY = "sk-or-v1-xxxxx"; private static final Row EMPTY_ROW = Row.of("", "", "");

// 实例级连接池(非静态)
private transient OkHttpClient client;

// --- 关键修改2:实现类型推导接口 ---

// @Override // public TypeInference getTypeInference(DataTypeFactory typeFactory) { // return TypeInference.newBuilder() // .inputTypeStrategy(TypeInferenceExtractor.forAsyncTableFunction(typeFactory, this.getClass()).getInputTypeStrategy()) // .outputTypeStrategy(TypeInferenceExtractor.forAsyncTableFunction(typeFactory, this.getClass()).getOutputTypeStrategy()) // .build(); // }

// --- 资源生命周期管理 ---
@Override
public void open(FunctionContext context) {
    this.client = new OkHttpClient.Builder()
            .connectTimeout(10, TimeUnit.SECONDS)
            .readTimeout(10, TimeUnit.SECONDS)
            .writeTimeout(10, TimeUnit.SECONDS)
            .connectionPool(new ConnectionPool(20, 5, TimeUnit.MINUTES)) // 连接复用
            .build();
}

@Override
public void close() {
    if (client != null) {
        client.dispatcher().executorService().shutdown(); // 关闭线程池
        client.connectionPool().evictAll(); // 清理连接
    }
}

// --- 核心逻辑(含完整提示词)---
@FunctionHint(
        input = {@DataTypeHint("STRING"), @DataTypeHint("STRING"), @DataTypeHint("STRING")},
        output = @DataTypeHint("ROW<product_name_zh STRING, text_zh STRING, answer_zh STRING>")
)
public void eval(CompletableFuture<Collection<Row>> resultFuture, String product_name_ru, String text_ru, String answer_ru) {
    // 1. NULL输入检查

// if (inputRow == null || // inputRow.getField("product_name_ru") == null || // inputRow.getField("text_ru") == null || // inputRow.getField("answer_ru") == null // ) { // logger.warn("输入包含NULL值,返回空行"); // resultFuture.complete(Collections.singleton(EMPTY_ROW)); // return; // }

    // 2. 组装AI请求数据(保留所有字段)
    JSONObject aiJSON = new JSONObject();
    aiJSON.put("product_name_ru", product_name_ru);
    aiJSON.put("text", text_ru);
    aiJSON.put("answer_text", answer_ru);

    // 3. 完整提示词组装(无省略)
    String isAnsweredStr = "{\"answer_text\":\"\",\"answer_text_zh\":\"\",\"id\":\"\",\"productDetails_brandName\":\"\",\"productDetails_brandName_zh\":\"\",\"productDetails_imtId\":\"\",\"productDetails_nmId\":\"\",\"productDetails_productName\":\"\",\"productDetails_productName_zh\":\"\",\"productDetails_supplierArticle\":\"\",\"productDetails_supplierName\":\"\",\"productDetails_supplierName_zh\":\"\",\"text\":\"\",\"text_zh\":\"\",\"wasViewed\":\"\"}";

    String prompt = "你是一个专业的俄语电商客服专家,精通俄语和中文。你的任务是分析俄罗斯电商场景下的客户问题,并按照指定格式输出结果。\n" +
            "核心规则:\n" +
            "1. 你必须结合产品标题和客户问题的完整信息来综合判断,不能孤立地分析任何一方。\n" +
            "2. 分类唯一性: category的值必须是下面提供的15个选项之一,不能创造新的选项。\n" +
            "3. 优先级原则: 当一个问题可以归入多个分类时,请选择最核心、最直接的那个分类。例如,询问“收到的产品坏了,怎么退货?”应优先归为售后问题,而不是退货流程或产品质量,因为它是一个已经发生问题的售后场景。\n" +
            "任务1:问题分类\n" +
            "分类选项: 产品型问题, 付款问题, 售后问题, 下单方法, 库存型, 生产地, 配送时效, 发货周期, 价格问题, 物流信息, 退货流程, 产品质量, 保修时间, 产品型号问题, 其他通用型问题。\n" +
            "任务2:翻译\n" +
            "将所有俄语翻译为中文\n" +
            "结合产品标题(productName)的上下文,翻译为易于理解的中文\n" +
            "需要处理的内容为: " + aiJSON.toString() + "\n" +
            "输出格式:\n" +
            "严格按照以下JSON格式输出,不要添加任何额外的解释或文字,当json中字段没有值时请将该字段赋值为空字符串:\n" +
            isAnsweredStr;
    logger.debug("提示词组装完成"); // 避免日志过大

    // 4. 构建HTTP请求
    JSONObject requestBodyJson = new JSONObject();
    requestBodyJson.put("model", "google/gemini-2.0-flash-001");
    requestBodyJson.put("prompt", prompt);
    requestBodyJson.put("transform", "middle-out");

    Request request = new Request.Builder()
            .url("https://openrouter.ai/api/v1/completions")
            .addHeader("Authorization", "Bearer " + GEMINI_API_KEY)
            .post(RequestBody.create(
                    requestBodyJson.toJSONString(),
                    MediaType.parse("application/json")
            ))
            .build();

    // 5. 异步调用
    client.newCall(request).enqueue(new Callback() {
        @Override
        public void onFailure(Call call, IOException e) {
            logger.error("HTTP请求失败: {}", e.getMessage());
            resultFuture.complete(Collections.singleton(EMPTY_ROW));
        }

        @Override
        public void onResponse(Call call, Response response) {
            try (ResponseBody body = response.body()) {
                if (!response.isSuccessful()) {
                    logger.error("HTTP状态码异常: {}", response.code());
                    resultFuture.complete(Collections.singleton(EMPTY_ROW));
                    return;
                }
                parseResponse(body.string(), resultFuture);
            } catch (Exception e) {
                logger.error("响应处理异常: {}", e.getMessage());
                resultFuture.complete(Collections.singleton(EMPTY_ROW));
            }
        }
    });
}

// --- 响应解析(完整逻辑)---
private void parseResponse(String responseBody, CompletableFuture<Collection<Row>> resultFuture) {
    try {
        JSONObject jsonResponse = JSON.parseObject(responseBody);
        JSONArray choices = jsonResponse.getJSONArray("choices");
        if (choices == null || choices.isEmpty()) {
            throw new Exception("choices数组为空");
        }

        String rawText = choices.getJSONObject(0).getString("text");
        if (rawText == null || rawText.trim().isEmpty()) {
            throw new Exception("AI返回文本为空");
        }

        // 提取JSON片段(严格边界检查)
        int start = rawText.indexOf("{");
        int end = rawText.lastIndexOf("}");
        if (start == -1 || end <= start) {
            throw new Exception("无效的JSON边界: start=" + start + ", end=" + end);
        }
        String jsonStr = rawText.substring(start, end + 1)
                .replace("\n", "")
                .replace("\r", "");

        JSONObject resultJson = JSON.parseObject(jsonStr);
        Row output = Row.of(
                resultJson.getString("productDetails_productName_zh"),
                resultJson.getString("text_zh"),
                resultJson.getString("answer_text_zh")
        );

        // 空字段兜底处理
        for (int i = 0; i < output.getArity(); i++) {
            if (output.getField(i) == null) {
                output.setField(i, "");
            }
        }
        resultFuture.complete(Collections.singleton(output));

    } catch (Exception e) {
        logger.error("解析失败: {}", e.getMessage());
        resultFuture.complete(Collections.singleton(EMPTY_ROW));
    }
}

} flinkSQL:-- DROP FUNCTION IF EXISTS WBRUTransV28; CREATE TEMPORARY FUNCTION wBRUTransV39 AS 'com.heckerstone.udf.WBRUTransV38' LANGUAGE JAVA;

INSERT INTO kl_boot.wb_questions_translated SELECT -- 日期类字段:空值替换为默认日期 COALESCE( NULLIF(TRIM(original.create_date), '1970-01-01'), '1970-01-01' ) AS create_date, COALESCE(NULLIF(original.product_nmid, ''), '00') AS product_nmid, -- 文本类字段:空值替换为空字符串 COALESCE(NULLIF(original.product_name_ru, ''), '00') AS product_name_ru, COALESCE(NULLIF(original.text_ru, ''), '00') AS text_ru, COALESCE(NULLIF(TRIM(original.question_id), ''), '00') AS question_id, COALESCE(NULLIF(original.answer_ru, ''), '00') AS answer_ru, COALESCE(NULLIF(T.answer_zh, ''), '00') AS answer_zh, COALESCE( NULLIF(TRIM(original.answer_date), '1970-01-01'), '1970-01-01' ) AS answer_date, -- 布尔/数值类字段:空值替换为默认值(0/false) COALESCE(original.is_warned, 0) AS is_warned, COALESCE(NULLIF(T.text_zh, ''), '00') AS text_zh, COALESCE(NULLIF(original.state, ''), '00') AS state, COALESCE(original.was_viewed, 0) AS was_viewed, COALESCE(NULLIF(original.product_suppliername, ''), '00') AS product_suppliername, COALESCE( NULLIF(original.product_suppliername_zh, ''), '00' ) AS product_suppliername_zh, COALESCE(NULLIF(TRIM(original.product_imt_id), ''), '00') AS product_imt_id, COALESCE(NULLIF(original.product_brandname, ''), '00') AS product_brandname, COALESCE(NULLIF(original.product_brandname_zh, ''), '00') AS product_brandname_zh, COALESCE(NULLIF(T.product_name_zh, ''), '00') AS product_name_zh, COALESCE(NULLIF(original.product_size, ''), '00') AS product_size, COALESCE( NULLIF(original.product_supplierarticle, ''), '00' ) AS product_supplierarticle FROM kl_boot.wb_questions_source AS original LEFT JOIN LATERAL TABLE ( wBRUTransV39 ( CONCAT('', COALESCE(original.product_name_ru, '00')), CONCAT('', COALESCE(original.text_ru, '00')), CONCAT('', COALESCE(original.answer_ru, '00')) ) ) AS T (product_name_zh, text_zh, answer_zh) ON TRUE;

运行报错: Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature wBRUTransV39(<CHARACTER>, <CHARACTER>, <CHARACTER>) at sun.reflect.GeneratedConstructorAccessor234.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:505) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:599) ... 207 more

Anything else

No response

Version

dev

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

heckerstone avatar Jul 14 '25 03:07 heckerstone

Hello @, this issue has not been active for more than 30 days. This issue will be closed in 7 days if there is no response. If you have any questions, you can comment and reply.

你好 @, 这个 issue 30 天内没有活跃,7 天后将关闭,如需回复,可以评论回复。

github-actions[bot] avatar Sep 01 '25 00:09 github-actions[bot]