Skip to content

Commit 8db79f5

Browse files
authored
Allow deletion of uploaded relay datasets (#3512)
* Allow deletion of uploaded relay datasets PBENCH-1238 As a stopgap for "final" Relay support under the agent, we plan to establish a known project Relay Server (e.g., on our AWS host) for testing. That server has limited storage, and substantial use. To avoid requiring constant manual management of space, our plan is to implement a mechanism to cause the Pbench Server to delete files from the Relay Server after intake, and to change the Pbench Dashboard relay action to always supply that option. Note that this PR adds the `?delete=t` to the dashboard *unconditionally*; in the future we can consider adding a checkbox to allow users to control the behavior, especially when they own a private relay server.
1 parent b2e56ac commit 8db79f5

File tree

4 files changed

+219
-15
lines changed

4 files changed

+219
-15
lines changed

dashboard/src/actions/relayActions.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ export const uploadFile = () => async (dispatch, getState) => {
1313
const endpoints = getState().apiEndpoint.endpoints;
1414
const fileURI = getState().overview.relayInput;
1515
const uri = uriTemplate(endpoints, "relay", { uri: fileURI });
16-
const response = await API.post(uri, null, null);
16+
const response = await API.post(uri, null, { params: { delete: "t" } });
1717
if (response.status >= 200 && response.status < 300) {
1818
dispatch(showToast(SUCCESS, response.data.message));
1919
dispatch(setRelayModalState(false));

lib/pbench/server/api/resources/intake_base.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,19 @@ def _stream(self, intake: Intake, request: Request) -> Access:
153153
"""
154154
raise NotImplementedError()
155155

156+
def _cleanup(self, args: ApiParams, intake: Intake, notes: list[str]):
157+
"""Clean up after a completed upload
158+
159+
Default behavior is to do nothing: each subclass can provide a custom
160+
behavior for cleanup after successful transfer.
161+
162+
Args:
163+
intake: The intake object
164+
args: API parameters
165+
notes: A list of error strings to report problems.
166+
"""
167+
pass
168+
156169
def _intake(
157170
self, args: ApiParams, request: Request, context: ApiContext
158171
) -> Response:
@@ -477,6 +490,7 @@ def _intake(
477490
enable_next = [OperationName.INDEX] if should_index else None
478491
if not should_index:
479492
notes.append("Indexing is disabled by 'archive only' setting.")
493+
self._cleanup(args, intake, notes)
480494
Sync(current_app.logger, OperationName.UPLOAD).update(
481495
dataset=dataset, state=OperationState.OK, enabled=enable_next
482496
)

lib/pbench/server/api/resources/relay.py

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from http import HTTPStatus
22

3-
from flask import Response
3+
from flask import current_app, Response
44
from flask.wrappers import Request
55
import requests
66

@@ -33,6 +33,7 @@ def __init__(self, config: PbenchServerConfig):
3333
uri_schema=Schema(Parameter("filename", ParamType.STRING)),
3434
query_schema=Schema(
3535
Parameter("access", ParamType.ACCESS),
36+
Parameter("delete", ParamType.BOOLEAN),
3637
Parameter(
3738
"metadata",
3839
ParamType.LIST,
@@ -107,6 +108,10 @@ def _identify(self, args: ApiParams, request: Request) -> Intake:
107108
HTTPStatus.BAD_GATEWAY, f"Relay info missing {str(e)!r}"
108109
) from e
109110

111+
# If the API client specified metadata, add it to the manifest
112+
# metadata list. When the common code processes the list into a dict,
113+
# any later duplicate keys will override the earlier values.
114+
metadata += args.query.get("metadata", [])
110115
return Intake(name, md5, access, metadata, uri)
111116

112117
def _stream(self, intake: Intake, request: Request) -> Access:
@@ -149,6 +154,40 @@ def _stream(self, intake: Intake, request: Request) -> Access:
149154
HTTPStatus.BAD_REQUEST, f"Unable to retrieve relay tarball: {str(e)!r}"
150155
) from e
151156

157+
def _cleanup(self, args: ApiParams, intake: Intake, notes: list[str]):
158+
"""Clean up after a completed upload
159+
160+
When pulling datasets from a relay, the client can ask that the relay
161+
files be deleted on successful completion to avoid accumulating storage
162+
on the relay server.
163+
164+
We capture all HTTP errors here, since there's not much we can do to
165+
clean up, and the dataset has already been successfully transferred.
166+
We just note the problems so they can be investigated.
167+
168+
Args:
169+
args: API parameters
170+
intake: The intake object containing the tarball URI
171+
notes: A list of error strings to report problems.
172+
"""
173+
errors = False
174+
if args.query.get("delete"):
175+
for uri in (args.uri["uri"], intake.uri):
176+
reason = None
177+
try:
178+
response = requests.delete(uri)
179+
if not response.ok:
180+
reason = response.reason
181+
except ConnectionError as e:
182+
reason = str(e)
183+
if reason:
184+
errors = True
185+
msg = f"Unable to remove relay file {uri}: {reason!r}"
186+
current_app.logger.warning("INTAKE relay {}: {}", intake.name, msg)
187+
notes.append(msg)
188+
if not errors:
189+
notes.append("Relay files were successfully removed.")
190+
152191
def _post(self, args: ApiParams, request: Request, context: ApiContext) -> Response:
153192
"""Launch the Relay operation from an HTTP POST"""
154193
return self._intake(args, request, context)

lib/pbench/test/unit/server/test_relay.py

Lines changed: 164 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from http import HTTPStatus
22
from logging import Logger
33
from pathlib import Path
4+
from typing import Union
45

56
from flask import Request
67
import pytest
@@ -87,13 +88,16 @@ def test_missing_authorization_header(self, client, server_config):
8788
assert response.status_code == HTTPStatus.UNAUTHORIZED
8889
assert not self.cachemanager_created
8990

90-
@responses.activate
9191
@pytest.mark.freeze_time("2023-07-01")
92-
def test_relay(self, client, server_config, pbench_drb_token, tarball):
92+
@pytest.mark.parametrize("delete", ("false", "true", None))
93+
@responses.activate
94+
def test_relay(self, client, server_config, pbench_drb_token, tarball, delete):
9395
"""Verify the success path
9496
9597
Ensure successful completion when the primary relay URI returns a valid
9698
relay manifest referencing a secondary relay URI containing a tarball.
99+
100+
Also check that the DELETE requests happen when `?delete` is specified.
97101
"""
98102
file, md5file, md5 = tarball
99103
name = Dataset.stem(file)
@@ -117,22 +121,34 @@ def test_relay(self, client, server_config, pbench_drb_token, tarball):
117121
headers={"content-length": f"{file.stat().st_size}"},
118122
content_type="application/octet-stream",
119123
)
124+
if delete == "true":
125+
responses.add(
126+
responses.DELETE, "https://relay.example.com/uri1", status=HTTPStatus.OK
127+
)
128+
responses.add(
129+
responses.DELETE, "https://relay.example.com/uri2", status=HTTPStatus.OK
130+
)
120131
response = client.post(
121132
self.gen_uri(server_config, "https://relay.example.com/uri1"),
133+
query_string={"delete": delete} if delete else None,
122134
headers=self.gen_headers(pbench_drb_token),
123135
)
124136
assert (
125137
response.status_code == HTTPStatus.CREATED
126138
), f"Unexpected result, {response.text}"
139+
expected_notes = [
140+
"Identified benchmark workload 'unknown'.",
141+
"Expected expiration date is 2025-06-30.",
142+
]
143+
if delete == "true":
144+
expected_notes.append("Relay files were successfully removed.")
127145
assert response.json == {
128146
"message": "File successfully uploaded",
129147
"name": name,
130148
"resource_id": md5,
131-
"notes": [
132-
"Identified benchmark workload 'unknown'.",
133-
"Expected expiration date is 2025-06-30.",
134-
],
149+
"notes": expected_notes,
135150
}
151+
assert len(responses.calls) == 4 if delete == "true" else 2
136152
assert (
137153
response.headers["location"]
138154
== f"https://localhost/api/v1/datasets/{md5}/inventory/"
@@ -162,23 +178,21 @@ def test_relay(self, client, server_config, pbench_drb_token, tarball):
162178
assert audit[1].name == "relay"
163179
assert audit[1].object_type == AuditType.DATASET
164180
assert audit[1].object_id == md5
165-
assert audit[1].object_name == Dataset.stem(file)
181+
assert audit[1].object_name == name
166182
assert audit[1].user_id == DRB_USER_ID
167183
assert audit[1].user_name == "drb"
168184
assert audit[1].reason is None
169185
assert audit[1].attributes == {
170186
"access": "private",
171187
"metadata": {"global.pbench.test": "data"},
172-
"notes": [
173-
"Identified benchmark workload 'unknown'.",
174-
"Expected expiration date is 2025-06-30.",
175-
],
188+
"notes": expected_notes,
176189
}
177190

178191
@responses.activate
179192
def test_relay_tar_fail(self, client, server_config, pbench_drb_token, tarball):
180193
"""Verify failure when secondary relay URI is not found"""
181194
file, md5file, md5 = tarball
195+
name = Dataset.stem(file)
182196
responses.add(
183197
responses.GET,
184198
"https://relay.example.com/uri1",
@@ -211,7 +225,7 @@ def test_relay_tar_fail(self, client, server_config, pbench_drb_token, tarball):
211225
assert audit[0].name == "relay"
212226
assert audit[0].object_type == AuditType.DATASET
213227
assert audit[0].object_id == md5
214-
assert audit[0].object_name == Dataset.stem(file)
228+
assert audit[0].object_name == name
215229
assert audit[0].user_id == DRB_USER_ID
216230
assert audit[0].user_name == "drb"
217231
assert audit[0].reason is None
@@ -226,7 +240,7 @@ def test_relay_tar_fail(self, client, server_config, pbench_drb_token, tarball):
226240
assert audit[1].name == "relay"
227241
assert audit[1].object_type == AuditType.DATASET
228242
assert audit[1].object_id == md5
229-
assert audit[1].object_name == Dataset.stem(file)
243+
assert audit[1].object_name == name
230244
assert audit[1].user_id == DRB_USER_ID
231245
assert audit[1].user_name == "drb"
232246
assert audit[1].reason == AuditReason.CONSISTENCY
@@ -392,3 +406,140 @@ def test_relay_tarball_connection(
392406
assert (
393407
response.json["message"] == "Unable to connect to results URI: 'leaky wire'"
394408
)
409+
410+
@pytest.mark.freeze_time("2023-07-01")
411+
@pytest.mark.parametrize(
412+
"status1,status2",
413+
(
414+
((HTTPStatus.OK, None), ((HTTPStatus.BAD_REQUEST, "Bad Request"))),
415+
((HTTPStatus.BAD_REQUEST, "Bad Request"), (HTTPStatus.OK, None)),
416+
(
417+
(HTTPStatus.BAD_REQUEST, "Bad Request"),
418+
(HTTPStatus.BAD_REQUEST, "Bad Request"),
419+
),
420+
((ConnectionError("testing"), "testing"), (HTTPStatus.OK, None)),
421+
((HTTPStatus.OK, None), (ConnectionError("testing"), "testing")),
422+
(
423+
(ConnectionError("testing1"), "testing1"),
424+
(ConnectionError("testing2"), "testing2"),
425+
),
426+
),
427+
)
428+
@responses.activate
429+
def test_delete_failures(
430+
self,
431+
client,
432+
server_config,
433+
pbench_drb_token,
434+
tarball,
435+
status1: tuple[Union[HTTPStatus, Exception], str],
436+
status2: tuple[Union[HTTPStatus, Exception], str],
437+
):
438+
"""Verify reporting of delete failures
439+
440+
Ensure successful completion with appropriate notes when deletion of
441+
the relay files fails.
442+
"""
443+
file, md5file, md5 = tarball
444+
name = Dataset.stem(file)
445+
responses.add(
446+
responses.GET,
447+
"https://relay.example.com/uri1",
448+
status=HTTPStatus.OK,
449+
json={
450+
"uri": "https://relay.example.com/uri2",
451+
"name": file.name,
452+
"md5": md5,
453+
"access": "private",
454+
"metadata": ["global.pbench.test:data"],
455+
},
456+
)
457+
responses.add(
458+
responses.GET,
459+
"https://relay.example.com/uri2",
460+
status=HTTPStatus.OK,
461+
body=file.open("rb"),
462+
headers={"content-length": f"{file.stat().st_size}"},
463+
content_type="application/octet-stream",
464+
)
465+
responses.add(
466+
responses.DELETE,
467+
"https://relay.example.com/uri1",
468+
status=status1[0]
469+
if isinstance(status1[0], int)
470+
else HTTPStatus.ALREADY_REPORTED,
471+
body=status1[0] if isinstance(status1[0], Exception) else None,
472+
)
473+
responses.add(
474+
responses.DELETE,
475+
"https://relay.example.com/uri2",
476+
status=status2[0]
477+
if isinstance(status2[0], int)
478+
else HTTPStatus.ALREADY_REPORTED,
479+
body=status2[0] if isinstance(status2[0], Exception) else None,
480+
)
481+
response = client.post(
482+
self.gen_uri(server_config, "https://relay.example.com/uri1"),
483+
query_string={"delete": "true"},
484+
headers=self.gen_headers(pbench_drb_token),
485+
)
486+
assert (
487+
response.status_code == HTTPStatus.CREATED
488+
), f"Unexpected result, {response.text}"
489+
expected_notes = [
490+
"Identified benchmark workload 'unknown'.",
491+
"Expected expiration date is 2025-06-30.",
492+
]
493+
if status1[0] != HTTPStatus.OK:
494+
expected_notes.append(
495+
f"Unable to remove relay file https://relay.example.com/uri1: '{status1[1]}'"
496+
)
497+
if status2[0] != HTTPStatus.OK:
498+
expected_notes.append(
499+
f"Unable to remove relay file https://relay.example.com/uri2: '{status2[1]}'"
500+
)
501+
assert response.json == {
502+
"message": "File successfully uploaded",
503+
"name": name,
504+
"resource_id": md5,
505+
"notes": expected_notes,
506+
}
507+
assert len(responses.calls) == 4
508+
assert (
509+
response.headers["location"]
510+
== f"https://localhost/api/v1/datasets/{md5}/inventory/"
511+
)
512+
513+
audit = Audit.query()
514+
assert len(audit) == 2
515+
assert audit[0].id == 1
516+
assert audit[0].root_id is None
517+
assert audit[0].operation == OperationCode.CREATE
518+
assert audit[0].status == AuditStatus.BEGIN
519+
assert audit[0].name == "relay"
520+
assert audit[0].object_type == AuditType.DATASET
521+
assert audit[0].object_id == md5
522+
assert audit[0].object_name == name
523+
assert audit[0].user_id == DRB_USER_ID
524+
assert audit[0].user_name == "drb"
525+
assert audit[0].reason is None
526+
assert audit[0].attributes == {
527+
"access": "private",
528+
"metadata": {"global.pbench.test": "data"},
529+
}
530+
assert audit[1].id == 2
531+
assert audit[1].root_id == 1
532+
assert audit[1].operation == OperationCode.CREATE
533+
assert audit[1].status == AuditStatus.SUCCESS
534+
assert audit[1].name == "relay"
535+
assert audit[1].object_type == AuditType.DATASET
536+
assert audit[1].object_id == md5
537+
assert audit[1].object_name == name
538+
assert audit[1].user_id == DRB_USER_ID
539+
assert audit[1].user_name == "drb"
540+
assert audit[1].reason is None
541+
assert audit[1].attributes == {
542+
"access": "private",
543+
"metadata": {"global.pbench.test": "data"},
544+
"notes": expected_notes,
545+
}

0 commit comments

Comments
 (0)