datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Optimize `NestedLoopJoinExec` Memory Usage

Open UBarney opened this issue 7 months ago • 6 comments

Is your feature request related to a problem or challenge?

The current Nested Loop Join implementation follows this simplified logic:

  1. Buffer the Build Side: All data from the left (build) side of the join is collected and held in memory.
  2. Iterate the Probe Side: The operator iterates through the right (probe) side, processing one RecordBatch at a time.
  3. Process Batch Pairs: For each buffered RecordBatch from the left side and the incoming RecordBatch from the right side, the core join logic is executed:
    1. Creates a Cartesian product of two input batches and apply filter -> (left_side_indices, right_side_indices)
    2. adjust_indices_by_join_type
    3. build_batch_from_indices(left_side_indices, right_side_indices)

It has following problems

  • It put all indices of Cartesian product of two input batches in memory with length of left_batch.num_rows() * right_batch.num_rows()
  • It may create extreme large record_batch at a time
    • In 3.i if query has filter, it will call build_batch_from_indices(left_side_indices, right_side_indices) -> RecordBatch. It will return left_side_indices.len()*right_side_indices().len() rows
    • In 3.iii it will return at most left_side_indices.len()*right_side_indices().len() rows

I add some log to show that

> select t1.value from range(40960) t1 join range(8192) t2 on t1.value + t2.value < t1.value * t2.value;
[datafusion/physical-plan/src/joins/utils.rs:864:5] intermediate_batch.num_rows() = 335544320
[datafusion/physical-plan/src/joins/utils.rs:864:5] intermediate_batch.get_array_memory_size() = 5368709312
[datafusion/physical-plan/src/joins/nested_loop_join.rs:906:17] result.num_rows() = 335446019
[datafusion/physical-plan/src/joins/nested_loop_join.rs:906:17] result.get_array_memory_size() = 2683568248

Describe the solution you'd like

  1. Process the Cartesian Product Incrementally. At any given time, it will only generate the Cartesian product for a slice of the left batch against a slice of the right batch

    1. Special Handling for Right Joins: For Right, RightSemi, and RightAnti joins, a probe_side_bitmap must be maintained. This bitmap is populated as each chunk is processed to track which rows from the right side have found a match. After all chunks have been evaluated, this bitmap is used to generate the final output for the unmatched right-side rows.
  2. Limit intermediate_batch Size During Filtering: When applying the join filter in apply_join_filter_to_indices, avoid creating a single, massive intermediate_batch for evaluation. Instead, process the indices in batches:

    1. Iteratively build smaller intermediate batches by calling build_batch_from_indices on slices of the input indices (e.g., build_indices.slice(i, N) and probe_indices.slice(i, N)).
    2. Apply the filter expression to each small intermediate batch.
    3. Concatenate the filtered results from each chunk to produce the final set of matched indices.
  3. Yield Partial Batches on Demand: On each call, the stream will use its cursor to process the next chunk of indices (e.g., from i to i + N). It will then build and return a small RecordBatch from only that slice

(we can do 2, 3 first)

Describe alternatives you've considered

No response

Additional context

No response

UBarney avatar Jun 11 '25 03:06 UBarney

take

UBarney avatar Jun 11 '25 03:06 UBarney

Yeah this was much needed thanks for bringing this up @UBarney

jonathanc-n avatar Jun 11 '25 23:06 jonathanc-n

Thanks for working on it.

3.i. is definitely not efficient for memory usage. 🤦🏼 Perhaps limiting the the intermediate result to ~1 batch size is enough to keep the performance.

Also it's a good idea it ignore right* joins in the first patch, but to avoid regression, it seems you can swap the join children during planning and always make it left* joins? 🤔

2010YOUY01 avatar Jun 12 '25 03:06 2010YOUY01

limiting the the intermediate result to ~1 batch size is enough to keep the performance.

Do you mean we should also limit num_row of left_side, right_side to 1 batch size ? By joining only one left row with the right batch at a time?

This is the easiest way to implement. This is the only limitation we need apply

I thought that increasing the size of the intermediate result for (left_side, right_side) might lead to better performance. ~~But then I realized that if we truly want better performance, we should avoid using nlj~~ We can implement it first then do some benchmarking. @2010YOUY01

UBarney avatar Jun 13 '25 02:06 UBarney

limiting the the intermediate result to ~1 batch size is enough to keep the performance.

Do you mean we should also limit num_row of left_side, right_side to 1 batch size ? By joining only one left row with the right batch at a time?

This is the easiest way to implement. This is the only limitation we need apply

I thought that increasing the size of the intermediate result for (left_side, right_side) might lead to better performance. ~But then I realized that if we truly want better performance, we should avoid using nlj~ We can implement it first then do some benchmarking. @2010YOUY01

Adding micro-benches for NLJ sounds great, we can enumerate scenarios like small build side + large probe side + less selective join filter large build side + small probe side + selective join filter ...

And later decide the what to implement according to bench results.

2010YOUY01 avatar Jun 13 '25 07:06 2010YOUY01

By joining only one left row with the right batch at a time?

However when right_side_ordered == True we need maintains right_side order.
https://github.com/apache/datafusion/blob/69dfe6c499d39563f4e6d9835fcdf3793f7d98c8/datafusion/physical-plan/src/joins/nested_loop_join.rs#L727-L728

Perhaps we can "Process the Cartesian Product Incrementally" by generating Cartesian Product of (left_batch, right_batch[i, i+N]) at a time

I plan to implement "Limit intermediate_batch Size During Filtering" and "Yield Partial Batches on Demand" first, and then I will work on "Process the Cartesian Product Incrementally".

cc @2010YOUY01 @xudong963

UBarney avatar Jun 16 '25 08:06 UBarney

I plan to implement "Limit intermediate_batch Size During Filtering" and "Yield Partial Batches on Demand" first, and then I will work on "Process the Cartesian Product Incrementally".

https://github.com/apache/datafusion/pull/16443

UBarney avatar Jun 24 '25 02:06 UBarney