iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

Arrow: Fix indexing in Parquet dictionary encoded values readers

Open wypoon opened this issue 1 year ago • 8 comments

This fixes https://github.com/apache/iceberg/issues/11221.

There is a bug in VectorizedDictionaryEncodedParquetValuesReader.BaseDictEncodedReader::nextBatch where nextVal of the BaseDictEncodedReader subclass is called with the incorrect index for certain subclasses (in particular, for FixedSizeBinaryDictEncodedReader), leading to the value being set at the incorrect index in the FieldVector that is used to hold the values. E.g., for a Decimal column that requires 16 bytes to store, the values are stored in 16-byte fixed length byte arrays and the typewidth is 16. FixedSizeBinaryDictEncodedReader::nextVal is called with index 0, 16, 32, 48, etc instead of 0, 1, 2, 3, etc. The fix is to not premultiply the index by the typewidth before calling nextVal, and instead, in each nextVal method, to account for the typewidth as appropriate.

A test is included that fails without the fix and passes with it.

wypoon avatar Oct 02 '24 04:10 wypoon

The bug occurs when reading a Parquet column chunk with multiple pages where some but not all of the pages are dictionary encoded. In particular, Impala tries to use dictionary encoding where possible when writing Parquet data, and when the number of values in the dictionary exceeds 40,000, it then switches to plain encoding. The data file uploaded in https://github.com/apache/iceberg/issues/11221 is such a data file written by Impala. I wanted to write data programmatically from the test, and I tried to get the Java (parquet-mr) Parquet writer to write such data, but I was not able to. With the v1 writer, I was not able to get it to write dictionary encoded data at all for decimal values stored as fixed length byte arrays. With the v2 writer, I was able to get it to write some pages with RLE dictionary encoding and some not dictionary encoded, but those other pages use a delta encoding, which Iceberg does not support (and thus I cannot use it to reproduce the bug). Eventually, I used pyarrow's C++ writer to write a small data file (less than 4 KB) with 2 pages, each with 200 rows, one RLE dictionary encoded and one plain encoded. I think this is acceptably small to be checked in.

wypoon avatar Oct 05 '24 20:10 wypoon

CI failed with

Execution failed for task ':iceberg-flink:iceberg-flink-1.20:compileJmhJava'.
> Java heap space

I have seen such Java heap space failure before in the CI. It is unrelated to my change.

wypoon avatar Oct 05 '24 20:10 wypoon

@nastra @rymurr the bug was introduced in https://github.com/apache/iceberg/commit/2842f0b5e4934f9a102a5298536efaaff0af6b45 (https://github.com/apache/iceberg/pull/2746). Assuming the code was correct before that refactoring, an analysis of the refactoring will show that my change is correct. Before that refactoring, there was a lot of duplicated code that began

    int left = numValuesToRead;
    int idx = startOffset;
    while (left > 0) {
      if (this.currentCount == 0) {
        this.readNextGroup();
      }
      int num = Math.min(left, this.currentCount);

and then, in many (but not all) cases, calls of the form vector.getDataBuffer().setXXX(idx * typeWidth, ...) are made. For those, we want to use idx * typeWidth. However, there are cases where we want to use idx in setting the value in the vector. E.g., in readBatchOfDictionaryEncodedFixedSizeBinary, we do

           ((FixedSizeBinaryVector) vector).set(idx, vectorBytes);

What the refactoring did was to use index = idx * typeWidth and call overridden nextVal methods with that index, so that in this case, we end up doing ((FixedSizeBinaryVector) vector).set(index, vectorBytes);.

My change is to not premultiply idx by typeWidth before making the nextVal calls and in the nextVal calls, to account for the typeWidth (which is also passed to nextVal anyway).

cc @rdblue

wypoon avatar Oct 05 '24 20:10 wypoon

@nastra thank you for reviewing.

I have added a test that exercises VectorizedDictionaryEncodedParquetValuesReader.VarWidthBinaryDictEncodedReader::nextVal. This is a test that actually passes both before and after the fix, for this is a case where typeWidth == -1.

The int backed, long backed, and fixed length decimal readers are all dead (never called) code. I'll put up a separate PR to remove the dead code.

wypoon avatar Oct 07 '24 23:10 wypoon

@nastra please see https://github.com/apache/iceberg/pull/11276 for the dead code removal.

wypoon avatar Oct 08 '24 01:10 wypoon

@amogh-jahagirdar I see you that you plan to review; can you please do so?

wypoon avatar Oct 16 '24 18:10 wypoon

@wypoon Sure, I will take a look tomorrow!

amogh-jahagirdar avatar Oct 17 '24 04:10 amogh-jahagirdar

@amogh-jahagirdar to recap:

This change fixes a bug introduced by a refactoring (see comment). An analysis of the refactor will show the correctness of this change. This is where your second pair of eyes would be helpful.

The fix affects the readers in VectorizedDictionaryEncodedParquetValuesReader. On the face of it, (with this change) the logic appears to have changed for a number of readers: FixedLengthDecimalDictEncodedReader, VarWidthBinaryDictEncodedReader, IntBackedDecimalDictEncodedReader, LongBackedDecimalDictEncodedReader, and FixedSizeBinaryDictEncodedReader.

FixedLengthDecimalDictEncodedReader, IntBackedDecimalDictEncodedReader and LongBackedDecimalDictEncodedReader are dead code and are removed in https://github.com/apache/iceberg/pull/11276. That leaves VarWidthBinaryDictEncodedReader and FixedSizeBinaryDictEncodedReader. I added tests for these two. In actuality, the logic has not changed for VarWidthBinaryDictEncodedReader because this is the case covered before the change by

          int index = idx * typeWidth;
          if (typeWidth == -1) {
            index = idx;
          }

The readers in VectorizedDictionaryEncodedParquetValuesReader are only called when dictionaryDecodeMode in VectorizedPageIterator is EAGER (which means decode the dictionary encoded values and set the decoded values in the Arrow vector used to store values for creating the Spark ColumnarBatch), and the DictionaryDecodeMode is EAGER if and only if some page in the column chunk is dictionary encoded but not all pages are. I have explained in this thread why I am not able to write such a Parquet data file programmatically from Java for the fixed length byte array type.

wypoon avatar Oct 21 '24 00:10 wypoon

@RussellSpitzer thanks for merging this.

We need to drop the dead code from our vectorized read path https://github.com/apache/iceberg/issues/11370

FYI, this is already done in https://github.com/apache/iceberg/pull/11276 (I mentioned it in my recap).

wypoon avatar Oct 21 '24 21:10 wypoon