From 7f64a28b5699c3aaa94e39df421684d9e1272454 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 18 Nov 2020 12:15:12 -0500 Subject: [PATCH 1/2] Fixes https://github.com/pydata/xarray/issues/4591 --- fsspec/caching.py | 6 ++++++ fsspec/implementations/http.py | 17 +++++++++++++---- fsspec/implementations/tests/test_http.py | 22 ++++++++++++++++++++++ 3 files changed, 41 insertions(+), 4 deletions(-) diff --git a/fsspec/caching.py b/fsspec/caching.py index 545caff1b..781d40aeb 100644 --- a/fsspec/caching.py +++ b/fsspec/caching.py @@ -22,6 +22,7 @@ class BaseCache(object): size: int How big this file is """ + name = "none" def __init__(self, blocksize, fetcher, size): self.blocksize = blocksize @@ -46,6 +47,7 @@ class MMapCache(BaseCache): This cache method might only work on posix """ + name = "mmap" def __init__(self, blocksize, fetcher, size, location=None, blocks=None): super().__init__(blocksize, fetcher, size) @@ -115,6 +117,7 @@ class ReadAheadCache(BaseCache): fill holes in the cache or keep fragments alive. It is best suited to many small reads in a sequential order (e.g., reading lines from a file). """ + name = "readahead" def __init__(self, blocksize, fetcher, size): super().__init__(blocksize, fetcher, size) @@ -170,6 +173,7 @@ class BlockCache(BaseCache): The maximum number of blocks to cache for. The maximum memory use for this cache is then ``blocksize * maxblocks``. """ + name = "blockcache" def __init__(self, blocksize, fetcher, size, maxblocks=32): super().__init__(blocksize, fetcher, size) @@ -291,6 +295,7 @@ class BytesCache(BaseCache): As we read more data, whether to discard the start of the buffer when we are more than a blocksize ahead of it. """ + name = "bytes" def __init__(self, blocksize, fetcher, size, trim=True): super().__init__(blocksize, fetcher, size) @@ -366,6 +371,7 @@ def __len__(self): class AllBytes(BaseCache): """Cache entire contents of the file""" + name = "all" def __init__(self, blocksize=None, fetcher=None, size=None, data=None): super().__init__(blocksize, fetcher, size) diff --git a/fsspec/implementations/http.py b/fsspec/implementations/http.py index 4eca88737..b7c55f78f 100644 --- a/fsspec/implementations/http.py +++ b/fsspec/implementations/http.py @@ -205,6 +205,7 @@ def _open( autocommit=None, # XXX: This differs from the base class. cache_type=None, cache_options=None, + size=None, **kwargs ): """Make a file-like object @@ -227,7 +228,7 @@ def _open( kw = self.kwargs.copy() kw["asynchronous"] = self.asynchronous kw.update(kwargs) - size = self.size(path) + size = size or self.size(path) if block_size and size: return HTTPFile( self, @@ -434,9 +435,14 @@ async def async_fetch_range(self, start, end): def close(self): pass + def __reduce__(self): + return reopen, (self.fs, self.url, self.mode, self.blocksize, self.cache.name, + self.size) + -async def get(session, url, **kwargs): - return await session.get(url, **kwargs) +def reopen(fs, url, mode, blocksize, cache_type, size=None): + return fs.open(url, mode=mode, block_size=blocksize, cache_type=cache_type, + size=size) class HTTPStreamFile(AbstractBufferedFile): @@ -449,7 +455,7 @@ def __init__(self, fs, url, mode="rb", loop=None, session=None, **kwargs): raise ValueError self.details = {"name": url, "size": None} super().__init__(fs=fs, path=url, mode=mode, cache_type="none", **kwargs) - self.r = sync(self.loop, get, self.session, url, **kwargs) + self.r = sync(self.loop, self.session.get, url, **kwargs) def seek(self, *args, **kwargs): raise ValueError("Cannot seek streaming HTTP file") @@ -467,6 +473,9 @@ async def _close(self): def close(self): asyncio.run_coroutine_threadsafe(self._close(), self.loop) + def __reduce__(self): + return reopen, (self.fs, self.url, self.mode, self.blocksize, self.cache.name) + async def get_range(session, url, start, end, file=None, **kwargs): # explicit get a range when we know it must be safe diff --git a/fsspec/implementations/tests/test_http.py b/fsspec/implementations/tests/test_http.py index d3a98a92c..ff56b7767 100644 --- a/fsspec/implementations/tests/test_http.py +++ b/fsspec/implementations/tests/test_http.py @@ -125,6 +125,28 @@ def test_read(server): assert f.read(100) + f.read() == data +def test_file_pickle(server): + import pickle + + # via HTTPFile + h = fsspec.filesystem("http", headers={"give_length": "true", "head_ok": "true"}) + out = server + "/index/realfile" + with h.open(out, "rb") as f: + pic = pickle.dumps(f) + assert f.read() == data + with pickle.loads(pic) as f: + assert f.read() == data + + # via HTTPStreamFile + h = fsspec.filesystem("http") + out = server + "/index/realfile" + with h.open(out, "rb") as f: + out = pickle.dumps(f) + assert f.read() == data + with pickle.loads(out) as f: + assert f.read() == data + + def test_methods(server): h = fsspec.filesystem("http") url = server + "/index/realfile" From 791cc1dedebde7ab3cedfcc3c4f2d4c1239498e5 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 18 Nov 2020 12:15:51 -0500 Subject: [PATCH 2/2] black --- fsspec/caching.py | 6 ++++++ fsspec/implementations/http.py | 15 +++++++++++---- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/fsspec/caching.py b/fsspec/caching.py index 781d40aeb..85ea05320 100644 --- a/fsspec/caching.py +++ b/fsspec/caching.py @@ -22,6 +22,7 @@ class BaseCache(object): size: int How big this file is """ + name = "none" def __init__(self, blocksize, fetcher, size): @@ -47,6 +48,7 @@ class MMapCache(BaseCache): This cache method might only work on posix """ + name = "mmap" def __init__(self, blocksize, fetcher, size, location=None, blocks=None): @@ -117,6 +119,7 @@ class ReadAheadCache(BaseCache): fill holes in the cache or keep fragments alive. It is best suited to many small reads in a sequential order (e.g., reading lines from a file). """ + name = "readahead" def __init__(self, blocksize, fetcher, size): @@ -173,6 +176,7 @@ class BlockCache(BaseCache): The maximum number of blocks to cache for. The maximum memory use for this cache is then ``blocksize * maxblocks``. """ + name = "blockcache" def __init__(self, blocksize, fetcher, size, maxblocks=32): @@ -295,6 +299,7 @@ class BytesCache(BaseCache): As we read more data, whether to discard the start of the buffer when we are more than a blocksize ahead of it. """ + name = "bytes" def __init__(self, blocksize, fetcher, size, trim=True): @@ -371,6 +376,7 @@ def __len__(self): class AllBytes(BaseCache): """Cache entire contents of the file""" + name = "all" def __init__(self, blocksize=None, fetcher=None, size=None, data=None): diff --git a/fsspec/implementations/http.py b/fsspec/implementations/http.py index b7c55f78f..029e4b963 100644 --- a/fsspec/implementations/http.py +++ b/fsspec/implementations/http.py @@ -436,13 +436,20 @@ def close(self): pass def __reduce__(self): - return reopen, (self.fs, self.url, self.mode, self.blocksize, self.cache.name, - self.size) + return reopen, ( + self.fs, + self.url, + self.mode, + self.blocksize, + self.cache.name, + self.size, + ) def reopen(fs, url, mode, blocksize, cache_type, size=None): - return fs.open(url, mode=mode, block_size=blocksize, cache_type=cache_type, - size=size) + return fs.open( + url, mode=mode, block_size=blocksize, cache_type=cache_type, size=size + ) class HTTPStreamFile(AbstractBufferedFile):