hyperspace icon indicating copy to clipboard operation
hyperspace copied to clipboard

[WIP] Caching index data

Open sezruby opened this issue 5 years ago • 1 comments

What is the context for this pull request?

  • Tracking Issue: TBD
  • Parent Issue: TBD
  • Dependencies: TBD

What changes were proposed in this pull request?

Prototyping index cache.

Spark's cacheManager checks and apply cached data before the optimizer, so after replacing the source plan as cached data, index cannot be applied.

Query result utilizing index data can be cached if hyperspace is enabled at caching time.

  • only "applied" index data can be cached - with specific filter conditions and/or project columns; it will be rarely reused with different conditions / project columns. If hyperspace is disabled or there's no candidate index for the plan, cached data won't utilize the index data.
  • Once {query plan, result} is cached, it won't be applied any of indexes even if there is newly built index.

Therefore, this PR introduces new APIs to cache index data.

// withBucketSpec; true for Join index (removing shuffle), false for Filter index (enhancing parallelism)
hs.cache("indexName", withBucketSpec=true/false) 
hs.uncache("indexName", withBucketSpec=true/false)

These APIs still uses Spark's cache manager which is scalable and also can retain outputPartitioning (bucketing) - ref. This PR uses the default config (MEMORY + DISK) option; https://spark.apache.org/docs/2.4.0/rdd-programming-guide.html#rdd-persistence

The cache key is the replaced logical relation of index data, and the cached data will be whole index data without any project/filter condition so that the cached index can be reused for different filter conditions and columns.

  • As the logical plan is the key of caching, different bucket spec when loading index data can result in caching the same index data twice.

Does this PR introduce any user-facing change?

Yes

Example)

The index here is called productIndex. Once you cache the index, you will notice that sparkPlan below shows InMemoryRelation indicating that the plan is reading the index from memory.

hyperspace.cache("productIndex", false)
Caching index: Relation[name#426,qty#427,date#428] parquet

spark.enableHyperspace
res64: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@144ab639

val query = testDF.filter("name = 'banana'")
query: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, qty: int ... 1 more field]

query.queryExecution.sparkPlan
res65: org.apache.spark.sql.execution.SparkPlan =
Filter (isnotnull(name#133) && (name#133 = banana))
+- InMemoryTableScan [name#133, qty#134, date#135], [isnotnull(name#133), (name#133 = banana)]
      +- InMemoryRelation [name#133, qty#134, date#135], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *(1) Project [name#336, qty#337, date#338]
               +- *(1) FileScan parquet [name#336,date#338,qty#337] Batched: true, Format: Parquet, Location: InMemoryFileIndex[<PATH_TO_INDEX>..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<name:string,date:string,qty:int>

We could now uncache and as expected, sparkPlan will still utilize the index (since Hyperspace is still enabled) but will read it from remote store.

hyperspace.uncache("productIndex", false)
Uncaching index: Relation[name#426,qty#427,date#428] parquet

val query = testDF.filter("name = 'banana'")
query: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, qty: int ... 1 more field]

query.queryExecution.sparkPlan
res68: org.apache.spark.sql.execution.SparkPlan =
Project [name#133, qty#134, date#135]
+- Filter (isnotnull(name#133) && (name#133 = banana))
   +- FileScan parquet [name#133,date#135,qty#134] Batched: true, Format: Parquet, Location: InMemoryFileIndex[PATH_TO_INDEX..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct<name:string,date:string,qty:int>

How was this patch tested?

sezruby avatar Nov 16 '20 02:11 sezruby

Thank you! Can you add some example plans for clarity?

rapoth avatar Nov 20 '20 01:11 rapoth