datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Potential memory issue when using COPY with PARTITIONED BY

Open hveiga opened this issue 1 year ago • 5 comments

Describe the bug

Memory does not get freed after executing multiple COPY ... TO ... PARTITIONED BY ... queries. I have not been able to identify what is causing this behavior.

To Reproduce

The behavior can be observed using datafusion-cli. I have been monitoring the memory usage through Activity Monitor.

  1. Download test parquet file (120MB): https://file.io/eKiHwu4waHVN
  2. Run datafusion-cli
  3. Create a external table:
CREATE EXTERNAL TABLE my_table
        (
            col1 VARCHAR NOT NULL,
            timestamp BIGINT NOT NULL,
            col2 VARCHAR NOT NULL,
            col3 VARCHAR NOT NULL,
            col4 VARCHAR NOT NULL,
            col5 VARCHAR NOT NULL,
            col6 VARCHAR NOT NULL,
            col7 VARCHAR NOT NULL,
            col8 VARCHAR NOT NULL,
            col9 VARCHAR NOT NULL,
            col10 VARCHAR NOT NULL,
            col11 VARCHAR NOT NULL,
            col12 DOUBLE
        )
        WITH ORDER (col1 ASC, timestamp ASC) STORED AS PARQUET LOCATION 'test_file.parquet';
  1. Execute COPY .. PARTITIONED BY query:
COPY (SELECT col1, timestamp, col10, col12 FROM my_table ORDER BY col1 ASC, timestamp ASC)
TO './output' STORED AS PARQUET PARTITIONED BY (col1) OPTIONS (compression 'uncompressed');
  1. Monitor memory usage.
  2. Repeat execution of COPY .. PARTITIONED BY query and continue monitoring memory usage.
  3. Observation: memory does not get released.

Expected behavior

My expectation is to be able to run the COPY command multiple times without having the memory usage increasing every time.

Additional context

There is more context of what I am trying to do in Discord: https://discord.com/channels/885562378132000778/1166447479609376850/1253419900043526236

I am also experiencing the same behavior when running my application in Kubernetes. K8s terminates my pod once it exceeds the pod memory limits:

Screenshot 2024-06-20 at 8 46 20 PM

hveiga avatar Jun 21 '24 03:06 hveiga

Does the issue happen if you do COPY without PARTITIONED BY?

The partitioning code spawns a potentially large number of tokio tasks (one for each partition), so if those tasks are not being cleaned up properly, it could lead to a memory leak.

devinjdangelo avatar Jun 21 '24 12:06 devinjdangelo

Yes, I can see the same behavior by running the following query multiple times:

COPY (SELECT col1, timestamp, col10, col12 FROM my_table ORDER BY col1 ASC, timestamp ASC)
TO './output/output.parquet' STORED AS PARQUET OPTIONS (compression 'uncompressed');

However, the memory increase is smaller and it takes many more queries to make it noticeable (10+).

hveiga avatar Jun 21 '24 20:06 hveiga

@devinjdangelo do you have any suggestions on how to investigate this issue further? I am happy to take the lead on it. I was chatting with @alamb yesterday and he suggested using heaptrack, but I was wondering if you would suggest other options. Thanks!

hveiga avatar Jun 25 '24 16:06 hveiga

@devinjdangelo do you have any suggestions on how to investigate this issue further? I am happy to take the lead on it. I was chatting with @alamb yesterday and he suggested using heaptrack, but I was wondering if you would suggest other options. Thanks!

A self contained example script may be helpful. I have used peak_alloc crate in the past as a very simple way to measure how much memory is being consumed.

Heaptrack will provide more detail and will likely help narrow down the source of the issue faster, but a self contained script I think is useful for demonstration and sanity checking.

devinjdangelo avatar Jun 25 '24 22:06 devinjdangelo

BTW something we have seen in InfluxDB, especially for very compressible data, was that the arrow writer was consuming substantial memory.

Something that might be worth testing would be to set the parquet writer's options to set data_page_row_limit to something like 20,000

By default it is unlimited. We just changed the default upstream in arrow-rs https://github.com/apache/arrow-rs/pull/5957 but that is not yet released

alamb avatar Jun 26 '24 00:06 alamb

Possible related to https://github.com/apache/datafusion/issues/11344 where the memory tracking for the parquet writing could be improved

alamb avatar Jul 09 '24 16:07 alamb

