Skip to content

Commit 2ab1772

Browse files
committed
Review comments
1 parent d5f1f89 commit 2ab1772

File tree

5 files changed

+127
-145
lines changed

5 files changed

+127
-145
lines changed

lib/pbench/server/api/resources/intake_base.py

Lines changed: 41 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -59,17 +59,30 @@ class Access:
5959

6060

6161
class IntakeBase(ApiBase):
62-
"""Framework to assimilate a dataset into the Pbench Server"""
62+
"""Framework to assimilate a dataset into the Pbench Server.
63+
64+
This relies on subclasses to provides specific hook methods to identify and
65+
stream the tarball data:
66+
67+
_identify: decodes the URI and query parameters to determine the target
68+
dataset name, the appropriate MD5, the initial access type, and
69+
optional metadata to be set.
70+
_stream: decodes the intake data and provides the length and byte IO
71+
stream to be read into a temporary file.
72+
"""
6373

6474
CHUNK_SIZE = 65536
6575

6676
def __init__(self, config: PbenchServerConfig, schema: ApiSchema):
6777
super().__init__(config, schema)
6878
self.temporary = config.ARCHIVE / CacheManager.TEMPORARY
6979
self.temporary.mkdir(mode=0o755, parents=True, exist_ok=True)
80+
method = list(self.schemas.schemas.keys())[0]
81+
self.name = self.schemas[method].audit_name
7082
current_app.logger.info("INTAKE temporary directory is {}", self.temporary)
7183

72-
def process_metadata(self, metas: list[str]) -> JSONOBJECT:
84+
@staticmethod
85+
def process_metadata(metas: list[str]) -> JSONOBJECT:
7386
"""Process 'metadata' query parameter
7487
7588
We allow the client to set metadata on the new dataset. We won't do
@@ -113,8 +126,8 @@ def process_metadata(self, metas: list[str]) -> JSONOBJECT:
113126
)
114127
return metadata
115128

116-
def _prepare(self, args: ApiParams, request: Request) -> Intake:
117-
"""Prepare to begin the intake operation
129+
def _identify(self, args: ApiParams, request: Request) -> Intake:
130+
"""Identify the tarball to be streamed.
118131
119132
Must be implemented by each subclass of this base class.
120133
@@ -127,13 +140,13 @@ def _prepare(self, args: ApiParams, request: Request) -> Intake:
127140
"""
128141
raise NotImplementedError()
129142

