arrow icon indicating copy to clipboard operation
arrow copied to clipboard

[R][C++] Repartitioning on a new variable uses all my RAM and crashes

Open thisisnic opened this issue 1 year ago • 9 comments

Describe the bug, including details regarding any error messages, version, and platform.

I'm trying to repartition a ~10Gb dataset based on a new variable, but I can't work out whether this is a bug or expected behaviour due to how things are implemented internally. Here's the R code I've been running:

open_dataset("data/pums/person") |>
  mutate(
    age_group = case_when(
      AGEP < 25 ~ "Under 25",
      AGEP < 35 ~ "25-34",
      AGEP < 45 ~ "35-44",
      AGEP < 55 ~ "45-54",
      AGEP < 65 ~ "55-64",
      TRUE ~ "65+"
    )
  )|>
  write_dataset(
    path = "./data/pums/person-age-partitions",
    partitioning = c("year", "location", "age_group")
  )

The data is in Parquet format and is already partitioned by "year" and "location". When I try to run this, it gradually uses more and more of my RAM until it crashes.

If I run it with the debugger attached, it all looks fine, but eventually dies with the message Program terminated with signal SIGKILL, Killed.

This is using Arrow C++ library version 14.0.2, and R package version 14.0.2.1

When I try this with a different variable that already exists in the dataset, it uses a lot of RAM, but seems to back off before it gets too high, e.g.

open_dataset("data/pums/person") |>
  write_dataset("data/pums/person-cow-partition", partitioning = c("year", "COW"))

I think I'm missing something here in terms of what's going on, rather than this being a bug?

Component(s)

C++

thisisnic avatar Feb 24 '24 15:02 thisisnic

@westonpace Reckon this is a bug?

thisisnic avatar Feb 24 '24 17:02 thisisnic

Is it possible to print the query plan that gets generated? I agree that this should work.

westonpace avatar Feb 24 '24 18:02 westonpace

Here's the query plan (the dataset has a lot of columns):

