columnify icon indicating copy to clipboard operation
columnify copied to clipboard

#45 Implement arrow -> json writer

Open syucream opened this issue 5 years ago • 15 comments

https://github.com/reproio/columnify/issues/45

TODO

  • [x] prototyping for PoC: (various inputs) -> map's -> arrow -> map's -> json -> parquet
  • [x] remove parquet writing side Go intermediates: (various inputs) -> map's -> arrow -> json -> parquet
  • [x] remove input side Go intermediates: (various inputs) -> arrow -> json -> parquet
  • [x] performance tests
  • [x] record type validations https://github.com/reproio/columnify/issues/27
  • [x] reporting benchmark results
  • [ ] check if we have any tuning points
  • [ ] finalize this changes (some refactorings, tests, docs)

syucream avatar Jul 14 '20 15:07 syucream

Oh I missed existing arrjson might support arrow -> json conversion for this part arrow -> map's -> json https://github.com/apache/arrow/tree/master/go/arrow/internal/arrjson

It's actually an internal package but reusable for this use case I guess.

syucream avatar Jul 21 '20 16:07 syucream

I finally examine a mem pprof result. It show a lower usage than the current version's ( https://github.com/reproio/columnify/issues/44#issuecomment-654917629 ) The reduction effect is 543.88MB -> 97.45MB, 18% sized! But I'm not sure if there's any other high memory consumer ...

