-
-
Notifications
You must be signed in to change notification settings - Fork 329
ProcessSynchronizer not write safe #857
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I'll add that if this is an actual bug, I have some bandwidth to help contribute toward fixing it. Please let me know if I can help. |
Hi, I'm experiencing the same issue on Windows, zarr 2.10.2. After digging a bit, It appears that the lock mechanism is done on a per chunk basis, in Lines 1991 to 1993 in 5c71212
Maybe it should be done before, i.e. somewhere between calling |
Ditto. Naively, I assume we'd need to look at a throughput metric. Are either of you up for opening a PR so we evaluate on a test-passing state? cc: @alimanfoo |
@joshmoore I can work on this. I just need to think of a more pragmatic example for a unit test, the code I shared above is too stochastic to be useful. |
Maybe this is acceptable for a test? I create a matrix that is of shape (10, 10) with chunks that are (2, 2) and initialize it with zeros. I then create ten parallel processes, each writes a repeating set of numbers diagonally down the upper triangle of the matrix. The expected output should look like this:
As processes write to the store, reads show inconsistencies across chunks where numbers might not yet be fully populated. A unit test would assert that matrix diagonals are always uniform. Examples of failed assertions:
What isn't fully intuitive to me yet is that even though there is a bit of a delay, writes across chunks seem to always eventually be correct. So refining my thinking, it might be more appropriate to say that "reading from a Zarr-store is not guaranteed to be process synchronized." Example Code: import zarr; import numpy as np; import os
from multiprocess import Process
import numcodecs; numcodecs.blosc.use_threads = False
synchronizer = zarr.sync.ProcessSynchronizer('foo.sync')
z = zarr.open_array(
store='foo.zarr', mode='w',
shape=(10, 10), chunks=(2, 2), fill_value=0,
compressor=None, dtype='i4', synchronizer=synchronizer
)
np.fliplr(z[...])
def write_across_chunks(i):
zarrMultiprocess = zarr.open_array('foo.zarr', mode='r+', synchronizer=synchronizer)
zarrMultiprocess.set_coordinate_selection((list(range(i)), list(reversed(range(i)))), [i] * i)
mm = np.fliplr(zarrMultiprocess[:])
if not np.all([np.all(mm.diagonal(offset=k)[0] == mm.diagonal(offset=k)) for k in range(mm.shape[1])]):
print(mm) # TODO: Assertion would fail here.
indices = list(range(1, 11))
processes = []
for index in indices:
process = Process(target=write_across_chunks, args=[index])
processes.append(process)
process.start()
print(f"Processes: {len(processes)}")
for i, process in enumerate(processes):
process.join()
np.fliplr(z) Any thoughts on this? |
If it's showing any failing behavior, then yeah, it would make sense to get that into a PR so we can start trying to pin point the issue.
In my mind, this would make sense since that matches the (general) expectations of object storage. |
Thanks @joshmoore. I'll work on forking the repo and adding the test. |
I think I've managed to convince myself that @jcafhe is 100% correct. That is:
There is probably room for the documentation to be more explicit but I believe this is consistent with what is stated there, specifically:
A value assignment that spans two chunks needs support for more than chunk-level synchronization: it needs multiple-chunk synchronization and Zarr currently has no support for this. The synchronizer interface currently only allows for the retrieval of chunk scoped locks and looking at all the synchronizer uses in Consequently, I'd currently posit that having Zarr assert the concurrency guarantees that I understand you desire would require substantial refactoring in order to achieve. Unless you really need this functionality and are able to drive an implementation it is likely going to be substantially easier to observe chunk boundaries algorithmically in your code that uses Zarr. PS: If it's still a medium or long term goal that "...larger data processing that will happen in the cloud..." and the desired backend is object storage the |
An Amazon Elastic File System can be shared between Lambdas within a Virtual Private Cloud. The attached image gives a rough overview showing the synchronizer accessible to all lambdas (unless I am misunderstanding something):
Yes, I am thinking that from the toy code I put together and the documentation shared above it is making more sense for me as a user to orchestrate writes to the store by some means additional to the synchronizer. |
Yes it can, however that just feels like using a sledgehammer to kill a fly. Obviously, I'm not intimately aware of your use case but just be really careful with the semantics of EFS and Lambda if you decide to go that route; additional Lambda cold start times for mounting EFS, performance of EFS with small amounts of data, EFS lock quotas, etc. If it were me, I'd pick one of the myriad of Redis (there is a fully managed Redis in AWS product if you don't want to run your own) based distributed lock implementations which have very well defined semantics and implement a synchronizer that was tailored to my use case. |
Hi, sorry for the late reply. In hindsight, I agree that the curent behaviour is consistent with what is described in the documentation:
Also, after some refactoring of my code, I don't use concurrent writes anymore so right now, I'm ok with the current behavior of zarr.
Some times ago, I tried to monkey patch the Basically, what I've tried is just about getting all the chunk keys involved in the current set operation, and acquiring all the locks at the "same time". It's a bit tricky when using lock as a context manager, because entering an unknown number of contexts in the same scope requires a bit of a hack (cf. def _set_selection(self, indexer, value, fields=None):
...
# iterate over chunks in range
if not hasattr(self.store, "setitems") or self._synchronizer is not None \
or any(map(lambda x: x == 0, self.shape)):
# get all chunk keys involved in the set operation
chunk_keys = []
if self._synchronizer is not None:
chunk_keys = [ self._chunk_key(chunk_coords) for (chunk_coords, *_) in indexer]
# acquire the locks by entering multiple contexts
with contextlib.ExitStack() as stacks:
for ckey in chunk_keys:
stacks.enter_context(self._synchronizer[ckey])
# iterative approach
for chunk_coords, chunk_selection, out_selection in indexer:
# extract data to store (chunk_value)
...
# put data with no sync
self._chunk_setitem_nosync(chunk_coords, chunk_selection, chunk_value,
fields=fields) I must say that this subject is far beyond me, but here are my thoughts about that. I suspect this approch is too naive, and some deadlocks could occur because all the locks cannot be acquired at the exact same time. E.g., we could imagine two threads/process A and B trying to write into two different slices but involving the same two adjacent chunks, chunk 1 and 2, with their respective locks L1 and L2. If the order of lock acquisition is random, we could end up with the following timeline and a possible deadlock:
So IMO, the order of acquisition of the chunk locks should be at least consistent across all thread/process, or maybe it would be necessary to control the acces of these locks by a global lock, which leads to more and more complexity and edge cases. |
Thanks, @jcafhe. Certainly the general consensus of how to deal with deadlock for multiple lock acquisition scenarios like the one you mention is to formalize the lock acquisition order. However, the fact that the I'm certainly not saying these problems cannot be addressed but rather wholeheartedly agree with your statement: "...leads to more and more complexity...". Concurrency in a distributed system is a hard problem and the complexity it adds to software attempting to address concurrent programming use cases is significant. |
I am just getting start with zarr, xarray and dask on kubernetes and have been considering using the redlock implemented in pottery. Just chiming in since this seems kind of critical for anyone writing to zarr in dask in a distributed setting. I know it is probably not ideal to use pvc for locking. Something like this:
and I am simply setting up redis with helm:
Any immediate concerns with this approach @chris-allan ? |
If you are using a Dask distributed cluster, you can use a distributed Lock to solve this problem. Communication between processes is handled by the Dask scheduler. In the Pangeo Forge project, we use this to manage distributed potentially overlapping writes to Zarr stores in object storage: https://github.com/pangeo-forge/pangeo-forge-recipes/blob/master/pangeo_forge_recipes/utils.py#L86 Our implementation is somewhat specific to our use case, but it could easily be generalized. |
Minimal, reproducible code sample:
Output:
Problem description:
I am prototyping for some larger data processing that will happen in the cloud and have been testing Zarr's parallel computing and synchronization capabilities on my local machine. The synchronization tools are not process-safe as I would expect.
Below is a toy example that creates a Zarr-store utilizing a ProcessSynchronizer to coordinate read and write protection when multiple processes access the same chunk in the store. The code initializes a store with 12 elements using a chunk-size of 4 for a total of 3 chunks. I spawn a number of processes where each writes across two chunks at a time. My expectation is that each of the print statements above would maintain a consistent set of numbers across chunks, e.g.:
Anywhere marked "problem!" above in the output is a situation where the process-safe writes return corrupted data.
Additional notes:
Blosc threading is disabled and compression isn't defined for the store. I have seen in other issues where that was a problem so that shouldn't be affecting read/writes in my example.
I have also tested and am getting similar results for a threaded application using the zarr.ThreadSynchronizer.
Version and installation information:
The text was updated successfully, but these errors were encountered: