diff --git a/alibabacloud_oss_v2/uploader.py b/alibabacloud_oss_v2/uploader.py index a4cea31..ea406e4 100644 --- a/alibabacloud_oss_v2/uploader.py +++ b/alibabacloud_oss_v2/uploader.py @@ -274,6 +274,9 @@ def _delegate( if options.parallel_num <= 0: options.parallel_num = defaults.DEFAULT_UPLOAD_PARALLEL + if hasattr(request, "parameters") and request.parameters.get('sequential') is not None: + options.parallel_num = 1 + delegate = _UploaderDelegate( base=self, client=self._client, @@ -471,6 +474,9 @@ def _single_part(self) -> UploadResult: if request.content_type is None: request.content_type = self._get_content_type() + if hasattr(self._reqeust, "parameters"): + request.parameters = self._reqeust.parameters + try: result = self._client.put_object(request) except Exception as err: @@ -520,6 +526,10 @@ def _multipart_part(self) -> UploadResult: if len(self._upload_errors) == 0: request = models.CompleteMultipartUploadRequest() copy_request(request, self._reqeust) + + if hasattr(self._reqeust, "parameters"): + request.parameters = self._reqeust.parameters + parts = sorted(self._uploaded_parts, key=lambda p: p.part_number) request.upload_id = upload_ctx.upload_id request.complete_multipart_upload = models.CompleteMultipartUpload(parts=parts) @@ -535,6 +545,10 @@ def _multipart_part(self) -> UploadResult: abort_request = models.AbortMultipartUploadRequest() abort_request.upload_id = upload_ctx.upload_id copy_request(request, self._reqeust) + + if hasattr(self._reqeust, "parameters"): + request.parameters = self._reqeust.parameters + self._client.abort_multipart_upload(abort_request) except Exception as _: pass @@ -566,6 +580,10 @@ def _get_upload_context(self) -> _UploadContext: #if not exist or fail, create a new upload id request = models.InitiateMultipartUploadRequest() copy_request(request, self._reqeust) + + if hasattr(self._reqeust, "parameters"): + request.parameters = self._reqeust.parameters + if request.content_type is None: request.content_type = self._get_content_type() diff --git a/tests/integration/test_uploader.py b/tests/integration/test_uploader.py new file mode 100644 index 0000000..b29d6ae --- /dev/null +++ b/tests/integration/test_uploader.py @@ -0,0 +1,68 @@ +# pylint: skip-file + +import tempfile +import alibabacloud_oss_v2 as oss +from typing import Dict, Any +from . import TestIntegration, random_bucket_name, random_str, OBJECTNAME_PREFIX + + +class TestUploader(TestIntegration): + def test_uploader_with_sequential(self): + length = 100 * 1024 + 123 + data = random_str(length) + key = OBJECTNAME_PREFIX + random_str(16) + key_2 = OBJECTNAME_PREFIX + random_str(16) + + uploader = self.client.uploader() + + # case 1: usually uploader + with tempfile.TemporaryFile() as fp: + fp.write(data.encode()) + fp.seek(0) + + result = uploader.upload_from(oss.PutObjectRequest( + bucket=self.bucket_name, + key=key, + ), + reader=fp, + part_size=100*1024, + ) + self.assertIsNotNone(result) + self.assertEqual(200, result.status_code) + + head_result = self.client.head_object(oss.HeadObjectRequest( + bucket=self.bucket_name, + key=key, + )) + etag = head_result.etag + self.assertEqual(200, head_result.status_code) + self.assertEqual("Multipart", head_result.object_type) + self.assertIsNone(head_result.content_md5) + + # case 2: sequential uploader + kwargs: Dict[str, Any] = {} + kwargs['parameters'] = {'sequential': ''} + + with tempfile.TemporaryFile() as fp: + fp.write(data.encode()) + fp.seek(0) + + result = uploader.upload_from(oss.PutObjectRequest( + bucket=self.bucket_name, + key=key_2, + **kwargs, + ), + reader=fp, + part_size=100*1024, + ) + self.assertIsNotNone(result) + self.assertEqual(200, result.status_code) + + head_result = self.client.head_object(oss.HeadObjectRequest( + bucket=self.bucket_name, + key=key_2, + )) + self.assertEqual(200, head_result.status_code) + self.assertEqual("Multipart", head_result.object_type) + self.assertEqual(etag, head_result.etag) + self.assertIsNotNone(head_result.content_md5)