Skip to content

Commit e3a5695

Browse files
authored
Merge pull request #65 from nearspacelabs/fix/retry
replace retry with tenacity, bump patch version
2 parents 7425600 + 8726972 commit e3a5695

File tree

7 files changed

+269
-137
lines changed

7 files changed

+269
-137
lines changed

nsl/stac/__init__.py

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
from google.auth.exceptions import DefaultCredentialsError
3737
from google.cloud import storage as gcp_storage
3838
from google.oauth2 import service_account
39-
from retry import retry
39+
from tenacity import retry, stop_after_delay, wait_fixed
4040

4141
from epl.protobuf.v1 import stac_service_pb2_grpc
4242
from epl.protobuf.v1.geometry_pb2 import GeometryData, ProjectionData, EnvelopeData
@@ -45,13 +45,12 @@
4545
LandsatRequest, Mosaic, MosaicRequest, DatetimeRange, View, ViewRequest, Extent, Interval, Provider
4646

4747
__all__ = [
48-
'stac_service', 'url_to_channel', 'STAC_SERVICE',
49-
'Collection', 'CollectionRequest', 'EoRequest', 'StacRequest', 'LandsatRequest', 'MosaicRequest', 'ViewRequest',
50-
'GeometryData', 'ProjectionData', 'EnvelopeData',
51-
'FloatFilter', 'TimestampFilter', 'StringFilter', 'UInt32Filter',
52-
'StacItem', 'Asset', 'Eo', 'View', 'Mosaic', 'DatetimeRange', 'Extent', 'Interval', 'Provider',
53-
'gcs_storage_client',
54-
'AUTH0_TENANT', 'API_AUDIENCE', 'ISSUER', 'AuthInfo', 'bearer_auth'
48+
'bearer_auth', 'gcs_storage_client', 'stac_service', 'url_to_channel',
49+
'CollectionRequest', 'EoRequest', 'StacRequest', 'LandsatRequest', 'MosaicRequest', 'ViewRequest',
50+
'Collection', 'Eo', 'StacItem', 'Mosaic', 'View', 'Asset',
51+
'GeometryData', 'ProjectionData', 'EnvelopeData', 'FloatFilter', 'TimestampFilter', 'StringFilter', 'UInt32Filter',
52+
'DatetimeRange', 'Extent', 'Interval', 'Provider',
53+
'AUTH0_TENANT', 'API_AUDIENCE', 'ISSUER', 'STAC_SERVICE', 'AuthInfo',
5554
]
5655

5756
CLOUD_PROJECT = os.getenv("CLOUD_PROJECT")
@@ -79,10 +78,11 @@
7978
MAX_BACKOFF_MS = int(os.getenv('MAX_BACKOFF_MS', 4))
8079
MULTIPLIER = int(os.getenv('MULTIPLIER', 4))
8180

82-
STAC_SERVICE = os.getenv('STAC_SERVICE', 'api.nearspacelabs.net:9090')
81+
STAC_SERVICE_HOST = os.getenv('STAC_SERVICE_HOST', 'api.nearspacelabs.net')
82+
STAC_SERVICE = os.getenv('STAC_SERVICE', f'{STAC_SERVICE_HOST}:9090')
8383
BYTES_IN_MB = 1024 * 1024
84-
# at this point only allowing 4 MB or smaller messages
85-
MESSAGE_SIZE_MB = int(os.getenv('MESSAGE_SIZE_MB', 20))
84+
# at this point only allowing 10 MB or smaller messages
85+
MESSAGE_SIZE_MB = int(os.getenv('MESSAGE_SIZE_MB', 10))
8686
GRPC_CHANNEL_OPTIONS = [('grpc.max_message_length', MESSAGE_SIZE_MB * BYTES_IN_MB),
8787
('grpc.max_receive_message_length', MESSAGE_SIZE_MB * BYTES_IN_MB)]
8888

@@ -299,7 +299,7 @@ def __init__(self, nsl_id: str, nsl_secret: str):
299299
self.nsl_secret = nsl_secret
300300

301301
# this only retries if there's a timeout error
302-
@retry(exceptions=requests.Timeout, delay=1, backoff=2, tries=4)
302+
@retry(reraise=True, stop=stop_after_delay(3), wait=wait_fixed(0.5))
303303
def authorize(self):
304304
if self.skip_authorization:
305305
return
@@ -388,7 +388,7 @@ def __init__(self, init=False):
388388
def default_nsl_id(self):
389389
return self._default_nsl_id
390390

391-
def auth_header(self, nsl_id: str = None, profile_name: str = None):
391+
def auth_header(self, nsl_id: str = None, profile_name: str = None) -> str:
392392
auth_info = self._get_auth_info(nsl_id, profile_name)
393393
if not auth_info.skip_authorization and (auth_info.expiry - time.time()) < TOKEN_REFRESH_THRESHOLD:
394394
print(f'authorizing NSL_ID: `{auth_info.nsl_id}`')
@@ -398,15 +398,19 @@ def auth_header(self, nsl_id: str = None, profile_name: str = None):
398398
print(f"will attempt re-authorization in {ttl} minutes")
399399
return f"Bearer {auth_info.token}"
400400

401-
def get_credentials(self, nsl_id: str = None) -> Optional[AuthInfo]:
401+
def get_credentials(self, nsl_id: str = None, profile_name: str = None) -> Optional[AuthInfo]:
402+
if profile_name is not None:
403+
nsl_id = self._profile_map.get(profile_name, None)
402404
return self._auth_info_map.get(nsl_id if nsl_id is not None else self.default_nsl_id, None)
403405

404-
def set_credentials(self, nsl_id: str, nsl_secret: str):
406+
def set_credentials(self, nsl_id: str, nsl_secret: str, profile_name: str = None):
405407
if len(self._auth_info_map) == 0:
406408
self._default_nsl_id = nsl_id
407409

408410
self._auth_info_map[nsl_id] = AuthInfo(nsl_id=nsl_id, nsl_secret=nsl_secret)
409411
self._auth_info_map[nsl_id].authorize()
412+
if profile_name is not None:
413+
self._profile_map[profile_name] = nsl_id
410414

411415
def unset_credentials(self, profile_name: str):
412416
nsl_id = self._profile_map.pop(profile_name)
@@ -431,14 +435,14 @@ def loads(self) -> Dict[str, AuthInfo]:
431435
if not lines[i + 1].startswith('NSL_ID') or not lines[i + 2].startswith('NSL_SECRET'):
432436
raise ValueError("credentials should be of the format:\n[named profile]\nNSL_ID={your "
433437
"nsl id}\nNSL_SECRET={your nsl secret}")
434-
# for id like 'NSL_ID = all_the_id_text\n', first strip remove front whitespace and newline
438+
# for id like 'NSL_ID = all_the_id_text\n', first strip remove front whitespace and newline, and optionally the leading quote
435439
# .strip(), now we now [6:] starts after 'NSL_ID' .strip()[6:], strip potential whitespace
436440
# between NSL_ID and '=' with .strip()[6:].strip(), start one after equal
437441
# .strip()[6:].strip()[1:], strip potential whitespace
438442
# after equal .strip()[6:].strip()[1:].strip()
439443
profile_name = line.strip().lstrip('[').rstrip(']')
440-
nsl_id = lines[i + 1].strip()[6:].strip()[1:].strip()
441-
nsl_secret = lines[i + 2].strip()[10:].strip()[1:].strip()
444+
nsl_id = lines[i + 1].strip()[6:].strip().strip('"')[1:].strip().strip('"')
445+
nsl_secret = lines[i + 2].strip()[10:].strip().strip('"')[1:].strip().strip('"')
442446

443447
output[profile_name] = AuthInfo(nsl_id=nsl_id, nsl_secret=nsl_secret)
444448
return output
@@ -448,8 +452,8 @@ def dumps(self):
448452
for profile_name, nsl_id in self._profile_map.items():
449453
creds = self.get_credentials(nsl_id)
450454
file_obj.write(f'[{profile_name}]\n')
451-
file_obj.write(f'NSL_ID={creds.nsl_id}\n')
452-
file_obj.write(f'NSL_SECRET={creds.nsl_secret}\n')
455+
file_obj.write(f'NSL_ID="{creds.nsl_id}"\n')
456+
file_obj.write(f'NSL_SECRET="{creds.nsl_secret}"\n')
453457
file_obj.write('\n')
454458
file_obj.close()
455459

