Optimize `NestedLoopJoinExec` Memory Usage
Is your feature request related to a problem or challenge?
The current Nested Loop Join implementation follows this simplified logic:
- Buffer the Build Side: All data from the left (build) side of the join is collected and held in memory.
- Iterate the Probe Side: The operator iterates through the right (probe) side, processing one RecordBatch at a time.
- Process Batch Pairs: For each buffered RecordBatch from the left side and the incoming RecordBatch from the right side, the core join logic is executed:
- Creates a Cartesian product of two input batches and apply filter -> (left_side_indices, right_side_indices)
- adjust_indices_by_join_type
-
build_batch_from_indices(left_side_indices, right_side_indices)
It has following problems
- It put all indices of Cartesian product of two input batches in memory with length of
left_batch.num_rows() * right_batch.num_rows() - It may create extreme large record_batch at a time
- In 3.i if query has filter, it will call
build_batch_from_indices(left_side_indices, right_side_indices) -> RecordBatch. It will returnleft_side_indices.len()*right_side_indices().len()rows - In 3.iii it will return at most
left_side_indices.len()*right_side_indices().len()rows
- In 3.i if query has filter, it will call
I add some log to show that
> select t1.value from range(40960) t1 join range(8192) t2 on t1.value + t2.value < t1.value * t2.value;
[datafusion/physical-plan/src/joins/utils.rs:864:5] intermediate_batch.num_rows() = 335544320
[datafusion/physical-plan/src/joins/utils.rs:864:5] intermediate_batch.get_array_memory_size() = 5368709312
[datafusion/physical-plan/src/joins/nested_loop_join.rs:906:17] result.num_rows() = 335446019
[datafusion/physical-plan/src/joins/nested_loop_join.rs:906:17] result.get_array_memory_size() = 2683568248
Describe the solution you'd like
-
Process the Cartesian Product Incrementally. At any given time, it will only generate the Cartesian product for a slice of the left batch against a slice of the right batch
- Special Handling for Right Joins: For Right, RightSemi, and RightAnti joins, a
probe_side_bitmapmust be maintained. This bitmap is populated as each chunk is processed to track which rows from the right side have found a match. After all chunks have been evaluated, this bitmap is used to generate the final output for the unmatched right-side rows.
- Special Handling for Right Joins: For Right, RightSemi, and RightAnti joins, a
-
Limit
intermediate_batchSize During Filtering: When applying the join filter inapply_join_filter_to_indices, avoid creating a single, massiveintermediate_batchfor evaluation. Instead, process the indices in batches:- Iteratively build smaller intermediate batches by calling build_batch_from_indices on slices of the input indices (e.g., build_indices.slice(i, N) and probe_indices.slice(i, N)).
- Apply the filter expression to each small intermediate batch.
- Concatenate the filtered results from each chunk to produce the final set of matched indices.
-
Yield Partial Batches on Demand: On each call, the stream will use its cursor to process the next chunk of indices (e.g., from i to i + N). It will then build and return a small RecordBatch from only that slice
(we can do 2, 3 first)
Describe alternatives you've considered
No response
Additional context
No response
take
Yeah this was much needed thanks for bringing this up @UBarney
Thanks for working on it.
3.i. is definitely not efficient for memory usage. 🤦🏼 Perhaps limiting the the intermediate result to ~1 batch size is enough to keep the performance.
Also it's a good idea it ignore right* joins in the first patch, but to avoid regression, it seems you can swap the join children during planning and always make it left* joins? 🤔
limiting the the intermediate result to ~1 batch size is enough to keep the performance.
Do you mean we should also limit num_row of left_side, right_side to 1 batch size ? By joining only one left row with the right batch at a time?
This is the easiest way to implement. This is the only limitation we need apply
I thought that increasing the size of the intermediate result for (left_side, right_side) might lead to better performance. ~~But then I realized that if we truly want better performance, we should avoid using nlj~~ We can implement it first then do some benchmarking. @2010YOUY01
limiting the the intermediate result to ~1 batch size is enough to keep the performance.
Do you mean we should also limit num_row of
left_side, right_sideto 1 batch size ? By joining only one left row with the right batch at a time?This is the easiest way to implement. This is the only limitation we need apply
I thought that increasing the size of the intermediate result for (left_side, right_side) might lead to better performance. ~But then I realized that if we truly want better performance, we should avoid using nlj~ We can implement it first then do some benchmarking. @2010YOUY01
Adding micro-benches for NLJ sounds great, we can enumerate scenarios like small build side + large probe side + less selective join filter large build side + small probe side + selective join filter ...
And later decide the what to implement according to bench results.
By joining only one left row with the right batch at a time?
However when right_side_ordered == True we need maintains right_side order.
https://github.com/apache/datafusion/blob/69dfe6c499d39563f4e6d9835fcdf3793f7d98c8/datafusion/physical-plan/src/joins/nested_loop_join.rs#L727-L728
Perhaps we can "Process the Cartesian Product Incrementally" by generating Cartesian Product of (left_batch, right_batch[i, i+N]) at a time
I plan to implement "Limit intermediate_batch Size During Filtering" and "Yield Partial Batches on Demand" first, and then I will work on "Process the Cartesian Product Incrementally".
cc @2010YOUY01 @xudong963
I plan to implement "Limit intermediate_batch Size During Filtering" and "Yield Partial Batches on Demand" first, and then I will work on "Process the Cartesian Product Incrementally".
https://github.com/apache/datafusion/pull/16443