diff --git a/fsspec/caching.py b/fsspec/caching.py index 545caff1b..85ea05320 100644 --- a/fsspec/caching.py +++ b/fsspec/caching.py @@ -23,6 +23,8 @@ class BaseCache(object): How big this file is """ + name = "none" + def __init__(self, blocksize, fetcher, size): self.blocksize = blocksize self.fetcher = fetcher @@ -47,6 +49,8 @@ class MMapCache(BaseCache): This cache method might only work on posix """ + name = "mmap" + def __init__(self, blocksize, fetcher, size, location=None, blocks=None): super().__init__(blocksize, fetcher, size) self.blocks = set() if blocks is None else blocks @@ -116,6 +120,8 @@ class ReadAheadCache(BaseCache): many small reads in a sequential order (e.g., reading lines from a file). """ + name = "readahead" + def __init__(self, blocksize, fetcher, size): super().__init__(blocksize, fetcher, size) self.cache = b"" @@ -171,6 +177,8 @@ class BlockCache(BaseCache): use for this cache is then ``blocksize * maxblocks``. """ + name = "blockcache" + def __init__(self, blocksize, fetcher, size, maxblocks=32): super().__init__(blocksize, fetcher, size) self.nblocks = math.ceil(size / blocksize) @@ -292,6 +300,8 @@ class BytesCache(BaseCache): we are more than a blocksize ahead of it. """ + name = "bytes" + def __init__(self, blocksize, fetcher, size, trim=True): super().__init__(blocksize, fetcher, size) self.cache = b"" @@ -367,6 +377,8 @@ def __len__(self): class AllBytes(BaseCache): """Cache entire contents of the file""" + name = "all" + def __init__(self, blocksize=None, fetcher=None, size=None, data=None): super().__init__(blocksize, fetcher, size) if data is None: diff --git a/fsspec/implementations/http.py b/fsspec/implementations/http.py index 4eca88737..029e4b963 100644 --- a/fsspec/implementations/http.py +++ b/fsspec/implementations/http.py @@ -205,6 +205,7 @@ def _open( autocommit=None, # XXX: This differs from the base class. cache_type=None, cache_options=None, + size=None, **kwargs ): """Make a file-like object @@ -227,7 +228,7 @@ def _open( kw = self.kwargs.copy() kw["asynchronous"] = self.asynchronous kw.update(kwargs) - size = self.size(path) + size = size or self.size(path) if block_size and size: return HTTPFile( self, @@ -434,9 +435,21 @@ async def async_fetch_range(self, start, end): def close(self): pass + def __reduce__(self): + return reopen, ( + self.fs, + self.url, + self.mode, + self.blocksize, + self.cache.name, + self.size, + ) + -async def get(session, url, **kwargs): - return await session.get(url, **kwargs) +def reopen(fs, url, mode, blocksize, cache_type, size=None): + return fs.open( + url, mode=mode, block_size=blocksize, cache_type=cache_type, size=size + ) class HTTPStreamFile(AbstractBufferedFile): @@ -449,7 +462,7 @@ def __init__(self, fs, url, mode="rb", loop=None, session=None, **kwargs): raise ValueError self.details = {"name": url, "size": None} super().__init__(fs=fs, path=url, mode=mode, cache_type="none", **kwargs) - self.r = sync(self.loop, get, self.session, url, **kwargs) + self.r = sync(self.loop, self.session.get, url, **kwargs) def seek(self, *args, **kwargs): raise ValueError("Cannot seek streaming HTTP file") @@ -467,6 +480,9 @@ async def _close(self): def close(self): asyncio.run_coroutine_threadsafe(self._close(), self.loop) + def __reduce__(self): + return reopen, (self.fs, self.url, self.mode, self.blocksize, self.cache.name) + async def get_range(session, url, start, end, file=None, **kwargs): # explicit get a range when we know it must be safe diff --git a/fsspec/implementations/tests/test_http.py b/fsspec/implementations/tests/test_http.py index d3a98a92c..ff56b7767 100644 --- a/fsspec/implementations/tests/test_http.py +++ b/fsspec/implementations/tests/test_http.py @@ -125,6 +125,28 @@ def test_read(server): assert f.read(100) + f.read() == data +def test_file_pickle(server): + import pickle + + # via HTTPFile + h = fsspec.filesystem("http", headers={"give_length": "true", "head_ok": "true"}) + out = server + "/index/realfile" + with h.open(out, "rb") as f: + pic = pickle.dumps(f) + assert f.read() == data + with pickle.loads(pic) as f: + assert f.read() == data + + # via HTTPStreamFile + h = fsspec.filesystem("http") + out = server + "/index/realfile" + with h.open(out, "rb") as f: + out = pickle.dumps(f) + assert f.read() == data + with pickle.loads(out) as f: + assert f.read() == data + + def test_methods(server): h = fsspec.filesystem("http") url = server + "/index/realfile"