Skip to content

numcodecs.blosc mutex leaked semaphore warning #230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
rc-conway opened this issue May 4, 2020 · 11 comments
Open

numcodecs.blosc mutex leaked semaphore warning #230

rc-conway opened this issue May 4, 2020 · 11 comments
Labels

Comments

@rc-conway
Copy link

rc-conway commented May 4, 2020

Minimal, reproducible code sample, a copy-pastable example if possible

import zarr

Problem description

Using multiprocessing in a cython context, if zarr is in the import space I get a leaked semaphore warning.

I suspect the issue is the global mutex variable (the lock) in numcodecs.blosc is possibly being garbage collected prior to the multiprocessing finalizer getting called, so the weakref is dropped and the lock is never unregistered. This is an issue when used in a multiprocessing environment. This results in a leaked semaphore warning for me. I don't think there's a functional issue but I would like the warning to go away.

https://github.com/zarr-developers/numcodecs/blob/master/numcodecs/blosc.pyx#L78

Version and installation information

Please provide the following:

  • numcodecs 0.6.4
  • Python 3.8.0
  • Linux, MacOS
  • conda
@alimanfoo
Copy link
Member

Thanks @rc-conway. Do you have any suggestions for how to avoid this?

@jakirkham
Copy link
Member

Could we move it to init? Would that help?

@rc-conway
Copy link
Author

@jakirkham That would the prevent issues caused by having zarr on the import path.

As a just in case
Maybe blosc.destroy might need to call mutex = None to clean it up?

@alimanfoo
Copy link
Member

For reference, we call init() here which will be called during zarr (numcodecs) import. We also register a call to destroy at exit (here).

@jakirkham
Copy link
Member

I'm guessing there is also something weird going on when Cython cleans up this object (or doesn't...).

Let's trying moving to init 🙂

@jakirkham
Copy link
Member

Added PR ( #234 ) to move initialization of the mutex to init. Also mutex is overwritten in destroy, which should trigger reference counting to cleanup the Lock (there doesn't appear to be a method to cleanup the Lock directly). Please take a look and give it a try 🙂

@jstriebel
Copy link
Member

jstriebel commented Oct 21, 2022

Since PR #234 was close, I had another look on this. The following shows a minimal reproduction of the problem, together with with potential fixes (coming with caveats, discussed below):

import multiprocessing as mp
from contextlib import nullcontext

if __name__ == '__main__':
    # This must happen before importing blosc,
    # and must be guarded by the if.
    mp.set_start_method("spawn")

from numcodecs import blosc

### The following line doesn't show anymore leaked semaphores:
# blosc.mutex = mp.get_context("fork").Lock

### Or this, removing the mutex completely, only using
### the _get_use_threads() check
# from contextlib import nullcontext
# blosc.mutex = nullcontext()

def f():
    print("inner")

if __name__ == '__main__':
    print("outer")
    p = mp.Process(target=f)
    p.start()
    p.join()

This outputs There appear to be 1 leaked semaphores to clean up at shutdown, commenting in any of the two fixes removes this warning. Using the first solution with mp.get_context("fork").Lock has two caveats:

  1. When using fork, leaked resources are simply not shown, so they might still leak, just the warning is gone. From the python docs

    On Unix using the spawn or forkserver start methods will also start a resource tracker process which tracks the unlinked named system resources (such as named semaphores or SharedMemory objects) created by processes of the program. When all processes have exited the resource tracker unlinks any remaining tracked object. Usually there should be none, but if a process was killed by a signal there may be some “leaked” resources. (Neither leaked semaphores nor shared memory segments will be automatically unlinked until the next reboot. This is problematic for both objects because the system allows only a limited number of named semaphores, and shared memory segments occupy some space in the main memory.)

  2. Using a lock from a fork context with spawned processes does not work. Also from the python docs :

    Alternatively, you can use get_context() to obtain a context object. Context objects have the same API as the multiprocessing module, and allow one to use multiple start methods in the same program.

    Note that objects related to one context may not be compatible with processes for a different context. In particular, locks created using the fork context cannot be passed to processes started using the spawn or forkserver start methods.

    However, it seems that this might not be a problem, since the blosc calls with the global context are guarded by _get_use_threads, which ensures that the global context is only accessed

    from within a single-threaded, single-process program.

    (from this code comment), and not from any subprocesses or -threads. Those only use the non-threaded ctx-version of the calls, so the fork-only lock should actually never be used in spawned subprocesses.

Because of the latter point, I'm wondering if the mutex is actually needed at all. It would be great if someone with more knowledge about the blosc internals could comment on that. In this case simply removing the mutex would be fine, which can be simulated by setting it to contextlib.nullcontext() as shown in the second commented fix, which essentially then only uses the rest of the _get_use_threads logic to guard the global context access.

Here is some more code I used for further testing:
import os
import multiprocessing as mp


if __name__ == '__main__':
    # This must happen before importing blosc,
    # and must be guarded by the if.
    mp.set_start_method("spawn")

from numcodecs import blosc

### The following line doesn't show anymore leaked semaphores:
# blosc.mutex = mp.get_context(method="fork").Lock()

### Or this, removing the mutex completely, only using
### the _get_use_threads() check
from contextlib import nullcontext
blosc.mutex = nullcontext()


def get_random_bytes():
    LENGTH = 10**9
    return b"\x00" + os.urandom(LENGTH) + b"\x00"


def f():
    print("inner")
    codec = blosc.Blosc("zstd")
    msg = get_random_bytes()
    print("use threads (inner)", blosc._get_use_threads(), flush=True)
    print("before inner encode", flush=True)
    encoded = codec.encode(msg)
    print("after inner encode", flush=True)
    decoded = codec.decode(encoded)
    assert decoded == msg


if __name__ == '__main__':
    print("outer")
    # When using fork in the global scope,
    # one can still use spawn locally,
    # without leaked semaphore warnings:
    # mp = mp.get_context("spawn")
    p = mp.Process(target=f)
    p.start()
    blosc.set_nthreads(1)  # limit threads to allow subprocess to start
    codec = blosc.Blosc("lz4")
    msg = get_random_bytes()
    print("use threads (outer)", blosc._get_use_threads(), flush=True)
    print("before outer encode", flush=True)
    encoded = codec.encode(msg)
    print("after outer encode", flush=True)
    decoded = codec.decode(encoded)
    assert decoded == msg
    p.join()

@jakirkham
Copy link
Member

FWIW the PR was closed more because of trying to address issue ( zarr-developers/zarr-python#777 ). So a new PR with those changes or other changes could be opened.

@joshmoore
Copy link
Member

Do you have a preference as to which change, @jakirkham?

@danielsf
Copy link

I apologize if I appear to be piling on, but running python 3.9, zarr-py version 2.13.3, I cannot even get this to run without emitting the warning about leaked semaphore objects and hanging (commenting out the zarr import resolves the problem).

import zarr
import multiprocessing

def fn(x):
    return x**2

def main():
    p_list = []
    for ii in range(4):
        p = multiprocessing.Process(target=fn, args=(ii,))
        p.start()
        p_list.append(p)
    for p in p_list:
        p.join()

if __name__ == "__main__":
    main()

While I agree with the original poster that results do not actually seem to be affected by this problem, a fix would be greatly appreciated.

@joshmoore
Copy link
Member

No piling on perceived, @danielsf. Extra data points welcome. Ping, @jakirkham.

@dstansby dstansby added the bug label Oct 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants