22
33import fsspec
44import pytest
5+ from upath import UPath
56
67from zarr .buffer import Buffer , default_buffer_prototype
78from zarr .store import RemoteStore
9+ from zarr .sync import sync
810from zarr .testing .store import StoreTests
911
1012s3fs = pytest .importorskip ("s3fs" )
1618test_bucket_name = "test"
1719secure_bucket_name = "test-secure"
1820port = 5555
19- endpoint_uri = f"http://127.0.0.1:{ port } /"
21+ endpoint_url = f"http://127.0.0.1:{ port } /"
2022
2123
2224@pytest .fixture (scope = "module" )
@@ -40,18 +42,33 @@ def get_boto3_client():
4042
4143 # NB: we use the sync botocore client for setup
4244 session = Session ()
43- return session .create_client ("s3" , endpoint_url = endpoint_uri )
45+ return session .create_client ("s3" , endpoint_url = endpoint_url )
4446
4547
4648@pytest .fixture (autouse = True , scope = "function" )
4749def s3 (s3_base ):
50+ """
51+ Quoting Martin Durant:
52+ pytest-asyncio creates a new event loop for each async test.
53+ When an async-mode s3fs instance is made from async, it will be assigned to the loop from
54+ which it is made. That means that if you use s3fs again from a subsequent test,
55+ you will have the same identical instance, but be running on a different loop - which fails.
56+
57+ For the rest: it's very convenient to clean up the state of the store between tests,
58+ make sure we start off blank each time.
59+
60+ https://github.com/zarr-developers/zarr-python/pull/1785#discussion_r1634856207
61+ """
4862 client = get_boto3_client ()
4963 client .create_bucket (Bucket = test_bucket_name , ACL = "public-read" )
5064 s3fs .S3FileSystem .clear_instance_cache ()
51- s3 = s3fs .S3FileSystem (anon = False , client_kwargs = {"endpoint_url" : endpoint_uri })
65+ s3 = s3fs .S3FileSystem (anon = False , client_kwargs = {"endpoint_url" : endpoint_url })
66+ session = sync (s3 .set_session ())
5267 s3 .invalidate_cache ()
5368 yield s3
54- requests .post (f"{ endpoint_uri } /moto-api/reset" )
69+ requests .post (f"{ endpoint_url } /moto-api/reset" )
70+ client .close ()
71+ sync (session .close ())
5572
5673
5774# ### end from s3fs ### #
@@ -65,7 +82,7 @@ async def alist(it):
6582
6683
6784async def test_basic ():
68- store = RemoteStore (f"s3://{ test_bucket_name } " , mode = "w" , endpoint_url = endpoint_uri , anon = False )
85+ store = RemoteStore (f"s3://{ test_bucket_name } " , mode = "w" , endpoint_url = endpoint_url , anon = False )
6986 assert not await alist (store .list ())
7087 assert not await store .exists ("foo" )
7188 data = b"hello"
@@ -81,31 +98,51 @@ async def test_basic():
8198class TestRemoteStoreS3 (StoreTests [RemoteStore ]):
8299 store_cls = RemoteStore
83100
84- @pytest .fixture (scope = "function" )
85- def store_kwargs (self ) -> dict [str , str | bool ]:
86- return {
87- "mode" : "w" ,
88- "endpoint_url" : endpoint_uri ,
89- "anon" : False ,
90- "url" : f"s3://{ test_bucket_name } " ,
91- }
101+ @pytest .fixture (scope = "function" , params = ("use_upath" , "use_str" ))
102+ def store_kwargs (self , request ) -> dict [str , str | bool ]:
103+ url = f"s3://{ test_bucket_name } "
104+ anon = False
105+ mode = "w"
106+ if request .param == "use_upath" :
107+ return {"mode" : mode , "url" : UPath (url , endpoint_url = endpoint_url , anon = anon )}
108+ elif request .param == "use_str" :
109+ return {"url" : url , "mode" : mode , "anon" : anon , "endpoint_url" : endpoint_url }
110+
111+ raise AssertionError
92112
93113 @pytest .fixture (scope = "function" )
94114 def store (self , store_kwargs : dict [str , str | bool ]) -> RemoteStore :
95- self ._fs , _ = fsspec .url_to_fs (asynchronous = False , ** store_kwargs )
96- out = self .store_cls (asynchronous = True , ** store_kwargs )
115+ url = store_kwargs ["url" ]
116+ mode = store_kwargs ["mode" ]
117+ if isinstance (url , UPath ):
118+ out = self .store_cls (url = url , mode = mode )
119+ else :
120+ endpoint_url = store_kwargs ["endpoint_url" ]
121+ out = self .store_cls (url = url , asynchronous = True , mode = mode , endpoint_url = endpoint_url )
97122 return out
98123
99124 def get (self , store : RemoteStore , key : str ) -> Buffer :
100- return Buffer .from_bytes (self ._fs .cat (f"{ store .path } /{ key } " ))
125+ # make a new, synchronous instance of the filesystem because this test is run in sync code
126+ fs , _ = fsspec .url_to_fs (
127+ url = store ._url ,
128+ asynchronous = False ,
129+ anon = store ._fs .anon ,
130+ endpoint_url = store ._fs .endpoint_url ,
131+ )
132+ return Buffer .from_bytes (fs .cat (f"{ store .path } /{ key } " ))
101133
102134 def set (self , store : RemoteStore , key : str , value : Buffer ) -> None :
103- self ._fs .write_bytes (f"{ store .path } /{ key } " , value .to_bytes ())
135+ # make a new, synchronous instance of the filesystem because this test is run in sync code
136+ fs , _ = fsspec .url_to_fs (
137+ url = store ._url ,
138+ asynchronous = False ,
139+ anon = store ._fs .anon ,
140+ endpoint_url = store ._fs .endpoint_url ,
141+ )
142+ fs .write_bytes (f"{ store .path } /{ key } " , value .to_bytes ())
104143
105144 def test_store_repr (self , store : RemoteStore ) -> None :
106- rep = str (store )
107- assert "fsspec" in rep
108- assert store .path in rep
145+ assert str (store ) == f"s3://{ test_bucket_name } "
109146
110147 def test_store_supports_writes (self , store : RemoteStore ) -> None :
111148 assert True
0 commit comments