numcodecs.blosc mutex leaked semaphore warning
Minimal, reproducible code sample, a copy-pastable example if possible
import zarr
Problem description
Using multiprocessing in a cython context, if zarr is in the import space I get a leaked semaphore warning.
I suspect the issue is the global mutex variable (the lock) in numcodecs.blosc is possibly being garbage collected prior to the multiprocessing finalizer getting called, so the weakref is dropped and the lock is never unregistered. This is an issue when used in a multiprocessing environment. This results in a leaked semaphore warning for me. I don't think there's a functional issue but I would like the warning to go away.
https://github.com/zarr-developers/numcodecs/blob/master/numcodecs/blosc.pyx#L78
Version and installation information
Please provide the following:
-
numcodecs0.6.4 - Python 3.8.0
- Linux, MacOS
-
conda
Thanks @rc-conway. Do you have any suggestions for how to avoid this?
Could we move it to init? Would that help?
@jakirkham That would the prevent issues caused by having zarr on the import path.
As a just in case
Maybe blosc.destroy might need to call mutex = None to clean it up?
For reference, we call init() here which will be called during zarr (numcodecs) import. We also register a call to destroy at exit (here).
I'm guessing there is also something weird going on when Cython cleans up this object (or doesn't...).
Let's trying moving to init 🙂
Added PR ( https://github.com/zarr-developers/numcodecs/pull/234 ) to move initialization of the mutex to init. Also mutex is overwritten in destroy, which should trigger reference counting to cleanup the Lock (there doesn't appear to be a method to cleanup the Lock directly). Please take a look and give it a try 🙂
Since PR #234 was close, I had another look on this. The following shows a minimal reproduction of the problem, together with with potential fixes (coming with caveats, discussed below):
import multiprocessing as mp
from contextlib import nullcontext
if __name__ == '__main__':
# This must happen before importing blosc,
# and must be guarded by the if.
mp.set_start_method("spawn")
from numcodecs import blosc
### The following line doesn't show anymore leaked semaphores:
# blosc.mutex = mp.get_context("fork").Lock
### Or this, removing the mutex completely, only using
### the _get_use_threads() check
# from contextlib import nullcontext
# blosc.mutex = nullcontext()
def f():
print("inner")
if __name__ == '__main__':
print("outer")
p = mp.Process(target=f)
p.start()
p.join()
This outputs There appear to be 1 leaked semaphores to clean up at shutdown, commenting in any of the two fixes removes this warning. Using the first solution with mp.get_context("fork").Lock has two caveats:
-
When using fork, leaked resources are simply not shown, so they might still leak, just the warning is gone. From the python docs
On Unix using the spawn or forkserver start methods will also start a resource tracker process which tracks the unlinked named system resources (such as named semaphores or SharedMemory objects) created by processes of the program. When all processes have exited the resource tracker unlinks any remaining tracked object. Usually there should be none, but if a process was killed by a signal there may be some “leaked” resources. (Neither leaked semaphores nor shared memory segments will be automatically unlinked until the next reboot. This is problematic for both objects because the system allows only a limited number of named semaphores, and shared memory segments occupy some space in the main memory.)
-
Using a lock from a fork context with spawned processes does not work. Also from the python docs :
Alternatively, you can use get_context() to obtain a context object. Context objects have the same API as the multiprocessing module, and allow one to use multiple start methods in the same program. … Note that objects related to one context may not be compatible with processes for a different context. In particular, locks created using the fork context cannot be passed to processes started using the spawn or forkserver start methods.
However, it seems that this might not be a problem, since the blosc calls with the global context are guarded by
_get_use_threads, which ensures that the global context is only accessedfrom within a single-threaded, single-process program.
(from this code comment), and not from any subprocesses or -threads. Those only use the non-threaded ctx-version of the calls, so the fork-only lock should actually never be used in spawned subprocesses.
Because of the latter point, I'm wondering if the mutex is actually needed at all. It would be great if someone with more knowledge about the blosc internals could comment on that. In this case simply removing the mutex would be fine, which can be simulated by setting it to contextlib.nullcontext() as shown in the second commented fix, which essentially then only uses the rest of the _get_use_threads logic to guard the global context access.
Here is some more code I used for further testing:
import os
import multiprocessing as mp
if __name__ == '__main__':
# This must happen before importing blosc,
# and must be guarded by the if.
mp.set_start_method("spawn")
from numcodecs import blosc
### The following line doesn't show anymore leaked semaphores:
# blosc.mutex = mp.get_context(method="fork").Lock()
### Or this, removing the mutex completely, only using
### the _get_use_threads() check
from contextlib import nullcontext
blosc.mutex = nullcontext()
def get_random_bytes():
LENGTH = 10**9
return b"\x00" + os.urandom(LENGTH) + b"\x00"
def f():
print("inner")
codec = blosc.Blosc("zstd")
msg = get_random_bytes()
print("use threads (inner)", blosc._get_use_threads(), flush=True)
print("before inner encode", flush=True)
encoded = codec.encode(msg)
print("after inner encode", flush=True)
decoded = codec.decode(encoded)
assert decoded == msg
if __name__ == '__main__':
print("outer")
# When using fork in the global scope,
# one can still use spawn locally,
# without leaked semaphore warnings:
# mp = mp.get_context("spawn")
p = mp.Process(target=f)
p.start()
blosc.set_nthreads(1) # limit threads to allow subprocess to start
codec = blosc.Blosc("lz4")
msg = get_random_bytes()
print("use threads (outer)", blosc._get_use_threads(), flush=True)
print("before outer encode", flush=True)
encoded = codec.encode(msg)
print("after outer encode", flush=True)
decoded = codec.decode(encoded)
assert decoded == msg
p.join()
FWIW the PR was closed more because of trying to address issue ( https://github.com/zarr-developers/zarr-python/issues/777 ). So a new PR with those changes or other changes could be opened.
Do you have a preference as to which change, @jakirkham?
I apologize if I appear to be piling on, but running python 3.9, zarr-py version 2.13.3, I cannot even get this to run without emitting the warning about leaked semaphore objects and hanging (commenting out the zarr import resolves the problem).
import zarr
import multiprocessing
def fn(x):
return x**2
def main():
p_list = []
for ii in range(4):
p = multiprocessing.Process(target=fn, args=(ii,))
p.start()
p_list.append(p)
for p in p_list:
p.join()
if __name__ == "__main__":
main()
While I agree with the original poster that results do not actually seem to be affected by this problem, a fix would be greatly appreciated.
No piling on perceived, @danielsf. Extra data points welcome. Ping, @jakirkham.