ccxt icon indicating copy to clipboard operation
ccxt copied to clipboard

Memory leak

Open timborden opened this issue 10 months ago • 13 comments

Operating System

OSX & Linux

Programming Languages

Python

CCXT Version

4.4.58

Description

We've got a handful of workers running in a DigitalOcean App that restart frequently after consuming all the memory.

Using Python's tracemalloc (see Code section below), I did some profiling and it looks like the deep_extend function in base/exchange.py (https://github.com/ccxt/ccxt/blob/go/v4.4.58/python/ccxt/base/exchange.py#L920-L931) is accumulating/consuming memory (see Code section below). Please note the increase in the ccxt total over time.

Code

Profiling code:

import time
import tracemalloc
import logging

from worker import run_job

logger = logging.getLogger(__name__)

# tracemalloc.start()
while True:
    before_snapshot = tracemalloc.take_snapshot()

    run_job()

    after_snapshot = tracemalloc.take_snapshot()
    stats = after_snapshot.compare_to(before_snapshot, 'lineno')

    total_size = 0
    ccxt_stats = []
    for stat in stats:
        if "ccxt" in stat.traceback[0].filename:
            total_size += stat.size
            ccxt_stats.append(stat)

    if total_size:
        logger.info(f"ccxt total - {total_size / 1024:.1f} KB")
        for stat in ccxt_stats[:5]:
            filename = str(stat.traceback[0].filename)
            print(f" -> {filename}:{stat.traceback[0].lineno} - {stat.size / 1024:.1f} KB (+{stat.size_diff / 1024:.1f} KB)")

    time.sleep(60)

Profiling results:

2025-03-11 13:12:21,050 - ccxt total = 1583.3 KB
 -> ccxt/base/exchange.py:928 = 805.5 KB (+805.5 KB)
 -> ccxt/base/exchange.py:926 = 221.0 KB (+221.0 KB)
 -> ccxt/base/exchange.py:426 = 177.5 KB (+177.5 KB)

2025-03-11 13:13:15,615 - ccxt total = 957.7 KB
 -> ccxt/base/exchange.py:928 = 537.9 KB (+537.9 KB)
 -> ccxt/base/exchange.py:926 = 171.9 KB (+171.9 KB)
 -> ccxt/base/exchange.py:426 = 63.4 KB (+63.4 KB)

2025-03-11 13:13:27,594 - ccxt total = 1666.4 KB
 -> ccxt/base/exchange.py:928 = 951.9 KB (+414.0 KB)
 -> ccxt/base/exchange.py:926 = 312.8 KB (+140.9 KB)
 -> ccxt/base/exchange.py:426 = 114.1 KB (+50.7 KB)

2025-03-11 13:14:22,109 - ccxt total = 1162.4 KB
 -> ccxt/base/exchange.py:928 = 583.8 KB (+583.8 KB)
 -> ccxt/base/exchange.py:926 = 164.0 KB (+164.0 KB)
 -> ccxt/base/exchange.py:426 = 126.8 KB (+126.8 KB)

2025-03-11 13:15:21,291 - ccxt total = 4018.0 KB
 -> ccxt/base/exchange.py:928 = 2253.4 KB (+1669.6 KB)
 -> ccxt/base/exchange.py:926 = 676.0 KB (+512.0 KB)
 -> ccxt/base/exchange.py:426 = 291.7 KB (+164.9 KB)

2025-03-11 13:16:22,970 - ccxt total = 4396.1 KB
 -> ccxt/base/exchange.py:928 = 2453.2 KB (+199.8 KB)
 -> ccxt/base/exchange.py:926 = 747.8 KB (+71.8 KB)

2025-03-11 13:17:15,891 - ccxt total = 4397.4 KB
 -> ccxt/base/exchange.py:928 = 2450.4 KB (+-2.3 KB)
 
2025-03-11 13:18:17,199 - ccxt total = 4799.3 KB
 -> ccxt/base/exchange.py:928 = 2680.6 KB (+230.2 KB)
 -> ccxt/base/exchange.py:926 = 804.8 KB (+57.0 KB)
 -> ccxt/base/exchange.py:426 = 355.1 KB (+50.7 KB)

