Skip to content

Commit fc92ad1

Browse files
authored
feat: add crc32c_checksum argument to download_chunks_concurrently (#1138)
1 parent a455195 commit fc92ad1

File tree

5 files changed

+262
-22
lines changed

5 files changed

+262
-22
lines changed

google/cloud/storage/transfer_manager.py

Lines changed: 142 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import warnings
2323
import pickle
2424
import copyreg
25+
import struct
26+
import base64
2527
import functools
2628

2729
from google.api_core import exceptions
@@ -32,9 +34,11 @@
3234
from google.cloud.storage._helpers import _api_core_retry_to_resumable_media_retry
3335
from google.cloud.storage.retry import DEFAULT_RETRY
3436

37+
import google_crc32c
38+
3539
from google.resumable_media.requests.upload import XMLMPUContainer
3640
from google.resumable_media.requests.upload import XMLMPUPart
37-
41+
from google.resumable_media.common import DataCorruption
3842

3943
warnings.warn(
4044
"The module `transfer_manager` is a preview feature. Functionality and API "
@@ -44,6 +48,7 @@
4448

4549
TM_DEFAULT_CHUNK_SIZE = 32 * 1024 * 1024
4650
DEFAULT_MAX_WORKERS = 8
51+
MAX_CRC32C_ZERO_ARRAY_SIZE = 4 * 1024 * 1024
4752
METADATA_HEADER_TRANSLATION = {
4853
"cacheControl": "Cache-Control",
4954
"contentDisposition": "Content-Disposition",
@@ -57,6 +62,20 @@
5762
PROCESS = "process"
5863
THREAD = "thread"
5964

65+
DOWNLOAD_CRC32C_MISMATCH_TEMPLATE = """\
66+
Checksum mismatch while downloading:
67+
68+
{}
69+
70+
The object metadata indicated a crc32c checksum of:
71+
72+
{}
73+
74+
but the actual crc32c checksum of the downloaded contents was:
75+
76+
{}
77+
"""
78+
6079

6180
_cached_clients = {}
6281

@@ -732,6 +751,8 @@ def download_chunks_concurrently(
732751
deadline=None,
733752
worker_type=PROCESS,
734753
max_workers=DEFAULT_MAX_WORKERS,
754+
*,
755+
crc32c_checksum=True,
735756
):
736757
"""Download a single file in chunks, concurrently.
737758
@@ -744,9 +765,6 @@ def download_chunks_concurrently(
744765
performance under normal circumstances due to Python interpreter threading
745766
behavior. The default is therefore to use processes instead of threads.
746767
747-
Checksumming (md5 or crc32c) is not supported for chunked operations. Any
748-
`checksum` parameter passed in to download_kwargs will be ignored.
749-
750768
:param bucket:
751769
The bucket which contains the blobs to be downloaded
752770
@@ -768,10 +786,13 @@ def download_chunks_concurrently(
768786
:param download_kwargs:
769787
A dictionary of keyword arguments to pass to the download method. Refer
770788
to the documentation for blob.download_to_file() or
771-
blob.download_to_filename() for more information. The dict is directly passed into the download methods and is not validated by this function.
789+
blob.download_to_filename() for more information. The dict is directly
790+
passed into the download methods and is not validated by this function.
772791
773792
Keyword arguments "start" and "end" which are not supported and will
774-
cause a ValueError if present.
793+
cause a ValueError if present. The key "checksum" is also not supported
794+
in download_kwargs, but see the argument "crc32c_checksum" (which does
795+
not go in download_kwargs) below.
775796
776797
:type deadline: int
777798
:param deadline:
@@ -811,15 +832,33 @@ def download_chunks_concurrently(
811832
and the default is a conservative number that should work okay in most
812833
cases without consuming excessive resources.
813834
814-
:raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded.
835+
:type crc32c_checksum: bool
836+
:param crc32c_checksum:
837+
Whether to compute a checksum for the resulting object, using the crc32c
838+
algorithm. As the checksums for each chunk must be combined using a
839+
feature of crc32c that is not available for md5, md5 is not supported.
840+
841+
:raises:
842+
:exc:`concurrent.futures.TimeoutError`
843+
if deadline is exceeded.
844+
:exc:`google.resumable_media.common.DataCorruption` if the download's
845+
checksum doesn't agree with server-computed checksum. The
846+
`google.resumable_media` exception is used here for consistency
847+
with other download methods despite the exception originating
848+
elsewhere.
815849
"""
850+
client = blob.client
816851

817852
if download_kwargs is None:
818853
download_kwargs = {}
819854
if "start" in download_kwargs or "end" in download_kwargs:
820855
raise ValueError(
821856
"Download arguments 'start' and 'end' are not supported by download_chunks_concurrently."
822857
)
858+
if "checksum" in download_kwargs:
859+
raise ValueError(
860+
"'checksum' is in download_kwargs, but is not supported because sliced downloads have a different checksum mechanism from regular downloads. Use the 'crc32c_checksum' argument on download_chunks_concurrently instead."
861+
)
823862

824863
download_kwargs["command"] = "tm.download_sharded"
825864

@@ -851,16 +890,42 @@ def download_chunks_concurrently(
851890
start=start,
852891
end=cursor - 1,
853892
download_kwargs=download_kwargs,
893+
crc32c_checksum=crc32c_checksum,
854894
)
855895
)
856896

857897
concurrent.futures.wait(
858898
futures, timeout=deadline, return_when=concurrent.futures.ALL_COMPLETED
859899
)
860900

861-
# Raise any exceptions. Successful results can be ignored.
901+
# Raise any exceptions; combine checksums.
902+
results = []
862903
for future in futures:
863-
future.result()
904+
results.append(future.result())
905+
906+
if crc32c_checksum and results:
907+
crc_digest = _digest_ordered_checksum_and_size_pairs(results)
908+
actual_checksum = base64.b64encode(crc_digest).decode("utf-8")
909+
expected_checksum = blob.crc32c
910+
if actual_checksum != expected_checksum:
911+
# For consistency with other download methods we will use
912+
# "google.resumable_media.common.DataCorruption" despite the error
913+
# not originating inside google.resumable_media.
914+
download_url = blob._get_download_url(
915+
client,
916+
if_generation_match=download_kwargs.get("if_generation_match"),
917+
if_generation_not_match=download_kwargs.get("if_generation_not_match"),
918+
if_metageneration_match=download_kwargs.get("if_metageneration_match"),
919+
if_metageneration_not_match=download_kwargs.get(
920+
"if_metageneration_not_match"
921+
),
922+
)
923+
raise DataCorruption(
924+
None,
925+
DOWNLOAD_CRC32C_MISMATCH_TEMPLATE.format(
926+
download_url, expected_checksum, actual_checksum
927+
),
928+
)
864929
return None
865930

866931

@@ -1118,23 +1183,58 @@ def _headers_from_metadata(metadata):
11181183

11191184

11201185
def _download_and_write_chunk_in_place(
1121-
maybe_pickled_blob, filename, start, end, download_kwargs
1186+
maybe_pickled_blob, filename, start, end, download_kwargs, crc32c_checksum
11221187
):
11231188
"""Helper function that runs inside a thread or subprocess.
11241189
11251190
`maybe_pickled_blob` is either a Blob (for threads) or a specially pickled
11261191
Blob (for processes) because the default pickling mangles Client objects
1127-
which are attached to Blobs."""
1192+
which are attached to Blobs.
1193+
1194+
Returns a crc if configured (or None) and the size written.
1195+
"""
11281196

11291197
if isinstance(maybe_pickled_blob, Blob):
11301198
blob = maybe_pickled_blob
11311199
else:
11321200
blob = pickle.loads(maybe_pickled_blob)
1133-
with open(
1134-
filename, "rb+"
1135-
) as f: # Open in mixed read/write mode to avoid truncating or appending
1136-
f.seek(start)
1137-
return blob._prep_and_do_download(f, start=start, end=end, **download_kwargs)
1201+
1202+
with _ChecksummingSparseFileWrapper(filename, start, crc32c_checksum) as f:
1203+
blob._prep_and_do_download(f, start=start, end=end, **download_kwargs)
1204+
return (f.crc, (end - start) + 1)
1205+
1206+
1207+
class _ChecksummingSparseFileWrapper:
1208+
"""A file wrapper that writes to a sparse file and optionally checksums.
1209+
1210+
This wrapper only implements write() and does not inherit from `io` module
1211+
base classes.
1212+
"""
1213+
1214+
def __init__(self, filename, start_position, crc32c_enabled):
1215+
# Open in mixed read/write mode to avoid truncating or appending
1216+
self.f = open(filename, "rb+")
1217+
self.f.seek(start_position)
1218+
self._crc = None
1219+
self._crc32c_enabled = crc32c_enabled
1220+
1221+
def write(self, chunk):
1222+
if self._crc32c_enabled:
1223+
if self._crc is None:
1224+
self._crc = google_crc32c.value(chunk)
1225+
else:
1226+
self._crc = google_crc32c.extend(self._crc, chunk)
1227+
self.f.write(chunk)
1228+
1229+
@property
1230+
def crc(self):
1231+
return self._crc
1232+
1233+
def __enter__(self):
1234+
return self
1235+
1236+
def __exit__(self, exc_type, exc_value, tb):
1237+
self.f.close()
11381238

11391239

11401240
def _call_method_on_maybe_pickled_blob(
@@ -1208,6 +1308,32 @@ def _get_pool_class_and_requirements(worker_type):
12081308
)
12091309

12101310

1311+
def _digest_ordered_checksum_and_size_pairs(checksum_and_size_pairs):
1312+
base_crc = None
1313+
zeroes = bytes(MAX_CRC32C_ZERO_ARRAY_SIZE)
1314+
for part_crc, size in checksum_and_size_pairs:
1315+
if not base_crc:
1316+
base_crc = part_crc
1317+
else:
1318+
base_crc ^= 0xFFFFFFFF # precondition
1319+
1320+
# Zero pad base_crc32c. To conserve memory, do so with only
1321+
# MAX_CRC32C_ZERO_ARRAY_SIZE at a time. Reuse the zeroes array where
1322+
# possible.
1323+
padded = 0
1324+
while padded < size:
1325+
desired_zeroes_size = min((size - padded), MAX_CRC32C_ZERO_ARRAY_SIZE)
1326+
base_crc = google_crc32c.extend(base_crc, zeroes[:desired_zeroes_size])
1327+
padded += desired_zeroes_size
1328+
1329+
base_crc ^= 0xFFFFFFFF # postcondition
1330+
base_crc ^= part_crc
1331+
crc_digest = struct.pack(
1332+
">L", base_crc
1333+
) # https://cloud.google.com/storage/docs/json_api/v1/objects#crc32c
1334+
return crc_digest
1335+
1336+
12111337
class _LazyClient:
12121338
"""An object that will transform into either a cached or a new Client"""
12131339

samples/snippets/snippets_test.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ def test_list_blobs_with_prefix(test_blob, capsys):
213213
def test_upload_blob(test_bucket):
214214
with tempfile.NamedTemporaryFile() as source_file:
215215
source_file.write(b"test")
216+
source_file.flush()
216217

217218
storage_upload_file.upload_blob(
218219
test_bucket.name, source_file.name, "test_upload_blob"
@@ -243,6 +244,7 @@ def test_upload_blob_with_kms(test_bucket):
243244
blob_name = f"test_upload_with_kms_{uuid.uuid4().hex}"
244245
with tempfile.NamedTemporaryFile() as source_file:
245246
source_file.write(b"test")
247+
source_file.flush()
246248
storage_upload_with_kms_key.upload_blob_with_kms(
247249
test_bucket.name,
248250
source_file.name,
@@ -779,6 +781,7 @@ def test_transfer_manager_download_chunks_concurrently(test_bucket, capsys):
779781

780782
with tempfile.NamedTemporaryFile() as file:
781783
file.write(b"test")
784+
file.flush()
782785

783786
storage_upload_file.upload_blob(test_bucket.name, file.name, BLOB_NAME)
784787

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
"google-cloud-core >= 2.3.0, < 3.0dev",
3434
"google-resumable-media >= 2.6.0",
3535
"requests >= 2.18.0, < 3.0.0dev",
36+
"google-crc32c >= 1.0, < 2.0dev",
3637
]
3738
extras = {"protobuf": ["protobuf<5.0.0dev"]}
3839

tests/system/test_transfer_manager.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,19 @@ def test_download_chunks_concurrently(shared_bucket, file_data):
172172
with open(trailing_chunk_filename, "rb") as file_obj:
173173
assert _base64_md5hash(file_obj) == source_file["hash"]
174174

175+
# And for a case where there is only one chunk.
176+
trailing_chunk_filename = os.path.join(tempdir, "chunky_file_3")
177+
transfer_manager.download_chunks_concurrently(
178+
download_blob,
179+
trailing_chunk_filename,
180+
chunk_size=size,
181+
deadline=DEADLINE,
182+
)
183+
with open(trailing_chunk_filename, "rb") as file_obj:
184+
assert _base64_md5hash(file_obj) == source_file["hash"]
185+
175186
# Also test threaded mode.
176-
threaded_filename = os.path.join(tempdir, "chunky_file_3")
187+
threaded_filename = os.path.join(tempdir, "chunky_file_4")
177188
transfer_manager.download_chunks_concurrently(
178189
download_blob,
179190
threaded_filename,

0 commit comments

Comments
 (0)