130-
def _access(self, intake: Intake, request: Request) -> Access:
143+
def _stream(self, intake: Intake, request: Request) -> Access:
131144
"""Determine how to access the tarball byte stream
132145
133146
Must be implemented by each subclass of this base class.
134147
135148
Args:
136-
intake: The Intake parameters produced by _intake
149+
intake: The Intake parameters produced by _identify
137150
request: The Flask request object
138151
139152
Returns:
@@ -154,15 +167,6 @@ def _intake(
154167
1) PUT /api/v1/upload/<filename>
155168
2) POST /api/v1/relay/<uri>
156169
157-
The operational differences are encapsulated by two helper methods
158-
provided by the subclasses:
159-
160-
_prepare: decodes the URI and query parameters to determine the target
161-
dataset name, the appropriate MD5, the initial access type, and
162-
optional metadata to be set.
163-
_access: decodes the intake data and provides the length and byte IO
164-
stream to be read into a temporary file.
165-
166170
If the new dataset is created successfully, return 201 (CREATED).
167171
168172
The tarball name must be unique on the Pbench Server. If the name
@@ -206,12 +210,7 @@ def _intake(
206210

207211
# Ask our helper to determine the name and resource ID of the new
208212
# dataset, along with requested access and metadata.
209-
try:
210-
intake = self._prepare(args, request)
211-
except APIAbort:
212-
raise
213-
except Exception as e:
214-
raise APIInternalError(str(e)) from e
213+
intake = self._identify(args, request)
215214

216215
filename = intake.name
217216
metadata = self.process_metadata(intake.metadata)
@@ -249,14 +248,15 @@ def _intake(
249248
bytes_received = 0
250249
usage = shutil.disk_usage(tar_full_path.parent)
251250
current_app.logger.info(
252-
"{} UPLOAD (pre): {:.3}% full, {} remaining",
251+
"{} {} (pre): {:.3}% full, {} remaining",
252+
self.name,
253253
tar_full_path.name,
254254
float(usage.used) / float(usage.total) * 100.0,
255255
humanize.naturalsize(usage.free),
256256
)
257257

258258
current_app.logger.info(
259-
"PUT uploading {} for {} to {}", filename, username, tar_full_path
259+
"{} {} for {} to {}", self.name, filename, username, tar_full_path
260260
)
261261

262262
# Create a tracking dataset object; it'll begin in UPLOADING state
@@ -278,31 +278,24 @@ def _intake(
278278
try:
279279
Dataset.query(resource_id=intake.md5)
280280
except DatasetNotFound:
281-
current_app.logger.error(
282-
"Duplicate dataset {} for user = (user_id: {}, username: {}) not found",
283-
dataset_name,
284-
user_id,
285-
username,
281+
raise APIInternalError(
282+
f"Duplicate dataset {dataset_name} for user {username!r} id {user_id} not found"
286283
)
287-
raise APIAbort(HTTPStatus.INTERNAL_SERVER_ERROR, "INTERNAL ERROR")
288284
else:
289285
response = jsonify(dict(message="Dataset already exists"))
290286
response.status_code = HTTPStatus.OK
291287
return response
292288
except APIAbort:
293289
raise # Propagate an APIAbort exception to the outer block
294290
except Exception:
295-
raise APIAbort(
296-
HTTPStatus.INTERNAL_SERVER_ERROR,
297-
message="Unable to create dataset",
298-
)
291+
raise APIInternalError("Unable to create dataset")
299292

300293
recovery.add(dataset.delete)
301294

302295
# AUDIT the operation start before we get any further
303296
audit = Audit.create(
304297
operation=OperationCode.CREATE,
305-
name="upload",
298+
name=self.name,
306299
user_id=user_id,
307300
user_name=username,
308301
dataset=dataset,
@@ -312,12 +305,7 @@ def _intake(
312305

313306
# Now we're ready to pull the tarball, so ask our helper for the
314307
# length and data stream.
315-
try:
316-
access = self._access(intake, request)
317-
except APIAbort:
318-
raise
319-
except Exception as e:
320-
raise APIInternalError(str(e)) from e
308+
access = self._stream(intake, request)
321309

322310
if access.length <= 0:
323311
raise APIAbort(
@@ -373,28 +361,20 @@ def _intake(
373361
f"MD5 checksum {hash_md5.hexdigest()} does not match expected {intake.md5}",
374362
)
375363

376-
# First write the .md5
377-
current_app.logger.info(
378-
"Creating MD5 file {}: {}", md5_full_path, intake.md5
379-
)
380-
381364
# From this point attempt to remove the MD5 file on error exit
382365
recovery.add(md5_full_path.unlink)
383366
try:
384367
md5_full_path.write_text(f"{intake.md5} {filename}\n")
385368
except Exception:
386-
raise APIAbort(
387-
HTTPStatus.INTERNAL_SERVER_ERROR,
369+
raise APIInternalError(
388370
f"Failed to write .md5 file '{md5_full_path}'",
389371
)
390372

391373
# Create a cache manager object
392374
try:
393375
cache_m = CacheManager(self.config, current_app.logger)
394376
except Exception:
395-
raise APIAbort(
396-
HTTPStatus.INTERNAL_SERVER_ERROR, "Unable to map the cache manager"
397-
)
377+
raise APIInternalError("Unable to map the cache manager")
398378

399379
# Move the files to their final location
400380
try:
@@ -410,14 +390,14 @@ def _intake(
410390
f"Tarball {dataset.name!r} is invalid or missing required metadata.log: {exc}",
411391
)
412392
except Exception as exc:
413-
raise APIAbort(
414-
HTTPStatus.INTERNAL_SERVER_ERROR,
415-
f"Unable to create dataset in file system for {tar_full_path}: {exc}",
393+
raise APIInternalError(
394+
f"Unable to create dataset in file system for {tar_full_path}: {exc}"
416395
)
417396

418397
usage = shutil.disk_usage(tar_full_path.parent)
419398
current_app.logger.info(
420-
"{} UPLOAD (post): {:.3}% full, {} remaining",
399+
"{} {} (post): {:.3}% full, {} remaining",
400+
self.name,
421401
tar_full_path.name,
422402
float(usage.used) / float(usage.total) * 100.0,
423403
humanize.naturalsize(usage.free),
@@ -441,18 +421,14 @@ def _intake(
441421
attributes["missing_metadata"] = True
442422
Metadata.create(dataset=dataset, key=Metadata.METALOG, value=metalog)
443423
except Exception as exc:
444-
raise APIAbort(
445-
HTTPStatus.INTERNAL_SERVER_ERROR,
446-
f"Unable tintakeo create metalog for Tarball {dataset.name!r}: {exc}",
424+
raise APIInternalError(
425+
f"Unable to create metalog for Tarball {dataset.name!r}: {exc}"
447426
)
448427

449428
try:
450429
retention_days = self.config.default_retention_period
451430
except Exception as e:
452-
raise APIAbort(
453-
HTTPStatus.INTERNAL_SERVER_ERROR,
454-
f"Unable to get integer retention days: {e!s}",
455-
)
431+
raise APIInternalError(f"Unable to get integer retention days: {e!s}")
456432

457433
# Calculate a default deletion time for the dataset, based on the
458434
# time it was uploaded rather than the time it was originally
@@ -474,9 +450,7 @@ def _intake(
474450
if f:
475451
attributes["failures"] = f
476452
except Exception as e:
477-
raise APIAbort(
478-
HTTPStatus.INTERNAL_SERVER_ERROR, f"Unable to set metadata: {e!s}"
479-
)
453+
raise APIInternalError(f"Unable to set metadata: {e!s}")
480454

481455
# Finally, update the operational state and Audit success.
482456
try:
@@ -490,9 +464,8 @@ def _intake(
490464
root=audit, status=AuditStatus.SUCCESS, attributes=attributes
491465
)
492466
except Exception as exc:
493-
raise APIAbort(
494-
HTTPStatus.INTERNAL_SERVER_ERROR,
495-
f"Unable to finalize dataset {dataset}: {exc!s}",
467+
raise APIInternalError(
468+
f"Unable to finalize dataset {dataset}: {exc!s}"
496469
) from exc
497470
except Exception as e:
498471
if isinstance(e, APIAbort):
@@ -518,7 +491,7 @@ def _intake(
518491
attributes={"message": audit_msg},
519492
)
520493
recovery.cleanup()
521-
raise exception
494+
raise exception from e
522495
finally:
523496
if tmp_dir:
524497
try:

lib/pbench/server/api/resources/relay.py

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@
2222

2323

2424
class Relay(IntakeBase):
25-
"""Download a dataset from a relay server"""
26-
27-
CHUNK_SIZE = 65536
25+
"""Retrieve a dataset from a relay server"""
2826

2927
def __init__(self, config: PbenchServerConfig):
3028
super().__init__(
@@ -43,18 +41,18 @@ def __init__(self, config: PbenchServerConfig):
4341
),
4442
),
4543
audit_type=AuditType.NONE,
46-
audit_name="upload",
44+
audit_name="relay",
4745
authorization=ApiAuthorizationType.NONE,
4846
),
4947
)
5048

51-
def _prepare(self, args: ApiParams, request: Request) -> Intake:
52-
"""Prepare to begin the intake operation
49+
def _identify(self, args: ApiParams, request: Request) -> Intake:
50+
"""Identify the tarball to be streamed.
5351
54-
We get the Relay configuration file location from the "uri" API
52+
We get the Relay inventory file location from the "uri" API
5553
parameter.
5654
57-
The Relay configuration file is an application/json file which
55+
The Relay inventory file is an application/json file which
5856
contains the following required fields:
5957
6058
uri: The Relay URI of the tarball file
@@ -68,17 +66,22 @@ def _prepare(self, args: ApiParams, request: Request) -> Intake:
6866
6967
Args:
7068
args: API parameters
71-
URI parameters
72-
uri: The full Relay configuration file URI
69+
URI parameters: the Relay inventory file URI
7370
request: The original Request object containing query parameters
7471
7572
Returns:
7673
An Intake object capturing the critical information
74+
75+
Raises:
76+
APIAbort on failure
7777
"""
7878
uri = args.uri["uri"]
79-
response = requests.get(uri, headers={"accept": "application/json"})
79+
response = requests.get(uri, headers={"Accept": "application/json"})
8080
if not response.ok:
81-
raise APIAbort(HTTPStatus.BAD_GATEWAY, "Relay URI does not respond")
81+
raise APIAbort(
82+
HTTPStatus.BAD_GATEWAY,
83+
f"Relay manifest URI problem: {response.reason!r}",
84+
)
8285