2025-03-11 13:19:24,288 - ccxt total = 5510.4 KB
 -> ccxt/base/exchange.py:928 = 3101.8 KB (+421.3 KB)
 -> ccxt/base/exchange.py:926 = 930.4 KB (+125.6 KB)
 -> ccxt/base/exchange.py:426 = 418.5 KB (+63.4 KB)

2025-03-11 13:20:24,173 - ccxt total = 5524.3 KB
 -> ccxt/base/exchange.py:928 = 3111.3 KB (+9.5 KB)

2025-03-11 13:21:21,759 - ccxt total = 5510.7 KB
 -> ccxt/base/exchange.py:928 = 3104.7 KB (+-6.2 KB)

2025-03-11 13:22:19,133 - ccxt total = 5509.1 KB
 -> ccxt/base/exchange.py:928 = 3102.2 KB (+-1.1 KB)

timborden avatar Mar 11 '25 13:03 timborden

Hello @timborden,

Thanks a lot for your detailed report, this is indeed worrying, we will analyze it as soon as possible and we will get back to you.

carlosmiei avatar Mar 11 '25 14:03 carlosmiei

Thanks for reporting @timborden ,

By any chance can you provide a little snippet or details of the job you are running to try and reproduce it locally?

pcriadoperez avatar Mar 12 '25 03:03 pcriadoperez

Sure @pcriadoperez

We've got a long-running process that monitors a set of exchanges and pairs. Some of the pairs aren't listed on the exchanges yet, but we've overridden the Exchange class load_markets method to retry fetch_currencies, fetch_markets and set_markets periodically so that when a pair is listed on an exchange, we'll pick it up.

Looking at set_markets, I can see the usage of deep_extend (which is the problem area....the tracemalloc identified), so I'm guessing this is likely why we're seeing the memory leak.

Apologies for not including these details in the original description....it only dawned on me overnight.

Hope that helps.

timborden avatar Mar 12 '25 08:03 timborden

Hi @timborden,

I looked at the functions of deep_extend and couldn't see there would be a memory leak. For a moment it does increase the memory as it creates a new object, instead of overwritting the existing object, but the previous one should be garbage collected after.

Just in case I wrote a snippet that calls fetchMarkets and then set_markets and ran it about 500 times. At the beginning you are right and the memory did increase on the first few runs.. but then it does seem to stabilize and I think there is just some variation depending on the garbage collector.

Do you know if there is anything else I could do to reproduce it?

Here is a snippet of my logs

Running job 156/5
Memory after run 156: 415.4 MB (delta: 340.9 MB)

Running job 157/5
Memory after run 157: 415.6 MB (delta: 341.1 MB)

Running job 158/5
Memory after run 158: 416.2 MB (delta: 341.7 MB)

... 

Running job 293/5
Memory after run 293: 419.4 MB (delta: 344.9 MB)

Running job 294/5
Memory after run 294: 417.8 MB (delta: 343.3 MB)

...


Running job 355/5
Memory after run 355: 419.9 MB (delta: 345.4 MB)

Running job 356/5
Memory after run 356: 419.2 MB (delta: 344.7 MB)

...
Running job 437/5
Memory after run 437: 413.8 MB (delta: 339.3 MB)

Running job 438/5
Memory after run 438: 413.3 MB (delta: 338.8 MB)

Running job 439/5
Memory after run 439: 393.6 MB (delta: 319.1 MB)

Running job 440/5
Memory after run 440: 399.8 MB (delta: 325.2 MB)

Running job 441/5
Memory after run 441: 398.2 MB (delta: 323.7 MB)

Running job 442/5
Memory after run 442: 398.2 MB (delta: 323.7 MB)

....


Running job 493/5
Memory after run 493: 414.8 MB (delta: 340.3 MB)

Running job 494/5
Memory after run 494: 413.8 MB (delta: 339.3 MB)

pcriadoperez avatar Mar 13 '25 10:03 pcriadoperez

Thanks @pcriadoperez for taking a look and sharing your results.

Curious....how many exchanges were you initiating? ....400MB of RAM seems like a lot

Also, is 156 runs when the memory consumption plateaus? ....what is the memory consumption over the first few iterations?

timborden avatar Mar 13 '25 11:03 timborden

Just following up @pcriadoperez .....would you mind sharing how you outputted Memory after run 156: 415.4 MB (delta: 340.9 MB)? ...want to see if I can replicate on our end and compare the results.

timborden avatar Mar 17 '25 10:03 timborden

@timborden sorry for the delay. Here was the script I was running, sorry for it being a little messy, it included different tries:

import time
import tracemalloc
import logging
import sys
import os
import psutil
import gc
sys.path.append(os.path.dirname('./python'))
import ccxt

# Configure logging to print to console
logging.basicConfig(
    level=logging.INFO,
    format='%(message)s',
    stream=sys.stdout
)
logger = logging.getLogger(__name__)
exchange = ccxt.binance({
        'enableRateLimit': False  # Disable rate limiting for testing
})
def get_process_memory():
    process = psutil.Process(os.getpid())
    return process.memory_info().rss / 1024 / 1024  # Convert to MB

def run_job():
    # Create new exchange instance each time to avoid accumulated caching

    
    gc.collect()
    # snapshot1 = tracemalloc.take_snapshot()
    markets = exchange.fetch_markets()
    gc.collect()
    exchange.set_markets(markets)
    # snapshot2 = tracemalloc.take_snapshot()
    
    # stats = snapshot2.compare_to(snapshot1, 'lineno')
    
    total_size = 0
    # ccxt_stats = []
    # for stat in stats:
    #     if "ccxt" in stat.traceback[0].filename:
    #         total_size += stat.size
    #         ccxt_stats.append(stat)

    if total_size:
        # logger.info(f"Memory change after set_markets - {total_size / 1024:.1f} KB")
        for stat in ccxt_stats[:5]:
            filename = str(stat.traceback[0].filename)
            # print(f" -> {filename}:{stat.traceback[0].lineno} - {stat.size / 1024:.1f} KB (+{stat.size_diff / 1024:.1f} KB)")

# tracemalloc.start()
gc.collect()  # Initial garbage collection
initial_memory = get_process_memory()
logger.info(f"Initial process memory: {initial_memory:.1f} MB")

# Run multiple times to check for memory leaks
for i in range(1000):
    print(f"\nRunning job {i+1}/5")
    run_job()
    gc.collect()  # Force garbage collection after each run
    current_memory = get_process_memory()
    logger.info(f"Memory after run {i+1}: {current_memory:.1f} MB (delta: {current_memory - initial_memory:.1f} MB)")

final_memory = get_process_memory()
logger.info(f"\nFinal process memory: {final_memory:.1f} MB (total delta: {final_memory - initial_memory:.1f} MB)")

pcriadoperez avatar Mar 24 '25 02:03 pcriadoperez

@pcriadoperez, here's an updated script for isolating the issue....it looks like the memory leak is connected to exchange instances creating new connection pool managers in requests.Session.

import gc
import logging
import os
import sys

import ccxt
import psutil

from pympler import asizeof

# Configure logging to print to console
logging.basicConfig(
    level=logging.INFO,
    format='%(message)s',
    stream=sys.stdout
)
logger = logging.getLogger(__name__)

previous_sessions = []
previous_poolmanagers = []

def get_process_memory():
    process = psutil.Process(os.getpid())
    return process.memory_info().rss / 1024 / 1024  # Convert to MB

