datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Decimal128 support for statistical aggregations

Open metesynnada opened this issue 3 years ago • 2 comments

Is your feature request related to a problem or challenge? Please describe what you are trying to do. Inside Accumulator implementation of VarianceAccumulator (and many others like CovarianceAccumulator) update_batch method casts every datatype into Float64Array by default. This makes statistical aggregations unsupported for Decimal128Array.

Describe the solution you'd like If we implement the necessary traits for ScalarValue struct, like std::ops::Div and std::ops::Mul, we can safely use the ScalarValue for the Decimal128 and Float64 calculations without default coercing to the Float64.

Describe alternatives you've considered N.A

Additional context https://github.com/apache/arrow-datafusion/blob/12f047e365a56399bf6fc1feaf25708c2afc5ba7/datafusion/physical-expr/src/aggregate/variance.rs#L223

Steps to reproduce the behavior:

#[tokio::test]
async fn statistical_agg_decimal() -> Result<()> {
    use datafusion::arrow::datatypes::{Field, Schema};
    use datafusion::datasource::MemTable;
    // define a schema.
    let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Decimal128(10,2), true)]));

    // define data in two partitions
    let batch = RecordBatch::try_new(
        schema.clone(),
        vec![Arc::new(
        (1..100)
            .map(|i| if i == 2 { None } else { Some(i) })
            .collect::<Decimal128Array>()
            .with_precision_and_scale(10, 2)?,
    )],
    )?;
    // declare a new context. In spark API, this corresponds to a new spark SQLsession
    let ctx = SessionContext::new();

    // declare a table in memory. In spark API, this corresponds to createDataFrame(...).
    let provider = MemTable::try_new(schema, vec![vec![batch]])?;
    ctx.register_table("t", Arc::new(provider))?;

    let sql = "SELECT \
               VAR(a) OVER()\
               FROM t";

    let df = ctx.sql(sql).await?;
    df.show().await?;
    Ok(())
}

produces

Error: Plan("The function Variance does not support inputs of type Decimal128(10, 2).") 

metesynnada avatar Sep 21 '22 13:09 metesynnada

There's a similar issue here -> #3481. I'd like to support the same, for PG compatibility, and as a stepping stone toward handling SQL literals as decimals rather than floats.

kmitchener avatar Sep 21 '22 13:09 kmitchener

I can help to review it in the agg function.

liukun4515 avatar Sep 21 '22 14:09 liukun4515