$ ./columnify -schemaType avro -schemaFile examples/primitives.avsc -recordType avro tmp.avro > /dev/null
$ go tool pprof /tmp/columnify.mem.prof
Type: inuse_space
Time: Jul 26, 2020 at 11:18pm (JST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 97.45MB, 100% of 97.45MB total
Showing top 10 nodes out of 47
      flat  flat%   sum%        cum   cum%
   96.08MB 98.60% 98.60%    96.08MB 98.60%  github.com/apache/arrow/go/arrow/memory.(*GoAllocator).Allocate (inline)
    1.37MB  1.40%   100%     1.37MB  1.40%  github.com/klauspost/compress/zstd.encoderOptions.encoder
         0     0%   100%    72.60MB 74.50%  github.com/apache/arrow/go/arrow/array.(*BinaryBuilder).Append
         0     0%   100%     8.59MB  8.81%  github.com/apache/arrow/go/arrow/array.(*BinaryBuilder).Reserve
         0     0%   100%     8.59MB  8.81%  github.com/apache/arrow/go/arrow/array.(*BinaryBuilder).Resize
         0     0%   100%     0.57MB  0.58%  github.com/apache/arrow/go/arrow/array.(*BooleanBuilder).Append
         0     0%   100%     0.57MB  0.58%  github.com/apache/arrow/go/arrow/array.(*BooleanBuilder).Reserve
         0     0%   100%     0.57MB  0.58%  github.com/apache/arrow/go/arrow/array.(*BooleanBuilder).Resize
         0     0%   100%     3.82MB  3.92%  github.com/apache/arrow/go/arrow/array.(*Float32Builder).NewArray
         0     0%   100%     3.82MB  3.92%  github.com/apache/arrow/go/arrow/array.(*Float32Builder).NewFloat32Array

syucream avatar Jul 26 '20 14:07 syucream

To resolve https://github.com/reproio/columnify/issues/27, we need https://github.com/reproio/columnify/issues/49 ... 😭 . I created an another pullreq https://github.com/reproio/columnify/pull/50 to aim that.

syucream avatar Jul 26 '20 15:07 syucream

Here's a quick performance test.

I gave the below dummy input Avro file.

$ java -jar ~/tools/avro-tools-1.8.2.jar random --schema-file examples/primitives.avsc --count 1000000 tmp.avro
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
test.seed=1595854640329
$ ls -lh tmp.avro
-rw-r--r-- 1 ryo staff 80M  7 27 21:57 tmp.avro

With the current version(0.0.3):

$ time ./columnify -schemaType avro -schemaFile examples/primitives.avsc -recordType avro tmp.avro > /dev/null
./columnify -schemaType avro -schemaFile examples/primitives.avsc -recordType  41.95s user 1.05s system 132% cpu 32.354 total

With the latest(this pullreq) version:

$ time ./columnify -schemaType avro -schemaFile examples/primitives.avsc -recordType avro tmp.avro > /dev/null
./columnify -schemaType avro -schemaFile examples/primitives.avsc -recordType  62.20s user 2.54s system 137% cpu 47.068 total

The elapsed time increased by 1.5x ... The latest version's result contains (inputs) -> map -> arrow additional conversion, so it's not so strange and we possibly reduce the time if we remove (inputs) -> map redundant conversion layer.

syucream avatar Jul 27 '20 13:07 syucream

record type validations #27

Finally supported! If we have this schema:

$ cat columnifier/testdata/schema/primitives.avsc
{
  "type": "record",
  "name": "Primitives",
  "fields" : [
    {"name": "boolean", "type": "boolean"},
    {"name": "int",     "type": "int"},
    {"name": "long",    "type": "long"},
    {"name": "float",   "type": "float"},
    {"name": "double",  "type": "double"},
    {"name": "bytes",   "type": "bytes"},
    {"name": "string",  "type": "string"}
  ]
}

And these record values. It partially matches field names and values but some values are null which's not allowed by the schema:

$ java -jar ~/tools/avro-tools-1.8.2.jar getschema columnifier/testdata/record/nullables.avro
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
{
  "type" : "record",
  "name" : "Nullables",
  "fields" : [ {
    "name" : "boolean",
    "type" : [ "null", "boolean" ],
    "default" : null
  }, {
    "name" : "int",
    "type" : [ "null", "int" ],
    "default" : null
  }, {
    "name" : "long",
    "type" : [ "null", "long" ],
    "default" : null
  }, {
    "name" : "float",
    "type" : [ "null", "float" ],
    "default" : null
  }, {
    "name" : "double",
    "type" : [ "null", "double" ],
    "default" : null
  }, {
    "name" : "bytes",
    "type" : [ "null", "bytes" ],
    "default" : null
  }, {
    "name" : "string",
    "type" : [ "null", "string" ],
    "default" : null
  } ]
}
$ java -jar ~/tools/avro-tools-1.8.2.jar tojson columnifier/testdata/record/nullables.avro
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
{"boolean":null,"int":{"int":2049911329},"long":{"long":93517433735957},"float":null,"double":null,"bytes":null,"string":null}
{"boolean":null,"int":{"int":-1494730473},"long":{"long":-202289580949163},"float":null,"double":{"double":0.08069785324756118},"bytes":{"bytes":""},"string":{"string":"tpwmyxc"}}
{"boolean":{"boolean":true},"int":{"int":-1949023704},"long":{"long":516734426921889},"float":null,"double":{"double":0.6583549661805351},"bytes":null,"string":null}
{"boolean":{"boolean":false},"int":null,"long":{"long":-867000385846723},"float":{"float":0.13172472},"double":{"double":0.007504294905384068},"bytes":null,"string":null}
{"boolean":{"boolean":true},"int":null,"long":{"long":-163096126488560},"float":{"float":0.08742553},"double":{"double":0.5728205289212072},"bytes":{"bytes":"bytes5"},"string":null}
{"boolean":{"boolean":true},"int":null,"long":null,"float":null,"double":null,"bytes":{"bytes":"bytes6"},"string":{"string":"s"}}
{"boolean":{"boolean":false},"int":{"int":170755098},"long":{"long":714762663965379},"float":{"float":0.7437153},"double":null,"bytes":null,"string":null}
{"boolean":null,"int":null,"long":null,"float":null,"double":{"double":0.22171424755307045},"bytes":null,"string":{"string":"uusutbymi"}}
{"boolean":{"boolean":true},"int":{"int":-433672812},"long":{"long":460231500089382},"float":{"float":0.43936086},"double":{"double":0.4923838260209136},"bytes":{"bytes":"bytes9"},"string":null}
{"boolean":null,"int":null,"long":null,"float":null,"double":{"double":0.24505978464315714},"bytes":null,"string":null}

Then columnify failed by the schema mismatch with this error message:

$ ./columnify -schemaType avro -schemaFile columnifier/testdata/schema/primitives.avsc -recordType avro columnifier/testdata/record/nullables.avro > /dev/null
2020/07/28 00:16:38 Failed to write: unexpected input <nil> typed <nil> as bool: input record is unable to convert

btw the latest release version 0.0.3 throws very naive error messages:

$ ./columnify -schemaType avro -schemaFile columnifier/testdata/schema/primitives.avsc -recordType avro columnifier/testdata/record/nullables.avro > /dev/null
2020/07/28 00:17:24 Failed to write: reflect: call of reflect.Value.Type on zero Value

syucream avatar Jul 27 '20 15:07 syucream

The elapsed time increased by 1.5x

I will profile CPU usages next.

syucream avatar Jul 27 '20 15:07 syucream

Codecov Report

Merging #47 into master will decrease coverage by 11.69%. The diff coverage is 39.08%.

Impacted file tree graph

@@             Coverage Diff             @@
##           master      #47       +/-   ##
===========================================
- Coverage   70.05%   58.36%   -11.70%     
===========================================
  Files          19       18        -1     
  Lines         875     1237      +362     
===========================================
+ Hits          613      722      +109     
- Misses        203      462      +259     
+ Partials       59       53        -6     
Flag Coverage Δ
#unittests 58.36% <39.08%> (-11.70%) :arrow_down:

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
parquet/discard.go 0.00% <0.00%> (ø)
parquet/stdio.go 80.00% <ø> (ø)
schema/avro.go 100.00% <ø> (ø)
schema/bigquery.go 100.00% <ø> (ø)
schema/parquet.go 73.56% <ø> (ø)
record/arrow.go 18.28% <4.48%> (-25.47%) :arrow_down:
record/avro.go 77.77% <57.14%> (+5.05%) :arrow_up:
record/jsonl.go 75.00% <66.66%> (+18.75%) :arrow_up:
arrow/json/writer.go 74.75% <74.75%> (ø)
columnifier/parquet.go 80.32% <75.00%> (-4.86%) :arrow_down:
... and 7 more

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update f6d5b93...f0f8a32. Read the comment docs.

codecov-commenter avatar Jul 28 '20 16:07 codecov-commenter

I added benchmark and profilings into the CI job. The cpu profiling was here:

      flat  flat%   sum%        cum   cum%
     5.59s 14.32% 14.32%      9.55s 24.46%  runtime.scanobject
     2.20s  5.64% 19.95%      2.59s  6.63%  runtime.findObject
     2.07s  5.30% 25.26%      6.60s 16.91%  runtime.mallocgc
     1.73s  4.43% 29.69%      1.73s  4.43%  runtime.futex
     0.90s  2.31% 31.99%      1.05s  2.69%  runtime.heapBitsSetType
     0.82s  2.10% 34.09%      0.82s  2.10%  runtime.nextFreeFast (inline)
...

And it has some high cum% consumers:

...
     0.21s  0.54% 56.79%      7.63s 19.54%  github.com/xitongsys/parquet-go/marshal.MarshalJSON
...
     0.02s 0.051% 73.64%      6.47s 16.57%  github.com/xitongsys/parquet-go/writer.(*ParquetWriter).WriteStop
...
     0.04s   0.1% 70.62%     12.16s 31.15%  github.com/xitongsys/parquet-go/writer.(*ParquetWriter).flushObjs.func1
...
     0.04s   0.1% 71.13%     13.10s 33.56%  runtime.systemstack
...

It seems that we don't have so many tuning parts in our side now. So what we can do next is, I guess some parts related to modules, mainly parquet-go.

syucream avatar Jul 28 '20 16:07 syucream

@okkez can you check this changes if we can reduce mem usage? I replaced stupid intermediate data structure with Apache Arrow record so it possibly reduced. (ctx. https://github.com/reproio/columnify/issues/43

syucream avatar Jul 28 '20 16:07 syucream

I cannot convert msgpack to parquet using columnify with this PR. So I don't measure memory usage.

$ ./columnify -recordType msgpack -schemaType avro -schemaFile ./rails_access_log.avsc rails-small.msgpack > /dev/null 
2020/07/29 08:36:09 Failed to write: unexpected input 200 typed string as int32: input record is unable to convert

But v0.0.3 can convert it.

I use the following schema and data.

{
  "name": "RailsAccessLog",
  "type": "record",
  "fields": [
    {
      "name": "container_id",
      "type": "string"
    },
    {
      "name": "container_name",
      "type": "string"
    },
    {
      "name": "source",
      "type": "string"
    },
    {
      "name": "log",
      "type": "string"
    },
    {
      "name": "__fluentd_address__",
      "type": "string"
    },
    {
      "name": "__fluentd_host__",
      "type": "string"
    },
    {
      "name": "action",
      "type": ["null", "string"]
    },
    {
      "name": "controller",
      "type": ["null", "string"]
    },
    {
      "name": "role",
      "type": "string"
    },
    {
      "name": "host",
      "type": "string"
    },
    {
      "name": "location",
      "type": ["null", "string"]
    },
    {
      "name": "severity",
      "type": ["null", "string"],
      "default": "INFO"
    },
    {
      "name": "status",
      "type": "int"
    },
    {
      "name": "db",
      "type": ["null", "float"]
    },
    {
      "name": "view",
      "type": ["null", "float"]
    },
    {
      "name": "duration",
      "type": ["null", "float"]
    },
    {
      "name": "method",
      "type": "string"
    },
    {
      "name": "path",
      "type": "string"
    },
    {
      "name": "format",
      "type": ["null", "string"]
    },
    {
      "name": "error",
      "type": ["null", "string"]
    },
    {
      "name": "remote_ip",
      "type": ["null", "string"]
    },
    {
      "name": "agent",
      "type": ["null", "string"]
    },
    {
      "name": "authenticated_user_id",
      "type": ["null", "string"]
    },
    {
      "name": "params",
      "type": ["null", "string"]
    },
    {
      "name": "tag",
      "type": "string"
    },
    {
      "name": "time",
      "type": "string"
    }
  ]
}

rails-small.msgpack.gz

I can convert mstpack to parquet after I replaced from int/float to string in the schema file.

okkez avatar Jul 28 '20 23:07 okkez

@okkez I wonder why some values are encoded as string fields ... anyway I re-enabled to convert these values to int/float types at https://github.com/reproio/columnify/pull/47/commits/f0f8a3213afdba29d810ce1f77f216e0c367ca29 How about that?

syucream avatar Jul 29 '20 13:07 syucream

@okkez I wonder why some values are encoded as string fields ... anyway I re-enabled to convert these values to int/float types at f0f8a32 How about that?

Works well. But memory usage is not reduced. Converting 223MB msgpack to parquet format uses memory about 2.0GB (ps command RSS).

okkez avatar Jul 30 '20 05:07 okkez

I could reproduce that, actually RSS is still so high (but I found that the memprofile result is not so terrible. It's curious). Anyway I would like to find another way to reduce it. Finally supporting streaming conversion ... ? That's not easy way but will be more effective.

syucream avatar Aug 03 '20 16:08 syucream

@okkez I created an another patch with small changes separated from this pullreq which focuses on using Arrow. It supports stream-like IO and will reduce memory usafe. Can you see that? https://github.com/reproio/columnify/pull/52

syucream avatar Aug 06 '20 14:08 syucream

For using Arrow instead of naive map[string]interface{} intermediate representation ... no way to reduce memory usage effectively on the parquet-go. Finally we should re-implement parquet-go I think. 🤔

syucream avatar Aug 06 '20 14:08 syucream