Move min and max to user defined aggregate function
Which issue does this PR close?
Closes #10943 .
- [x] Sliding window accumulator for min and max
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?
I do have something that's starting to look reasonable, but some tests on the optimizer now are failing for some reasons I can't understand
running 1 test
test custom_sources_cases::optimizers_catch_all_statistics ... FAILED
successes:
successes:
failures:
---- custom_sources_cases::optimizers_catch_all_statistics stdout ----
thread 'custom_sources_cases::optimizers_catch_all_statistics' panicked at datafusion/core/tests/custom_sources_cases/mod.rs:274:5:
Expected aggregate_statistics optimizations missing: AggregateExec { mode: Single, group_by: PhysicalGroupBy { expr: [], null_expr: [], groups: [[]] }, aggr_expr: [AggregateFunctionExpr { fun: AggregateUDF { inner: Count { name: "COUNT", signature: Signature { type_signature: VariadicAny, volatility: Immutable } } }, args: [Literal { value: Int64(1) }], logical_args: [Literal(Int64(1))], data_type: Int64, name: "COUNT(*)", schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, sort_exprs: [], ordering_req: [], ignore_nulls: false, ordering_fields: [], is_distinct: false, input_type: Int64 }, AggregateFunctionExpr { fun: AggregateUDF { inner: Min { signature: Signature { type_signature: Numeric(1), volatility: Immutable }, aliases: ["min"] } }, args: [Column { name: "c1", index: 0 }], logical_args: [Column(Column { relation: Some(Bare { table: "test" }), name: "c1" })], data_type: Int32, name: "MIN(test.c1)", schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, sort_exprs: [], ordering_req: [], ignore_nulls: false, ordering_fields: [], is_distinct: false, input_type: Int32 }, AggregateFunctionExpr { fun: AggregateUDF { inner: Max { signature: Signature { type_signature: Numeric(1), volatility: Immutable }, aliases: ["max"] } }, args: [Column { name: "c1", index: 0 }], logical_args: [Column(Column { relation: Some(Bare { table: "test" }), name: "c1" })], data_type: Int32, name: "MAX(test.c1)", schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, sort_exprs: [], ordering_req: [], ignore_nulls: false, ordering_fields: [], is_distinct: false, input_type: Int32 }], filter_expr: [None, None, None], limit: None, input: CustomExecutionPlan { projection: Some([0]), cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, constants: [], schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} } }, partitioning: UnknownPartitioning(1), execution_mode: Bounded, output_ordering: None } }, schema: Schema { fields: [Field { name: "COUNT(*)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MIN(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MAX(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, input_schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [] } } }, required_input_ordering: None, input_order_mode: Linear, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, constants: [], schema: Schema { fields: [Field { name: "COUNT(*)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MIN(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MAX(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} } }, partitioning: UnknownPartitioning(1), execution_mode: Bounded, output_ordering: None } }
I do have something that's starting to look reasonable, but some tests on the optimizer now are failing for some reasons I can't understand
running 1 test test custom_sources_cases::optimizers_catch_all_statistics ... FAILED successes: successes: failures: ---- custom_sources_cases::optimizers_catch_all_statistics stdout ---- thread 'custom_sources_cases::optimizers_catch_all_statistics' panicked at datafusion/core/tests/custom_sources_cases/mod.rs:274:5: Expected aggregate_statistics optimizations missing: AggregateExec { mode: Single, group_by: PhysicalGroupBy { expr: [], null_expr: [], groups: [[]] }, aggr_expr: [AggregateFunctionExpr { fun: AggregateUDF { inner: Count { name: "COUNT", signature: Signature { type_signature: VariadicAny, volatility: Immutable } } }, args: [Literal { value: Int64(1) }], logical_args: [Literal(Int64(1))], data_type: Int64, name: "COUNT(*)", schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, sort_exprs: [], ordering_req: [], ignore_nulls: false, ordering_fields: [], is_distinct: false, input_type: Int64 }, AggregateFunctionExpr { fun: AggregateUDF { inner: Min { signature: Signature { type_signature: Numeric(1), volatility: Immutable }, aliases: ["min"] } }, args: [Column { name: "c1", index: 0 }], logical_args: [Column(Column { relation: Some(Bare { table: "test" }), name: "c1" })], data_type: Int32, name: "MIN(test.c1)", schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, sort_exprs: [], ordering_req: [], ignore_nulls: false, ordering_fields: [], is_distinct: false, input_type: Int32 }, AggregateFunctionExpr { fun: AggregateUDF { inner: Max { signature: Signature { type_signature: Numeric(1), volatility: Immutable }, aliases: ["max"] } }, args: [Column { name: "c1", index: 0 }], logical_args: [Column(Column { relation: Some(Bare { table: "test" }), name: "c1" })], data_type: Int32, name: "MAX(test.c1)", schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, sort_exprs: [], ordering_req: [], ignore_nulls: false, ordering_fields: [], is_distinct: false, input_type: Int32 }], filter_expr: [None, None, None], limit: None, input: CustomExecutionPlan { projection: Some([0]), cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, constants: [], schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} } }, partitioning: UnknownPartitioning(1), execution_mode: Bounded, output_ordering: None } }, schema: Schema { fields: [Field { name: "COUNT(*)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MIN(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MAX(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, input_schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [] } } }, required_input_ordering: None, input_order_mode: Linear, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, constants: [], schema: Schema { fields: [Field { name: "COUNT(*)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MIN(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MAX(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} } }, partitioning: UnknownPartitioning(1), execution_mode: Bounded, output_ordering: None } }
I guess you skip the aggregate statistic optimization for min/max
https://github.com/apache/datafusion/blob/18042fd69138e19613844580408a71a200ea6caa/datafusion/core/src/physical_optimizer/aggregate_statistics.rs#L177-L224
You might need to check if the AggregateExpr is min/max in take_optimizable_min and take_optimizable_max
I fixed this but now I have a test that doesn't pass on the optimizer (there are two actually)
---- single_distinct_to_groupby::tests::two_distinct_and_one_common stdout ----
thread 'single_distinct_to_groupby::tests::two_distinct_and_one_common' panicked at datafusion/optimizer/src/test/mod.rs:200:5:
assertion `left == right` failed
left: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias3) AS MAX(test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(test.b):UInt32;N]\n Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias3)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias3):UInt32;N]\n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"
right: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\n Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias1)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias1):UInt32;N]\n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:UInt64;N]\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"
That suggests that the optimizer cannot use the existing aliases / doesn't understand the existing aliases that provide DISTINCT test.b . Looking, any tip would be highly appreciated
I fixed this but now I have a test that doesn't pass on the optimizer (there are two actually)
---- single_distinct_to_groupby::tests::two_distinct_and_one_common stdout ---- thread 'single_distinct_to_groupby::tests::two_distinct_and_one_common' panicked at datafusion/optimizer/src/test/mod.rs:200:5: assertion `left == right` failed left: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias3) AS MAX(test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(test.b):UInt32;N]\n Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias3)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias3):UInt32;N]\n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]" right: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\n Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias1)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias1):UInt32;N]\n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:UInt64;N]\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"That suggests that the optimizer cannot use the existing aliases / doesn't understand the existing aliases that provide DISTINCT test.b . Looking, any tip would be highly appreciated
I think we should add distinct for MIN/MAX so we can get the distinct after group by is converted to distinct function
But I think there is no difference between MIN and Distinct Min, maybe we could remove distinct for MIN/MAX beforehand? Introduce EliminateDistinct optimize rule for MIN/MAX.
I fixed this but now I have a test that doesn't pass on the optimizer (there are two actually)
---- single_distinct_to_groupby::tests::two_distinct_and_one_common stdout ---- thread 'single_distinct_to_groupby::tests::two_distinct_and_one_common' panicked at datafusion/optimizer/src/test/mod.rs:200:5: assertion `left == right` failed left: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias3) AS MAX(test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(test.b):UInt32;N]\n Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias3)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias3):UInt32;N]\n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]" right: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\n Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias1)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias1):UInt32;N]\n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:UInt64;N]\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"That suggests that the optimizer cannot use the existing aliases / doesn't understand the existing aliases that provide DISTINCT test.b . Looking, any tip would be highly appreciated
I think we should add distinct for MIN/MAX so we can get the
distinctafter group by is converted to distinct functionBut I think there is no difference between MIN and Distinct Min, maybe we could remove distinct for MIN/MAX beforehand? Introduce EliminateDistinct optimize rule for MIN/MAX.
Is this a part of the optimizer i.e. https://github.com/edmondop/arrow-datafusion/blob/main/datafusion/optimizer/src/replace_distinct_aggregate.rs ? Thank your for your help btw
I fixed this but now I have a test that doesn't pass on the optimizer (there are two actually)
---- single_distinct_to_groupby::tests::two_distinct_and_one_common stdout ---- thread 'single_distinct_to_groupby::tests::two_distinct_and_one_common' panicked at datafusion/optimizer/src/test/mod.rs:200:5: assertion `left == right` failed left: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias3) AS MAX(test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(test.b):UInt32;N]\n Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias3)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias3):UInt32;N]\n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]" right: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\n Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias1)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias1):UInt32;N]\n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:UInt64;N]\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"That suggests that the optimizer cannot use the existing aliases / doesn't understand the existing aliases that provide DISTINCT test.b . Looking, any tip would be highly appreciated
I think we should add distinct for MIN/MAX so we can get the
distinctafter group by is converted to distinct function But I think there is no difference between MIN and Distinct Min, maybe we could remove distinct for MIN/MAX beforehand? Introduce EliminateDistinct optimize rule for MIN/MAX.Is this a part of the optimizer i.e. https://github.com/edmondop/arrow-datafusion/blob/main/datafusion/optimizer/src/replace_distinct_aggregate.rs ? Thank your for your help btw
I don't think so, Distinct/Distinct On is different from distinct in the function.
@jayzhan211 I have started experimenting with an optimizer rule, but removing the distinct result in such an error:
running 2 tests
test eliminate_distinct::tests::eliminate_distinct_from_min_expr ... FAILED
test eliminate_nested_union::tests::eliminate_distinct_nothing ... ok
failures:
---- eliminate_distinct::tests::eliminate_distinct_from_min_expr stdout ----
Transformed yes true
Error: Context("Optimizer rule 'eliminate_distinct' failed", Context("eliminate_distinct", Internal("Failed due to a difference in schemas, original schema: DFSchema { inner: Schema { fields: [Field { name: \"a\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"MIN(DISTINCT test.b)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, field_qualifiers: [Some(Bare { table: \"test\" }), None], functional_dependencies: FunctionalDependencies { deps: [FunctionalDependence { source_indices: [0], target_indices: [0, 1], nullable: false, mode: Single }] } }, new schema: DFSchema { inner: Schema { fields: [Field { name: \"a\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"MIN(test.b)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, field_qualifiers: [Some(Bare { table: \"test\" }), None], functional_dependencies: FunctionalDependencies { deps: [FunctionalDependence { source_indices: [0], target_indices: [0, 1], nullable: false, mode: Single }] } }")))
Do I need to change also the equivalence rules?
eliminate_distinct_from_min_expr
You can take single_distinct_groupby as reference, there is alias to remain schema equivalence.
Also, I suggest we introduce this rule in another PR, not mixing this with MIN/MAX UDAF.
Thanks. I guess I wasn't clear in my comment here https://github.com/apache/datafusion/pull/11013#issuecomment-2183027880 . How should that test failure be addressed? It seems that min/max udaf uses other aliases and is not reusing the intermediate results already available
Thanks. I guess I wasn't clear in my comment here #11013 (comment) . How should that test failure be addressed? It seems that min/max udaf uses other aliases and is not reusing the intermediate results already available
If we eliminate distinct of min/max prior to single_distinct_to_group_by, we don't expect to get distinct min/max at this point, we should rewrite the test to other function like sum.
---- single_distinct_to_groupby::tests::two_distinct_and_one_common
Wouldn't eliminating it require the optimizer rule? Or do you suggest I update the test case? Or the expected value?
---- single_distinct_to_groupby::tests::two_distinct_and_one_common
Wouldn't eliminating it require the optimizer rule? Or do you suggest I update the test case? Or the expected value?
Yes, I suggest we update the test like
#[test]
fn one_distinct_and_two_common() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(
vec![col("a")],
vec![sum(col("c")), count_distinct(col("b")), max(col("b"))],
)?
.build()?;
// Should work
let expected = "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias3) AS MAX(test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(test.b):UInt32;N]\n Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias3)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias3):UInt32;N]\n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
assert_optimized_plan_equal(plan, expected)
}
There seems to be a column added to the Aggregate node in the logical plan, can that affect performance and/or memory footprint? This was the reason why I didn't update the test in the first place
This is a subset of the new plan
aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"
while this is the subset from the previous plan
Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:UInt64;N]\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"
there is an alias3:UInt64 that gets added
There seems to be a column added to the Aggregate node in the logical plan, can that affect performance and/or memory footprint? This was the reason why I didn't update the test in the first place
This is a subset of the new plan
aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"while this is the subset from the previous plan
Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:UInt64;N]\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"there is an alias3:UInt64 that gets added
Remove the Min/Max matching in is_single_distinct_agg and the alias is removed
I think as long as you can explain me how to resolve the current test failure I should be fine. Agree using names for min and max unwrapping is not very robust
Now that I have spent some more time working with this PR I see it still needs some additional work -- sorry for the noise
I think as long as you can explain me how to resolve the current test failure I should be fine. Agree using names for min and max unwrapping is not very robust
I started with merging up from main and resolving the conflicts: https://github.com/edmondop/arrow-datafusion/pull/1
Once that is merged / ready I think we could keep hacking at this PR together
Alternately, we could potentially make some smaller PRs to remove the barriers / unblock this one -- for example we could remove the direct use of the Min/Max PhysicalExprs
For example in https://github.com/apache/datafusion/pull/11013#discussion_r1657767157
As well as here: https://github.com/apache/datafusion/blob/f58df32753a06dd1b67597a12cdf68007e249338/datafusion/physical-plan/src/aggregates/mod.rs#L485-L494
If you are interested, I can file tickets explaining how those smaller tasks
Now that I have spent some more time working with this PR I see it still needs some additional work -- sorry for the noise
I think as long as you can explain me how to resolve the current test failure I should be fine. Agree using names for min and max unwrapping is not very robust
I started with merging up from main and resolving the conflicts: https://github.com/edmondop/arrow-datafusion/pull/1
Once that is merged / ready I think we could keep hacking at this PR together
Alternately, we could potentially make some smaller PRs to remove the barriers / unblock this one -- for example we could remove the direct use of the Min/Max PhysicalExprs
For example in https://github.com/apache/datafusion/pull/11013#discussion_r1657767157
As well as here: https://github.com/apache/datafusion/blob/f58df32753a06dd1b67597a12cdf68007e249338/datafusion/physical-plan/src/aggregates/mod.rs#L485-L494
If you are interested, I can file tickets explaining how those smaller tasks
Yes 🙏
If you are interested, I can file tickets explaining how those smaller tasks
Yes 🙏
Ok, I filed https://github.com/apache/datafusion/issues/11153 and then some starting tasks like this
Task List
- [ ] https://github.com/apache/datafusion/issues/11152
- [ ] https://github.com/apache/datafusion/issues/11153
Hopefully that helps
Let me help with this, I think the current blocker are failed tests in CI?
optimizers_catch_all_statistics failed because aggregate_statictis are not checking with UDAF
fn is_min(agg_expr: &dyn AggregateExpr) -> bool {
if let Some(expr) = agg_expr.as_any().downcast_ref::<AggregateFunctionExpr>() {
return expr.fun().name() == "MIN"
}
false
}
fn is_max(agg_expr: &dyn AggregateExpr) -> bool {
if let Some(expr) = agg_expr.as_any().downcast_ref::<AggregateFunctionExpr>() {
return expr.fun().name() == "MAX"
}
false
}
I think we need to make get_minmax_desc an UDAF method before moving on
~btw, there is nullable in min/max too, should we keep it 🤔~ It is always true, so not an issue
We also need to add get_minmax_desc method to AggregateUDAFImpl
optimizers_catch_all_statisticsfailed becauseaggregate_statictisare not checking with UDAFfn is_min(agg_expr: &dyn AggregateExpr) -> bool { if let Some(expr) = agg_expr.as_any().downcast_ref::<AggregateFunctionExpr>() { return expr.fun().name() == "MIN" } false }fn is_max(agg_expr: &dyn AggregateExpr) -> bool { if let Some(expr) = agg_expr.as_any().downcast_ref::<AggregateFunctionExpr>() { return expr.fun().name() == "MAX" } false }
This change was issued here https://github.com/apache/datafusion/pull/11261/files
@jayzhan211 I am missing a piece, AggregateExpr implementation doesn't include AggregateUDFImpl can you clarify?
@edmondop CI is still red. Btw the PR grows so large, maybe you can try to split the changes into several PRs if you can't pass the CI.
@edmondop CI is still red. Btw the PR grows so large, maybe you can try to split the changes into several PRs if you can't pass the CI.
@jayzhan211 yes, it's still unclear to me what the fix is for the single_distinct_group. I think if you guide me there we can get this ready soon
@jayzhan211 I am missing a piece, AggregateExpr implementation doesn't include AggregateUDFImpl can you clarify?
Yes, it makes sense.
All the UDAFs are in AggregateFunctionExpr, and you can get AggregateUDFImpl method from it.
I think it is because of you remove distinct from min. Do you plan to add eliminate distinct in min/max rule in another PR first? Or you can also deal with distinct in this PR first.
I think it is because of you remove distinct from min. Do you plan to add
eliminate distinct in min/maxrule in another PR first? Or you can also deal with distinct in this PR first.
I will eliminate distinct min/max in a separate PR, to avoid growing this too much. Where did I remove distinct from min?