datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Move min and max to user defined aggregate function

Open edmondop opened this issue 1 year ago • 18 comments

Which issue does this PR close?

Closes #10943 .

  • [x] Sliding window accumulator for min and max

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

edmondop avatar Jun 19 '24 18:06 edmondop

I do have something that's starting to look reasonable, but some tests on the optimizer now are failing for some reasons I can't understand

running 1 test
test custom_sources_cases::optimizers_catch_all_statistics ... FAILED

successes:

successes:

failures:

---- custom_sources_cases::optimizers_catch_all_statistics stdout ----
thread 'custom_sources_cases::optimizers_catch_all_statistics' panicked at datafusion/core/tests/custom_sources_cases/mod.rs:274:5:
Expected aggregate_statistics optimizations missing: AggregateExec { mode: Single, group_by: PhysicalGroupBy { expr: [], null_expr: [], groups: [[]] }, aggr_expr: [AggregateFunctionExpr { fun: AggregateUDF { inner: Count { name: "COUNT", signature: Signature { type_signature: VariadicAny, volatility: Immutable } } }, args: [Literal { value: Int64(1) }], logical_args: [Literal(Int64(1))], data_type: Int64, name: "COUNT(*)", schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, sort_exprs: [], ordering_req: [], ignore_nulls: false, ordering_fields: [], is_distinct: false, input_type: Int64 }, AggregateFunctionExpr { fun: AggregateUDF { inner: Min { signature: Signature { type_signature: Numeric(1), volatility: Immutable }, aliases: ["min"] } }, args: [Column { name: "c1", index: 0 }], logical_args: [Column(Column { relation: Some(Bare { table: "test" }), name: "c1" })], data_type: Int32, name: "MIN(test.c1)", schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, sort_exprs: [], ordering_req: [], ignore_nulls: false, ordering_fields: [], is_distinct: false, input_type: Int32 }, AggregateFunctionExpr { fun: AggregateUDF { inner: Max { signature: Signature { type_signature: Numeric(1), volatility: Immutable }, aliases: ["max"] } }, args: [Column { name: "c1", index: 0 }], logical_args: [Column(Column { relation: Some(Bare { table: "test" }), name: "c1" })], data_type: Int32, name: "MAX(test.c1)", schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, sort_exprs: [], ordering_req: [], ignore_nulls: false, ordering_fields: [], is_distinct: false, input_type: Int32 }], filter_expr: [None, None, None], limit: None, input: CustomExecutionPlan { projection: Some([0]), cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, constants: [], schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} } }, partitioning: UnknownPartitioning(1), execution_mode: Bounded, output_ordering: None } }, schema: Schema { fields: [Field { name: "COUNT(*)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MIN(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MAX(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, input_schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [] } } }, required_input_ordering: None, input_order_mode: Linear, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, constants: [], schema: Schema { fields: [Field { name: "COUNT(*)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MIN(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MAX(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} } }, partitioning: UnknownPartitioning(1), execution_mode: Bounded, output_ordering: None } }

edmondop avatar Jun 20 '24 20:06 edmondop

I do have something that's starting to look reasonable, but some tests on the optimizer now are failing for some reasons I can't understand

running 1 test
test custom_sources_cases::optimizers_catch_all_statistics ... FAILED

successes:

successes:

failures:

---- custom_sources_cases::optimizers_catch_all_statistics stdout ----
thread 'custom_sources_cases::optimizers_catch_all_statistics' panicked at datafusion/core/tests/custom_sources_cases/mod.rs:274:5:
Expected aggregate_statistics optimizations missing: AggregateExec { mode: Single, group_by: PhysicalGroupBy { expr: [], null_expr: [], groups: [[]] }, aggr_expr: [AggregateFunctionExpr { fun: AggregateUDF { inner: Count { name: "COUNT", signature: Signature { type_signature: VariadicAny, volatility: Immutable } } }, args: [Literal { value: Int64(1) }], logical_args: [Literal(Int64(1))], data_type: Int64, name: "COUNT(*)", schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, sort_exprs: [], ordering_req: [], ignore_nulls: false, ordering_fields: [], is_distinct: false, input_type: Int64 }, AggregateFunctionExpr { fun: AggregateUDF { inner: Min { signature: Signature { type_signature: Numeric(1), volatility: Immutable }, aliases: ["min"] } }, args: [Column { name: "c1", index: 0 }], logical_args: [Column(Column { relation: Some(Bare { table: "test" }), name: "c1" })], data_type: Int32, name: "MIN(test.c1)", schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, sort_exprs: [], ordering_req: [], ignore_nulls: false, ordering_fields: [], is_distinct: false, input_type: Int32 }, AggregateFunctionExpr { fun: AggregateUDF { inner: Max { signature: Signature { type_signature: Numeric(1), volatility: Immutable }, aliases: ["max"] } }, args: [Column { name: "c1", index: 0 }], logical_args: [Column(Column { relation: Some(Bare { table: "test" }), name: "c1" })], data_type: Int32, name: "MAX(test.c1)", schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, sort_exprs: [], ordering_req: [], ignore_nulls: false, ordering_fields: [], is_distinct: false, input_type: Int32 }], filter_expr: [None, None, None], limit: None, input: CustomExecutionPlan { projection: Some([0]), cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, constants: [], schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} } }, partitioning: UnknownPartitioning(1), execution_mode: Bounded, output_ordering: None } }, schema: Schema { fields: [Field { name: "COUNT(*)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MIN(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MAX(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, input_schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [] } } }, required_input_ordering: None, input_order_mode: Linear, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, constants: [], schema: Schema { fields: [Field { name: "COUNT(*)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MIN(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MAX(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} } }, partitioning: UnknownPartitioning(1), execution_mode: Bounded, output_ordering: None } }

I guess you skip the aggregate statistic optimization for min/max

https://github.com/apache/datafusion/blob/18042fd69138e19613844580408a71a200ea6caa/datafusion/core/src/physical_optimizer/aggregate_statistics.rs#L177-L224

You might need to check if the AggregateExpr is min/max in take_optimizable_min and take_optimizable_max

jayzhan211 avatar Jun 21 '24 02:06 jayzhan211

I fixed this but now I have a test that doesn't pass on the optimizer (there are two actually)

---- single_distinct_to_groupby::tests::two_distinct_and_one_common stdout ----
thread 'single_distinct_to_groupby::tests::two_distinct_and_one_common' panicked at datafusion/optimizer/src/test/mod.rs:200:5:
assertion `left == right` failed
  left: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias3) AS MAX(test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias3)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias3):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"
 right: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias1)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias1):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:UInt64;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"

That suggests that the optimizer cannot use the existing aliases / doesn't understand the existing aliases that provide DISTINCT test.b . Looking, any tip would be highly appreciated

edmondop avatar Jun 21 '24 16:06 edmondop

I fixed this but now I have a test that doesn't pass on the optimizer (there are two actually)

---- single_distinct_to_groupby::tests::two_distinct_and_one_common stdout ----
thread 'single_distinct_to_groupby::tests::two_distinct_and_one_common' panicked at datafusion/optimizer/src/test/mod.rs:200:5:
assertion `left == right` failed
  left: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias3) AS MAX(test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias3)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias3):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"
 right: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias1)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias1):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:UInt64;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"

That suggests that the optimizer cannot use the existing aliases / doesn't understand the existing aliases that provide DISTINCT test.b . Looking, any tip would be highly appreciated

I think we should add distinct for MIN/MAX so we can get the distinct after group by is converted to distinct function

But I think there is no difference between MIN and Distinct Min, maybe we could remove distinct for MIN/MAX beforehand? Introduce EliminateDistinct optimize rule for MIN/MAX.

jayzhan211 avatar Jun 22 '24 00:06 jayzhan211

I fixed this but now I have a test that doesn't pass on the optimizer (there are two actually)