nsl/stac/client.py

Lines changed: 65 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@
1414
#
1515
# for additional information, contact:
1616
17+
import uuid
1718

1819
import requests
1920

20-
from typing import Iterator, List
21+
from typing import Iterator, List, Optional, Tuple
2122
from warnings import warn
2223

2324
from epl.protobuf.v1 import stac_pb2
@@ -29,14 +30,18 @@
2930

3031

3132
class NSLClient:
32-
def __init__(self, nsl_only=True):
33+
def __init__(self, nsl_only=True, nsl_id=None, profile_name=None):
3334
"""
3435
Create a client connection to a gRPC STAC service. nsl_only limits all queries to only return data from Near
3536
Space Labs.
3637
:param nsl_only:
3738
"""
3839
self._stac_service = stac_singleton
3940
self._nsl_only = nsl_only
41+
if profile_name:
42+
nsl_id = bearer_auth._get_auth_info(profile_name=profile_name).nsl_id
43+
if nsl_id:
44+
bearer_auth._default_nsl_id = nsl_id
4045

4146
@property
4247
def default_nsl_id(self):
@@ -67,7 +72,8 @@ def search_one(self,
6772
stac_request: stac_pb2.StacRequest,
6873
timeout=15,
6974
nsl_id: str = None,
70-
profile_name: str = None) -> stac_pb2.StacItem:
75+
profile_name: str = None,
76+
correlation_id: str = None) -> stac_pb2.StacItem:
7177
"""
7278
search for one item from the db that matches the stac request
7379
:param timeout: timeout for request
@@ -77,20 +83,23 @@ def search_one(self,
7783
NSLClient object's set_credentials to set credentials
7884
:param profile_name: if a ~/.nsl/credentials file exists, you can override the [default] credential usage, by
7985
using a different profile name
86+
:param correlation_id: is a unique identifier that is added to the very first interaction (incoming request)
87+
to identify the context and is passed to all components that are involved in the transaction flow
8088
:return: StacItem
8189
"""
8290
# limit to only search Near Space Labs SWIFT data
8391
if self._nsl_only:
8492
stac_request.mission_enum = stac_pb2.SWIFT
8593

86-
metadata = (('authorization', bearer_auth.auth_header(nsl_id=nsl_id, profile_name=profile_name)),)
94+
metadata = self._grpc_headers(nsl_id, profile_name, correlation_id)
8795
return self._stac_service.stub.SearchOneItem(stac_request, timeout=timeout, metadata=metadata)
8896

8997
def count(self,
9098
stac_request: stac_pb2.StacRequest,
9199
timeout=15,
92100
nsl_id: str = None,
93-
profile_name: str = None) -> int:
101+
profile_name: str = None,
102+
correlation_id: str = None) -> int:
94103
"""
95104
count all the items in the database that match the stac request
96105
:param timeout: timeout for request
@@ -100,13 +109,15 @@ def count(self,
100109
NSLClient object's set_credentials to set credentials
101110
:param profile_name: if a ~/.nsl/credentials file exists, you can override the [default] credential usage, by
102111
using a different profile name
112+
:param correlation_id: is a unique identifier that is added to the very first interaction (incoming request)
113+
to identify the context and is passed to all components that are involved in the transaction flow
103114
:return: int
104115
"""
105116
# limit to only search Near Space Labs SWIFT data
106117
if self._nsl_only:
107118
stac_request.mission_enum = stac_pb2.SWIFT
108119

