Skip to content

Commit 75dd992

Browse files
authored
Merge pull request #631 from rtibbles/output_path_required
Do more robust handling of download failures.
2 parents 5e8378b + 72ca9cc commit 75dd992

File tree

6 files changed

+977
-837
lines changed

6 files changed

+977
-837
lines changed

ricecooker/classes/files.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -142,22 +142,27 @@ def truncate_fields(self):
142142
)
143143
self.source_url = self.source_url[: config.MAX_SOURCE_URL_LENGTH]
144144

145+
def file_dict(self, filename=None):
146+
if not filename:
147+
filename = self.get_filename()
148+
return {
149+
"size": self.size,
150+
"preset": self.get_preset(),
151+
"filename": filename,
152+
"original_filename": self.original_filename,
153+
"language": self.language,
154+
"source_url": self.source_url,
155+
"duration": self.duration,
156+
}
157+
145158
def to_dict(self):
146159
filename = self.get_filename()
147160

148161
# If file was successfully downloaded, return dict
149162
# Otherwise return None
150163
if filename:
151164
if os.path.isfile(config.get_storage_path(filename)):
152-
return {
153-
"size": self.size,
154-
"preset": self.get_preset(),
155-
"filename": filename,
156-
"original_filename": self.original_filename,
157-
"language": self.language,
158-
"source_url": self.source_url,
159-
"duration": self.duration,
160-
}
165+
return self.file_dict(filename=filename)
161166
else:
162167
config.LOGGER.warning(
163168
"File not found: {}".format(config.get_storage_path(filename))
@@ -609,6 +614,7 @@ class StudioFile(File):
609614
"""
610615

611616
skip_upload = True
617+
size = None
612618

613619
def __init__(self, checksum, ext, preset, is_primary=False, **kwargs):
614620
kwargs["preset"] = preset
@@ -629,8 +635,12 @@ def validate(self):
629635
self.filename, e
630636
)
631637
)
638+
self.size = int(response.headers.get("Content-Length", 0))
632639
self._validated = True
633640

641+
def to_dict(self):
642+
return self.file_dict()
643+
634644
def __str__(self):
635645
return self.filename
636646

ricecooker/utils/pipeline/file_handler.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
"""
44
import os
55
import tempfile
6+
import threading
67
from abc import ABC
78
from abc import abstractmethod
89
from contextlib import contextmanager
@@ -104,6 +105,20 @@ class FileHandler(Handler):
104105
# Subclasses can define this list to specify which exceptions should be caught and reported
105106
HANDLED_EXCEPTIONS = []
106107

108+
def __init__(self):
109+
super().__init__()
110+
self._thread_local = threading.local()
111+
112+
@property
113+
def _output_path(self):
114+
"""Thread-safe output path property."""
115+
return getattr(self._thread_local, "output_path", None)
116+
117+
@_output_path.setter
118+
def _output_path(self, value):
119+
"""Thread-safe output path setter."""
120+
self._thread_local.output_path = value
121+
107122
def _get_context(self, context: Optional[Dict] = None):
108123
fields = set(get_type_hints(self.CONTEXT_CLASS).keys())
109124
context = {k: v for k, v in (context or {}).items() if k in fields}

ricecooker/utils/pipeline/transfer.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from requests.exceptions import HTTPError
1818
from requests.exceptions import InvalidSchema
1919
from requests.exceptions import InvalidURL
20+
from requests.exceptions import Timeout
2021

2122
from .context import ContextMetadata
2223
from .context import FileMetadata
@@ -139,10 +140,19 @@ class CatchAllWebResourceDownloadHandler(WebResourceHandler):
139140

140141
PATTERNS = [""]
141142

142-
HANDLED_EXCEPTIONS = [HTTPError, ConnectionError, InvalidURL, InvalidSchema]
143+
HANDLED_EXCEPTIONS = [
144+
HTTPError,
145+
ConnectionError,
146+
InvalidURL,
147+
InvalidSchema,
148+
Timeout,
149+
]
143150

144151
def handle_file(self, path, default_ext=None):
145-
r = config.DOWNLOAD_SESSION.get(path, stream=True)
152+
# Use explicit timeout to prevent hanging downloads
153+
# (connection_timeout, read_timeout) - connection timeout for establishing connection,
154+
# read timeout for time between receiving data chunks (prevents stuck downloads)
155+
r = config.DOWNLOAD_SESSION.get(path, stream=True, timeout=(30, 60))
146156
original_filename = extract_filename_from_request(path, r)
147157
default_ext = extract_path_ext(original_filename, default_ext=default_ext)
148158
r.raise_for_status()
@@ -434,7 +444,10 @@ def execute(
434444
# The download stage is special, as we expect it to always return a file
435445
# if it does not, we raise an exception to prevent further processing
436446
raise InvalidFileException(f"No file could be downloaded from {path}")
447+
448+
# Ensure all downloaded files are actually in storage
437449
for metadata in metadata_list:
438-
if metadata.path == path:
439-
raise InvalidFileException(f"{path} failed to transfer")
450+
if not metadata.path.startswith(os.path.abspath(config.STORAGE_DIRECTORY)):
451+
raise InvalidFileException(f"{path} failed to transfer to storage")
452+
440453
return metadata_list

tests/cassettes/test_youtubevideo_process_file.yaml

Lines changed: 833 additions & 824 deletions
Large diffs are not rendered by default.

tests/pipeline/test_file_handler.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
import threading
2+
13
import pytest
24

5+
from ricecooker.utils.pipeline.context import FileMetadata
36
from ricecooker.utils.pipeline.exceptions import InvalidFileException
47
from ricecooker.utils.pipeline.file_handler import FileHandler
58

@@ -31,3 +34,56 @@ def test_write_file_with_exception_still_checks_file_not_empty():
3134
# Don't write anything to file (will make it empty)
3235
# Then raise an exception that would normally prevent cleanup
3336
raise RuntimeError("This exception should be caught by try/finally")
37+
38+
39+
class ThreadRaceTestHandler(FileHandler):
40+
def __init__(self):
41+
super().__init__()
42+
self.barrier = threading.Barrier(2) # Synchronize two threads
43+
44+
def should_handle(self, path: str) -> bool:
45+
return path.startswith("race-test://")
46+
47+
def handle_file(self, path, **kwargs):
48+
file_id = path.split("/")[-1]
49+
50+
# Set _output_path for this file
51+
self._output_path = f"/storage/{file_id}.txt"
52+
53+
# Wait for both threads to reach this point
54+
self.barrier.wait()
55+
56+
# Now both threads continue - without thread-local storage,
57+
# the second one would overwrite _output_path
58+
return FileMetadata(original_filename=f"{file_id}.txt")
59+
60+
61+
def test_output_path_thread_safety():
62+
"""Test that _output_path is thread-safe and doesn't have race conditions."""
63+
handler = ThreadRaceTestHandler()
64+
results = {}
65+
66+
def thread_a():
67+
handler._output_path = None
68+
path = "race-test://file_A"
69+
output = handler.execute(path)
70+
results["A"] = output[0].path
71+
72+
def thread_b():
73+
handler._output_path = None
74+
path = "race-test://file_B"
75+
output = handler.execute(path)
76+
results["B"] = output[0].path
77+
78+
thread1 = threading.Thread(target=thread_a)
79+
thread2 = threading.Thread(target=thread_b)
80+
81+
thread1.start()
82+
thread2.start()
83+
84+
thread1.join()
85+
thread2.join()
86+
87+
# Each thread should get its own correct path, not interfere with each other
88+
assert results["A"] == "/storage/file_A.txt"
89+
assert results["B"] == "/storage/file_B.txt"

tests/pipeline/test_transfer.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@
77
import pytest
88
from vcr_config import my_vcr
99

10+
from ricecooker.utils.pipeline.context import FileMetadata
11+
from ricecooker.utils.pipeline.exceptions import InvalidFileException
12+
from ricecooker.utils.pipeline.file_handler import FileHandler
1013
from ricecooker.utils.pipeline.transfer import DiskResourceHandler
14+
from ricecooker.utils.pipeline.transfer import DownloadStageHandler
1115
from ricecooker.utils.pipeline.transfer import (
1216
get_filename_from_content_disposition_header,
1317
)
@@ -291,3 +295,36 @@ def test_disk_transfer_non_file_protocol():
291295
"os.path.exists", return_value=False
292296
): # Ensure it doesn't try to check a web URL
293297
assert not handler.should_handle(path), "Handler should not handle HTTP URLs"
298+
299+
300+
class DummyPassthroughHandler(FileHandler):
301+
"""A dummy handler that passes through the original path without transferring to storage.
302+
303+
This simulates the bug where a download handler fails to actually download/transfer
304+
the file but returns the original URL as the path.
305+
"""
306+
307+
def should_handle(self, path: str) -> bool:
308+
return path.startswith("http://dummy-test-url.com")
309+
310+
def handle_file(self, path, **kwargs):
311+
# Intentionally don't use write_file context manager
312+
# This simulates a handler that fails to transfer the file to storage
313+
return FileMetadata(original_filename="test.txt")
314+
315+
316+
def test_download_stage_handler_catches_failed_transfer():
317+
"""Test that DownloadStageHandler catches when files aren't transferred to storage.
318+
319+
This is a regression test for the issue where download handlers would sometimes
320+
log "saved to [original URL]" instead of the actual storage path, indicating
321+
that the file wasn't actually transferred to storage.
322+
"""
323+
# Create a DownloadStageHandler with our dummy passthrough handler
324+
download_handler = DownloadStageHandler(children=[DummyPassthroughHandler()])
325+
326+
dummy_url = "http://dummy-test-url.com/test.txt"
327+
328+
# The handler should raise an InvalidFileException when the file isn't transferred to storage
329+
with pytest.raises(InvalidFileException, match="failed to transfer to storage"):
330+
download_handler.execute(dummy_url)

0 commit comments

Comments
 (0)