---- single_distinct_to_groupby::tests::two_distinct_and_one_common stdout ----
thread 'single_distinct_to_groupby::tests::two_distinct_and_one_common' panicked at datafusion/optimizer/src/test/mod.rs:200:5:
assertion `left == right` failed
  left: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias3) AS MAX(test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias3)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias3):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"
 right: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias1)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias1):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:UInt64;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"

That suggests that the optimizer cannot use the existing aliases / doesn't understand the existing aliases that provide DISTINCT test.b . Looking, any tip would be highly appreciated

I think we should add distinct for MIN/MAX so we can get the distinct after group by is converted to distinct function

But I think there is no difference between MIN and Distinct Min, maybe we could remove distinct for MIN/MAX beforehand? Introduce EliminateDistinct optimize rule for MIN/MAX.

Is this a part of the optimizer i.e. https://github.com/edmondop/arrow-datafusion/blob/main/datafusion/optimizer/src/replace_distinct_aggregate.rs ? Thank your for your help btw

edmondop avatar Jun 22 '24 01:06 edmondop

I fixed this but now I have a test that doesn't pass on the optimizer (there are two actually)

---- single_distinct_to_groupby::tests::two_distinct_and_one_common stdout ----
thread 'single_distinct_to_groupby::tests::two_distinct_and_one_common' panicked at datafusion/optimizer/src/test/mod.rs:200:5:
assertion `left == right` failed
  left: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias3) AS MAX(test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias3)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias3):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"
 right: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias1)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias1):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:UInt64;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"

That suggests that the optimizer cannot use the existing aliases / doesn't understand the existing aliases that provide DISTINCT test.b . Looking, any tip would be highly appreciated

I think we should add distinct for MIN/MAX so we can get the distinct after group by is converted to distinct function But I think there is no difference between MIN and Distinct Min, maybe we could remove distinct for MIN/MAX beforehand? Introduce EliminateDistinct optimize rule for MIN/MAX.

Is this a part of the optimizer i.e. https://github.com/edmondop/arrow-datafusion/blob/main/datafusion/optimizer/src/replace_distinct_aggregate.rs ? Thank your for your help btw

I don't think so, Distinct/Distinct On is different from distinct in the function.

jayzhan211 avatar Jun 22 '24 01:06 jayzhan211

@jayzhan211 I have started experimenting with an optimizer rule, but removing the distinct result in such an error:

running 2 tests
test eliminate_distinct::tests::eliminate_distinct_from_min_expr ... FAILED
test eliminate_nested_union::tests::eliminate_distinct_nothing ... ok

failures:

