Custom window frame logic (support `ROWS`, `RANGE`, `PRECEDING` and `FOLLOWING` for window functions)
Which issue does this PR close?
We offer a partial implementation of windows with custom window frames and improve the situation on #361.
As a team, this is our first contribution to Datafusion and we hope to contribute further to both Datafusion and Ballista in the future. Since we are creating a PR for this project for the first time, we would like to get feedback on how we are doing in terms of code quality, alignment with the project roadmap etc. We also would like to get your ideas on how to close this issue completely since we are providing a partial implementation as a first step. You can see which cases we cover in integration tests.
Rationale for this change
Datafusion currently does not support custom window frames, but it is on the roadmap.
What changes are included in this PR?
For now, we implemented ROWS and RANGE modes supporting PRECEDING and FOLLOWING.
As a draft, we currently do not support:
-
GROUPSmode - Timestamp ranges; e.g.
RANGE BETWEEN '1 day' PRECEDING AND '10 days' FOLLOWINGsince the logical planner does not support the types other than an integer. - Frame exclusion, i.e
EXCLUDE CURRENT ROW
Next steps
-
GROUPSmode implementation by extendingcalculate_current_windowmethod. - Frame exclusion, by logical planner extension and adapting
calculate_current_windowmethod.
Observations
-
Some aggregation function implementations are not generic, but use
f64. This can create issues with statistical aggregation functions likeCORR(x, y)when greater precision is required. Fortunately, they can be enhanced to support other data types similar toSUM(x)aggregation.Also,
evaluation()of theCovarianceAccumulatorshould be@ -374,12 +374,6 @@ impl Accumulator for CovarianceAccumulator { }; if count <= 1 { - return Err(DataFusionError::Internal( - "At least two values are needed to calculate covariance".to_string(), - )); - } - - if self.count == 0 { Ok(ScalarValue::Float64(None)) } else { Ok(ScalarValue::Float64(Some(self.algo_const / count as f64)))to become compatible with PostgreSQL. However, these issues are separate from this PR and we can discuss them under a new issue. For this reason, we deferred supporting functions like
CORR(x, y)to the future. -
Since unstable sorting is used, some queries output different results than PostgreSQL. We use only unique columns for
ORDER BYclauses while testingROWSmode.An example query:
SELECT c2, c3, SUM(c2) OVER(ROWS BETWEEN 3 PRECEDING AND 1 FOLLOWING) as summation2, SUM(c3) OVER(ORDER BY c2 ROWS BETWEEN 3 PRECEDING AND 1 FOLLOWING) as summation3, SUM(c3) OVER(ORDER BY c1 ROWS BETWEEN 3 PRECEDING AND 1 FOLLOWING) as summation4 FROM test LIMIT 10;The Datafusion output is:
+----+-----+------------+------------+------------+ | c2 | c3 | summation2 | summation3 | summation4 | +----+-----+------------+------------+------------+ | 1 | 12 | 2 | 132 | -13 | | 1 | 120 | 3 | 203 | 263 | | 1 | 71 | 4 | 118 | 447 | | 1 | -85 | 5 | 154 | 37 | | 1 | 36 | 5 | 43 | 358 | | 1 | -99 | 5 | 48 | -81 | | 1 | 125 | 5 | -31 | 247 | | 1 | -8 | 5 | 111 | 215 | | 1 | 57 | 5 | 3 | 238 | | 1 | -72 | 5 | 140 | 247 | +----+-----+------------+------------+------------+and in PostgreSQL as
+ --- + --- + ---------- + ---------- + ---------- + | c2 | c3 | summation2 | summation3 | summation4 | | --- | --- | ---------- | ---------- | ---------- | | 1 | -85 | 2 | -49 | -251 | | 1 | 36 | 3 | 71 | 330 | | 1 | 120 | 4 | 46 | 284 | | 1 | -25 | 5 | 149 | -184 | | 1 | 103 | 5 | 305 | -15 | | 1 | 71 | 5 | 323 | 251 | | 1 | 54 | 5 | 286 | 48 | | 1 | 83 | 5 | 255 | 166 | | 1 | -56 | 5 | 222 | -79 | | 1 | 70 | 5 | 180 | -233 | +-----+-----+------------+------------+------------+ -
There is a minor problem in the logical planner, it should run
SELECT SUM(c2) OVER(ORDER BY c5, c6 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as sum FROM test;without a problem, however, it produces
Error: Plan("With window frame of type RANGE, the order by expression must be of length 1, got 2")
@alamb, mentioning you since you showed interest in this. Once this gets some feedback, we will perform any fixes if necessary and then add GROUPS functionality so that this work can get merged.
Note that EXCLUDE CURRENT ROW and timestamp handling requires changes to planning logic, so we will leave those to another PR.
Thanks @ozankabak -- I will try and find time to review this later this week, though it may not be until the weekend
I took a brief look through this PR, and it looks quite cool -- will provide detailed comments later. For now I have started the CI checks
Seems like we forgot the license header in one file 🙂 CI checks are passing except that. We will fix the license thing along with any other changes based on the review.
Thanks for the reviews, everybody! We are working on changes that address your reviews and plan to send them your way in a day or two.
Hello again! We resolved almost all the points you've mentioned. Additionally, we identified some challenging corner cases regarding NULL treatment and updated our code to handle these cases, along with new unit tests for these corner cases. Four main discussion points remain:
PostgreSQL compatibility of new unit tests
Expected values of all the new unit tests were sanity-checked against PostgreSQL. To remain in sync with PostgreSQL, we also think that it could be a good idea to add psql-parity tests for NULL-treatment cases. If you agree, let's make this subject of another PR. We can open an issue to track this (and will be happy to work on a PR to resolve it).
Changes to ScalarValue
At this point, we do not really have any new functionality or complexity added to this type. We just tidied up/moved some ScalarValue-only code that was scattered across multiple modules to scalar.rs. For example, the coercions you saw were already a part of the code here:
https://github.com/apache/arrow-datafusion/blob/87faf868f2276b84e63cad6721ca08bd79ed9cb8/datafusion/physical-expr/src/aggregate/sum.rs#L261
We agree that there should be no coercions in general at this stage -- any necessary work along these lines should be handled previously in the code path. However, we don't want to extend the scope of this PR to remove already-existing coercions from the codebase. If there is no issue about this, we are happy to open an issue for this and help resolve it in the near future along with similar issues like #3511.
Some teaser: I have done some preliminary tests and it seems like removing all the coercions will not be a tough task at all. However, removing this kind of code always requires more careful testing than just preliminary testing and deserves a careful study/separate PR 🙂
Bisect
We considered using the partition_point function. This requires converting &[ArrayRef] to &[&[ScalarValue]], where the inner &[ScalarValue] corresponds to each row -- we are searching for the insertion place of target &[ScalarValue] among rows. Unless we are missing something, doing this will require unnecessary memory usage and CPU time since it will involve copying. On the contrary, our implementation takes data in original columnar format and calculates rows from the index only when the comparison is done. Since we are doing log(n) comparisons on average, we need log(n) copies instead of n copies. If you have any suggestions for adding bisect functionality without changing the columnar orientation of the data, we can create an issue for this and help resolve it in a subsequent PR.
Overflows
Overflows are not handled well by the ScalarValue type in general. Since we all agree that we do not want to change this type too much in this PR, we left one TODO in the window frame boundary code so that we can upgrade the overflow-sensitive part of the code in the future once there is an agreement on how to fix it. Currently, if an overflow happens, the whole process panics and quits as there is no machinery to handle this in the codebase. If you let us know of your thinking on this matter, we will be happy to help get this fixed in general in a subsequent PR so that other code can benefit too.
Thanks again for all the reviews. Excited to see this merge and get used!
Thanks @metesynnada and @ozankabak -- I'll check this out more carefully tomorrow or Friday
Codecov Report
Merging #3570 (b1b27d2) into master (11abfb9) will increase coverage by
0.16%. The diff coverage is85.76%.
@@ Coverage Diff @@
## master #3570 +/- ##
==========================================
+ Coverage 85.91% 86.07% +0.16%
==========================================
Files 301 301
Lines 56218 56884 +666
==========================================
+ Hits 48301 48965 +664
- Misses 7917 7919 +2
| Impacted Files | Coverage Δ | |
|---|---|---|
| datafusion/common/src/lib.rs | 0.00% <ø> (ø) |
|
| datafusion/expr/src/accumulator.rs | 58.33% <0.00%> (-19.45%) |
:arrow_down: |
| datafusion/expr/src/window_frame.rs | 96.19% <ø> (+2.91%) |
:arrow_up: |
| ...tafusion/physical-expr/src/aggregate/covariance.rs | 91.91% <0.00%> (-6.80%) |
:arrow_down: |
| datafusion/physical-expr/src/aggregate/variance.rs | 93.17% <0.00%> (-5.14%) |
:arrow_down: |
| datafusion/sql/src/planner.rs | 81.25% <ø> (+<0.01%) |
:arrow_up: |
| ...afusion/physical-expr/src/aggregate/correlation.rs | 96.00% <42.85%> (-1.96%) |
:arrow_down: |
| datafusion/physical-expr/src/window/aggregate.rs | 80.64% <79.25%> (+5.17%) |
:arrow_up: |
| datafusion/core/tests/sql/mod.rs | 97.59% <90.90%> (-0.19%) |
:arrow_down: |
| datafusion/common/src/scalar.rs | 85.84% <92.94%> (+0.72%) |
:arrow_up: |
| ... and 68 more |
:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more
I'll plan to merge this tomorrow unless I hear otherwise or anyone else would like longer to review. Thanks again @metesynnada and @ozankabak -- this looks epic
Thanks @alamb! In our first follow-up PR, we will tweak the test you noted to increase coverage and also enrich the comments to explain subtle points such as the one you found.
🚀 -- great first contribution
Benchmark runs are scheduled for baseline = a9f7cacce7f22e1b645821d0ce9b0323895721ae and contender = 65a5c6bcdd27e6b9677af3071f65901dc4230800. 65a5c6bcdd27e6b9677af3071f65901dc4230800 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. Conbench compare runs links: [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2 [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q Buildkite builds: Supported benchmarks: ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True test-mac-arm: Supported benchmark langs: C++, Python, R ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java