hyperspace icon indicating copy to clipboard operation
hyperspace copied to clipboard

Support joins over nested fields with indexes on nested fields

Open andrei-ionescu opened this issue 4 years ago • 1 comments

What is the context for this pull request?

  • Proposal: #347
  • Discussion: #312
  • Dependency: #379, #380
  • Fixes: #347

What changes were proposed in this pull request?

This PR adds support for hybrid scans when filtering over nested fields using indexes created over nested fields.

Given the nestedDataset dataset with schema

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- nested: struct (nullable = true)
 |    |-- field1: string (nullable = true)
 |    |-- nst: struct (nullable = true)
 |    |    |-- field1: string (nullable = true)
 |    |    |-- field2: string (nullable = true)

and the following data

+---+-----+-----------------+
|id |name |nested           |
+---+-----+-----------------+
|2  |name2|[va2, [wa2, wb2]]|
|1  |name1|[va1, [wa1, wb1]]|
+---+-----+-----------------+

The transformed plans for hybrid scans will have need to accommodate the union between the latest files arriving in the dataset having the schema above and the index schema bellow

root
 |-- __hs_nested.nested.nst.field1: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- __hs_nested.nested.nst.field2: string (nullable = true)

Files Appended

The optimized plan

Union
:- Project [id#1, name#2, __hs_nested.nested.nst.field2#3]
:  +- Filter (isnotnull(__hs_nested.nested.nst.field1#0) && (__hs_nested.nested.nst.field1#0 = wa1))
:     +- Relation[__hs_nested.nested.nst.field1#0,id#1,name#2,__hs_nested.nested.nst.field2#3] 
:        Hyperspace(Type: CI, Name: idx_nested, LogVersion: 1)
+- Project [id#100, name#101, nested#102.nst.field2]
   +- Filter (isnotnull(nested#102) && (nested#102.nst.field1 = wa1))
      +- Relation[id#1,name#2,nested#102] parquet

The Spark plan

Union
:- Project [id#1, name#2, __hs_nested.nested.nst.field2#3]
:  +- Filter (isnotnull(__hs_nested.nested.nst.field1#0) && (__hs_nested.nested.nst.field1#0 = wa1))
:     +- FileScan parquet [__hs_nested.nested.nst.field1#0,id#1,name#2,__hs_nested.nested.nst.field2#3] 
:        Batched: false, Format: Parquet, 
:        Location: InMemoryFileIndex[file:/..../spark_warehouse/indexes/idx_nested/v__=0], 
:        PartitionFilters: [], PushedFilters: [IsNotNull(__hs_nested.nested.nst.field1)], ReadSchema: 
:        struct<__hs_nested.nested.nst.field1:string,id:int,name:string,__hs_nested.nested.nst.field2:string>
+- Project [id#100, name#101, nested#102.nst.field2]
   +- Filter (isnotnull(nested#102) && (nested#102.nst.field1 = wa1))
      +- FileScan parquet [id#100,name#101,nested#102] 
         Batched: false, Format: Parquet, 
         Location: InMemoryFileIndex[file:/..../tableN2/appended_file.parquet], 
         PartitionFilters: [], PushedFilters: [IsNotNull(nested)], ReadSchema: 
         struct<id:int,name:string,nested:struct<field1:string,nst:struct<field1:string,field2:string>>>

Files Deleted

The optimized plan

Project [id#1, name#2, __hs_nested.nested.nst.field2#3]
+- Filter (isnotnull(__hs_nested.nested.nst.field1#0) && (__hs_nested.nested.nst.field1#0 = wa1))
   +- Project [__hs_nested.nested.nst.field1#0, id#1, name#2, __hs_nested.nested.nst.field2#3]
      +- Filter NOT (_data_file_id#368L = 3)
         +- Relation[__hs_nested.nested.nst.field1#0,id#1,name#2,__hs_nested.nested.nst.field2#3,_data_file_id#368L] 
            Hyperspace(Type: CI, Name: indexWithLineage, LogVersion: 1)

The Spark plan

Project [id#1, name#2, __hs_nested.nested.nst.field2#3]
+- Filter (isnotnull(__hs_nested.nested.nst.field1#0) && (__hs_nested.nested.nst.field1#0 = wa1))
   +- Project [__hs_nested.nested.nst.field1#0, id#1, name#2, __hs_nested.nested.nst.field2#3]
      +- Filter NOT (_data_file_id#368L = 3)
         +- FileScan parquet [__hs_nested.nested.nst.field1#0,id#1,name#2,__hs_nested.nested.nst.field2#3] 
            Batched: false, Format: Parquet, 
            Location: InMemoryFileIndex[file:/..../spark_warehouse/indexes/idx_nested/v__=0], 
            PartitionFilters: [], PushedFilters: [IsNotNull(__hs_nested.nested.nst.field1)], ReadSchema: 
            struct<__hs_nested.nested.nst.field1:string,id:int,name:string,__hs_nested.nested.nst.field2:string>

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  • Existing unit and integration tests for not breaking the existing functionalities.
  • Unit and integration test added for the new functionalities.

andrei-ionescu avatar Mar 11 '21 13:03 andrei-ionescu

@sezruby, @imback82 Here is the 3rd PR that completes the support of nested fields in filters.

andrei-ionescu avatar Mar 11 '21 13:03 andrei-ionescu