Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion alibabacloud_oss_v2/checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def __init__(
h.update(absfilepath.encode())
src_hash = h.hexdigest()

if len(basedir) == 0:
if basedir is None or len(basedir) == 0:
dirbase = gettempdir()
else:
dirbase = os.path.dirname(basedir)
Expand Down
21 changes: 21 additions & 0 deletions alibabacloud_oss_v2/encryption_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .crypto import MasterCipher, Envelope, ContentCipherBuilder, ContentCipher, CipherData
from .crypto.aes_ctr_cipher import AESCtrCipherBuilder
from .crypto.aes_ctr import _BLOCK_SIZE_LEN
from .uploader import Uploader

class EncryptionMultiPartContext:
"""EncryptionMultiPartContext save encryption or decryption information
Expand Down Expand Up @@ -180,6 +181,26 @@ def list_parts(self, request: models.ListPartsRequest, **kwargs

return self._client.list_parts(request, **kwargs)

def uploader(self, **kwargs) -> Uploader:
"""creates a Uploader instance to upload objects.
Args:
Returns:
Uploader: _description_
"""
return Uploader(self, **kwargs)

def get_content_cipher_from_list_parts(self, result: models.ListPartsResult) -> ContentCipher:
"""
Obtain encryption and decryption information based on the results of the list parts.
Args:
result (ListPartsResult): Result parameters for ListParts operation.
Returns:
ContentCipher: A data container for storing encrypted or decrypted objects.
"""
envelope = _get_envelope_from_list_parts(result)
cc = self._get_ccbuilder(envelope).content_cipher_from_env(envelope)
return cc

def _get_ccbuilder(self, envelope: Envelope ) -> ContentCipherBuilder:
return self._ccbuilders.get(envelope.mat_desc or '', self._defualt_ccbuilder)

Expand Down
59 changes: 45 additions & 14 deletions alibabacloud_oss_v2/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from .checkpoint import UploadCheckpoint
from .crc import Crc64
from .paginator import ListPartsPaginator
import alibabacloud_oss_v2 as oss

class UploadAPIClient(abc.ABC):
"""Abstract base class for uploader client."""
Expand Down Expand Up @@ -180,6 +181,7 @@ def __init__(
is_eclient = True
self._feature_flags = feature_flags
self._is_eclient = is_eclient
self._cse_multipart_context = None


def upload_file(
Expand Down Expand Up @@ -247,7 +249,7 @@ def upload_from(

def _delegate(
self,
request: models.GetObjectRequest,
request: models.PutObjectRequest,
**kwargs: Any
) -> "_UploaderDelegate":

Expand Down Expand Up @@ -306,7 +308,7 @@ def __init__(
"""
self._base = base
self._client = client
self._reqeust = request
self._request = request
self._options = options

parallel = options.parallel_num > 1
Expand Down Expand Up @@ -337,6 +339,7 @@ def __init__(
# resumable upload
self._upload_id = None
self._part_number = None
self._list_parts_result = None


@property
Expand Down Expand Up @@ -391,7 +394,7 @@ def check_checkpoint(self):
return

checkpoint = UploadCheckpoint(
request=self._reqeust,
request=self._request,
filepath=self._filepath,
basedir=self._options.checkpoint_dir,
fileinfo=self._file_stat,
Expand Down Expand Up @@ -433,6 +436,15 @@ def adjust_source(self):
part_number = uploaded_parts[-1].part_number
next_offset = part_number * self._options.part_size

if self._base._is_eclient:
cc = self._client.get_content_cipher_from_list_parts(self._list_parts_result)

self._base._cse_multipart_context = oss.EncryptionMultiPartContext(
content_cipher=cc,
part_size=utils.safety_int(self._options.part_size),
data_size=utils.safety_int(self._total_size),
)

#print(f'last part number={part_number}, next offset={next_offset}')

self._uploaded_parts = uploaded_parts
Expand Down Expand Up @@ -466,7 +478,7 @@ def upload(self) -> UploadResult:

def _single_part(self) -> UploadResult:
request = models.PutObjectRequest()
copy_request(request, self._reqeust)
copy_request(request, self._request)
request.body = self._reader
if request.content_type is None:
request.content_type = self._get_content_type()
Expand Down Expand Up @@ -519,7 +531,7 @@ def _multipart_part(self) -> UploadResult:
cmresult: models.CompleteMultipartUploadResult = None
if len(self._upload_errors) == 0:
request = models.CompleteMultipartUploadRequest()
copy_request(request, self._reqeust)
copy_request(request, self._request)
parts = sorted(self._uploaded_parts, key=lambda p: p.part_number)
request.upload_id = upload_ctx.upload_id
request.complete_multipart_upload = models.CompleteMultipartUpload(parts=parts)
Expand All @@ -534,7 +546,7 @@ def _multipart_part(self) -> UploadResult:
try:
abort_request = models.AbortMultipartUploadRequest()
abort_request.upload_id = upload_ctx.upload_id
copy_request(request, self._reqeust)
copy_request(request, self._request)
self._client.abort_multipart_upload(abort_request)
except Exception as _:
pass
Expand Down Expand Up @@ -565,12 +577,18 @@ def _get_upload_context(self) -> _UploadContext:

#if not exist or fail, create a new upload id
request = models.InitiateMultipartUploadRequest()
copy_request(request, self._reqeust)
copy_request(request, self._request)
if request.content_type is None:
request.content_type = self._get_content_type()
if request.cse_part_size is None:
request.cse_part_size = self._options.part_size
if request.cse_data_size is None:
request.cse_data_size = self._total_size

result = self._client.initiate_multipart_upload(request)

self._base._cse_multipart_context = result.cse_multipart_context

return _UploadContext(
upload_id=result.upload_id,
start_num=0,
Expand Down Expand Up @@ -622,20 +640,32 @@ def _upload_part(self, part):
hash_crc64 = None
try:
result = self._client.upload_part(models.UploadPartRequest(
bucket=self._reqeust.bucket,
key=self._reqeust.key,
bucket=self._request.bucket,
key=self._request.key,
upload_id=upload_id,
part_number=part_number,
body=body,
request_payer=self._reqeust.request_payer
request_payer=self._request.request_payer,
cse_multipart_context=self._base._cse_multipart_context
))
etag = result.etag
hash_crc64 = result.hash_crc64
except Exception as err:
error = err
self._update_progress(size)

return part_number, etag, error, hash_crc64, size

def _update_progress(self, increment: int):
if self._progress_lock:
with self._progress_lock:
self._transferred += increment
if self._request.progress_fn is not None:
self._request.progress_fn(increment, self._transferred, self._total_size)
else:
self._transferred += increment
if self._request.progress_fn is not None:
self._request.progress_fn(increment, self._transferred, self._total_size)

def _save_error(self, error) -> None:
if self._upload_part_lock:
Expand All @@ -656,13 +686,14 @@ def _iter_uploaded_part(self):
try:
paginator = ListPartsPaginator(self._client)
iterator = paginator.iter_page(models.ListPartsRequest(
bucket=self._reqeust.bucket,
key=self._reqeust.key,
request_payer=self._reqeust.request_payer,
bucket=self._request.bucket,
key=self._request.key,
request_payer=self._request.request_payer,
upload_id=self._upload_id,
))
check_part_number = 1
for page in iterator:
self._list_parts_result = page
for part in page.parts:
if (part.part_number != check_part_number or
part.size != self._options.part_size):
Expand Down Expand Up @@ -707,6 +738,6 @@ def _assert_crc_same(self, headers: MutableMapping):
def _wrap_error(self, upload_id: str, error: Exception) -> Exception:
return UploadError(
upload_id=upload_id,
path=f'oss://{self._reqeust.bucket}/{self._reqeust.key}',
path=f'oss://{self._request.bucket}/{self._request.key}',
error=error
)
58 changes: 58 additions & 0 deletions sample/encryption_upload_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import argparse
import alibabacloud_oss_v2 as oss

parser = argparse.ArgumentParser(description="encryption upload file sample")
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
parser.add_argument('--key', help='The name of the object.', required=True)
parser.add_argument('--file_path', help='The path of Upload file.', required=True)

RSA_PUBLIC_KEY = """-----BEGIN PUBLIC KEY-----
MIGfMA0G6mse2QsIgz3******GBcom6kEF6MmR1EKixaQIDAQAB
-----END PUBLIC KEY-----"""

RSA_PRIVATE_KEY = """-----BEGIN PRIVATE KEY-----
MIICdQIBADANBgk******ItewfwXIL1Mqz53lO/gK+q6TR92gGc+4ajL
-----END PRIVATE KEY-----"""


def main():

args = parser.parse_args()

# Loading credentials values from the environment variables
credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

# Using the SDK's default configuration
cfg = oss.config.load_default()
cfg.credentials_provider = credentials_provider
cfg.region = args.region
if args.endpoint is not None:
cfg.endpoint = args.endpoint

client = oss.Client(cfg)

mc = oss.crypto.MasterRsaCipher(
mat_desc={"desc": "your master encrypt key material describe information"},
public_key=RSA_PUBLIC_KEY,
private_key=RSA_PRIVATE_KEY
)
encryption_client = oss.EncryptionClient(client, mc)
up_loader = encryption_client.uploader(part_size=1000 * 1024,
parallel_num=5,
leave_parts_on_error=True,
enable_checkpoint=True)



result = up_loader.upload_file(oss.PutObjectRequest(
bucket=args.bucket,
key=args.key,
), filepath=args.file_path)

print(vars(result))

if __name__ == "__main__":
main()

58 changes: 58 additions & 0 deletions sample/encryption_upload_from.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import argparse
import alibabacloud_oss_v2 as oss

parser = argparse.ArgumentParser(description="encryption upload from sample")
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
parser.add_argument('--key', help='The name of the object.', required=True)
parser.add_argument('--file_path', help='The path of Upload file.', required=True)

RSA_PUBLIC_KEY = """-----BEGIN PUBLIC KEY-----
MIGfMA0G6mse2QsIgz3******GBcom6kEF6MmR1EKixaQIDAQAB
-----END PUBLIC KEY-----"""

RSA_PRIVATE_KEY = """-----BEGIN PRIVATE KEY-----
MIICdQIBADANBgk******ItewfwXIL1Mqz53lO/gK+q6TR92gGc+4ajL
-----END PRIVATE KEY-----"""


def main():

args = parser.parse_args()

# Loading credentials values from the environment variables
credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

# Using the SDK's default configuration
cfg = oss.config.load_default()
cfg.credentials_provider = credentials_provider
cfg.region = args.region
if args.endpoint is not None:
cfg.endpoint = args.endpoint

client = oss.Client(cfg)

mc = oss.crypto.MasterRsaCipher(
mat_desc={"desc": "your master encrypt key material describe information"},
public_key=RSA_PUBLIC_KEY,
private_key=RSA_PRIVATE_KEY
)
encryption_client = oss.EncryptionClient(client, mc)
up_loader = encryption_client.uploader(part_size=1000 * 1024,
parallel_num=5,
leave_parts_on_error=True,
enable_checkpoint=True)


with open(file=args.file_path, mode='rb') as f:
result = up_loader.upload_from(oss.PutObjectRequest(
bucket=args.bucket,
key=args.key,
), reader=f)

print(vars(result))

if __name__ == "__main__":
main()

54 changes: 54 additions & 0 deletions sample/progress_upload_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import argparse
import alibabacloud_oss_v2 as oss

parser = argparse.ArgumentParser(description="progress upload file sample")
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
parser.add_argument('--key', help='The name of the object.', required=True)
parser.add_argument('--file_path', help='The path of Upload file.', required=True)


def main():

args = parser.parse_args()

# Loading credentials values from the environment variables
credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

# Using the SDK's default configuration
cfg = oss.config.load_default()
cfg.credentials_provider = credentials_provider
cfg.region = args.region
if args.endpoint is not None:
cfg.endpoint = args.endpoint

client = oss.Client(cfg)

up_loader = client.uploader(part_size=100*1024,
parallel_num=5,
leave_parts_on_error=True,
enable_checkpoint=True,
)

global progress_save_n
progress_save_n = 0
def _progress_fn(n, _written, total):
global progress_save_n
progress_save_n += n
rate = int(100 * (float(_written) / float(total)))
print('\r{0}% '.format(rate))

result = up_loader.upload_file(oss.PutObjectRequest(
bucket=args.bucket,
key=args.key,
progress_fn=_progress_fn,
), filepath=args.file_path)

print(vars(result))



if __name__ == "__main__":
main()

Loading