Allow for file appends
Hello,
I'm currently dealing with a problem where I need to be able to append to a *.arrow file. The data needs to be added row by row because its bigger than memory and can only be loaded row by row through a stream.
A good example of what I want to do is this.
using Arrow
using DataFrames
using Tables
open("out.arrow", "a") do file
for i in massive_stream_iterator
X = rand(1, 1000) |> Tables.table
Arrow.write(file, X)
end
end
Reading from this file generates a 1x1000 sized table not an Nx1000 sized table.
df = Arrow.Table("append.arrow") |> DataFrame
@info size(df)
Is it currently possible for Arrow.jl to do this? I've looked all over the documentation and the closest thing to this was using Table.partitioner, which I can't use since I'm iterating through a massive-bigger-than-memory iterator.
So the tricky thing here is that the arrow format is columnar; so rows have to be collected and columns have to be "built" at some point to write the data out. i.e. it's not physically possible to write out an arrow file row-by-row.
Now, we do have some new tools in the ecosystem that might help in this case. For arrow, if you're working with larger-than-memory data, the solution is to write the data in "batches", which Arrow.write supports when the table source implements Tables.partitions. This allows writing out each "partition" as a separate record batch in the arrow file.
So you're correct in thinking Tables.partitioner could help here, it allows "partitioning" an input lazily, like a lazy map that produces a valid table on each iteration. So the usage in your example above would be more like:
Arrow.write("out.arrow", Tables.partitioner(x->Iterators.take(x, 1000), massive_stream_iterator) )
What this will do is spin up a writing thread for each of Threads.nthreads(), each will call Iterators.take(massive_stream_iterator, 1000), which will iterate/return 1000 elements, those 1000 elements will be made into columns and written out as a single record batch. So the massive_stream_iterator will be written out in record batches of 1000 elements each. This assumes that each element iterated from massive_stream_iterator is a valid "row" according to the Tables.jl interface (i.e. some object that supports at least propertynames(x) and getproperty(x, nm)).
Anyway, let me know if that sounds like something in the right direction. I think it could also be possible to support something like Arrow.append(file, x) if we really needed: we would open file and parse the existing schema and then verify that the new table x matches that schema and then write the single table out as a record batch.
Thanks for replying.
I did a slight variation of your post (on a single thread):
using Tables
using Arrow
# Processing to 1xN table
function f(x)
vec = map(x->parse(Int64, x), split(x, ','))
Tables.table(hcat(vec...))
end
iostream = open("myfile.csv", "r") # 1000 x 100 file
Arrow.write("out.arrow", Tables.partitioner(f, eachline(iostream) ) # Equivalent to take(x, 1) |> f ?
Looking at my activity monitor, it seems like its loading the entire iterable to memory before doing a single write, which defeats the purpose of what I'm trying to do (massive-sized file conversions).
Am I misinterpreting your post or is this the expected outcome?
I can technically rely on virtual memory for these massive files, however, I have no idea if that's viable on a colleagues PC or in production. Appending would be more reliable for these massive operations IMO, but if that's a limitation of .Arrow files, then it is what it is.
I don't think it's entirely unexpected, I think it's just a matter of the fact that the "processing" work is going so much faster than the IO work, which isn't too surprising. i.e. it's processing each line, parsing, and creating the record batch so quickly then passing it off to the writing thread which can't keep up, so you end up getting large chunks of the processing "in memory" at hte same time.
If you're willing, I'd be interested if you can utilize a new PR I just put up to limit: https://github.com/JuliaData/Arrow.jl/pull/106. If you call Arrow.write(file, tbl; ntasks=4), then that should only allow up to 4 record batches at a time to be in memory before being written out then hopefully garbage collected.
I tried your branch, and it gets what I wanted done so +1 and thank you.
However, there are two major problems.
Im dealing with a CSV thats Int64[1000 x 10].
function f(x)
print(1)
vec = map(x->parse(Int64, x), split(x, ','))
Tables.table(hcat(vec...))
end
# Input csv = 39 kb
@time Arrow.write(output, Tables.partitioner(f, eachline(input)), ntasks=5) # 7.582670 seconds, 681 kb arrow file
@time Arrow.write(output Tables.partitioner(f, eachline(input)), ntasks=500) # 7.582670 seconds, 681 kb arrow file
@time Arrow.write(output Tables.partitioner(f, eachline(myfile)) |> collect) # 0.002 seconds, 43 kb arrow file
Opening these files with Arrow.Table |> DataFrame shows that they are the same size.
So the two problems are.
- Serious performance issues (compared with just collecting the Tables.partitioner iterator)
- Bigger file sizes for some reason
I'm pretty sure @time Arrow.write(output Tables.partitioner(f, eachline(myfile)) |> collect) # 0.002 seconds, 43 kb arrow file isn't generating the same output "table" you're expecting.
Hmm I see, here is one with working code.
using Arrow
using Tables
function f(x)
print(1)
vec = map(x->parse(Int64, x), split(x, ','))
Tables.table(hcat(vec...))
end
@info stat(input).size # 38930
# Writing using ntasks
input = "small.csv" # Int[1000 x 10]
output1 = "output1.arrow"
@time Arrow.write(output1, Tables.partitioner(f, eachline(input)), ntasks=5) # 7.58 seconds
@info stat(output1).size # 681234
# Writing by collecting first
input = "small.csv"
output2 = "output2.arrow"
@time Arrow.write(output2, Tables.partitions(((map(f, eachline(input)) |> collect)...)...)) # 0.03
@info stat(output2).size # 38930
# Comparison
output1_table = Arrow.Table(output1) |> DataFrame
output2_table = Arrow.Table(output2) |> DataFrame
@info output1_table == output2_table # true
Same output, different file size, different time required.
I put up #277 to support appending to Arrow files that are either in Arrow IPC format (already supported thanks to @tanmaykm's #160) or Arrow file format (currently unsupported). @quinnj - would you mind reviewing my PR? Many thanks.
This is supported now