[Bug] 写UDAF的时候,如果不加group by level就能正常接收参数,如果加了group by level就报301
Search before asking
- [X] I searched in the issues and found nothing similar.
Version
1.3.2
Describe the bug and provide the minimal reproduce step
写了一个UDAF,用来统计数量,加了一个参数filter用来过滤数据,如果不加group by level就能够正常执行,如果加了level就不能够正常执行,抛出异常301
不加的sql如下所示:
如果加了 group by level就会报错,如下所示:
我用count这种内置的聚合函数就没问题,即使参数filter这种没有定义的,而且,这个countFill也能够正常接收参数,filter能够传进去,
报错日志如下:
UDAF代码如下:
`package org.apache.iotdb.udf.api;
import org.apache.iotdb.udf.api.customizer.config.UDAFConfigurations; import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator; import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters; import org.apache.iotdb.udf.api.type.Type; import org.apache.iotdb.udf.api.utils.ExecuteEval; import org.apache.iotdb.udf.api.utils.ResultValue; import org.apache.tsfile.block.column.Column; import org.apache.tsfile.utils.BitMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List;
/**
-
个数统计函数
-
获取所有符合条件的个数
-
@author ylx */ public class CountFill implements UDAF {
private Logger logger = LoggerFactory.getLogger(CountFill.class);
private Type dataType;
private List<String> filter; static class CountState implements State {
Long count; @Override public void reset() { count = 0L; } @Override public byte[] serialize() { ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES + Long.BYTES); buffer.putLong(count); return buffer.array(); } @Override public void deserialize(byte[] bytes) { ByteBuffer buffer = ByteBuffer.wrap(bytes); count = buffer.getLong(); }}
/**
- 在初始化方法beforeStart调用前执行,用于检测UDFParameters中用户输入的参数是否合法。该方法与 UDTF 的validate相同。 */ @Override public void validate(UDFParameterValidator validator) throws Exception { validator .validateInputSeriesNumber(1) .validateInputSeriesDataType(0, Type.INT32, Type.INT64, Type.FLOAT, Type.DOUBLE,Type.TEXT); }
/**
- 初始化方法,在 UDAF 处理输入数据前,调用用户自定义的初始化行为。与 UDTF 不同的是,这里的 configuration 是 UDAFConfiguration 类型
- @param parameters
- @param configurations */ @Override public void beforeStart(UDFParameters parameters, UDAFConfigurations configurations) { String inputFilter = parameters.getString("filter"); if (inputFilter !=null ){ filter = (Arrays.asList(inputFilter.split(","))); } dataType = parameters.getDataType(0); configurations.setOutputDataType(Type.INT64); }
/**
- 创建State对象,一般只需要调用默认构造函数,然后按需修改默认的初始值即可。 */ @Override public State createState() { CountState countState = new CountState(); countState.count = 0L; return countState; }
/**
- 根据传入的数据Column[]批量地更新State对象,注意 column[0] 总是代表时间列。另外BitMap表示之前已经被过滤掉的数据,您在编写该方法时需要手动判断对应的数据是否被过滤掉
- values
- @param state state to be updated
- @param columns input columns from IoTDB TsBlock, time column is always the last column, the
-
remaining columns are their parameter value columns - @param bitMap define some filtered position in columns */ @Override public void addInput(State state, Column[] columns, BitMap bitMap) { CountState countState = (CountState) state; switch (dataType) { case INT32: case INT64: case FLOAT: case DOUBLE: case TEXT: case BOOLEAN: default: addIntInput(countState, columns, bitMap); break; } }
public void addIntInput(CountState state, Column[] columns, BitMap bitMap) { int count = columns[0].getPositionCount(); for (int i = 0; i < count; i++) { Boolean filterFlag = true; if (bitMap != null && !bitMap.isMarked(i)) { continue; } if (filter != null && filter.size()>0){ for (String fill : filter) { String comparisonOperators = ExecuteEval.findComparisonOperators(fill); String replace = fill.replace(comparisonOperators, "").trim(); if (ExecuteEval.isString(replace,dataType)){ // 如果是字符串,则将其拼接上单引号 replace = ("'"+replace+"'").trim(); switch (dataType){ case INT32: // int类型 filterFlag =ExecuteEval.evalString("('"+columns[0].getInt(i)+"'"+comparisonOperators+replace+")?true:false"); break; case INT64: filterFlag =ExecuteEval.evalString("('"+columns[0].getLong(i)+"'"+comparisonOperators+replace+")?true:false"); break; case FLOAT: filterFlag =ExecuteEval.evalString("('"+columns[0].getFloat(i)+"'"+comparisonOperators+replace+")?true:false"); break; case DOUBLE: filterFlag =ExecuteEval.evalString("('"+columns[0].getDouble(i)+"'"+comparisonOperators+replace+")?true:false"); break; case TEXT: filterFlag =ExecuteEval.evalString("('"+columns[0].getObject(i)+"'"+comparisonOperators+replace+")?true:false"); break; default: break; }
}else { switch (dataType){ case INT32: // int类型 filterFlag =ExecuteEval.evalString("("+columns[0].getInt(i)+comparisonOperators+replace+")?true:false"); break; case INT64: filterFlag =ExecuteEval.evalString("("+columns[0].getLong(i)+comparisonOperators+replace+")?true:false"); break; case FLOAT: filterFlag =ExecuteEval.evalString("("+columns[0].getFloat(i)+comparisonOperators+replace+")?true:false"); break; case DOUBLE: filterFlag =ExecuteEval.evalString("("+columns[0].getDouble(i)+comparisonOperators+replace+")?true:false"); break; case TEXT: filterFlag =ExecuteEval.evalString("("+columns[0].getObject(i)+comparisonOperators+replace+")?true:false"); break; default: break; } } } } if (!columns[0].isNull(i) && filterFlag) { state.count++; } }}
/**
-
将rhs状态合并至state状态中。在分布式场景下,同一组的数据可能分布在不同节点上,IoTDB 会为每个节点上的部分数据生成一个State对象,然后调用该方法合并成完整的State。
-
@param state current state
-
@param rhs right-hand-side state to be merged */ @Override public void combineState(State state, State rhs) { CountState avgState = (CountState) state; CountState avgRhs = (CountState) rhs;
avgState.count += avgRhs.count; }
/**
-
根据State中的数据,计算出最终的聚合结果。注意根据聚合的语义,每一组只能输出一个值。
-
@param state final state
-
@param resultValue used to collect output data points */ @Override public void outputFinal(State state, ResultValue resultValue) { CountState avgState = (CountState) state;
resultValue.setLong(avgState.count); }
}`
What did you expect to see?
无
What did you see instead?
无
Anything else?
无
Are you willing to submit a PR?
- [ ] I'm willing to submit a PR!
Hi, this is your first issue in IoTDB project. Thanks for your report. Welcome to join the community!