[R][C++] Repartitioning on a new variable uses all my RAM and crashes
Describe the bug, including details regarding any error messages, version, and platform.
I'm trying to repartition a ~10Gb dataset based on a new variable, but I can't work out whether this is a bug or expected behaviour due to how things are implemented internally. Here's the R code I've been running:
open_dataset("data/pums/person") |>
mutate(
age_group = case_when(
AGEP < 25 ~ "Under 25",
AGEP < 35 ~ "25-34",
AGEP < 45 ~ "35-44",
AGEP < 55 ~ "45-54",
AGEP < 65 ~ "55-64",
TRUE ~ "65+"
)
)|>
write_dataset(
path = "./data/pums/person-age-partitions",
partitioning = c("year", "location", "age_group")
)
The data is in Parquet format and is already partitioned by "year" and "location". When I try to run this, it gradually uses more and more of my RAM until it crashes.
If I run it with the debugger attached, it all looks fine, but eventually dies with the message Program terminated with signal SIGKILL, Killed.
This is using Arrow C++ library version 14.0.2, and R package version 14.0.2.1
When I try this with a different variable that already exists in the dataset, it uses a lot of RAM, but seems to back off before it gets too high, e.g.
open_dataset("data/pums/person") |>
write_dataset("data/pums/person-cow-partition", partitioning = c("year", "COW"))
I think I'm missing something here in terms of what's going on, rather than this being a bug?
Component(s)
C++
@westonpace Reckon this is a bug?
Is it possible to print the query plan that gets generated? I agree that this should work.
Here's the query plan (the dataset has a lot of columns):
ExecPlan with 3 nodes:
2:SinkNode{}
1:ProjectNode{projection=[SPORDER, RT, SERIALNO, PUMA, ST, ADJUST, PWGTP, AGEP, CIT, COW, DDRS, DEYE, DOUT, DPHY, DREM, DWRK, ENG, FER, GCL, GCM, GCR, INTP, JWMNP, JWRIP, JWTR, LANX, MAR, MIG, MIL, MILY, MLPA, MLPB, MLPC, MLPD, MLPE, MLPF, MLPG, MLPH, MLPI, MLPJ, MLPK, NWAB, NWAV, NWLA, NWLK, NWRE, OIP, PAP, REL, RETP, SCH, SCHG, SCHL, SEMP, SEX, SSIP, SSP, WAGP, WKHP, WKL, WKW, YOEP, UWRK, ANC, ANC1P, ANC2P, DECADE, DRIVESP, DS, ESP, ESR, HISP, INDP, JWAP, JWDP, LANP, MIGPUMA, MIGSP, MSP, NAICSP, NATIVITY, OC, OCCP, PAOC, PERNP, PINCP, POBP, POVPIP, POWPUMA, POWSP, QTRBIR, RAC1P, RAC2P, RAC3P, RACAIAN, RACASN, RACBLK, RACNHPI, RACNUM, RACSOR, RACWHT, RC, SFN, SFR, SOCP, VPS, WAOB, FAGEP, FANCP, FCITP, FCOWP, FDDRSP, FDEYEP, FDOUTP, FDPHYP, FDREMP, FDWRKP, FENGP, FESRP, FFERP, FGCLP, FGCMP, FGCRP, FHISP, FINDP, FINTP, FJWDP, FJWMNP, FJWRIP, FJWTRP, FLANP, FLANXP, FMARP, FMIGP, FMIGSP, FMILPP, FMILSP, FMILYP, FOCCP, FOIP, FPAP, FPOBP, FPOWSP, FRACP, FRELP, FRETP, FSCHGP, FSCHLP, FSCHP, FSEMP, FSEXP, FSSIP, FSSP, FWAGP, FWKHP, FWKLP, FWKWP, FYOEP, PWGTP1, PWGTP2, PWGTP3, PWGTP4, PWGTP5, PWGTP6, PWGTP7, PWGTP8, PWGTP9, PWGTP10, PWGTP11, PWGTP12, PWGTP13, PWGTP14, PWGTP15, PWGTP16, PWGTP17, PWGTP18, PWGTP19, PWGTP20, PWGTP21, PWGTP22, PWGTP23, PWGTP24, PWGTP25, PWGTP26, PWGTP27, PWGTP28, PWGTP29, PWGTP30, PWGTP31, PWGTP32, PWGTP33, PWGTP34, PWGTP35, PWGTP36, PWGTP37, PWGTP38, PWGTP39, PWGTP40, PWGTP41, PWGTP42, PWGTP43, PWGTP44, PWGTP45, PWGTP46, PWGTP47, PWGTP48, PWGTP49, PWGTP50, PWGTP51, PWGTP52, PWGTP53, PWGTP54, PWGTP55, PWGTP56, PWGTP57, PWGTP58, PWGTP59, PWGTP60, PWGTP61, PWGTP62, PWGTP63, PWGTP64, PWGTP65, PWGTP66, PWGTP67, PWGTP68, PWGTP69, PWGTP70, PWGTP71, PWGTP72, PWGTP73, PWGTP74, PWGTP75, PWGTP76, PWGTP77, PWGTP78, PWGTP79, PWGTP80, NOP, ADJINC, CITWP, DEAR, DRAT, DRATX, HINS1, HINS2, HINS3, HINS4, HINS5, HINS6, HINS7, MARHD, MARHM, MARHT, MARHW, MARHYP, DIS, HICOV, PRIVCOV, PUBCOV, FCITWP, FDEARP, FDRATP, FDRATXP, FHINS1P, FHINS2P, FHINS3P, FHINS4P, FHINS5P, FHINS6P, FHINS7P, FMARHDP, FMARHMP, FMARHTP, FMARHWP, FMARHYP, WRK, FOD1P, FOD2P, SCIENGP, SCIENGRLP, FFODP, FHINS3C, FHINS4C, FHINS5C, RELP, FWRKP, FDISP, FPERNP, FPINCP, FPRIVCOVP, FPUBCOVP, RACNH, RACPI, SSPA, MLPCD, MLPFG, FHICOVP, DIVISION, REGION, HIMRKS, JWTRNS, RELSHIPP, WKWN, FHIMRKSP, FJWTRNSP, FRELSHIPP, FWKWNP, MLPIK, year, location, "age_group": case_when({1=(AGEP < 25), 2=(AGEP < 35), 3=(AGEP < 45), 4=(AGEP < 55), 5=(AGEP < 65), 6=true}, "Under 25", "25-34", "35-44", "45-54", "55-64", "65+")]}
0:SourceNode{}
Hmm, could be an R bug or something already solved actually; I ran the following (different query, but similarly problematic in R) with pyarrow:
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
dataset = ds.dataset("data/pums/person", format="parquet")
ds.write_dataset("data/pums/person-puma-partition", partitioning = ["year", "PUMA"])
and RAM usage never goes above 80%.
(It is a newer Arrow C++ version though, so I'll need to check and see if it's the same on 14.0.0)
The data is in Parquet format and is already partitioned by "year" and "location". When I try to run this, it gradually uses more and more of my RAM until it crashes.
How were you measuring RAM? Were you looking at the RSS of the process? Or were you looking at the amount of free/available memory?
I would also be surprised if that plan itself was leaking / accumulating memory.
If it is R specific then maybe R is accumulating everything before the call to write_dataset? I seem to remember that being an R fallback at some point when creating plans.
In python the write_dataset call can take as input a record batch reader. I think you actually end up with two acero plans. The first is the one you shared and the second is just source -> write (where the first plan's output is the "source" node in the second plan).
However, in R, it might be more natural to make a source -> project -> write plan instead of a source -> project -> sink plan in this situation.
OK, this is the actual plan:
ExecPlan with 3 nodes:
2:ConsumingSinkNode{}
1:ProjectNode{projection=[SPORDER, RT, SERIALNO, PUMA, ST, ADJUST, PWGTP, AGEP, CIT, COW, DDRS, DEYE, DOUT, DPHY, DREM, DWRK, ENG, FER, GCL, GCM, GCR, INTP, JWMNP, JWRIP, JWTR, LANX, MAR, MIG, MIL, MILY, MLPA, MLPB, MLPC, MLPD, MLPE, MLPF, MLPG, MLPH, MLPI, MLPJ, MLPK, NWAB, NWAV, NWLA, NWLK, NWRE, OIP, PAP, REL, RETP, SCH, SCHG, SCHL, SEMP, SEX, SSIP, SSP, WAGP, WKHP, WKL, WKW, YOEP, UWRK, ANC, ANC1P, ANC2P, DECADE, DRIVESP, DS, ESP, ESR, HISP, INDP, JWAP, JWDP, LANP, MIGPUMA, MIGSP, MSP, NAICSP, NATIVITY, OC, OCCP, PAOC, PERNP, PINCP, POBP, POVPIP, POWPUMA, POWSP, QTRBIR, RAC1P, RAC2P, RAC3P, RACAIAN, RACASN, RACBLK, RACNHPI, RACNUM, RACSOR, RACWHT, RC, SFN, SFR, SOCP, VPS, WAOB, FAGEP, FANCP, FCITP, FCOWP, FDDRSP, FDEYEP, FDOUTP, FDPHYP, FDREMP, FDWRKP, FENGP, FESRP, FFERP, FGCLP, FGCMP, FGCRP, FHISP, FINDP, FINTP, FJWDP, FJWMNP, FJWRIP, FJWTRP, FLANP, FLANXP, FMARP, FMIGP, FMIGSP, FMILPP, FMILSP, FMILYP, FOCCP, FOIP, FPAP, FPOBP, FPOWSP, FRACP, FRELP, FRETP, FSCHGP, FSCHLP, FSCHP, FSEMP, FSEXP, FSSIP, FSSP, FWAGP, FWKHP, FWKLP, FWKWP, FYOEP, PWGTP1, PWGTP2, PWGTP3, PWGTP4, PWGTP5, PWGTP6, PWGTP7, PWGTP8, PWGTP9, PWGTP10, PWGTP11, PWGTP12, PWGTP13, PWGTP14, PWGTP15, PWGTP16, PWGTP17, PWGTP18, PWGTP19, PWGTP20, PWGTP21, PWGTP22, PWGTP23, PWGTP24, PWGTP25, PWGTP26, PWGTP27, PWGTP28, PWGTP29, PWGTP30, PWGTP31, PWGTP32, PWGTP33, PWGTP34, PWGTP35, PWGTP36, PWGTP37, PWGTP38, PWGTP39, PWGTP40, PWGTP41, PWGTP42, PWGTP43, PWGTP44, PWGTP45, PWGTP46, PWGTP47, PWGTP48, PWGTP49, PWGTP50, PWGTP51, PWGTP52, PWGTP53, PWGTP54, PWGTP55, PWGTP56, PWGTP57, PWGTP58, PWGTP59, PWGTP60, PWGTP61, PWGTP62, PWGTP63, PWGTP64, PWGTP65, PWGTP66, PWGTP67, PWGTP68, PWGTP69, PWGTP70, PWGTP71, PWGTP72, PWGTP73, PWGTP74, PWGTP75, PWGTP76, PWGTP77, PWGTP78, PWGTP79, PWGTP80, year, location, "age_group": case_when({1=(AGEP < 25), 2=(AGEP < 35), 3=(AGEP < 45), 4=(AGEP < 55), 5=(AGEP < 65), 6=true}, "Under 25", "25-34", "35-44", "45-54", "55-64", "65+")]}
0:SourceNode{}
It would be nice if the ConsumingSinkNode printed the values of the WriteNodeOptions so we could compare with pyarrow. But glancing at the defaults, they look the same (more or less?)
How were you measuring RAM? Were you looking at the RSS of the process? Or were you looking at the amount of free/available memory?
I was just looking at free/available memory - would RSS of the process be better?
If it is R specific then maybe R is accumulating everything before the call to write_dataset? I seem to remember that being an R fallback at some point when creating plans.
Thanks, I'll take a closer look into the code to see if I can find something.
In python the write_dataset call can take as input a record batch reader. I think you actually end up with two acero plans. The first is the one you shared and the second is just source -> write (where the first plan's output is the "source" node in the second plan).
However, in R, it might be more natural to make a source -> project -> write plan instead of a source -> project -> sink plan in this situation.
Sorry, I'm a bit lost here; what are the implications of write versus sink here?
Sorry, I'm a bit lost here; what are the implications of write versus sink here?
I suppose it is more about "two plans" vs. "one plan".
A plan's output can be a record batch reader (the final node is a sink node). A plan's input can be a record batch reader. A write plan has no output (the final node is a write node).
So, to scan and rewrite a dataset, you can have two plans:
Plan 1: Scan(files) -> Project -> Sink Plan 2: Scan(record_batch_reader) -> Write
Or you can do it all in one plan:
Combined: Scan(files) -> Project -> Write
In python, since there is no equivalent of dplyr, the user cannot write a "single statement" like open_dataset |> mutate(...) |> write_dataset(...). Instead the user has to do something like...
projected = dataset.to_reader(...) # Creates an acero plan
ds.write_dataset(projected) # Creates a second acero plan
However, since you have a single dplyr plan in R, it might be possible to make a single acero plan. That being said, I don't expect it to have much impact.
I was just looking at free/available memory - would RSS of the process be better?
When writing a dataset the server will generally use all available memory. This is because a "write" call just copies data from the process RSS to the kernel page cache. It doesn't wait and block until all the data is persisted to disk. So you will generally see it gradually uses more and more of my RAM as expected behavior. If you are looking at the output of free:
total used free shared buff/cache available
Mem: 31Gi 9.8Gi 10Gi 262Mi 11Gi 20Gi
Swap: 63Gi 5.5Gi 58Gi
You should see free drop to 0 and buff/cache grow to consume all RAM. This is normal.
However, you should not see the RSS of the process increase without bound. You should not see used increase without bound. You also shouldn't see Program terminated with signal SIGKILL, Killed.
Thanks! And when you say "increase without bound", how would I know that's happening?
OK, so I've been experimenting with various combinations of this, and have found that it happens with both Python and R, so looks like a C++ issue.
I'm running this in a Docker container I've created based off ubuntu:latest, with 8Gb of RAM, 2Gb of swap, and 50% of my CPU.
Here's what I've found so far:
- everything is fine when I partition on an existing variable and a new one that I've created via projection (so I think the new column thing was a red herring), even if it's slow, it eventually completes
- as soon as I partition on 3 variables, it eventually crashes (both in Python and R)
Here's an example in pyarrow using the NYC taxi dataset (this should result in 924 partitions):
import pyarrow.dataset as ds
import pyarrow as pa
dataset = ds.dataset("data", partitioning="hive")
target_dir = "data2"
ds.write_dataset(
dataset,
target_dir,
partitioning=["year", "month", "rate_code"]
)
I was wondering if it was related to the number of partitions, though when I run this example (which should have fewer partitions - 396), it also eats memory until the Python process is killed.
import pyarrow.dataset as ds
import pyarrow as pa
dataset = ds.dataset("data", partitioning="hive")
target_dir = "data2"
ds.write_dataset(
dataset,
target_dir,
partitioning=["year", "month", "vendor_name"]
)
Happy to try to investigate further and try to get some output, but I wasn't sure whether it'd be more useful to log the memory via free or run with the debugger attached and log the output?
I think we might be rehashing some of the conversation already had a long time ago in https://github.com/apache/arrow/issues/18944#issuecomment-1377665189
Is the relevant issue number of partition variables? Or is it the number of groups/partitions that result. I.e. if you partitioned on one variable that had 1000 distinct values, would you have the same problem?
I tried it with mta_tax which has 385 distinct values, and it also crashes. But I'd expect that, seeing as the data isn't already partitioned on that variable and it'd need to all be in memory to work out how many distinct partitions it needs. Actually, no, perhaps I wouldn't given we can call distinct() on a column without having all data in memory, though I don't know if the same mechanism that allows that is used when working out what data goes in what partition.
But, I don't know how this works internally here.
Maybe I'm overthinking this and it is just as simple as number of total partitions...
OK, so I've been experimenting with various combinations of this, and have found that it happens with both Python and R, so looks like a C++ issue.
I'm running this in a Docker container I've created based off
ubuntu:latest, with 8Gb of RAM, 2Gb of swap, and 50% of my CPU.Here's what I've found so far:
* everything is fine when I partition on an existing variable and a new one that I've created via projection (so I think the new column thing was a red herring), even if it's slow, it eventually completes * as soon as I partition on 3 variables, it eventually crashes (both in Python and R)Here's an example in pyarrow using the NYC taxi dataset (this should result in 924 partitions):
import pyarrow.dataset as ds import pyarrow as pa dataset = ds.dataset("data", partitioning="hive") target_dir = "data2" ds.write_dataset( dataset, target_dir, partitioning=["year", "month", "rate_code"] )I was wondering if it was related to the number of partitions, though when I run this example (which should have fewer partitions - 396), it also eats memory until the Python process is killed.
import pyarrow.dataset as ds import pyarrow as pa dataset = ds.dataset("data", partitioning="hive") target_dir = "data2" ds.write_dataset( dataset, target_dir, partitioning=["year", "month", "vendor_name"] )Happy to try to investigate further and try to get some output, but I wasn't sure whether it'd be more useful to log the memory via
freeor run with the debugger attached and log the output?
Is the data in these examples something I can easily download? I will try and reproduce / study this on the weekend.
Thanks @westonpace ! I set up a private repo with all the Dockerfiles etc I've been using, and notes about the different experiments I've been running - you should have an invitation to that repo now. data is the NYC taxi dataset from the VD bucket.
Thank you! I looked at this today. It's a bug. It was probably introduced when we switched the dataset writer over to using some more generic tools for backpressure (the async task scheduler). Apologies in advance for the long boring explanation :)
The dataset writer does not assume the underlying file writer is re-entrant (I can't remember if the parquet writer is reentrant or not). When a batch comes in for file X and a write task is already running on file X then we queue that batch up. We call this data "in flight" and we have a special throttle for how many rows we can have in flight (it's not configurable and set to 8Mi). When this throttle is full it sends a signal to the source to pause.
All of this is actually working correctly. The problem is that, when it pauses the source, a few extra tasks leak in because they were already running. This is kind of ok, but then it unpauses, fills up, and pauses again, and a few more extra tasks leak in. This process repeats...a lot. By the time it crashed on my machine there was over a thousand extra tasks. Because all these tasks are getting in the source thinks the data is being consumed and it keeps reading. This becomes uncontrolled memory growth and everything crashes.
I suspect this is related to partitioning because you end up with lots of tiny writes and the in flight throttle fills and empties A LOT during execution. You might be able to get things to pass if you set min_rows_per_group to some largish value although then you run into a different kind of throttle (max rows "staged") and so it might not help.
The proper fix would be to not release the throttle until the extra tasks that snuck in the last time it was paused have been launched. I am doing some arrow-cpp work this week and will try and get a look at this.
Cheers for looking into it!
Hah, after spending time running various experiments and testing hypotheses based on guesswork mental models, reading the long explanation is certainly not boring - it's satisfying to know these things, so thank you!
@thisisnic I believe I have come up with a fix (https://github.com/apache/arrow/pull/40722) if you want to try and test it out on your setup.
Thanks @westonpace, will take a look over the weekend!
Issue resolved by pull request 40722 https://github.com/apache/arrow/pull/40722