8386
try:
8487
information = response.json()
@@ -99,30 +102,33 @@ def _prepare(self, args: ApiParams, request: Request) -> Intake:
99102

100103
return Intake(name, md5, access, metadata, uri)
101104

102-
def _access(self, intake: Intake, request: Request) -> Access:
105+
def _stream(self, intake: Intake, request: Request) -> Access:
103106
"""Determine how to access the tarball byte stream
104107
105108
Using the _intake information captured in the Intake instance, perform
106109
a follow-up GET operation to the URI provided by the Relay config file,
107110
returning the length header and the IO stream.
108111
109112
Args:
110-
intake: The Intake parameters produced by _intake
113+
intake: The Intake parameters produced by _identify
111114
request: The Flask request object
112115
113116
Returns:
114117
An Access object with the data byte stream and length
118+
119+
Raises:
120+
APIAbort on failure
115121
"""
116122
response: requests.Response = requests.get(
117-
url=intake.uri, stream=True, headers={"accept": "application/octet-stream"}
123+
url=intake.uri, stream=True, headers={"Accept": "application/octet-stream"}
118124
)
119125
if not response.ok:
120126
raise APIAbort(
121127
response.status_code,
122128
f"Unable to retrieve relay tarball: {response.reason!r}",
123129
)
124130
try:
125-
length = int(response.headers["content-length"])
131+
length = int(response.headers["Content-length"])
126132
return Access(length, response.raw)
127133
except Exception as e:
128134
raise APIAbort(

0 commit comments

Comments
 (0)