parquet-testing icon indicating copy to clipboard operation
parquet-testing copied to clipboard

Add example Variant data

Open alamb opened this issue 9 months ago • 7 comments

Use Case (What are you trying to do?)

We are trying to organize the implementation of Variant the Rust implementation of parquet and arrow:

  • https://github.com/apache/arrow-rs/issues/6736

We would like to make sure the Rust implementation is compatible with other implementations (that seem mostly JVM / spark focused at the moment).

From what I can tell, the JVM based implementations are tested by verifing round tripped to and from JSON. For example, the ParquetVariantShreddingSuite: https://github.com/apache/spark/blob/418cfd1f78014698ac4baac21156341a11b771b3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala#L30

There are several limitations with this approach:

  1. it doesn't ensure compatibility across language implementations (it only ensures consistency between reader/writer)
  2. VARIANTs have more types than JSON (e.g. timestamps, etc) so using JSON limits the range of types testable

What do I want

I would like example data in the parquet-testing repository that contains:

  1. Example binary variant data (e.g. metadata and data fields)
  2. A parquet file with a column that stores variant data (but does not "shred" any of the columns)
  3. A parquet file with the same data as 2, but that stores several of the columns "shredded" (aka some of the fields in their own column, as described in 'VariantShredding' when storing in parquet files

Each of the above should have

  1. some sort of human interpretable description of the encoded values to help verify comparisons (e.g. text, markdown or json)
  2. Cover the variant scalar types
  3. Cover the variant nested types (struct, etc)

I recommend keeping the scalar and nested types in separate files / columns to make it easier to incrementally implement variant support (starting with non nested types and then nested types)

Having the above data would permit other parquet implementations to start with a reader that can handle the basic types and then move on to more complex parts (like nested types and shredding). This is similar to how alltypes_plain.parquet is used today.

Suggestions

@cashmand David Cashman suggests on the Parquet Dev list: https://lists.apache.org/thread/22dvcnm7v5d30slzc3hp8d9qq8syj1dq

Hi Andrew, you should be able to create shredded files using OSS Spark 4.0. I think the only issue is that it doesn't have the logical type annotation yet, so readers wouldn't be able to distinguish it from a non-variant struct that happens to have the same schema. (Spark is able to infer that it is a Variant from the org.apache.spark.sql.parquet.row.metadata metadata.)

The ParquetVariantShreddingSuite in Spark has some tests that write and read shredded parquet files. Below is an example that translates the first test into code that runs in spark-shell and writes a Parquet file. The shredding schema is set via conf. If you want to test types that Spark doesn't infer in parse_json (e.g. timestamp, binary), you can use to_variant_object to cast structured values to Variant.

I won't have time to work on this in the next couple of weeks, but am happy to answer any questions.

Thanks, David

scala> import org.apache.spark.sql.internal.SQLConf
scala> spark.conf.set(SQLConf.VARIANT_WRITE_SHREDDING_ENABLED.key, true)
scala> spark.conf.set(SQLConf.VARIANT_ALLOW_READING_SHREDDED.key, true)
scala> spark.conf.set(SQLConf.VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST.key,
"a int, b string, c decimal(15, 1)")
scala> val df = spark.sql(
     |       """
     |         | select case
     |         | when id = 0 then parse_json('{"a": 1, "b": "2", "c":
3.3, "d": 4.4}')
     |         | when id = 1 then parse_json('{"a": [1,2,3], "b":
"hello", "c": {"x": 0}}')
     |         | when id = 2 then parse_json('{"A": 1, "c": 1.23}')
     |         | end v from range(3)
     |         |""".stripMargin)
scala> df.write.mode("overwrite").parquet("/tmp/shredded_test")
scala> spark.read.parquet("/tmp/shredded_test").show
+--------------------+
|                   v|
+--------------------+
|{"a":1,"b":"2","c...|
|{"a":[1,2,3],"b":...|
|    {"A":1,"c":1.23}|
+--------------------+

alamb avatar Apr 07 '25 10:04 alamb

I'm attaching two simple test files for references. I named them .txt since I can't attach *.parquet directly.

primitive.parquet.txt The file contains id integer and var variant with variant being integer 34 and unshredded. Schema:

message table {
  required int32 id;
  required group var {
    required binary metadata;
    required binary value;
  }
}

array_simple.parquet.txt The file contains id integer and var variant with variant being array ['comedy', 'drama'] and shredded.

Schema:

message table {
  required int32 id;
  optional group var {
    required binary metadata;
    optional binary value;
    optional group typed_value {
      required group a {
        optional binary value;
        optional group typed_value (LIST) {
          repeated group list {
            required group element {
              optional binary value;
              optional binary typed_value (STRING);
            }
          }
        }
      }
    }
  }
}

aihuaxu avatar Apr 07 '25 20:04 aihuaxu

Thanks @aihuaxu !

Can you share how you made the files or a description of what is contained in each (so we know what we are looking for?)

alamb avatar Apr 07 '25 20:04 alamb

Update from the mailing list:

I captured them when working on the Iceberg tests in https://github.com/apache/iceberg/blob/main/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java#L172 .

You can change to OutputFile outputFile = Files.localOutput("primitive.parquet"); to capture them, but you probably can follow what David mentioned.

alamb avatar Apr 08 '25 09:04 alamb

I looked into primitive.parquet.txt and array_simple.parquet.txt from @aihuaxu above ❤

primitive.parquet:

CREATE TEMPORARY VIEW parquetTable
USING org.apache.spark.sql.parquet
OPTIONS (
  path "/Users/andrewlamb/Downloads/primitive.parquet"
);
DESCRIBE parquetTable;
SELECT * from parquetTable

Yields this output (somehow spark doesn't see the file as variant 🤔 )

spark-sql (default)> DESCRIBE parquetTable;
id                  	int
var                 	struct<metadata:binary,value:binary>
Time taken: 0.051 seconds, Fetched 2 row(s)
spark-sql (default)> SELECT * from parquetTable;
1	{"metadata":,"value":
                             "}
Time taken: 0.066 seconds, Fetched 1 row(s)

array_simple.parquet:

CREATE TEMPORARY VIEW parquetTable
USING org.apache.spark.sql.parquet
OPTIONS (
  path "/Users/andrewlamb/Downloads/array_simple.parquet"
);
DESCRIBE parquetTable;
SELECT * from parquetTable
spark-sql (default)> DESCRIBE parquetTable;
id                  	int
var                 	struct<metadata:binary,value:binary,typed_value:struct<a:struct<value:binary,typed_value:array<struct<value:binary,typed_value:string>>>>>
Time taken: 0.108 seconds, Fetched 2 row(s)
spark-sql (default)> SELECT * from parquetTable
                   > ;
1	{"metadata":abcde,"value":null,"typed_value":{"a":{"value":null,"typed_value":[{"value":null,"typed_value":"comedy"},{"value":null,"typed_value":"drama"}]}}}
Time taken: 0.313 seconds, Fetched 1 row(s)

alamb avatar Apr 13 '25 19:04 alamb

I played around with variant in spark 4.0 preview a bit today and figured out how to generate variant columns:

Here is an example of how to make variant columns

-- Run in spark 4.0 preview
--
-- Remove local catalog first
-- rm -rf spark-warehouse/

DROP TABLE IF EXISTS T;
CREATE TABLE T (id INT, variant_col VARIANT);
INSERT INTO T VALUES (1, parse_json('{"foo": "bar", "baz": 42}'));
INSERT INTO T VALUES (2, parse_json('{"baz": 32}'));

Generated Parquet File:

The variant_col is stored in parquet as a Struct with two fields:

  • value: Binary
  • metadata: Binary
> describe 'part-00000-c13d3cac-027c-4ffc-acdd-c5ba41e2f6b7-c000.snappy.parquet';
+-------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
| column_name | data_type                                                                                                                                                                                                                                 | is_nullable |
+-------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
| id          | Int32                                                                                                                                                                                                                                     | NO          |
| variant_col | Struct([Field { name: "value", data_type: Binary, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "metadata", data_type: Binary, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]) | NO          |
+-------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
2 row(s) fetched.
Elapsed 0.003 seconds.

BTW here is how to access variant fields (using try_variant_get):

-- https://docs.databricks.com/aws/en/sql/language-manual/functions/try_variant_get
SELECT try_variant_get(variant_col, '$.foo') from T;
SELECT try_variant_get(variant_col, '$.foo', 'string') from T;
SELECT try_variant_get(variant_col, '$.foo', 'timestamp') from T;
SELECT try_variant_get(variant_col, '$.baz', 'timestamp') from T;

alamb avatar Apr 13 '25 19:04 alamb

So my plan is to create a directory of variant values, and for each value, provide 3 files:

  1. name.metadata -- the binary contents of the metadata field
  2. name.value -- the binary contents of the value field
  3. name.json -- the equivalent JSON (as much as possible)

Example values, covering the values in VariantEncoding.md:

There should be at least 4 sets:

  1. primitive_types/<type> -- examples of each 21 types in https://github.com/apache/parquet-format/blob/master/VariantEncoding.md#value-data-for-primitive-type-basic_type0
  2. short_string -- example of short string (less than 2*64 bytes) https://github.com/apache/parquet-format/blob/master/VariantEncoding.md#value-header-for-short-string-basic_type1
  3. object_primitive -- an object with 21 fields, one for each primitive type
  4. array_primitive -- an array of 10 strings (maybe also strings
  5. object_nested -- an object with several primitive type fields, and a field with a nested object (with primitives and another object) and an array of primitives
  6. array_nested -- an array of objects

Other examples that are probably needed:

  1. object with 300 fields (more than 2^8 = 256) fields (requires a more than 1 byte num_elements)
  2. object with 66,000 fields (more than 2^16 = 65,536 fields) requires a 3 byte field id / offset
  3. object with 17M fields (more than 2^25 = 16,777,216 fields) requires a 4 byte field id / offset

I am a little worried about 2 and 3 as the objects will be non trivial in size -- they may need to be gzipped or something to put them into the object

alamb avatar Apr 13 '25 20:04 alamb

I have created a proposed PR with examples of the binary encodings here

  • https://github.com/apache/parquet-testing/pull/76

Once we get that in, I will move on to making a PR that has actual example parquet files with variant -- both shredded and not.

alamb avatar Apr 16 '25 14:04 alamb

I think https://github.com/apache/parquet-testing/pull/76 is close to mergable (thank you @emkornfield for the review).

However, As @neilechao pointed out on a recent Parquet call, even after https://github.com/apache/parquet-testing/pull/76 there are still no example parquet files that have variant values. As described above, it is important to have actual parquet files too.

The reason I didn't add parquet files is that I could not figure out how to create a properly annotated parquet file with Apache Spark. Perhaps we could use parquet-cpp library now that https://github.com/apache/arrow/pull/45375 (also from @neilechao) has merged 🤔

What I have tried so far

I tried to get Spark to write a Parquet file with correct annotations using the regen.py script from https://github.com/apache/parquet-testing/pull/76:

spark.sql("SELECT * FROM output").repartition(1).write.parquet('variant.parquet')

This results in this file (needed .txt extension to upload to github): part-00000-6df14c7c-3b09-4678-b311-0ac6199a7857-c000.snappy.parquet.txt

This file's schema doesn't have the Variant logical annotation (source) which we can see from:

For example:

parquet-dump-schema  variant.parquet/part-00000-855cbfbf-1e1b-4557-87bb-b6e83aa5fb9c-c000.snappy.parquet

required group field_id=-1 spark_schema {
  optional binary field_id=-1 name (String);
  optional group field_id=-1 variant_col {     <-- this field should have the Variant logical type annotation
    required binary field_id=-1 value;
    required binary field_id=-1 metadata;
  }
  optional binary field_id=-1 json_col (String);
}

It DOES have some spark specific metadata, which I think is how Spark detects that the column contains a variant:

Key: org.apache.spark.sql.parquet.row.metadata Value :

 {
 "type":"struct",
 "fields":[
    {"name":"name","type":"string","nullable":true,"metadata":{"__CHAR_VARCHAR_TYPE_STRING":"varchar(2000)"}}, 
    {"name":"variant_col","type":"variant","nullable":true,"metadata":{}},
    {"name":"json_col","type":"string","nullable":true,"metadata":{}}
  ]
}

alamb avatar May 02 '25 18:05 alamb

In case it isn't obvious I would like introducing Variant into Parquet to be a model of how to extend the spec and get wide adoption across the ecosystem

alamb avatar May 02 '25 18:05 alamb

@rdblue mentioned today on the parquet sync that the parquet-mr code has a test suite that verifies that for a bunch of variant values, they can be round tripped (written into and read from) a parquet file using a particular shredding scheme

Here is a potentially related PRs ( think this contains the tests that Ryan mentioned on the call)

  • https://github.com/apache/parquet-java/pull/3221

It looks like there are several example Variant values that are programmatically constructed in parquet-avro/src/test/java/org/apache/parquet/avro/TestWriteVariant.java

It would be great to add some actual parquet files in parquet-testing with parquet files with these shredded values (to verify that other readers can read shredded parquet files).

alamb avatar Jun 25 '25 17:06 alamb

There are some example Variant files in this duckdb PR

  • https://github.com/duckdb/duckdb/pull/18224

That were supposedly made by the Apache Iceberg test suite somehow

Perhaps we can (re)use those or that method for parquet-testing

alamb avatar Jul 23 '25 09:07 alamb

Here's a PR with Iceberg's test cases that you can use for validation: https://github.com/apache/parquet-testing/pull/90

rdblue avatar Jul 24 '25 15:07 rdblue

Thank you @rdblue -- in my opinion once we merge #90 we can close this ticket as well

alamb avatar Aug 07 '25 18:08 alamb

I think all the major work here is done (there are some nice-to-have s left over, but I think the important ones are merged)

alamb avatar Sep 08 '25 17:09 alamb