109-
metadata = (('authorization', bearer_auth.auth_header(nsl_id=nsl_id, profile_name=profile_name)),)
120+
metadata = self._grpc_headers(nsl_id, profile_name, correlation_id)
110121
db_result = self._stac_service.stub.CountItems(stac_request, timeout=timeout, metadata=metadata)
111122
if db_result.status:
112123
# print db_result
@@ -119,7 +130,9 @@ def search(self,
119130
nsl_id: str = None,
120131
profile_name: str = None,
121132
auto_paginate: bool = False,
122-
only_accessible: bool = False) -> Iterator[stac_pb2.StacItem]:
133+
only_accessible: bool = False,
134+
page_size: int = 50,
135+
correlation_id: str = None) -> Iterator[stac_pb2.StacItem]:
123136
"""
124137
search for stac items by using StacRequest. return a stream of StacItems
125138
:param timeout: timeout for request
@@ -136,13 +149,16 @@ def search(self,
136149
- If set to `False` (the default), `stac_request.limit` and `stac_request.offset` can be used to manually
137150
page through StacItems.
138151
:param only_accessible: limits results to only StacItems downloadable by your level of sample/paid access
152+
:param page_size: how many results to page at a time
139153
:return: stream of StacItems
140154
"""
141155
for item in self._search_all(stac_request,
142156
timeout,
143157
nsl_id=nsl_id,
144158
profile_name=profile_name,
145-
auto_paginate=auto_paginate):
159+
auto_paginate=auto_paginate,
160+
page_size=page_size,
161+
correlation_id=correlation_id):
146162
if not only_accessible or \
147163
bearer_auth.is_valid_for(item_region(item), nsl_id=nsl_id, profile_name=profile_name):
148164
yield item
@@ -151,8 +167,10 @@ def search_collections(self,
151167
collection_request: stac_pb2.CollectionRequest,
152168
timeout=15,
153169
nsl_id: str = None,
154-
profile_name: str = None) -> Iterator[stac_pb2.Collection]:
155-
metadata = (('authorization', bearer_auth.auth_header(nsl_id=nsl_id, profile_name=profile_name)),)
170+
profile_name: str = None,
171+
correlation_id: str = None) -> Iterator[stac_pb2.Collection]:
172+
173+
metadata = self._grpc_headers(nsl_id, profile_name, correlation_id)
156174
for item in self._stac_service.stub.SearchCollections(collection_request, timeout=timeout, metadata=metadata):
157175
yield item
158176

@@ -173,7 +191,7 @@ def subscribe(self,
173191
if self._nsl_only:
174192
stac_request.mission_enum = stac_pb2.SWIFT
175193
res = requests.post(f'{AUTH0_TENANT}/subscription',
176-
headers=NSLClient._json_headers(nsl_id, profile_name),
194+
headers=self._json_headers(nsl_id, profile_name),
177195
json=dict(stac_request=utils.stac_request_to_b64(stac_request),
178196
destination=destination.to_json_str(),
179197
is_active=is_active))
@@ -186,7 +204,7 @@ def subscribe(self,
186204
def resubscribe(self, sub_id: str, nsl_id: str = None, profile_name: str = None):
187205
"""Reactivates a subscription with the given `sub_id`."""
188206
res = requests.put(f'{AUTH0_TENANT}/subscription/{sub_id}',
189-
headers=NSLClient._json_headers(nsl_id, profile_name))
207+
headers=self._json_headers(nsl_id, profile_name))
190208

191209
NSLClient._handle_json_response(res, 200)
192210
print(f'reactivated subscription with id: {sub_id}')
@@ -195,7 +213,7 @@ def resubscribe(self, sub_id: str, nsl_id: str = None, profile_name: str = None)
195213
def unsubscribe(self, sub_id: str, nsl_id: str = None, profile_name: str = None):
196214
"""Deactivates a subscription with the given `sub_id`."""
197215
res = requests.delete(f'{AUTH0_TENANT}/subscription/{sub_id}',
198-
headers=NSLClient._json_headers(nsl_id, profile_name))
216+
headers=self._json_headers(nsl_id, profile_name))
199217

200218
NSLClient._handle_json_response(res, 202)
201219
print(f'deactivated subscription with id: {sub_id}')
@@ -204,7 +222,7 @@ def unsubscribe(self, sub_id: str, nsl_id: str = None, profile_name: str = None)
204222
def subscriptions(self, nsl_id: str = None, profile_name: str = None) -> List[Subscription]:
205223
"""Fetches all subscriptions."""
206224
res = requests.get(f'{AUTH0_TENANT}/subscription',
207-
headers=NSLClient._json_headers(nsl_id, profile_name))
225+
headers=self._json_headers(nsl_id, profile_name))
208226

