datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Custom window frame logic (support `ROWS`, `RANGE`, `PRECEDING` and `FOLLOWING` for window functions)

Open metesynnada opened this issue 3 years ago • 4 comments

Which issue does this PR close?

We offer a partial implementation of windows with custom window frames and improve the situation on #361.

As a team, this is our first contribution to Datafusion and we hope to contribute further to both Datafusion and Ballista in the future. Since we are creating a PR for this project for the first time, we would like to get feedback on how we are doing in terms of code quality, alignment with the project roadmap etc. We also would like to get your ideas on how to close this issue completely since we are providing a partial implementation as a first step. You can see which cases we cover in integration tests.

Rationale for this change

Datafusion currently does not support custom window frames, but it is on the roadmap.

What changes are included in this PR?

For now, we implemented ROWS and RANGE modes supporting PRECEDING and FOLLOWING.

As a draft, we currently do not support:

  • GROUPS mode
  • Timestamp ranges; e.g. RANGE BETWEEN '1 day' PRECEDING AND '10 days' FOLLOWING since the logical planner does not support the types other than an integer.
  • Frame exclusion, i.e EXCLUDE CURRENT ROW

Next steps

  • GROUPS mode implementation by extending calculate_current_window method.
  • Frame exclusion, by logical planner extension and adapting calculate_current_window method.