BTW something we have seen in InfluxDB, especially for very compressible data, was that the arrow writer was consuming substantial memory.

Something that might be worth testing would be to set the parquet writer's options to set data_page_row_limit to something like 20,000

By default it is unlimited. We just changed the default upstream in arrow-rs apache/arrow-rs#5957 but that is not yet released

Thanks for the suggestion @alamb. I tested with a different value for data_page_row_limit but got the same result.

In general I have been having a hard time trying to debug this since there is no heaptrack for Mac and the build process for heaptrack_gui is also broken at the moment as I cannot install one of the required dependencies. I did try valgrind and the RustRover profiler, but could not find anything relevant.

I'll wait for https://github.com/apache/datafusion/issues/11344 to land and test again.

hveiga avatar Jul 09 '24 16:07 hveiga

In general I have been having a hard time trying to debug this since there is no heaptrack for Mac and the build process for heaptrack_gui is also broken at the moment as I cannot installed one of the required dependencies. I did try valgrind and the RustRover profiler, but could not find anything relevant.

I have had good luck with the Instruments tool that comes with XCode / Mac. Specifically the "allocations" tool:

Screenshot 2024-07-09 at 1 58 38 PM

But I think @wiedld said she didn't have good luck with it so your mileage may vary

alamb avatar Jul 09 '24 17:07 alamb

But I think @wiedld said she didn't have good luck with it so your mileage may vary

While using the xcode allocations tool, I was getting <10% of the allocations vs the peak measured with the time builtin. (Note: our process was an application-triggered datafusion query and not via the datafusion-cli.) As a result I ended up using heaptrack.

In general I have been having a hard time trying to debug this since there is no heaptrack for Mac and the build process for heaptrack_gui is also broken

I ran into the same problems. Here is the work-around I used (I'm sure there are others):

  1. ran heaptrack on a linux vm. Instead of using the recommended cargo-heaptrack, I was building & running a rust project using cargo-with. Below is a rough idea (dependencies may differ for you).

    sudo apt install git-all
    curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
    source "$HOME/.cargo/env"
    sudo apt-get install gcc build-essential time heaptrack
    cargo install cargo-with
    cargo with 'heaptrack' -- run --profile=quick-release --no-default-features -- <process cmd>
    
    # results in <heaptrack_output_file>.zst
    
  2. use heaptrack_print to convert the output files into stack files for flamegraph

    heaptrack_print heaptrack_output_file.zst --flamegraph-cost-type peak -F my_stack_file.txt
    
    # results in output file, and summary stats
    total runtime: 27.98s.
    calls to allocation functions: 12432130 (444322/s)
    temporary memory allocations: 1214337 (43400/s)
    peak heap memory consumption: 1.26G
    peak RSS (including heaptrack overhead): 3.22G
    total memory leaked: 1.88M
    
  3. confirm the heaptrack peak memory (summary stats from above) is ~= time builtin

    /usr/bin/time -l -h -p <process>
    
  4. Move files to your macos, then build & analyze the flamegraphs.

    git clone [email protected]:brendangregg/FlameGraph.git
    cd FlameGraph
    
    # generate flamegraph files
    flamegraph.pl --title "heaptrack: peak memory" --colors mem --countname peak_bytes < my_stack_file.txt > my_flamegraph.svg
    open my_flamegraph.svg
    

If you run into any issues, or find any better alternatives, please let me know @hveiga .

wiedld avatar Jul 09 '24 22:07 wiedld

Ah, I forgot to mention a key point. When extracting data via heaptrack_print, I was looking at memory peaks and hence used --flamegraph-cost-type peak. You may want to check other options for investigating leaks.

wiedld avatar Jul 09 '24 22:07 wiedld

I finally have some time to continue investigating this issue. I have not been able to make heaptrack work (yet!) but I did try using dhat and I got an interesting lead:

Screenshot 2024-07-16 at 7 22 55 PM

I won't claim I am experienced with this tool but I was curious why it was highlighting dict_encoder.rs. When writing parquet from a COPY query the option DICTIONARY_ENABLED is enabled by default. I decided to give a try to disable it using DICTIONARY_ENABLED false.

After disabling it I see the memory increasing only marginally for every invocation (in the 100-200MB range) while with DICTIONARY_ENABLED true each invocation increases the memory usage in multiple GBs (2-3GB) and it seems it never gets freed again.

I don't have a root cause of the issue yet but wanted to share this behavior in case somebody else might find this pattern familiar. This might also only be a red herring and not the actual issue.

I also found https://github.com/apache/arrow-rs/issues/5828 which might be related and/or relevant.

hveiga avatar Jul 17 '24 02:07 hveiga

I also found https://github.com/apache/arrow-rs/issues/5828 which might be related and/or relevant.

I would expect that the memory usage hightlighted in https://github.com/apache/arrow-rs/issues/5828 would be directly reduced by setting the data_page_row_limit.

After disabling it I see the memory increasing only marginally for every invocation (in the 100-200MB range) while with DICTIONARY_ENABLED true each invocation increases the memory usage in multiple GBs (2-3GB) and it seems it never gets freed again.

I wonder if this could be related to DataFusion overriding the data_page_row_limit setting in https://github.com/apache/datafusion/issues/11367 (that @wiedld is working on)

I think you can set this option like

COPY (SELECT col1, timestamp, col10, col12 FROM my_table ORDER BY col1 ASC, timestamp ASC)
TO './output' STORED AS PARQUET PARTITIONED BY (col1) 
OPTIONS (
  compression 'uncompressed', 
  'format.parquet.data_pagesize_limit' 20000
);

alamb avatar Jul 17 '24 10:07 alamb

I wonder if this could be related to DataFusion overriding the data_page_row_limit setting in https://github.com/apache/datafusion/issues/11367 (that @wiedld is working on)

@alamb is mentioning the data_page_row_limit since in our own work we found that the dict_encoder used alot more memory with the datafusion default data_page_row_limit=usize::max. It was only once we set data_page_row_limit=20k that we fixed the memory issue. As a result, we decided to change the arrow-rs/parquet default to 20k.

The current gotchas with using the defaults in COPY TO

So the arrow-rs/parquet writer now has a default data_page_row_limit=20k, so we should see that default when we use datafusion COPY TO, right?

Wrong. How the datafusion session's write configuration is treated in arrow-rs/parquet's ArrowWriter, (a.k.a. whether or not it's used in the actual parquet writing), is a little confusing IMO:

  • In some cases, the datafusion defaults override the arrow-rs/parquet defaults.
    • e.g. datafusion default data_page_row_limit=usize::MAX, overwrites arrow-rs/parquet data_page_row_limit=20k.
    • short term fix: do as alamb suggests and configure the sql COPY TO for data_page_row_limit=20k.
  • In other cases, the datafusion defaults get ignored in arrow-rs/parquet.
    • specifically, these are in cases when the datafusion default is None.
    • e.g. datafusion default dictionary_enabled=None gets overridden, and the actual default behavior is to turn it on (as @hveiga noticed above).
    • short term fix: explicitly set dictionary_enabled=Some(false)

The PR description here gives an overview of how the default datafusion settings are treated in arrow-rs/parquet.

wiedld avatar Jul 17 '24 20:07 wiedld

Also spoiler alert is that @wiedld is in the process of harmonizing the default configuration for the parquet writer and the arrow settings as part of https://github.com/apache/datafusion/issues/11367

alamb avatar Jul 17 '24 20:07 alamb

I also found https://github.com/apache/arrow-rs/issues/5828 which might be related and/or relevant.

@hveiga is correct that this is one suspected place with extra memory usage (specifically in the dict_encoder) when processing many rows per page. But that is not the only possible impact from very-many-rows-per-page, and as such that's why we focused on changing the config setting data_page_row_limit=20k.

wiedld avatar Jul 17 '24 20:07 wiedld

Thanks for all the feedback, we are actively testing this and can report our findings later today.

Regarding data_page_row_limit=20k what is the right key to pass in the COPY command? I can see the Write Options docs say format.data_pagesize_limit but https://github.com/apache/datafusion/pull/11524 talks about data_page_row_limit. If this different naming is an issue, I am happy to PR a change to consolidate.

hveiga avatar Jul 17 '24 21:07 hveiga

data_page_row_limit is the right one (based on rows)

There is another similarly named one called data_pagesize_limit (I would have expected it to be called data_page_size_limit) that is a limit in bytes

Sorry this is so confusing

alamb avatar Jul 17 '24 21:07 alamb

I believe the right one is data_page_row_count_limit instead of data_page_row_limit. So many similar configs :)

hveiga avatar Jul 17 '24 21:07 hveiga