209227
NSLClient._handle_json_response(res, 200)
210228
return list(Subscription(response_dict) for response_dict in res.json()['results'])
@@ -214,48 +232,63 @@ def _search_all(self,
214232
timeout=15,
215233
nsl_id: str = None,
216234
profile_name: str = None,
217-
auto_paginate: bool = False) -> Iterator[stac_pb2.StacItem]:
235+
auto_paginate: bool = False,
236+
page_size: int = 50,
237+
correlation_id: str = None) -> Iterator[stac_pb2.StacItem]:
218238
# limit to only search Near Space Labs SWIFT data
219239
if self._nsl_only:
220240
stac_request.mission_enum = stac_pb2.SWIFT
221241

222242
if not auto_paginate:
223-
metadata = (('authorization', bearer_auth.auth_header(nsl_id=nsl_id, profile_name=profile_name)),)
243+
metadata = self._grpc_headers(nsl_id, profile_name, correlation_id)
224244
for item in self._stac_service.stub.SearchItems(stac_request, timeout=timeout, metadata=metadata):
225245
if not item.id:
226-
warn("STAC item missing STAC id; ending search")
246+
warn(f"STAC item missing STAC id: \n{item};\n ending search")
227247
return
228248
else:
229249
yield item
230250
else:
231-
limit = stac_request.limit if stac_request.limit > 0 else None
251+
original_limit = stac_request.limit if stac_request.limit > 0 else None
232252
offset = stac_request.offset
233-
page_size = 500
234253
count = 0
235254

236-
stac_request.limit = page_size
237-
items = list(self.search(stac_request, timeout=timeout, nsl_id=nsl_id, profile_name=profile_name))
255+
stac_request.limit = page_size if original_limit is None else max(original_limit, page_size)
256+
items = list(self._search_all(stac_request, timeout=timeout,
257+
nsl_id=nsl_id, profile_name=profile_name,
258+
page_size=page_size, correlation_id=correlation_id))
238259
while len(items) > 0:
239260
for item in items:
240-
if limit is None or (limit is not None and count < limit):
261+
if original_limit is None or (original_limit is not None and count < original_limit):
241262
yield item
242263
count += 1
243-
if limit is not None and count >= limit:
264+
if original_limit is not None and count >= original_limit:
244265
break
245266

246-
if limit is not None and count >= limit:
267+
if original_limit is not None and count >= original_limit:
247268
break
248269

249-
stac_request.offset += page_size
250-
items = list(self.search(stac_request, timeout=timeout, nsl_id=nsl_id, profile_name=profile_name))
270+
stac_request.offset += len(items)
271+
items = list(self._search_all(stac_request, timeout=timeout,
272+
nsl_id=nsl_id, profile_name=profile_name,
273+
page_size=page_size, correlation_id=correlation_id))
251274

252275
stac_request.offset = offset
253-
stac_request.limit = limit if limit is not None else 0
254-
255-
@staticmethod
256-
def _json_headers(nsl_id: str = None, profile_name: str = None) -> dict:
257-
return {'content-type': 'application/json',
258-
'Authorization': bearer_auth.auth_header(nsl_id=nsl_id, profile_name=profile_name)}
276+
stac_request.limit = original_limit if original_limit is not None else 0
277+
278+
def _json_headers(self,
279+
nsl_id: str = None,
280+
profile_name: str = None,
281+
correlation_id: str = None) -> dict:
282+
headers = {k: v for (k, v) in self._grpc_headers(nsl_id, profile_name, correlation_id)}
283+
return {'content-type': 'application/json', **headers}
284+
285+
def _grpc_headers(self,
286+
nsl_id: str = None,
287+
profile_name: str = None,
288+
correlation_id: str = None) -> Tuple[Tuple[str, str], ...]:
289+
correlation_id = str(uuid.uuid4()) if correlation_id is None else correlation_id
290+
return (('x-correlation-id', correlation_id),
291+
('authorization', bearer_auth.auth_header(nsl_id=nsl_id, profile_name=profile_name)))
259292

260293
@staticmethod
261294
def _handle_json_response(res, status_code: int):

0 commit comments

Comments
 (0)