hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[HUDI-7246] Fix Data Skipping Issue: No Results When Query Conditions Involve Both Columns with and without Column Stats

Open majian1998 opened this issue 2 years ago • 2 comments

In the current code version, support for column stats has not yet been extended to handle complex nested data types, such as map-type data structures. Take the table tbl as an example, which is defined with three fields: an integer field id, a string field name, and a map-type field attributes. Within this table structure, the id and name fields support column stats, and as such, HUDI will generate the corresponding column stats indices for these two fields at the time of table creation. However, no corresponding index will be generated for the attributes field. The specific table creation statement is as follows:

create table tbl (
  id int,
  name string,
  attributes map<string, string>
) ...

To elaborate further, consider the following insert operation:

insert into tbl values
(1, 'a1', map('color', 'red', 'size', 'M')),
(2, 'a2', map('color', 'blue', 'size', 'L'));

After the execution of the insert, the content of the column stats should be as follows:

a.parquet   id     min: 1   max: 1   null: 0
b.parquet   id     min: 2   max: 2   null: 0
a.parquet   name   min: 'a1' max: 'a1' null: 0
b.parquet   name   min: 'a2' max: 'a2' null: 0

This means that there is no column stats index for the attributes column. Based on the table tbl, when we execute a query:

Queries containing both columns supported and not supported by column stats

Let's consider a query adjusted to: select * from tbl where attributes.color = 'red' and name = 'a1' At this point, queryReferencedColumns includes attributes and name, and the queryFilters are as follows.