Observations

  • Some aggregation function implementations are not generic, but use f64. This can create issues with statistical aggregation functions like CORR(x, y) when greater precision is required. Fortunately, they can be enhanced to support other data types similar to SUM(x) aggregation.

    Also, evaluation() of the CovarianceAccumulator should be

    @ -374,12 +374,6 @@ impl Accumulator for CovarianceAccumulator {
      };
    
      if count <= 1 {
    -      return Err(DataFusionError::Internal(
    -          "At least two values are needed to calculate covariance".to_string(),
    -      ));
    -  }
    -
    -  if self.count == 0 {
          Ok(ScalarValue::Float64(None))
      } else {
          Ok(ScalarValue::Float64(Some(self.algo_const / count as f64)))
    

    to become compatible with PostgreSQL. However, these issues are separate from this PR and we can discuss them under a new issue. For this reason, we deferred supporting functions like CORR(x, y) to the future.

  • Since unstable sorting is used, some queries output different results than PostgreSQL. We use only unique columns for ORDER BY clauses while testing ROWS mode.

    An example query:

    SELECT c2, c3,
       SUM(c2) OVER(ROWS BETWEEN 3 PRECEDING AND 1 FOLLOWING) as summation2,
       SUM(c3) OVER(ORDER BY c2 ROWS BETWEEN 3 PRECEDING AND 1 FOLLOWING) as summation3,
       SUM(c3) OVER(ORDER BY c1 ROWS BETWEEN 3 PRECEDING AND 1 FOLLOWING) as summation4
    FROM test
    LIMIT 10;
    

    The Datafusion output is:

    +----+-----+------------+------------+------------+
    | c2 | c3  | summation2 | summation3 | summation4 |
    +----+-----+------------+------------+------------+
    | 1  | 12  | 2          | 132        | -13        |
    | 1  | 120 | 3          | 203        | 263        |
    | 1  | 71  | 4          | 118        | 447        |
    | 1  | -85 | 5          | 154        | 37         |
    | 1  | 36  | 5          | 43         | 358        |
    | 1  | -99 | 5          | 48         | -81        |
    | 1  | 125 | 5          | -31        | 247        |
    | 1  | -8  | 5          | 111        | 215        |
    | 1  | 57  | 5          | 3          | 238        |
    | 1  | -72 | 5          | 140        | 247        |
    +----+-----+------------+------------+------------+
    

    and in PostgreSQL as

    + --- + --- + ---------- + ---------- + ---------- +
    | c2  | c3  | summation2 | summation3 | summation4 |
    | --- | --- | ---------- | ---------- | ---------- |
    | 1   | -85 | 2          | -49        | -251       |
    | 1   | 36  | 3          | 71         | 330        |
    | 1   | 120 | 4          | 46         | 284        |
    | 1   | -25 | 5          | 149        | -184       |
    | 1   | 103 | 5          | 305        | -15        |
    | 1   | 71  | 5          | 323        | 251        |
    | 1   | 54  | 5          | 286        | 48         |
    | 1   | 83  | 5          | 255        | 166        |
    | 1   | -56 | 5          | 222        | -79        |
    | 1   | 70  | 5          | 180        | -233       |
    +-----+-----+------------+------------+------------+
    
  • There is a minor problem in the logical planner, it should run

    SELECT 
      SUM(c2) OVER(ORDER BY c5, c6 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as sum
    FROM test;
    

    without a problem, however, it produces

    Error: Plan("With window frame of type RANGE, the order by expression must be of length 1, got 2")
    

metesynnada avatar Sep 21 '22 11:09 metesynnada

@alamb, mentioning you since you showed interest in this. Once this gets some feedback, we will perform any fixes if necessary and then add GROUPS functionality so that this work can get merged.

Note that EXCLUDE CURRENT ROW and timestamp handling requires changes to planning logic, so we will leave those to another PR.

ozankabak avatar Sep 21 '22 11:09 ozankabak

Thanks @ozankabak -- I will try and find time to review this later this week, though it may not be until the weekend

alamb avatar Sep 21 '22 17:09 alamb

I took a brief look through this PR, and it looks quite cool -- will provide detailed comments later. For now I have started the CI checks

alamb avatar Sep 21 '22 17:09 alamb

Seems like we forgot the license header in one file 🙂 CI checks are passing except that. We will fix the license thing along with any other changes based on the review.

ozankabak avatar Sep 21 '22 19:09 ozankabak

Thanks for the reviews, everybody! We are working on changes that address your reviews and plan to send them your way in a day or two.

ozankabak avatar Sep 26 '22 15:09 ozankabak

Hello again! We resolved almost all the points you've mentioned. Additionally, we identified some challenging corner cases regarding NULL treatment and updated our code to handle these cases, along with new unit tests for these corner cases. Four main discussion points remain:

PostgreSQL compatibility of new unit tests

Expected values of all the new unit tests were sanity-checked against PostgreSQL. To remain in sync with PostgreSQL, we also think that it could be a good idea to add psql-parity tests for NULL-treatment cases. If you agree, let's make this subject of another PR. We can open an issue to track this (and will be happy to work on a PR to resolve it).

Changes to ScalarValue

At this point, we do not really have any new functionality or complexity added to this type. We just tidied up/moved some ScalarValue-only code that was scattered across multiple modules to scalar.rs. For example, the coercions you saw were already a part of the code here:

https://github.com/apache/arrow-datafusion/blob/87faf868f2276b84e63cad6721ca08bd79ed9cb8/datafusion/physical-expr/src/aggregate/sum.rs#L261

We agree that there should be no coercions in general at this stage -- any necessary work along these lines should be handled previously in the code path. However, we don't want to extend the scope of this PR to remove already-existing coercions from the codebase. If there is no issue about this, we are happy to open an issue for this and help resolve it in the near future along with similar issues like #3511.

Some teaser: I have done some preliminary tests and it seems like removing all the coercions will not be a tough task at all. However, removing this kind of code always requires more careful testing than just preliminary testing and deserves a careful study/separate PR 🙂

Bisect

We considered using the partition_point function. This requires converting &[ArrayRef] to &[&[ScalarValue]], where the inner &[ScalarValue] corresponds to each row -- we are searching for the insertion place of target &[ScalarValue] among rows. Unless we are missing something, doing this will require unnecessary memory usage and CPU time since it will involve copying. On the contrary, our implementation takes data in original columnar format and calculates rows from the index only when the comparison is done. Since we are doing log(n) comparisons on average, we need log(n) copies instead of n copies. If you have any suggestions for adding bisect functionality without changing the columnar orientation of the data, we can create an issue for this and help resolve it in a subsequent PR.

Overflows

Overflows are not handled well by the ScalarValue type in general. Since we all agree that we do not want to change this type too much in this PR, we left one TODO in the window frame boundary code so that we can upgrade the overflow-sensitive part of the code in the future once there is an agreement on how to fix it. Currently, if an overflow happens, the whole process panics and quits as there is no machinery to handle this in the codebase. If you let us know of your thinking on this matter, we will be happy to help get this fixed in general in a subsequent PR so that other code can benefit too.

Thanks again for all the reviews. Excited to see this merge and get used!

ozankabak avatar Sep 28 '22 12:09 ozankabak

Thanks @metesynnada and @ozankabak -- I'll check this out more carefully tomorrow or Friday

alamb avatar Sep 28 '22 21:09 alamb

Codecov Report

Merging #3570 (b1b27d2) into master (11abfb9) will increase coverage by 0.16%. The diff coverage is 85.76%.

@@            Coverage Diff             @@
##           master    #3570      +/-   ##
==========================================
+ Coverage   85.91%   86.07%   +0.16%     
==========================================
  Files         301      301              
  Lines       56218    56884     +666     
==========================================
+ Hits        48301    48965     +664     
- Misses       7917     7919       +2     
Impacted Files Coverage Δ
datafusion/common/src/lib.rs 0.00% <ø> (ø)
datafusion/expr/src/accumulator.rs 58.33% <0.00%> (-19.45%) :arrow_down:
datafusion/expr/src/window_frame.rs 96.19% <ø> (+2.91%) :arrow_up:
...tafusion/physical-expr/src/aggregate/covariance.rs 91.91% <0.00%> (-6.80%) :arrow_down:
datafusion/physical-expr/src/aggregate/variance.rs 93.17% <0.00%> (-5.14%) :arrow_down:
datafusion/sql/src/planner.rs 81.25% <ø> (+<0.01%) :arrow_up:
...afusion/physical-expr/src/aggregate/correlation.rs 96.00% <42.85%> (-1.96%) :arrow_down:
datafusion/physical-expr/src/window/aggregate.rs 80.64% <79.25%> (+5.17%) :arrow_up:
datafusion/core/tests/sql/mod.rs 97.59% <90.90%> (-0.19%) :arrow_down:
datafusion/common/src/scalar.rs 85.84% <92.94%> (+0.72%) :arrow_up:
... and 68 more

:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more

codecov-commenter avatar Sep 28 '22 21:09 codecov-commenter

I'll plan to merge this tomorrow unless I hear otherwise or anyone else would like longer to review. Thanks again @metesynnada and @ozankabak -- this looks epic

alamb avatar Sep 29 '22 12:09 alamb

Thanks @alamb! In our first follow-up PR, we will tweak the test you noted to increase coverage and also enrich the comments to explain subtle points such as the one you found.

ozankabak avatar Sep 29 '22 13:09 ozankabak

🚀 -- great first contribution

alamb avatar Sep 30 '22 10:09 alamb

Benchmark runs are scheduled for baseline = a9f7cacce7f22e1b645821d0ce9b0323895721ae and contender = 65a5c6bcdd27e6b9677af3071f65901dc4230800. 65a5c6bcdd27e6b9677af3071f65901dc4230800 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. Conbench compare runs links: [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2 [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q Buildkite builds: Supported benchmarks: ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True test-mac-arm: Supported benchmark langs: C++, Python, R ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

ursabot avatar Sep 30 '22 10:09 ursabot