---- eliminate_distinct::tests::eliminate_distinct_from_min_expr stdout ----
Transformed yes true
Error: Context("Optimizer rule 'eliminate_distinct' failed", Context("eliminate_distinct", Internal("Failed due to a difference in schemas, original schema: DFSchema { inner: Schema { fields: [Field { name: \"a\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"MIN(DISTINCT test.b)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, field_qualifiers: [Some(Bare { table: \"test\" }), None], functional_dependencies: FunctionalDependencies { deps: [FunctionalDependence { source_indices: [0], target_indices: [0, 1], nullable: false, mode: Single }] } }, new schema: DFSchema { inner: Schema { fields: [Field { name: \"a\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"MIN(test.b)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, field_qualifiers: [Some(Bare { table: \"test\" }), None], functional_dependencies: FunctionalDependencies { deps: [FunctionalDependence { source_indices: [0], target_indices: [0, 1], nullable: false, mode: Single }] } }")))

Do I need to change also the equivalence rules?

edmondop avatar Jun 23 '24 20:06 edmondop

eliminate_distinct_from_min_expr

You can take single_distinct_groupby as reference, there is alias to remain schema equivalence. Also, I suggest we introduce this rule in another PR, not mixing this with MIN/MAX UDAF.

jayzhan211 avatar Jun 23 '24 23:06 jayzhan211

Thanks. I guess I wasn't clear in my comment here https://github.com/apache/datafusion/pull/11013#issuecomment-2183027880 . How should that test failure be addressed? It seems that min/max udaf uses other aliases and is not reusing the intermediate results already available

edmondop avatar Jun 24 '24 00:06 edmondop

Thanks. I guess I wasn't clear in my comment here #11013 (comment) . How should that test failure be addressed? It seems that min/max udaf uses other aliases and is not reusing the intermediate results already available

If we eliminate distinct of min/max prior to single_distinct_to_group_by, we don't expect to get distinct min/max at this point, we should rewrite the test to other function like sum.

jayzhan211 avatar Jun 24 '24 06:06 jayzhan211

---- single_distinct_to_groupby::tests::two_distinct_and_one_common

Wouldn't eliminating it require the optimizer rule? Or do you suggest I update the test case? Or the expected value?

edmondop avatar Jun 24 '24 11:06 edmondop

---- single_distinct_to_groupby::tests::two_distinct_and_one_common

Wouldn't eliminating it require the optimizer rule? Or do you suggest I update the test case? Or the expected value?

Yes, I suggest we update the test like

    #[test]
    fn one_distinct_and_two_common() -> Result<()> {
        let table_scan = test_table_scan()?;

        let plan = LogicalPlanBuilder::from(table_scan)
            .aggregate(
                vec![col("a")],
                vec![sum(col("c")), count_distinct(col("b")), max(col("b"))],
            )?
            .build()?;
        // Should work
        let expected = "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias3) AS MAX(test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias3)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias3):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]";

        assert_optimized_plan_equal(plan, expected)
    }

jayzhan211 avatar Jun 24 '24 11:06 jayzhan211

There seems to be a column added to the Aggregate node in the logical plan, can that affect performance and/or memory footprint? This was the reason why I didn't update the test in the first place

This is a subset of the new plan

aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"

while this is the subset from the previous plan

Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:UInt64;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"

there is an alias3:UInt64 that gets added

edmondop avatar Jun 24 '24 12:06 edmondop

There seems to be a column added to the Aggregate node in the logical plan, can that affect performance and/or memory footprint? This was the reason why I didn't update the test in the first place

This is a subset of the new plan

aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"

while this is the subset from the previous plan

Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:UInt64;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"

there is an alias3:UInt64 that gets added

Remove the Min/Max matching in is_single_distinct_agg and the alias is removed

jayzhan211 avatar Jun 24 '24 14:06 jayzhan211

I think as long as you can explain me how to resolve the current test failure I should be fine. Agree using names for min and max unwrapping is not very robust

edmondop avatar Jun 27 '24 20:06 edmondop

Now that I have spent some more time working with this PR I see it still needs some additional work -- sorry for the noise

I think as long as you can explain me how to resolve the current test failure I should be fine. Agree using names for min and max unwrapping is not very robust

I started with merging up from main and resolving the conflicts: https://github.com/edmondop/arrow-datafusion/pull/1

Once that is merged / ready I think we could keep hacking at this PR together

Alternately, we could potentially make some smaller PRs to remove the barriers / unblock this one -- for example we could remove the direct use of the Min/Max PhysicalExprs

For example in https://github.com/apache/datafusion/pull/11013#discussion_r1657767157

As well as here: https://github.com/apache/datafusion/blob/f58df32753a06dd1b67597a12cdf68007e249338/datafusion/physical-plan/src/aggregates/mod.rs#L485-L494

If you are interested, I can file tickets explaining how those smaller tasks

alamb avatar Jun 27 '24 21:06 alamb

Now that I have spent some more time working with this PR I see it still needs some additional work -- sorry for the noise

I think as long as you can explain me how to resolve the current test failure I should be fine. Agree using names for min and max unwrapping is not very robust

I started with merging up from main and resolving the conflicts: https://github.com/edmondop/arrow-datafusion/pull/1

Once that is merged / ready I think we could keep hacking at this PR together

Alternately, we could potentially make some smaller PRs to remove the barriers / unblock this one -- for example we could remove the direct use of the Min/Max PhysicalExprs

For example in https://github.com/apache/datafusion/pull/11013#discussion_r1657767157

As well as here: https://github.com/apache/datafusion/blob/f58df32753a06dd1b67597a12cdf68007e249338/datafusion/physical-plan/src/aggregates/mod.rs#L485-L494

If you are interested, I can file tickets explaining how those smaller tasks

Yes 🙏

edmondop avatar Jun 27 '24 21:06 edmondop

If you are interested, I can file tickets explaining how those smaller tasks

Yes 🙏

Ok, I filed https://github.com/apache/datafusion/issues/11153 and then some starting tasks like this

Task List

  • [ ] https://github.com/apache/datafusion/issues/11152
  • [ ] https://github.com/apache/datafusion/issues/11153

Hopefully that helps

alamb avatar Jun 28 '24 00:06 alamb

Let me help with this, I think the current blocker are failed tests in CI?

jayzhan211 avatar Jul 17 '24 12:07 jayzhan211

optimizers_catch_all_statistics failed because aggregate_statictis are not checking with UDAF

fn is_min(agg_expr: &dyn AggregateExpr) -> bool {
    if let Some(expr) = agg_expr.as_any().downcast_ref::<AggregateFunctionExpr>() {
        return expr.fun().name() == "MIN"    
    }

    false
}
fn is_max(agg_expr: &dyn AggregateExpr) -> bool {
    if let Some(expr) = agg_expr.as_any().downcast_ref::<AggregateFunctionExpr>() {
        return expr.fun().name() == "MAX"    
    }

    false
}

jayzhan211 avatar Jul 17 '24 12:07 jayzhan211

I think we need to make get_minmax_desc an UDAF method before moving on

jayzhan211 avatar Jul 17 '24 13:07 jayzhan211

~btw, there is nullable in min/max too, should we keep it 🤔~ It is always true, so not an issue

jayzhan211 avatar Jul 17 '24 13:07 jayzhan211

We also need to add get_minmax_desc method to AggregateUDAFImpl

jayzhan211 avatar Jul 17 '24 13:07 jayzhan211

optimizers_catch_all_statistics failed because aggregate_statictis are not checking with UDAF

fn is_min(agg_expr: &dyn AggregateExpr) -> bool {
    if let Some(expr) = agg_expr.as_any().downcast_ref::<AggregateFunctionExpr>() {
        return expr.fun().name() == "MIN"    
    }

    false
}
fn is_max(agg_expr: &dyn AggregateExpr) -> bool {
    if let Some(expr) = agg_expr.as_any().downcast_ref::<AggregateFunctionExpr>() {
        return expr.fun().name() == "MAX"    
    }

    false
}

This change was issued here https://github.com/apache/datafusion/pull/11261/files

edmondop avatar Jul 17 '24 13:07 edmondop

@jayzhan211 I am missing a piece, AggregateExpr implementation doesn't include AggregateUDFImpl can you clarify?

edmondop avatar Jul 18 '24 23:07 edmondop

@edmondop CI is still red. Btw the PR grows so large, maybe you can try to split the changes into several PRs if you can't pass the CI.

jayzhan211 avatar Jul 21 '24 00:07 jayzhan211

@edmondop CI is still red. Btw the PR grows so large, maybe you can try to split the changes into several PRs if you can't pass the CI.

@jayzhan211 yes, it's still unclear to me what the fix is for the single_distinct_group. I think if you guide me there we can get this ready soon

edmondop avatar Jul 21 '24 01:07 edmondop

@jayzhan211 I am missing a piece, AggregateExpr implementation doesn't include AggregateUDFImpl can you clarify?

Yes, it makes sense.

All the UDAFs are in AggregateFunctionExpr, and you can get AggregateUDFImpl method from it.

jayzhan211 avatar Jul 21 '24 01:07 jayzhan211

I think it is because of you remove distinct from min. Do you plan to add eliminate distinct in min/max rule in another PR first? Or you can also deal with distinct in this PR first.

jayzhan211 avatar Jul 21 '24 02:07 jayzhan211

I think it is because of you remove distinct from min. Do you plan to add eliminate distinct in min/max rule in another PR first? Or you can also deal with distinct in this PR first.

I will eliminate distinct min/max in a separate PR, to avoid growing this too much. Where did I remove distinct from min?

edmondop avatar Jul 21 '24 02:07 edmondop