isnotnull(attributes#95)
isnotnull(name#94)
(attributes#95[color] = red)
(name#94 = a1)

The content of transposedColStatsDF is shown below:

+--------------------+----------+-------------------+-------------------+--------------------+-------------+-------------+--------------+
|            fileName|valueCount|attributes_minValue|attributes_maxValue|attributes_nullCount|name_minValue|name_maxValue|name_nullCount|
+--------------------+----------+-------------------+-------------------+--------------------+-------------+-------------+--------------+
|3363b50e-ca25-4fb...|         1|               null|               null|                   1|           a2|           a2|             0|
|876af2ed-d529-4df...|         1|               null|               null|                   1|           a1|           a1|             0|
+--------------------+----------+-------------------+-------------------+--------------------+-------------+-------------+--------------+

This implies that targetIndexedColumns is the intersection of targetColumnNames and indexedColumns, and indexedColumns, by default, includes all the schema fields of the table (tableSchema.fieldNames.toSet) when we haven't explicitly configured a column stats list. This includes the Map type attributes, and targetColumnNames includes columns relevant to the query, so the intersection still results in targetColumnNames and name. Returning to the code for transposedRows, because there are no corresponding column stats for attributes, rows where min=null, max=null, and null_count=valueCount are added to avoid errors during subsequent where filtering. This is the origin of attributes_minValue, attributes_maxValue, and attributes_nullCount in the resulting DataFrame. Let's revisit getCandidateFiles and take a closer look at the assembly of indexFilter. The queryFilters mentioned earlier are processed in translateIntoColumnStatsIndexFilterExpr, where expressions are transformed, defaulting to True if they cannot be converted. We can skip over name since it's a string type and can be converted directly. For map types, there are two expressions: isnotnull(attributes#95) This hits the case:

case IsNotNull(attribute: AttributeReference) =>
  getTargetIndexedColumnName(attribute, indexSchema)
    .map(colName => LessThan(genColNumNullsExpr(colName), genColValueCountExpr))

The corresponding value is 'attributes_nullCount < 'valueCount. However, EqualTo expressions like (attributes#95[color] = red) should, theoretically, match equalTo cases but actually do not since AllowedTransformationExpression does not support the getMapValue type, resulting in a None and no match. Ultimately, nothing matches, resulting in None, which means the expression for attributes ends up being 'attributes_nullCount < 'valueCount And True. Herein lies a problem: acc ++= Seq(null, null, valueCount) The null_count for attributes always equals valueCount, so the expression 'attributes_nullCount < 'valueCount will always be False. As a result, prunedCandidateFileNames will always be empty. However, since indexDf does have value, allIndexedFileNames will not be empty, leading notIndexedFileNames to be empty. Consequently, the end result is that getCandidateFiles will always return an empty set! This presents a significant issue because the indexFilter effectively fails to filter any files based on the attributes column, due to the fact that its null_count is always set to match valueCount, rendering the condition perpetually false.

Problem Summary

Under the existing code implementation, when data skipping is enabled and the index columns have not been explicitly specified in the parameters, if the query conditions include both columns that are supported by column stats and those that are not, no data will ever be retrieved!

Problem Fix

We can see that the variables directly causing the issue are indexDf and indexFilter:

  • indexDf includes unsupported columns, filled with inappropriate values.
  • indexFilter also contains query conditions not supported by the index. And indexDf is filled with inappropriate values to adapt to these conditions.
  • Moreover, the unsupported query conditions in indexFilter are obtained from filtering the schema derived from indexDf.

A more straightforward fix would be to set the null_count for non-existent columns directly to null (refer to [HUDI-4250][HUDI-4202]), and incorporate this check into the isNotNull evaluation. This way, there would be no erroneous filtering leading to exceptions. However, I'm not sure what issues were encountered with null before? [HUDI-5557] removed this scenario, but setting it to value_count could also lead to errors. I would like to inquire if the issues with null and value_count are similar? @rfyu @alexeykudinkin

Change Logs

When data skipping, filter the columns.

Impact

None

Risk level (write none, low medium or high below)

medium

Documentation Update

None

Contributor's checklist

  • [ ] Read through contributor's guide
  • [ ] Change Logs and Impact were stated clearly
  • [ ] Adequate tests were added if applicable
  • [ ] CI passed

majian1998 avatar Dec 21 '23 06:12 majian1998

@hudi-bot run azure

majian1998 avatar Dec 27 '23 08:12 majian1998

And another suggestion: may be we can make csi filter expr compose logical more clear. Something like:

case IsNotNull(attribute: AttributeReference) =>
        getTargetIndexedColumnName(attribute, indexSchema)
          .map(colName => Or(
            LessThan(genColNumNullsExpr(colName), genColValueCountExpr),
            IsNull(genColNumNullsExpr(colName))
          ))

the code you fix, is genColValueCountExpr will be null, look like expression LessThan(leftExpr, rightExpr) will work well, but we can not expected the result if we compare a value with null, and the expr is from spark, so, I think is the following code is be better?

case IsNotNull(attribute: AttributeReference) =>
        getTargetIndexedColumnName(attribute, indexSchema)
          .map {colName =>
            val numNullExpr = genColNumNullsExpr(colName)
            val valueCountExpr = genColValueCountExpr
            Or(Or(IsNull(numNullExpr), IsNull(valueCountExpr)), LessThan(numNullExpr, valueCountExpr))
          }

Based on this question, other expr like EqualTo, Not, In .... all can make logical more clear

KnightChess avatar Dec 29 '23 03:12 KnightChess

@KnightChess: Your suggestions are quite helpful! I will address these issues. However, regarding the modifications to expressions, I believe we could make the necessary format adjustments within the isNotNull function. As for the other expressions, might it be better to keep them as they are? Excessive untested changes could introduce new, unforeseen issues, especially since we are not yet certain about the desired behavior when other expressions encounter null values. Let's ensure our modifications are minimal, affecting only what's necessary to solve the problem at hand.

majian1998 avatar Dec 29 '23 03:12 majian1998

@majian1998 agree, we can open another pr to make logical clear.

KnightChess avatar Dec 29 '23 03:12 KnightChess

@danny0405: I've revised the annotations to more accurately convey their meaning. In essence, the column stats is set to null if they do not exist and configures the index filter to be true for this column in such scenarios. It will not have any additional impact. Could you please review it again and see if there are any further suggestions?

majian1998 avatar Jan 04 '24 05:01 majian1998

Can we exclude the filters for complex data types during the column stats data skipping? And we keep the filters after that.

danny0405 avatar Jan 12 '24 10:01 danny0405

@danny0405: Actually, my initial fix was like this (skipping the complex data types). But such a fix might not have addressed the root cause of the problem—the conflict between obtaining the index schema and constructing the index filter. This change solves that logical conflict.

majian1998 avatar Jan 16 '24 09:01 majian1998

Looks good to me, just take care of the test failures.

danny0405 avatar Jan 17 '24 03:01 danny0405

@hudi-bot run azure

stream2000 avatar Jan 17 '24 09:01 stream2000

@hudi-bot run azure

majian1998 avatar Jan 18 '24 03:01 majian1998

CI report:

  • 248df7c04d611c5f521f309732aa21351161fa8b UNKNOWN
  • 9f4774b8e838237a44003381a56affe67d2a26b0 Azure: SUCCESS
Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

hudi-bot avatar Jan 18 '24 12:01 hudi-bot

Merge it as ci is green.

stream2000 avatar Jan 18 '24 12:01 stream2000