#45 Implement arrow -> json writer
https://github.com/reproio/columnify/issues/45
TODO
- [x] prototyping for PoC: (various inputs) -> map's -> arrow -> map's -> json -> parquet
- [x] remove parquet writing side Go intermediates: (various inputs) -> map's -> arrow -> json -> parquet
- [x] remove input side Go intermediates: (various inputs) -> arrow -> json -> parquet
- [x] performance tests
- [x] record type validations https://github.com/reproio/columnify/issues/27
- [x] reporting benchmark results
- [ ] check if we have any tuning points
- [ ] finalize this changes (some refactorings, tests, docs)
Oh I missed existing arrjson might support arrow -> json conversion for this part arrow -> map's -> json
https://github.com/apache/arrow/tree/master/go/arrow/internal/arrjson
It's actually an internal package but reusable for this use case I guess.
I finally examine a mem pprof result. It show a lower usage than the current version's ( https://github.com/reproio/columnify/issues/44#issuecomment-654917629 ) The reduction effect is 543.88MB -> 97.45MB, 18% sized! But I'm not sure if there's any other high memory consumer ...
$ ./columnify -schemaType avro -schemaFile examples/primitives.avsc -recordType avro tmp.avro > /dev/null
$ go tool pprof /tmp/columnify.mem.prof
Type: inuse_space
Time: Jul 26, 2020 at 11:18pm (JST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 97.45MB, 100% of 97.45MB total
Showing top 10 nodes out of 47
flat flat% sum% cum cum%
96.08MB 98.60% 98.60% 96.08MB 98.60% github.com/apache/arrow/go/arrow/memory.(*GoAllocator).Allocate (inline)
1.37MB 1.40% 100% 1.37MB 1.40% github.com/klauspost/compress/zstd.encoderOptions.encoder
0 0% 100% 72.60MB 74.50% github.com/apache/arrow/go/arrow/array.(*BinaryBuilder).Append
0 0% 100% 8.59MB 8.81% github.com/apache/arrow/go/arrow/array.(*BinaryBuilder).Reserve
0 0% 100% 8.59MB 8.81% github.com/apache/arrow/go/arrow/array.(*BinaryBuilder).Resize
0 0% 100% 0.57MB 0.58% github.com/apache/arrow/go/arrow/array.(*BooleanBuilder).Append
0 0% 100% 0.57MB 0.58% github.com/apache/arrow/go/arrow/array.(*BooleanBuilder).Reserve
0 0% 100% 0.57MB 0.58% github.com/apache/arrow/go/arrow/array.(*BooleanBuilder).Resize
0 0% 100% 3.82MB 3.92% github.com/apache/arrow/go/arrow/array.(*Float32Builder).NewArray
0 0% 100% 3.82MB 3.92% github.com/apache/arrow/go/arrow/array.(*Float32Builder).NewFloat32Array
To resolve https://github.com/reproio/columnify/issues/27, we need https://github.com/reproio/columnify/issues/49 ... 😭 . I created an another pullreq https://github.com/reproio/columnify/pull/50 to aim that.
Here's a quick performance test.
I gave the below dummy input Avro file.
$ java -jar ~/tools/avro-tools-1.8.2.jar random --schema-file examples/primitives.avsc --count 1000000 tmp.avro
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
test.seed=1595854640329
$ ls -lh tmp.avro
-rw-r--r-- 1 ryo staff 80M 7 27 21:57 tmp.avro
With the current version(0.0.3):
$ time ./columnify -schemaType avro -schemaFile examples/primitives.avsc -recordType avro tmp.avro > /dev/null
./columnify -schemaType avro -schemaFile examples/primitives.avsc -recordType 41.95s user 1.05s system 132% cpu 32.354 total
With the latest(this pullreq) version:
$ time ./columnify -schemaType avro -schemaFile examples/primitives.avsc -recordType avro tmp.avro > /dev/null
./columnify -schemaType avro -schemaFile examples/primitives.avsc -recordType 62.20s user 2.54s system 137% cpu 47.068 total
The elapsed time increased by 1.5x ... The latest version's result contains (inputs) -> map -> arrow additional conversion, so it's not so strange and we possibly reduce the time if we remove (inputs) -> map redundant conversion layer.
record type validations #27
Finally supported! If we have this schema:
$ cat columnifier/testdata/schema/primitives.avsc
{
"type": "record",
"name": "Primitives",
"fields" : [
{"name": "boolean", "type": "boolean"},
{"name": "int", "type": "int"},
{"name": "long", "type": "long"},
{"name": "float", "type": "float"},
{"name": "double", "type": "double"},
{"name": "bytes", "type": "bytes"},
{"name": "string", "type": "string"}
]
}
And these record values. It partially matches field names and values but some values are null which's not allowed by the schema:
$ java -jar ~/tools/avro-tools-1.8.2.jar getschema columnifier/testdata/record/nullables.avro
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
{
"type" : "record",
"name" : "Nullables",
"fields" : [ {
"name" : "boolean",
"type" : [ "null", "boolean" ],
"default" : null
}, {
"name" : "int",
"type" : [ "null", "int" ],
"default" : null
}, {
"name" : "long",
"type" : [ "null", "long" ],
"default" : null
}, {
"name" : "float",
"type" : [ "null", "float" ],
"default" : null
}, {
"name" : "double",
"type" : [ "null", "double" ],
"default" : null
}, {
"name" : "bytes",
"type" : [ "null", "bytes" ],
"default" : null
}, {
"name" : "string",
"type" : [ "null", "string" ],
"default" : null
} ]
}
$ java -jar ~/tools/avro-tools-1.8.2.jar tojson columnifier/testdata/record/nullables.avro
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
{"boolean":null,"int":{"int":2049911329},"long":{"long":93517433735957},"float":null,"double":null,"bytes":null,"string":null}
{"boolean":null,"int":{"int":-1494730473},"long":{"long":-202289580949163},"float":null,"double":{"double":0.08069785324756118},"bytes":{"bytes":""},"string":{"string":"tpwmyxc"}}
{"boolean":{"boolean":true},"int":{"int":-1949023704},"long":{"long":516734426921889},"float":null,"double":{"double":0.6583549661805351},"bytes":null,"string":null}
{"boolean":{"boolean":false},"int":null,"long":{"long":-867000385846723},"float":{"float":0.13172472},"double":{"double":0.007504294905384068},"bytes":null,"string":null}
{"boolean":{"boolean":true},"int":null,"long":{"long":-163096126488560},"float":{"float":0.08742553},"double":{"double":0.5728205289212072},"bytes":{"bytes":"bytes5"},"string":null}
{"boolean":{"boolean":true},"int":null,"long":null,"float":null,"double":null,"bytes":{"bytes":"bytes6"},"string":{"string":"s"}}
{"boolean":{"boolean":false},"int":{"int":170755098},"long":{"long":714762663965379},"float":{"float":0.7437153},"double":null,"bytes":null,"string":null}
{"boolean":null,"int":null,"long":null,"float":null,"double":{"double":0.22171424755307045},"bytes":null,"string":{"string":"uusutbymi"}}
{"boolean":{"boolean":true},"int":{"int":-433672812},"long":{"long":460231500089382},"float":{"float":0.43936086},"double":{"double":0.4923838260209136},"bytes":{"bytes":"bytes9"},"string":null}
{"boolean":null,"int":null,"long":null,"float":null,"double":{"double":0.24505978464315714},"bytes":null,"string":null}
Then columnify failed by the schema mismatch with this error message:
$ ./columnify -schemaType avro -schemaFile columnifier/testdata/schema/primitives.avsc -recordType avro columnifier/testdata/record/nullables.avro > /dev/null
2020/07/28 00:16:38 Failed to write: unexpected input <nil> typed <nil> as bool: input record is unable to convert
btw the latest release version 0.0.3 throws very naive error messages:
$ ./columnify -schemaType avro -schemaFile columnifier/testdata/schema/primitives.avsc -recordType avro columnifier/testdata/record/nullables.avro > /dev/null
2020/07/28 00:17:24 Failed to write: reflect: call of reflect.Value.Type on zero Value
The elapsed time increased by 1.5x
I will profile CPU usages next.
Codecov Report
Merging #47 into master will decrease coverage by
11.69%. The diff coverage is39.08%.
@@ Coverage Diff @@
## master #47 +/- ##
===========================================
- Coverage 70.05% 58.36% -11.70%
===========================================
Files 19 18 -1
Lines 875 1237 +362
===========================================
+ Hits 613 722 +109
- Misses 203 462 +259
+ Partials 59 53 -6
| Flag | Coverage Δ | |
|---|---|---|
| #unittests | 58.36% <39.08%> (-11.70%) |
:arrow_down: |
Flags with carried forward coverage won't be shown. Click here to find out more.
| Impacted Files | Coverage Δ | |
|---|---|---|
| parquet/discard.go | 0.00% <0.00%> (ø) |
|
| parquet/stdio.go | 80.00% <ø> (ø) |
|
| schema/avro.go | 100.00% <ø> (ø) |
|
| schema/bigquery.go | 100.00% <ø> (ø) |
|
| schema/parquet.go | 73.56% <ø> (ø) |
|
| record/arrow.go | 18.28% <4.48%> (-25.47%) |
:arrow_down: |
| record/avro.go | 77.77% <57.14%> (+5.05%) |
:arrow_up: |
| record/jsonl.go | 75.00% <66.66%> (+18.75%) |
:arrow_up: |
| arrow/json/writer.go | 74.75% <74.75%> (ø) |
|
| columnifier/parquet.go | 80.32% <75.00%> (-4.86%) |
:arrow_down: |
| ... and 7 more |
Continue to review full report at Codecov.
Legend - Click here to learn more
Δ = absolute <relative> (impact),ø = not affected,? = missing dataPowered by Codecov. Last update f6d5b93...f0f8a32. Read the comment docs.
I added benchmark and profilings into the CI job. The cpu profiling was here:
flat flat% sum% cum cum%
5.59s 14.32% 14.32% 9.55s 24.46% runtime.scanobject
2.20s 5.64% 19.95% 2.59s 6.63% runtime.findObject
2.07s 5.30% 25.26% 6.60s 16.91% runtime.mallocgc
1.73s 4.43% 29.69% 1.73s 4.43% runtime.futex
0.90s 2.31% 31.99% 1.05s 2.69% runtime.heapBitsSetType
0.82s 2.10% 34.09% 0.82s 2.10% runtime.nextFreeFast (inline)
...
And it has some high cum% consumers:
...
0.21s 0.54% 56.79% 7.63s 19.54% github.com/xitongsys/parquet-go/marshal.MarshalJSON
...
0.02s 0.051% 73.64% 6.47s 16.57% github.com/xitongsys/parquet-go/writer.(*ParquetWriter).WriteStop
...
0.04s 0.1% 70.62% 12.16s 31.15% github.com/xitongsys/parquet-go/writer.(*ParquetWriter).flushObjs.func1
...
0.04s 0.1% 71.13% 13.10s 33.56% runtime.systemstack
...
It seems that we don't have so many tuning parts in our side now. So what we can do next is, I guess some parts related to modules, mainly parquet-go.
@okkez can you check this changes if we can reduce mem usage? I replaced stupid intermediate data structure with Apache Arrow record so it possibly reduced. (ctx. https://github.com/reproio/columnify/issues/43
I cannot convert msgpack to parquet using columnify with this PR. So I don't measure memory usage.
$ ./columnify -recordType msgpack -schemaType avro -schemaFile ./rails_access_log.avsc rails-small.msgpack > /dev/null
2020/07/29 08:36:09 Failed to write: unexpected input 200 typed string as int32: input record is unable to convert
But v0.0.3 can convert it.
I use the following schema and data.
{
"name": "RailsAccessLog",
"type": "record",
"fields": [
{
"name": "container_id",
"type": "string"
},
{
"name": "container_name",
"type": "string"
},
{
"name": "source",
"type": "string"
},
{
"name": "log",
"type": "string"
},
{
"name": "__fluentd_address__",
"type": "string"
},
{
"name": "__fluentd_host__",
"type": "string"
},
{
"name": "action",
"type": ["null", "string"]
},
{
"name": "controller",
"type": ["null", "string"]
},
{
"name": "role",
"type": "string"
},
{
"name": "host",
"type": "string"
},
{
"name": "location",
"type": ["null", "string"]
},
{
"name": "severity",
"type": ["null", "string"],
"default": "INFO"
},
{
"name": "status",
"type": "int"
},
{
"name": "db",
"type": ["null", "float"]
},
{
"name": "view",
"type": ["null", "float"]
},
{
"name": "duration",
"type": ["null", "float"]
},
{
"name": "method",
"type": "string"
},
{
"name": "path",
"type": "string"
},
{
"name": "format",
"type": ["null", "string"]
},
{
"name": "error",
"type": ["null", "string"]
},
{
"name": "remote_ip",
"type": ["null", "string"]
},
{
"name": "agent",
"type": ["null", "string"]
},
{
"name": "authenticated_user_id",
"type": ["null", "string"]
},
{
"name": "params",
"type": ["null", "string"]
},
{
"name": "tag",
"type": "string"
},
{
"name": "time",
"type": "string"
}
]
}
I can convert mstpack to parquet after I replaced from int/float to string in the schema file.
@okkez I wonder why some values are encoded as string fields ... anyway I re-enabled to convert these values to int/float types at https://github.com/reproio/columnify/pull/47/commits/f0f8a3213afdba29d810ce1f77f216e0c367ca29 How about that?
@okkez I wonder why some values are encoded as string fields ... anyway I re-enabled to convert these values to int/float types at f0f8a32 How about that?
Works well. But memory usage is not reduced. Converting 223MB msgpack to parquet format uses memory about 2.0GB (ps command RSS).
I could reproduce that, actually RSS is still so high (but I found that the memprofile result is not so terrible. It's curious). Anyway I would like to find another way to reduce it. Finally supporting streaming conversion ... ? That's not easy way but will be more effective.
@okkez I created an another patch with small changes separated from this pullreq which focuses on using Arrow. It supports stream-like IO and will reduce memory usafe. Can you see that? https://github.com/reproio/columnify/pull/52
For using Arrow instead of naive map[string]interface{} intermediate representation ... no way to reduce memory usage effectively on the parquet-go. Finally we should re-implement parquet-go I think. 🤔