Skip to content

Commit 741565f

Browse files
Merge remote-tracking branch 'upstream/master' into remove-deprecated-filesystem
2 parents 8e5075e + 7fb2faa commit 741565f

15 files changed

+228
-44
lines changed

fsspec/asyn.py

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import sys
77
import threading
88

9-
from .utils import other_paths
9+
from .utils import other_paths, is_exception
1010
from .spec import AbstractFileSystem
1111

1212
# this global variable holds whether this thread is running async or not
@@ -16,7 +16,8 @@
1616

1717
def _run_until_done(coro):
1818
"""execute coroutine, when already in the event loop"""
19-
loop = asyncio.get_running_loop()
19+
loop = asyncio.get_event_loop()
20+
assert loop.is_running()
2021
task = asyncio.current_task()
2122
asyncio.tasks._unregister_task(task)
2223
current_task = asyncio.tasks._current_tasks.get(loop)
@@ -72,7 +73,7 @@ async def f():
7273
def maybe_sync(func, self, *args, **kwargs):
7374
"""Make function call into coroutine or maybe run
7475
75-
If we are running async, returns the coroutine object so that it can be awaited;
76+
If we are running async, run coroutine on current loop until done;
7677
otherwise runs it on the loop (if is a coroutine already) or directly. Will guess
7778
we are running async if either "self" has an attribute asynchronous which is True,
7879
or thread_state does (this gets set in ``sync()`` itself, to avoid nesting loops).
@@ -105,7 +106,7 @@ async def _run_as_coroutine(func, *args, **kwargs):
105106

106107

107108
def sync_wrapper(func, obj=None):
108-
"""Given a function, make so can be called in async or bocking contexts
109+
"""Given a function, make so can be called in async or blocking contexts
109110
110111
Leave obj=None if defining within a class. Pass the instance if attaching
111112
as an attribute of the instance.
@@ -189,7 +190,7 @@ async def _rm(self, path, recursive=False, **kwargs):
189190

190191
def rm(self, path, recursive=False, **kwargs):
191192
path = self.expand_path(path, recursive=recursive)
192-
sync(self.loop, self._rm, path, **kwargs)
193+
maybe_sync(self._rm, self, path, **kwargs)
193194

194195
async def _copy(self, paths, path2, **kwargs):
195196
await asyncio.gather(
@@ -199,7 +200,7 @@ async def _copy(self, paths, path2, **kwargs):
199200
def copy(self, path1, path2, recursive=False, **kwargs):
200201
paths = self.expand_path(path1, recursive=recursive)
201202
path2 = other_paths(paths, path2)
202-
sync(self.loop, self._copy, paths, path2, **kwargs)
203+
maybe_sync(self._copy, self, paths, path2)
203204

204205
async def _pipe(self, path, value=None, **kwargs):
205206
if isinstance(path, str):
@@ -213,18 +214,27 @@ async def _cat(self, paths, **kwargs):
213214
*[
214215
asyncio.ensure_future(self._cat_file(path, **kwargs), loop=self.loop)
215216
for path in paths
216-
]
217+
],
218+
return_exceptions=True
217219
)
218220

219-
def cat(self, path, recursive=False, **kwargs):
221+
def cat(self, path, recursive=False, on_error="raise", **kwargs):
220222
paths = self.expand_path(path, recursive=recursive)
221-
out = sync(self.loop, self._cat, paths, **kwargs)
223+
out = maybe_sync(self._cat, self, paths, **kwargs)
224+
if on_error == "raise":
225+
ex = next(filter(is_exception, out), False)
226+
if ex:
227+
raise ex
222228
if (
223229
len(paths) > 1
224230
or isinstance(path, list)
225231
or paths[0] != self._strip_protocol(path)
226232
):
227-
return {k: v for k, v in zip(paths, out)}
233+
return {
234+
k: v
235+
for k, v in zip(paths, out)
236+
if on_error != "omit" or not is_exception(v)
237+
}
228238
else:
229239
return out[0]
230240

@@ -245,9 +255,11 @@ def put(self, lpath, rpath, recursive=False, **kwargs):
245255
fs = LocalFileSystem()
246256
lpaths = fs.expand_path(lpath, recursive=recursive)
247257
rpaths = other_paths(lpaths, rpath)
248-
sync(self.loop, self._put, lpaths, rpaths, **kwargs)
258+
maybe_sync(self._put, self, lpaths, rpaths, **kwargs)
249259

250260
async def _get(self, rpaths, lpaths, **kwargs):
261+
dirs = [os.path.dirname(lp) for lp in lpaths]
262+
[os.makedirs(d, exist_ok=True) for d in dirs]
251263
return await asyncio.gather(
252264
*[
253265
self._get_file(rpath, lpath, **kwargs)

fsspec/caching.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ def _fetch(self, start, end):
218218

219219
# these are cached, so safe to do multiple calls for the same start and end.
220220
for block_number in range(start_block_number, end_block_number + 1):
221-
self._fetch_block(block_number)
221+
self._fetch_block_cached(block_number)
222222

223223
return self._read_cache(
224224
start,

fsspec/implementations/cached.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ def _open(
294294
# TODO: action where partial file exists in read-only cache
295295
logger.debug("Opening partially cached copy of %s" % path)
296296
else:
297-
hash = hash_name(path, self.same_names)
297+
hash = self.hash_name(path, self.same_names)
298298
fn = os.path.join(self.storage[-1], hash)
299299
blocks = set()
300300
detail = {
@@ -339,6 +339,9 @@ def _open(
339339
self.save_cache()
340340
return f
341341

342+
def hash_name(self, path, same_name):
343+
return hash_name(path, same_name=same_name)
344+
342345
def close_and_update(self, f, close):
343346
"""Called when a file is closing, so store the set of blocks"""
344347
path = self._strip_protocol(f.path)
@@ -376,6 +379,7 @@ def __getattribute__(self, item):
376379
"_paths_from_path",
377380
"open_many",
378381
"commit_many",
382+
"hash_name",
379383
]:
380384
# all the methods defined in this class. Note `open` here, since
381385
# it calls `_open`, but is actually in superclass
@@ -442,7 +446,7 @@ def open_many(self, open_files):
442446
downpath = [p for p, d in zip(paths, details) if not d]
443447
downstore = [p for p, d in zip(store_paths, details) if not d]
444448
downfn0 = [
445-
os.path.join(self.storage[-1], hash_name(p, self.same_names))
449+
os.path.join(self.storage[-1], self.hash_name(p, self.same_names))
446450
for p, d in zip(paths, details)
447451
] # keep these path names for opening later
448452
downfn = [fn for fn, d in zip(downfn0, details) if not d]
@@ -453,7 +457,7 @@ def open_many(self, open_files):
453457
# update metadata - only happens when downloads are successful
454458
newdetail = [
455459
{
456-
"fn": hash_name(path, self.same_names),
460+
"fn": self.hash_name(path, self.same_names),
457461
"blocks": True,
458462
"time": time.time(),
459463
"uid": self.fs.ukey(path),
@@ -504,7 +508,7 @@ def _open(self, path, mode="rb", **kwargs):
504508
"as a wholly cached file" % path
505509
)
506510
else:
507-
hash = hash_name(path, self.same_names)
511+
hash = self.hash_name(path, self.same_names)
508512
fn = os.path.join(self.storage[-1], hash)
509513
detail = {
510514
"fn": hash,
@@ -576,7 +580,7 @@ def __init__(self, **kwargs):
576580

577581
def _check_file(self, path):
578582
self._check_cache()
579-
sha = hash_name(path, self.same_names)
583+
sha = self.hash_name(path, self.same_names)
580584
for storage in self.storage:
581585
fn = os.path.join(storage, sha)
582586
if os.path.exists(fn):
@@ -602,7 +606,7 @@ def _open(self, path, mode="rb", **kwargs):
602606
if fn:
603607
return open(fn, mode)
604608

605-
sha = hash_name(path, self.same_names)
609+
sha = self.hash_name(path, self.same_names)
606610
fn = os.path.join(self.storage[-1], sha)
607611
logger.debug("Copying %s to local cache" % path)
608612
kwargs["mode"] = mode

fsspec/implementations/http.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import aiohttp
44
import asyncio
5+
import logging
56
import re
67
import requests
78
import weakref
@@ -14,6 +15,7 @@
1415
# https://stackoverflow.com/a/15926317/3821154
1516
ex = re.compile(r"""<a\s+(?:[^>]*?\s+)?href=(["'])(.*?)\1""")
1617
ex2 = re.compile(r"""(http[s]?://[-a-zA-Z0-9@:%_+.~#?&/=]+)""")
18+
logger = logging.getLogger("fsspec.http")
1719

1820

1921
async def get_client(**kwargs):
@@ -101,6 +103,7 @@ async def _ls(self, url, detail=True, **kwargs):
101103
# ignoring URL-encoded arguments
102104
kw = self.kwargs.copy()
103105
kw.update(kwargs)
106+
logger.debug(url)
104107
async with self.session.get(url, **self.kwargs) as r:
105108
r.raise_for_status()
106109
text = await r.text()
@@ -145,6 +148,7 @@ async def _ls(self, url, detail=True, **kwargs):
145148
async def _cat_file(self, url, **kwargs):
146149
kw = self.kwargs.copy()
147150
kw.update(kwargs)
151+
logger.debug(url)
148152
async with self.session.get(url, **kw) as r:
149153
if r.status == 404:
150154
raise FileNotFoundError(url)
@@ -155,6 +159,7 @@ async def _cat_file(self, url, **kwargs):
155159
async def _get_file(self, rpath, lpath, chunk_size=5 * 2 ** 20, **kwargs):
156160
kw = self.kwargs.copy()
157161
kw.update(kwargs)
162+
logger.debug(rpath)
158163
async with self.session.get(rpath, **self.kwargs) as r:
159164
if r.status == 404:
160165
raise FileNotFoundError(rpath)
@@ -169,6 +174,7 @@ async def _exists(self, path, **kwargs):
169174
kw = self.kwargs.copy()
170175
kw.update(kwargs)
171176
try:
177+
logger.debug(path)
172178
r = await self.session.get(path, **kw)
173179
async with r:
174180
return r.status < 400
@@ -370,6 +376,7 @@ async def async_fetch_range(self, start, end):
370376
kwargs = self.kwargs.copy()
371377
headers = kwargs.pop("headers", {}).copy()
372378
headers["Range"] = "bytes=%i-%i" % (start, end - 1)
379+
logger.debug(self.url + " : " + headers["Range"])
373380
r = await self.session.get(self.url, headers=headers, **kwargs)
374381
async with r:
375382
if r.status == 416:

fsspec/implementations/memory.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from io import BytesIO
44
from datetime import datetime
5+
from errno import ENOTEMPTY
56
from fsspec import AbstractFileSystem
67
import logging
78

@@ -102,7 +103,7 @@ def rmdir(self, path):
102103
if not self.ls(path):
103104
self.pseudo_dirs.remove(path)
104105
else:
105-
raise OSError("Directory %s not empty" % path)
106+
raise OSError(ENOTEMPTY, "Directory not empty", path)
106107
else:
107108
raise FileNotFoundError(path)
108109

fsspec/implementations/tests/test_http.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
data = b"\n".join([b"some test data"] * 1000)
1313
realfile = "http://localhost:%i/index/realfile" % port
1414
index = b'<a href="%s">Link</a>' % realfile.encode()
15+
win = os.name == "nt"
1516

1617

1718
class HTTPTestHandler(BaseHTTPRequestHandler):
@@ -240,9 +241,12 @@ async def _():
240241
loop = asyncio.get_event_loop()
241242
fs = fsspec.filesystem("http", asynchronous=True, loop=loop)
242243

244+
# fails because client creation has not yet been awaited
245+
assert isinstance(
246+
(await fs._cat([server + "/index/realfile"]))[0], RuntimeError
247+
)
243248
with pytest.raises(RuntimeError):
244-
# fails because client creation has not yet been awaited
245-
await fs._cat([server + "/index/realfile"])
249+
fs.cat([server + "/index/realfile"])
246250

247251
await fs.set_session() # creates client
248252

@@ -251,3 +255,30 @@ async def _():
251255
assert out == [data]
252256

253257
asyncio.run(_())
258+
259+
260+
def _inner_pass(fs, q, fn):
261+
# pass the s3 instance, but don't use it; in new process, the instance
262+
# cache should be skipped to make a new instance
263+
fs = fsspec.filesystem("http")
264+
q.put(fs.cat(fn))
265+
266+
267+
@pytest.mark.skipif(
268+
bool(os.environ.get("TRAVIS", "")), reason="Travis is weird in many ways"
269+
)
270+
@pytest.mark.parametrize("method", ["spawn", "forkserver", "fork"])
271+
def test_processes(server, method):
272+
import multiprocessing as mp
273+
274+
if win and method != "spawn":
275+
pytest.skip("Windows can only spawn")
276+
ctx = mp.get_context(method)
277+
fn = server + "/index/realfile"
278+
fs = fsspec.filesystem("http")
279+
280+
q = ctx.Queue()
281+
p = ctx.Process(target=_inner_pass, args=(fs, q, fn))
282+
p.start()
283+
assert q.get() == fs.cat(fn)
284+
p.join()

0 commit comments

Comments
 (0)