def run_job(i, reuse_session=False):
    exchange = ccxt.binance({'enableRateLimit': False})
    markets = exchange.fetch_markets()
    exchange.set_markets(markets)

    if reuse_session:
        session = previous_sessions[-1] if previous_sessions else None
        if session:
            exchange.session = session
        else:
            logger.info(f"\nCreating new session for run {i+1}:")
    else:
        logger.info(f"\nCreating new session for run {i+1}:")

    logger.info(f"Inspecting session after run {i+1}:")

    try:
        session = getattr(exchange, 'session', None)
        if session:
            logger.info(f"exchange.session: type={type(session).__name__}, id={id(session)}, repr={repr(session)[:200]}")
            session_size = asizeof.asizeof(session)
            logger.info(f"exchange.session memory size: {session_size / 1024:.1f} KB")
    except Exception as e:
        logger.warning(f"Could not inspect session: {e}")

    if hasattr(exchange, "session"):
        try:
            adapter = exchange.session.adapters.get('https://')
            if adapter:
                previous_poolmanagers.append(adapter.poolmanager)
            previous_sessions.append(exchange.session)
            total_session_mem = asizeof.asizeof(previous_sessions)
            total_pool_mgr_mem = asizeof.asizeof(previous_poolmanagers)
            logger.info(f"Cumulative previous sessions memory: {total_session_mem / 1024:.1f} KB")
            logger.info(f"Cumulative previous pool managers memory: {total_pool_mgr_mem / 1024:.1f} KB")
            exchange.session.adapters.clear()  # Clear adapters to drop pools
            exchange.session.close()
        except Exception as e:
            logger.warning(f"Failed to fully release session: {e}")

    del exchange
    gc.collect()

gc.collect()  # Initial garbage collection
initial_memory = get_process_memory()
logger.info(f"Initial process memory: {initial_memory:.1f} MB")

# Phase 1: Run with new session each time to reproduce memory leak
logger.info("\n--- Phase 1: New session each run ---")
for i in range(10):
    run_job(i)
    current_memory = get_process_memory()
    logger.info(f"Memory after run {i+1}: {current_memory:.1f} MB (delta: {current_memory - initial_memory:.1f} MB)")

# Phase 2: Run with reused session to prove leak disappears
logger.info("\n--- Phase 2: Reused session ---")
for i in range(10):
    run_job(i, reuse_session=True)
    current_memory = get_process_memory()
    logger.info(f"Memory after run {i+1}: {current_memory:.1f} MB (delta: {current_memory - initial_memory:.1f} MB)")

final_memory = get_process_memory()
logger.info(f"\nFinal process memory: {final_memory:.1f} MB (total delta: {final_memory - initial_memory:.1f} MB)")

...and the results:

Initial process memory: 77.1 MB

--- Phase 1: New session each run ---

Creating new session for run 1:
Inspecting session after run 1:
exchange.session: type=Session, id=4576452512, repr=<requests.sessions.Session object at 0x110c71fa0>
exchange.session memory size: 58.9 KB
Cumulative previous sessions memory: 59.0 KB
Cumulative previous pool managers memory: 49.5 KB
Memory after run 1: 156.3 MB (delta: 79.2 MB)

Creating new session for run 2:
Inspecting session after run 2:
exchange.session: type=Session, id=4398982080, repr=<requests.sessions.Session object at 0x1063323c0>
exchange.session memory size: 57.4 KB
Cumulative previous sessions memory: 59.9 KB
Cumulative previous pool managers memory: 85.8 KB
Memory after run 2: 184.6 MB (delta: 107.6 MB)

Creating new session for run 3:
Inspecting session after run 3:
exchange.session: type=Session, id=4397763520, repr=<requests.sessions.Session object at 0x106208bc0>
exchange.session memory size: 56.5 KB
Cumulative previous sessions memory: 61.4 KB
Cumulative previous pool managers memory: 121.3 KB
Memory after run 3: 196.5 MB (delta: 119.5 MB)

Creating new session for run 4:
Inspecting session after run 4:
exchange.session: type=Session, id=4862378736, repr=<requests.sessions.Session object at 0x121d202f0>
exchange.session memory size: 56.3 KB
Cumulative previous sessions memory: 63.5 KB
Cumulative previous pool managers memory: 157.5 KB
Memory after run 4: 206.6 MB (delta: 129.6 MB)

Creating new session for run 5:
Inspecting session after run 5:
exchange.session: type=Session, id=4874398288, repr=<requests.sessions.Session object at 0x122896a50>
exchange.session memory size: 56.1 KB
Cumulative previous sessions memory: 65.6 KB
Cumulative previous pool managers memory: 193.7 KB
Memory after run 5: 217.6 MB (delta: 140.6 MB)

