Skip to content

Http serialising files #477

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions fsspec/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class BaseCache(object):
How big this file is
"""

name = "none"

def __init__(self, blocksize, fetcher, size):
self.blocksize = blocksize
self.fetcher = fetcher
Expand All @@ -47,6 +49,8 @@ class MMapCache(BaseCache):
This cache method might only work on posix
"""

name = "mmap"

def __init__(self, blocksize, fetcher, size, location=None, blocks=None):
super().__init__(blocksize, fetcher, size)
self.blocks = set() if blocks is None else blocks
Expand Down Expand Up @@ -116,6 +120,8 @@ class ReadAheadCache(BaseCache):
many small reads in a sequential order (e.g., reading lines from a file).
"""

name = "readahead"

def __init__(self, blocksize, fetcher, size):
super().__init__(blocksize, fetcher, size)
self.cache = b""
Expand Down Expand Up @@ -171,6 +177,8 @@ class BlockCache(BaseCache):
use for this cache is then ``blocksize * maxblocks``.
"""

name = "blockcache"

def __init__(self, blocksize, fetcher, size, maxblocks=32):
super().__init__(blocksize, fetcher, size)
self.nblocks = math.ceil(size / blocksize)
Expand Down Expand Up @@ -292,6 +300,8 @@ class BytesCache(BaseCache):
we are more than a blocksize ahead of it.
"""

name = "bytes"

def __init__(self, blocksize, fetcher, size, trim=True):
super().__init__(blocksize, fetcher, size)
self.cache = b""
Expand Down Expand Up @@ -367,6 +377,8 @@ def __len__(self):
class AllBytes(BaseCache):
"""Cache entire contents of the file"""

name = "all"

def __init__(self, blocksize=None, fetcher=None, size=None, data=None):
super().__init__(blocksize, fetcher, size)
if data is None:
Expand Down
24 changes: 20 additions & 4 deletions fsspec/implementations/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ def _open(
autocommit=None, # XXX: This differs from the base class.
cache_type=None,
cache_options=None,
size=None,
**kwargs
):
"""Make a file-like object
Expand All @@ -227,7 +228,7 @@ def _open(
kw = self.kwargs.copy()
kw["asynchronous"] = self.asynchronous
kw.update(kwargs)
size = self.size(path)
size = size or self.size(path)
if block_size and size:
return HTTPFile(
self,
Expand Down Expand Up @@ -434,9 +435,21 @@ async def async_fetch_range(self, start, end):
def close(self):
pass

def __reduce__(self):
return reopen, (
self.fs,
self.url,
self.mode,
self.blocksize,
self.cache.name,
self.size,
)


async def get(session, url, **kwargs):
return await session.get(url, **kwargs)
def reopen(fs, url, mode, blocksize, cache_type, size=None):
return fs.open(
url, mode=mode, block_size=blocksize, cache_type=cache_type, size=size
)


class HTTPStreamFile(AbstractBufferedFile):
Expand All @@ -449,7 +462,7 @@ def __init__(self, fs, url, mode="rb", loop=None, session=None, **kwargs):
raise ValueError
self.details = {"name": url, "size": None}
super().__init__(fs=fs, path=url, mode=mode, cache_type="none", **kwargs)
self.r = sync(self.loop, get, self.session, url, **kwargs)
self.r = sync(self.loop, self.session.get, url, **kwargs)

def seek(self, *args, **kwargs):
raise ValueError("Cannot seek streaming HTTP file")
Expand All @@ -467,6 +480,9 @@ async def _close(self):
def close(self):
asyncio.run_coroutine_threadsafe(self._close(), self.loop)

def __reduce__(self):
return reopen, (self.fs, self.url, self.mode, self.blocksize, self.cache.name)


async def get_range(session, url, start, end, file=None, **kwargs):
# explicit get a range when we know it must be safe
Expand Down
22 changes: 22 additions & 0 deletions fsspec/implementations/tests/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,28 @@ def test_read(server):
assert f.read(100) + f.read() == data


def test_file_pickle(server):
import pickle

# via HTTPFile
h = fsspec.filesystem("http", headers={"give_length": "true", "head_ok": "true"})
out = server + "/index/realfile"
with h.open(out, "rb") as f:
pic = pickle.dumps(f)
assert f.read() == data
with pickle.loads(pic) as f:
assert f.read() == data

# via HTTPStreamFile
h = fsspec.filesystem("http")
out = server + "/index/realfile"
with h.open(out, "rb") as f:
out = pickle.dumps(f)
assert f.read() == data
with pickle.loads(out) as f:
assert f.read() == data


def test_methods(server):
h = fsspec.filesystem("http")
url = server + "/index/realfile"
Expand Down