iotdb icon indicating copy to clipboard operation
iotdb copied to clipboard

[Bug] 写UDAF的时候,如果不加group by level就能正常接收参数,如果加了group by level就报301

Open sky-sheep opened this issue 1 year ago • 1 comments

Search before asking

  • [X] I searched in the issues and found nothing similar.

Version

1.3.2

Describe the bug and provide the minimal reproduce step

写了一个UDAF,用来统计数量,加了一个参数filter用来过滤数据,如果不加group by level就能够正常执行,如果加了level就不能够正常执行,抛出异常301 不加的sql如下所示: 881715313071_ pic 如果加了 group by level就会报错,如下所示: 871715313067_ pic 我用count这种内置的聚合函数就没问题,即使参数filter这种没有定义的,而且,这个countFill也能够正常接收参数,filter能够传进去, 951715313417_ pic 报错日志如下: 1141715318978_ pic UDAF代码如下: `package org.apache.iotdb.udf.api;

import org.apache.iotdb.udf.api.customizer.config.UDAFConfigurations; import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator; import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters; import org.apache.iotdb.udf.api.type.Type; import org.apache.iotdb.udf.api.utils.ExecuteEval; import org.apache.iotdb.udf.api.utils.ResultValue; import org.apache.tsfile.block.column.Column; import org.apache.tsfile.utils.BitMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List;

/**

  • 个数统计函数

  • 获取所有符合条件的个数

  • @author ylx */ public class CountFill implements UDAF {

    private Logger logger = LoggerFactory.getLogger(CountFill.class);

    private Type dataType;

    private List<String> filter; static class CountState implements State {

     Long count;
    
     @Override
     public void reset() {
         count = 0L;
     }
    
     @Override
     public byte[] serialize() {
         ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES + Long.BYTES);
         buffer.putLong(count);
    
         return buffer.array();
     }
    
     @Override
     public void deserialize(byte[] bytes) {
         ByteBuffer buffer = ByteBuffer.wrap(bytes);
         count = buffer.getLong();
     }
    

    }

    /**

    • 在初始化方法beforeStart调用前执行,用于检测UDFParameters中用户输入的参数是否合法。该方法与 UDTF 的validate相同。 */ @Override public void validate(UDFParameterValidator validator) throws Exception { validator .validateInputSeriesNumber(1) .validateInputSeriesDataType(0, Type.INT32, Type.INT64, Type.FLOAT, Type.DOUBLE,Type.TEXT); }

    /**

    • 初始化方法,在 UDAF 处理输入数据前,调用用户自定义的初始化行为。与 UDTF 不同的是,这里的 configuration 是 UDAFConfiguration 类型
    • @param parameters
    • @param configurations */ @Override public void beforeStart(UDFParameters parameters, UDAFConfigurations configurations) { String inputFilter = parameters.getString("filter"); if (inputFilter !=null ){ filter = (Arrays.asList(inputFilter.split(","))); } dataType = parameters.getDataType(0); configurations.setOutputDataType(Type.INT64); }

    /**

    • 创建State对象,一般只需要调用默认构造函数,然后按需修改默认的初始值即可。 */ @Override public State createState() { CountState countState = new CountState(); countState.count = 0L; return countState; }

    /**

    • 根据传入的数据Column[]批量地更新State对象,注意 column[0] 总是代表时间列。另外BitMap表示之前已经被过滤掉的数据,您在编写该方法时需要手动判断对应的数据是否被过滤掉
    • values
    • @param state state to be updated
    • @param columns input columns from IoTDB TsBlock, time column is always the last column, the
    •            remaining columns are their parameter value columns
      
    • @param bitMap define some filtered position in columns */ @Override public void addInput(State state, Column[] columns, BitMap bitMap) { CountState countState = (CountState) state; switch (dataType) { case INT32: case INT64: case FLOAT: case DOUBLE: case TEXT: case BOOLEAN: default: addIntInput(countState, columns, bitMap); break; } }

    public void addIntInput(CountState state, Column[] columns, BitMap bitMap) { int count = columns[0].getPositionCount(); for (int i = 0; i < count; i++) { Boolean filterFlag = true; if (bitMap != null && !bitMap.isMarked(i)) { continue; } if (filter != null && filter.size()>0){ for (String fill : filter) { String comparisonOperators = ExecuteEval.findComparisonOperators(fill); String replace = fill.replace(comparisonOperators, "").trim(); if (ExecuteEval.isString(replace,dataType)){ // 如果是字符串,则将其拼接上单引号 replace = ("'"+replace+"'").trim(); switch (dataType){ case INT32: // int类型 filterFlag =ExecuteEval.evalString("('"+columns[0].getInt(i)+"'"+comparisonOperators+replace+")?true:false"); break; case INT64: filterFlag =ExecuteEval.evalString("('"+columns[0].getLong(i)+"'"+comparisonOperators+replace+")?true:false"); break; case FLOAT: filterFlag =ExecuteEval.evalString("('"+columns[0].getFloat(i)+"'"+comparisonOperators+replace+")?true:false"); break; case DOUBLE: filterFlag =ExecuteEval.evalString("('"+columns[0].getDouble(i)+"'"+comparisonOperators+replace+")?true:false"); break; case TEXT: filterFlag =ExecuteEval.evalString("('"+columns[0].getObject(i)+"'"+comparisonOperators+replace+")?true:false"); break; default: break; }

                 }else {
                     switch (dataType){
                         case INT32:
                             // int类型
                             filterFlag =ExecuteEval.evalString("("+columns[0].getInt(i)+comparisonOperators+replace+")?true:false");
                             break;
                         case INT64:
                             filterFlag =ExecuteEval.evalString("("+columns[0].getLong(i)+comparisonOperators+replace+")?true:false");
                             break;
                         case FLOAT:
                             filterFlag =ExecuteEval.evalString("("+columns[0].getFloat(i)+comparisonOperators+replace+")?true:false");
                             break;
                         case DOUBLE:
                             filterFlag =ExecuteEval.evalString("("+columns[0].getDouble(i)+comparisonOperators+replace+")?true:false");
                             break;
                         case TEXT:
                             filterFlag =ExecuteEval.evalString("("+columns[0].getObject(i)+comparisonOperators+replace+")?true:false");
                             break;
                         default:
                             break;
                     }
                 }
             }
         }
         if (!columns[0].isNull(i) && filterFlag) {
             state.count++;
         }
     }
    

    }

    /**

    • 将rhs状态合并至state状态中。在分布式场景下,同一组的数据可能分布在不同节点上,IoTDB 会为每个节点上的部分数据生成一个State对象,然后调用该方法合并成完整的State。

    • @param state current state

    • @param rhs right-hand-side state to be merged */ @Override public void combineState(State state, State rhs) { CountState avgState = (CountState) state; CountState avgRhs = (CountState) rhs;

      avgState.count += avgRhs.count; }

    /**

    • 根据State中的数据,计算出最终的聚合结果。注意根据聚合的语义,每一组只能输出一个值。

    • @param state final state

    • @param resultValue used to collect output data points */ @Override public void outputFinal(State state, ResultValue resultValue) { CountState avgState = (CountState) state;

      resultValue.setLong(avgState.count); }

}`

What did you expect to see?

What did you see instead?

Anything else?

Are you willing to submit a PR?

  • [ ] I'm willing to submit a PR!

sky-sheep avatar May 11 '24 04:05 sky-sheep

Hi, this is your first issue in IoTDB project. Thanks for your report. Welcome to join the community!

github-actions[bot] avatar May 11 '24 04:05 github-actions[bot]