Creating new session for run 6:
Inspecting session after run 6:
exchange.session: type=Session, id=4564587568, repr=<requests.sessions.Session object at 0x110121430>
exchange.session memory size: 55.8 KB
Cumulative previous sessions memory: 67.8 KB
Cumulative previous pool managers memory: 229.7 KB
Memory after run 6: 229.6 MB (delta: 152.5 MB)

Creating new session for run 7:
Inspecting session after run 7:
exchange.session: type=Session, id=4573464048, repr=<requests.sessions.Session object at 0x1109985f0>
exchange.session memory size: 55.7 KB
Cumulative previous sessions memory: 69.9 KB
Cumulative previous pool managers memory: 265.5 KB
Memory after run 7: 236.6 MB (delta: 159.5 MB)

Creating new session for run 8:
Inspecting session after run 8:
exchange.session: type=Session, id=4581715520, repr=<requests.sessions.Session object at 0x111176e40>
exchange.session memory size: 55.5 KB
Cumulative previous sessions memory: 72.1 KB
Cumulative previous pool managers memory: 301.4 KB
Memory after run 8: 247.4 MB (delta: 170.3 MB)

Creating new session for run 9:
Inspecting session after run 9:
exchange.session: type=Session, id=4878434672, repr=<requests.sessions.Session object at 0x122c70170>
exchange.session memory size: 55.4 KB
Cumulative previous sessions memory: 74.3 KB
Cumulative previous pool managers memory: 337.6 KB
Memory after run 9: 252.6 MB (delta: 175.5 MB)

Creating new session for run 10:
Inspecting session after run 10:
exchange.session: type=Session, id=4585455936, repr=<requests.sessions.Session object at 0x111508140>
exchange.session memory size: 55.4 KB
Cumulative previous sessions memory: 76.5 KB
Cumulative previous pool managers memory: 373.6 KB
Memory after run 10: 258.3 MB (delta: 181.2 MB)

--- Phase 2: Reused session ---

Inspecting session after run 1:
exchange.session: type=Session, id=4585455936, repr=<requests.sessions.Session object at 0x111508140>
exchange.session memory size: 4.7 KB
Cumulative previous sessions memory: 25.8 KB
Cumulative previous pool managers memory: 373.3 KB
Memory after run 1: 257.3 MB (delta: 180.3 MB)

Inspecting session after run 2:
exchange.session: type=Session, id=4585455936, repr=<requests.sessions.Session object at 0x111508140>
exchange.session memory size: 4.7 KB
Cumulative previous sessions memory: 25.7 KB
Cumulative previous pool managers memory: 373.0 KB
Memory after run 2: 257.0 MB (delta: 179.9 MB)

Inspecting session after run 3:
exchange.session: type=Session, id=4585455936, repr=<requests.sessions.Session object at 0x111508140>
exchange.session memory size: 4.7 KB
Cumulative previous sessions memory: 25.6 KB
Cumulative previous pool managers memory: 373.0 KB
Memory after run 3: 259.4 MB (delta: 182.3 MB)

Inspecting session after run 4:
exchange.session: type=Session, id=4585455936, repr=<requests.sessions.Session object at 0x111508140>
exchange.session memory size: 4.7 KB
Cumulative previous sessions memory: 25.5 KB
Cumulative previous pool managers memory: 373.0 KB
Memory after run 4: 261.1 MB (delta: 184.0 MB)

Inspecting session after run 5:
exchange.session: type=Session, id=4585455936, repr=<requests.sessions.Session object at 0x111508140>
exchange.session memory size: 4.7 KB
Cumulative previous sessions memory: 25.4 KB
Cumulative previous pool managers memory: 373.0 KB
Memory after run 5: 261.5 MB (delta: 184.5 MB)

Inspecting session after run 6:
exchange.session: type=Session, id=4585455936, repr=<requests.sessions.Session object at 0x111508140>
exchange.session memory size: 4.7 KB
Cumulative previous sessions memory: 25.4 KB
Cumulative previous pool managers memory: 373.0 KB
Memory after run 6: 261.6 MB (delta: 184.5 MB)

Inspecting session after run 7:
exchange.session: type=Session, id=4585455936, repr=<requests.sessions.Session object at 0x111508140>
exchange.session memory size: 4.6 KB
Cumulative previous sessions memory: 25.4 KB
Cumulative previous pool managers memory: 373.0 KB
Memory after run 7: 261.5 MB (delta: 184.4 MB)

Inspecting session after run 8:
exchange.session: type=Session, id=4585455936, repr=<requests.sessions.Session object at 0x111508140>
exchange.session memory size: 4.6 KB
Cumulative previous sessions memory: 25.4 KB
Cumulative previous pool managers memory: 373.0 KB
Memory after run 8: 262.9 MB (delta: 185.9 MB)

Inspecting session after run 9:
exchange.session: type=Session, id=4585455936, repr=<requests.sessions.Session object at 0x111508140>
exchange.session memory size: 4.6 KB
Cumulative previous sessions memory: 25.4 KB
Cumulative previous pool managers memory: 373.0 KB
Memory after run 9: 262.2 MB (delta: 185.2 MB)

Inspecting session after run 10:
exchange.session: type=Session, id=4585455936, repr=<requests.sessions.Session object at 0x111508140>
exchange.session memory size: 4.6 KB
Cumulative previous sessions memory: 25.4 KB
Cumulative previous pool managers memory: 373.0 KB
Memory after run 10: 262.8 MB (delta: 185.7 MB)

Final process memory: 262.8 MB (total delta: 185.7 MB)

Hope that helps.

timborden avatar Mar 24 '25 10:03 timborden

Just following up @pcriadoperez, was the updated script helpful to replicate the issue on your end?

timborden avatar Apr 03 '25 11:04 timborden

Hi @timborden , sorry I was trying but still no luck.

I believe in this case it could be that you are seeing an increase in the allocated memory for sessions as you are storing a reference to it so the garbage collector does not delete it.

I commented the references to the session. And over a 50/100 jobs I still see the memory go back to it's initial numbers.

I was curious that the number did seem high, but most of the memory used is for storing the market data of the exchange:

before fetch_markets memory: 90.7 MB
after fetch_markets memory: 297.9 MB

Also I checked both the async and sync versions of exchange.py and it does seem to close the session before deleting the exchange:

    async def close(self):
        await self.ws_close()
        if self.session is not None:
            if self.own_session:
                await self.session.close()
            self.session = None
        await self.close_connector()
        await self.close_proxy_sessions()
        await self.sleep(self.timeout_on_exit)

pcriadoperez avatar Apr 08 '25 04:04 pcriadoperez

I am also getting an issue like this with 750M calls to deep_extend.

Image

pattty847 avatar Jun 05 '25 02:06 pattty847

@pattty847 Are you also facing a mem leak? Can you elaborate more please?

carlosmiei avatar Jun 05 '25 08:06 carlosmiei

I would suggest to be very specific here (as in - what exact python version was used, what are the exact versions of dependencies, essentially a pip freeze on a hopefully otherwise clean environment...). neither of these details seems to have been exchanged as part of this issue (which i think might have hindered proper reproduction ...) - though i hope it's been shared in other channels - as otherwise, a proper reproduction may automatically fail (what leaks on 3.11 may work fine on 3.10 or 3.12 ...).

i did experience a similar leak - though with async code only - and ONLY related to 3.12 in combination with newer aiohttp versions (and only on binance). I'm still not sure about the actual problem - but for me, the fix was to not return markets data through loop.run_until_complete() - but rather use the .markets data from the exchange class afterwards. If this is now a memoryleak in python itself (which is what i'd suspect - but why is it not happening with older aiohttp versions??) - or in aiohttp - or in ccxt - that's pretty difficult to say for sure (at least to me).

xmatthias avatar Jun 05 '25 17:06 xmatthias

@xmatthias When you need to reload the market, what if the .market() method is unavailable or not working?

yyywwt avatar Jul 12 '25 13:07 yyywwt

well you need to handle errors? that didn't leak on me - at least not in an obvious way.

xmatthias avatar Jul 12 '25 15:07 xmatthias