ExecPlan with 3 nodes:
2:SinkNode{}
  1:ProjectNode{projection=[SPORDER, RT, SERIALNO, PUMA, ST, ADJUST, PWGTP, AGEP, CIT, COW, DDRS, DEYE, DOUT, DPHY, DREM, DWRK, ENG, FER, GCL, GCM, GCR, INTP, JWMNP, JWRIP, JWTR, LANX, MAR, MIG, MIL, MILY, MLPA, MLPB, MLPC, MLPD, MLPE, MLPF, MLPG, MLPH, MLPI, MLPJ, MLPK, NWAB, NWAV, NWLA, NWLK, NWRE, OIP, PAP, REL, RETP, SCH, SCHG, SCHL, SEMP, SEX, SSIP, SSP, WAGP, WKHP, WKL, WKW, YOEP, UWRK, ANC, ANC1P, ANC2P, DECADE, DRIVESP, DS, ESP, ESR, HISP, INDP, JWAP, JWDP, LANP, MIGPUMA, MIGSP, MSP, NAICSP, NATIVITY, OC, OCCP, PAOC, PERNP, PINCP, POBP, POVPIP, POWPUMA, POWSP, QTRBIR, RAC1P, RAC2P, RAC3P, RACAIAN, RACASN, RACBLK, RACNHPI, RACNUM, RACSOR, RACWHT, RC, SFN, SFR, SOCP, VPS, WAOB, FAGEP, FANCP, FCITP, FCOWP, FDDRSP, FDEYEP, FDOUTP, FDPHYP, FDREMP, FDWRKP, FENGP, FESRP, FFERP, FGCLP, FGCMP, FGCRP, FHISP, FINDP, FINTP, FJWDP, FJWMNP, FJWRIP, FJWTRP, FLANP, FLANXP, FMARP, FMIGP, FMIGSP, FMILPP, FMILSP, FMILYP, FOCCP, FOIP, FPAP, FPOBP, FPOWSP, FRACP, FRELP, FRETP, FSCHGP, FSCHLP, FSCHP, FSEMP, FSEXP, FSSIP, FSSP, FWAGP, FWKHP, FWKLP, FWKWP, FYOEP, PWGTP1, PWGTP2, PWGTP3, PWGTP4, PWGTP5, PWGTP6, PWGTP7, PWGTP8, PWGTP9, PWGTP10, PWGTP11, PWGTP12, PWGTP13, PWGTP14, PWGTP15, PWGTP16, PWGTP17, PWGTP18, PWGTP19, PWGTP20, PWGTP21, PWGTP22, PWGTP23, PWGTP24, PWGTP25, PWGTP26, PWGTP27, PWGTP28, PWGTP29, PWGTP30, PWGTP31, PWGTP32, PWGTP33, PWGTP34, PWGTP35, PWGTP36, PWGTP37, PWGTP38, PWGTP39, PWGTP40, PWGTP41, PWGTP42, PWGTP43, PWGTP44, PWGTP45, PWGTP46, PWGTP47, PWGTP48, PWGTP49, PWGTP50, PWGTP51, PWGTP52, PWGTP53, PWGTP54, PWGTP55, PWGTP56, PWGTP57, PWGTP58, PWGTP59, PWGTP60, PWGTP61, PWGTP62, PWGTP63, PWGTP64, PWGTP65, PWGTP66, PWGTP67, PWGTP68, PWGTP69, PWGTP70, PWGTP71, PWGTP72, PWGTP73, PWGTP74, PWGTP75, PWGTP76, PWGTP77, PWGTP78, PWGTP79, PWGTP80, NOP, ADJINC, CITWP, DEAR, DRAT, DRATX, HINS1, HINS2, HINS3, HINS4, HINS5, HINS6, HINS7, MARHD, MARHM, MARHT, MARHW, MARHYP, DIS, HICOV, PRIVCOV, PUBCOV, FCITWP, FDEARP, FDRATP, FDRATXP, FHINS1P, FHINS2P, FHINS3P, FHINS4P, FHINS5P, FHINS6P, FHINS7P, FMARHDP, FMARHMP, FMARHTP, FMARHWP, FMARHYP, WRK, FOD1P, FOD2P, SCIENGP, SCIENGRLP, FFODP, FHINS3C, FHINS4C, FHINS5C, RELP, FWRKP, FDISP, FPERNP, FPINCP, FPRIVCOVP, FPUBCOVP, RACNH, RACPI, SSPA, MLPCD, MLPFG, FHICOVP, DIVISION, REGION, HIMRKS, JWTRNS, RELSHIPP, WKWN, FHIMRKSP, FJWTRNSP, FRELSHIPP, FWKWNP, MLPIK, year, location, "age_group": case_when({1=(AGEP < 25), 2=(AGEP < 35), 3=(AGEP < 45), 4=(AGEP < 55), 5=(AGEP < 65), 6=true}, "Under 25", "25-34", "35-44", "45-54", "55-64", "65+")]}
    0:SourceNode{}

thisisnic avatar Feb 24 '24 18:02 thisisnic

Hmm, could be an R bug or something already solved actually; I ran the following (different query, but similarly problematic in R) with pyarrow:

import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds

dataset = ds.dataset("data/pums/person", format="parquet")
ds.write_dataset("data/pums/person-puma-partition", partitioning = ["year", "PUMA"])

and RAM usage never goes above 80%.

(It is a newer Arrow C++ version though, so I'll need to check and see if it's the same on 14.0.0)

thisisnic avatar Feb 24 '24 19:02 thisisnic

The data is in Parquet format and is already partitioned by "year" and "location". When I try to run this, it gradually uses more and more of my RAM until it crashes.

How were you measuring RAM? Were you looking at the RSS of the process? Or were you looking at the amount of free/available memory?

I would also be surprised if that plan itself was leaking / accumulating memory.

If it is R specific then maybe R is accumulating everything before the call to write_dataset? I seem to remember that being an R fallback at some point when creating plans.

In python the write_dataset call can take as input a record batch reader. I think you actually end up with two acero plans. The first is the one you shared and the second is just source -> write (where the first plan's output is the "source" node in the second plan).

However, in R, it might be more natural to make a source -> project -> write plan instead of a source -> project -> sink plan in this situation.

westonpace avatar Feb 28 '24 13:02 westonpace

OK, this is the actual plan:

ExecPlan with 3 nodes:
2:ConsumingSinkNode{}
  1:ProjectNode{projection=[SPORDER, RT, SERIALNO, PUMA, ST, ADJUST, PWGTP, AGEP, CIT, COW, DDRS, DEYE, DOUT, DPHY, DREM, DWRK, ENG, FER, GCL, GCM, GCR, INTP, JWMNP, JWRIP, JWTR, LANX, MAR, MIG, MIL, MILY, MLPA, MLPB, MLPC, MLPD, MLPE, MLPF, MLPG, MLPH, MLPI, MLPJ, MLPK, NWAB, NWAV, NWLA, NWLK, NWRE, OIP, PAP, REL, RETP, SCH, SCHG, SCHL, SEMP, SEX, SSIP, SSP, WAGP, WKHP, WKL, WKW, YOEP, UWRK, ANC, ANC1P, ANC2P, DECADE, DRIVESP, DS, ESP, ESR, HISP, INDP, JWAP, JWDP, LANP, MIGPUMA, MIGSP, MSP, NAICSP, NATIVITY, OC, OCCP, PAOC, PERNP, PINCP, POBP, POVPIP, POWPUMA, POWSP, QTRBIR, RAC1P, RAC2P, RAC3P, RACAIAN, RACASN, RACBLK, RACNHPI, RACNUM, RACSOR, RACWHT, RC, SFN, SFR, SOCP, VPS, WAOB, FAGEP, FANCP, FCITP, FCOWP, FDDRSP, FDEYEP, FDOUTP, FDPHYP, FDREMP, FDWRKP, FENGP, FESRP, FFERP, FGCLP, FGCMP, FGCRP, FHISP, FINDP, FINTP, FJWDP, FJWMNP, FJWRIP, FJWTRP, FLANP, FLANXP, FMARP, FMIGP, FMIGSP, FMILPP, FMILSP, FMILYP, FOCCP, FOIP, FPAP, FPOBP, FPOWSP, FRACP, FRELP, FRETP, FSCHGP, FSCHLP, FSCHP, FSEMP, FSEXP, FSSIP, FSSP, FWAGP, FWKHP, FWKLP, FWKWP, FYOEP, PWGTP1, PWGTP2, PWGTP3, PWGTP4, PWGTP5, PWGTP6, PWGTP7, PWGTP8, PWGTP9, PWGTP10, PWGTP11, PWGTP12, PWGTP13, PWGTP14, PWGTP15, PWGTP16, PWGTP17, PWGTP18, PWGTP19, PWGTP20, PWGTP21, PWGTP22, PWGTP23, PWGTP24, PWGTP25, PWGTP26, PWGTP27, PWGTP28, PWGTP29, PWGTP30, PWGTP31, PWGTP32, PWGTP33, PWGTP34, PWGTP35, PWGTP36, PWGTP37, PWGTP38, PWGTP39, PWGTP40, PWGTP41, PWGTP42, PWGTP43, PWGTP44, PWGTP45, PWGTP46, PWGTP47, PWGTP48, PWGTP49, PWGTP50, PWGTP51, PWGTP52, PWGTP53, PWGTP54, PWGTP55, PWGTP56, PWGTP57, PWGTP58, PWGTP59, PWGTP60, PWGTP61, PWGTP62, PWGTP63, PWGTP64, PWGTP65, PWGTP66, PWGTP67, PWGTP68, PWGTP69, PWGTP70, PWGTP71, PWGTP72, PWGTP73, PWGTP74, PWGTP75, PWGTP76, PWGTP77, PWGTP78, PWGTP79, PWGTP80, year, location, "age_group": case_when({1=(AGEP < 25), 2=(AGEP < 35), 3=(AGEP < 45), 4=(AGEP < 55), 5=(AGEP < 65), 6=true}, "Under 25", "25-34", "35-44", "45-54", "55-64", "65+")]}
    0:SourceNode{}

thisisnic avatar Mar 01 '24 17:03 thisisnic

It would be nice if the ConsumingSinkNode printed the values of the WriteNodeOptions so we could compare with pyarrow. But glancing at the defaults, they look the same (more or less?)

thisisnic avatar Mar 01 '24 17:03 thisisnic

How were you measuring RAM? Were you looking at the RSS of the process? Or were you looking at the amount of free/available memory?

I was just looking at free/available memory - would RSS of the process be better?

If it is R specific then maybe R is accumulating everything before the call to write_dataset? I seem to remember that being an R fallback at some point when creating plans.

Thanks, I'll take a closer look into the code to see if I can find something.

In python the write_dataset call can take as input a record batch reader. I think you actually end up with two acero plans. The first is the one you shared and the second is just source -> write (where the first plan's output is the "source" node in the second plan).

However, in R, it might be more natural to make a source -> project -> write plan instead of a source -> project -> sink plan in this situation.

Sorry, I'm a bit lost here; what are the implications of write versus sink here?

thisisnic avatar Mar 11 '24 19:03 thisisnic

Sorry, I'm a bit lost here; what are the implications of write versus sink here?

I suppose it is more about "two plans" vs. "one plan".

A plan's output can be a record batch reader (the final node is a sink node). A plan's input can be a record batch reader. A write plan has no output (the final node is a write node).

So, to scan and rewrite a dataset, you can have two plans:

Plan 1: Scan(files) -> Project -> Sink Plan 2: Scan(record_batch_reader) -> Write

Or you can do it all in one plan:

Combined: Scan(files) -> Project -> Write

In python, since there is no equivalent of dplyr, the user cannot write a "single statement" like open_dataset |> mutate(...) |> write_dataset(...). Instead the user has to do something like...

projected = dataset.to_reader(...) # Creates an acero plan
ds.write_dataset(projected) # Creates a second acero plan

However, since you have a single dplyr plan in R, it might be possible to make a single acero plan. That being said, I don't expect it to have much impact.

I was just looking at free/available memory - would RSS of the process be better?

When writing a dataset the server will generally use all available memory. This is because a "write" call just copies data from the process RSS to the kernel page cache. It doesn't wait and block until all the data is persisted to disk. So you will generally see it gradually uses more and more of my RAM as expected behavior. If you are looking at the output of free:

               total        used        free      shared  buff/cache   available
Mem:            31Gi       9.8Gi        10Gi       262Mi        11Gi        20Gi
Swap:           63Gi       5.5Gi        58Gi

You should see free drop to 0 and buff/cache grow to consume all RAM. This is normal.

However, you should not see the RSS of the process increase without bound. You should not see used increase without bound. You also shouldn't see Program terminated with signal SIGKILL, Killed.

westonpace avatar Mar 12 '24 14:03 westonpace

Thanks! And when you say "increase without bound", how would I know that's happening?

thisisnic avatar Mar 13 '24 22:03 thisisnic

OK, so I've been experimenting with various combinations of this, and have found that it happens with both Python and R, so looks like a C++ issue.

I'm running this in a Docker container I've created based off ubuntu:latest, with 8Gb of RAM, 2Gb of swap, and 50% of my CPU.

Here's what I've found so far:

  • everything is fine when I partition on an existing variable and a new one that I've created via projection (so I think the new column thing was a red herring), even if it's slow, it eventually completes
  • as soon as I partition on 3 variables, it eventually crashes (both in Python and R)

Here's an example in pyarrow using the NYC taxi dataset (this should result in 924 partitions):

import pyarrow.dataset as ds
import pyarrow as pa

dataset = ds.dataset("data", partitioning="hive")

target_dir = "data2"

ds.write_dataset(
    dataset,
    target_dir,
    partitioning=["year", "month", "rate_code"]
)

I was wondering if it was related to the number of partitions, though when I run this example (which should have fewer partitions - 396), it also eats memory until the Python process is killed.

import pyarrow.dataset as ds
import pyarrow as pa

dataset = ds.dataset("data", partitioning="hive")

target_dir = "data2"

ds.write_dataset(
    dataset,
    target_dir,
    partitioning=["year", "month", "vendor_name"]
)

Happy to try to investigate further and try to get some output, but I wasn't sure whether it'd be more useful to log the memory via free or run with the debugger attached and log the output?

thisisnic avatar Mar 14 '24 20:03 thisisnic

I think we might be rehashing some of the conversation already had a long time ago in https://github.com/apache/arrow/issues/18944#issuecomment-1377665189

thisisnic avatar Mar 14 '24 22:03 thisisnic

Is the relevant issue number of partition variables? Or is it the number of groups/partitions that result. I.e. if you partitioned on one variable that had 1000 distinct values, would you have the same problem?

nealrichardson avatar Mar 14 '24 22:03 nealrichardson

I tried it with mta_tax which has 385 distinct values, and it also crashes. But I'd expect that, seeing as the data isn't already partitioned on that variable and it'd need to all be in memory to work out how many distinct partitions it needs. Actually, no, perhaps I wouldn't given we can call distinct() on a column without having all data in memory, though I don't know if the same mechanism that allows that is used when working out what data goes in what partition.

But, I don't know how this works internally here.

Maybe I'm overthinking this and it is just as simple as number of total partitions...

thisisnic avatar Mar 14 '24 23:03 thisisnic

OK, so I've been experimenting with various combinations of this, and have found that it happens with both Python and R, so looks like a C++ issue.

I'm running this in a Docker container I've created based off ubuntu:latest, with 8Gb of RAM, 2Gb of swap, and 50% of my CPU.

Here's what I've found so far:

* everything is fine when I partition on an existing variable and a new one that I've created via projection (so I think the new column thing was a red herring), even if it's slow, it eventually completes

* as soon as I partition on 3 variables, it eventually crashes (both in Python and R)

Here's an example in pyarrow using the NYC taxi dataset (this should result in 924 partitions):

import pyarrow.dataset as ds
import pyarrow as pa

dataset = ds.dataset("data", partitioning="hive")

target_dir = "data2"

ds.write_dataset(
    dataset,
    target_dir,
    partitioning=["year", "month", "rate_code"]
)

I was wondering if it was related to the number of partitions, though when I run this example (which should have fewer partitions - 396), it also eats memory until the Python process is killed.

import pyarrow.dataset as ds
import pyarrow as pa

dataset = ds.dataset("data", partitioning="hive")

target_dir = "data2"

ds.write_dataset(
    dataset,
    target_dir,
    partitioning=["year", "month", "vendor_name"]
)

Happy to try to investigate further and try to get some output, but I wasn't sure whether it'd be more useful to log the memory via free or run with the debugger attached and log the output?

Is the data in these examples something I can easily download? I will try and reproduce / study this on the weekend.

westonpace avatar Mar 15 '24 04:03 westonpace

Thanks @westonpace ! I set up a private repo with all the Dockerfiles etc I've been using, and notes about the different experiments I've been running - you should have an invitation to that repo now. data is the NYC taxi dataset from the VD bucket.

thisisnic avatar Mar 15 '24 15:03 thisisnic

Thank you! I looked at this today. It's a bug. It was probably introduced when we switched the dataset writer over to using some more generic tools for backpressure (the async task scheduler). Apologies in advance for the long boring explanation :)

The dataset writer does not assume the underlying file writer is re-entrant (I can't remember if the parquet writer is reentrant or not). When a batch comes in for file X and a write task is already running on file X then we queue that batch up. We call this data "in flight" and we have a special throttle for how many rows we can have in flight (it's not configurable and set to 8Mi). When this throttle is full it sends a signal to the source to pause.

All of this is actually working correctly. The problem is that, when it pauses the source, a few extra tasks leak in because they were already running. This is kind of ok, but then it unpauses, fills up, and pauses again, and a few more extra tasks leak in. This process repeats...a lot. By the time it crashed on my machine there was over a thousand extra tasks. Because all these tasks are getting in the source thinks the data is being consumed and it keeps reading. This becomes uncontrolled memory growth and everything crashes.

I suspect this is related to partitioning because you end up with lots of tiny writes and the in flight throttle fills and empties A LOT during execution. You might be able to get things to pass if you set min_rows_per_group to some largish value although then you run into a different kind of throttle (max rows "staged") and so it might not help.

The proper fix would be to not release the throttle until the extra tasks that snuck in the last time it was paused have been launched. I am doing some arrow-cpp work this week and will try and get a look at this.

westonpace avatar Mar 18 '24 05:03 westonpace

Cheers for looking into it!

Hah, after spending time running various experiments and testing hypotheses based on guesswork mental models, reading the long explanation is certainly not boring - it's satisfying to know these things, so thank you!

thisisnic avatar Mar 18 '24 12:03 thisisnic

@thisisnic I believe I have come up with a fix (https://github.com/apache/arrow/pull/40722) if you want to try and test it out on your setup.

westonpace avatar Mar 22 '24 15:03 westonpace

Thanks @westonpace, will take a look over the weekend!

thisisnic avatar Mar 22 '24 21:03 thisisnic

Issue resolved by pull request 40722 https://github.com/apache/arrow/pull/40722

pitrou avatar Apr 04 '24 09:04 pitrou