From b08f9f777da6d87b594630d7b91275104fb4a328 Mon Sep 17 00:00:00 2001 From: zhuxiaolong37 Date: Thu, 27 Mar 2025 14:08:51 +0800 Subject: [PATCH] Add encryption client file like --- alibabacloud_oss_v2/encryption_client.py | 25 + sample/eclient_file_like_read_only.py | 53 ++ tests/integration/test_encryption_client.py | 769 ++++++++++++++++++++ 3 files changed, 847 insertions(+) create mode 100644 sample/eclient_file_like_read_only.py diff --git a/alibabacloud_oss_v2/encryption_client.py b/alibabacloud_oss_v2/encryption_client.py index 3675c3a..f084f5b 100644 --- a/alibabacloud_oss_v2/encryption_client.py +++ b/alibabacloud_oss_v2/encryption_client.py @@ -12,6 +12,7 @@ from .crypto import MasterCipher, Envelope, ContentCipherBuilder, ContentCipher, CipherData from .crypto.aes_ctr_cipher import AESCtrCipherBuilder from .crypto.aes_ctr import _BLOCK_SIZE_LEN +from .filelike import ReadOnlyFile class EncryptionMultiPartContext: """EncryptionMultiPartContext save encryption or decryption information @@ -180,6 +181,30 @@ def list_parts(self, request: models.ListPartsRequest, **kwargs return self._client.list_parts(request, **kwargs) + def open_file(self, bucket: str, key: str, + version_id: Optional[str] = None, + request_payer: Optional[str] = None, + **kwargs) -> ReadOnlyFile: + """OpenFile opens the named file for reading. + + Args: + bucket (str, required): The name of the bucket. + key (str, required): The name of the object. + version_id (str, optional): The version ID of the object. + request_payer (str, optional): To indicate that the requester is aware that the request and data download will incur costs + + Returns: + ReadOnlyFile: _description_ + """ + return ReadOnlyFile( + self, + bucket=bucket, + key=key, + version_id=version_id, + request_payer=request_payer, + **kwargs + ) + def _get_ccbuilder(self, envelope: Envelope ) -> ContentCipherBuilder: return self._ccbuilders.get(envelope.mat_desc or '', self._defualt_ccbuilder) diff --git a/sample/eclient_file_like_read_only.py b/sample/eclient_file_like_read_only.py new file mode 100644 index 0000000..c6133a9 --- /dev/null +++ b/sample/eclient_file_like_read_only.py @@ -0,0 +1,53 @@ +import argparse +import alibabacloud_oss_v2 as oss + + +parser = argparse.ArgumentParser(description="file like read only file sample") +parser.add_argument('--region', help='The region in which the bucket is located.', required=True) +parser.add_argument('--bucket', help='The name of the bucket.', required=True) +parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS') +parser.add_argument('--key', help='The name of the object.', required=True) + +RSA_PUBLIC_KEY = """-----BEGIN PUBLIC KEY----- +MIGfMA0G6mse2QsIgz3******GBcom6kEF6MmR1EKixaQIDAQAB +-----END PUBLIC KEY-----""" + +RSA_PRIVATE_KEY = """-----BEGIN PRIVATE KEY----- +MIICdQIBADANBgk******ItewfwXIL1Mqz53lO/gK+q6TR92gGc+4ajL +-----END PRIVATE KEY-----""" + + +def main(): + + args = parser.parse_args() + + # Loading credentials values from the environment variables + credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider() + + # Using the SDK's default configuration + cfg = oss.config.load_default() + cfg.credentials_provider = credentials_provider + cfg.region = args.region + + + if args.endpoint is not None: + cfg.endpoint = args.endpoint + + client = oss.Client(cfg) + + mc = oss.crypto.MasterRsaCipher( + mat_desc={"desc": "your master encrypt key material describe information"}, + public_key=RSA_PUBLIC_KEY, + private_key=RSA_PRIVATE_KEY + ) + encryption_client = oss.EncryptionClient(client, mc) + + rf: oss.ReadOnlyFile = None + with encryption_client.open_file(args.bucket, args.key) as f: + rf = f + print(rf.read().decode()) + + +if __name__ == "__main__": + main() + diff --git a/tests/integration/test_encryption_client.py b/tests/integration/test_encryption_client.py index 2d04f1c..987f078 100644 --- a/tests/integration/test_encryption_client.py +++ b/tests/integration/test_encryption_client.py @@ -1,5 +1,6 @@ # pylint: skip-file import os +import random import tempfile from typing import cast import alibabacloud_oss_v2 as oss @@ -468,3 +469,771 @@ def test_compatibility(self): self.assertEqual(example_data[i:12346], data) self.assertEqual(size, len(data)) + def test_eclient_open_file_baisc(self): + + mc = oss.crypto.MasterRsaCipher( + mat_desc={"tag": "value"}, + public_key=RSA_PUBLIC_KEY, + private_key=RSA_PRIVATE_KEY + ) + eclient = oss.EncryptionClient(self.client, mc) + + data_size = 1234 + key = 'check_member-' + random_str(6) + data = random_str(data_size).encode() + result = eclient.put_object(oss.PutObjectRequest( + bucket=self.bucket_name, + key=key, + body=data + )) + self.assertEqual(200, result.status_code) + self.assertEqual('OK', result.status) + + rf: oss.ReadOnlyFile = None + with eclient.open_file(self.bucket_name, key) as f: + self.assertIsNotNone(f) + rf = f + self.assertEqual(0, f.tell()) + self.assertEqual(True, f.seekable()) + self.assertEqual(True, f.readable()) + self.assertEqual(False, f.closed) + self.assertEqual(f'oss://{self.bucket_name}/{key}', f.name) + self.assertEqual('rb', f.mode) + + # seek, tell + f.seek(0, os.SEEK_SET) + self.assertEqual(0, f.tell()) + offset = f.tell() + b = f.read(2) + self.assertEqual(data[offset:offset + 2], b) + + f.seek(1, os.SEEK_SET) + self.assertEqual(1, f.tell()) + offset = f.tell() + b = f.read(2) + self.assertEqual(data[offset:offset + 2], b) + + f.seek(data_size, os.SEEK_SET) + self.assertEqual(data_size, f.tell()) + offset = f.tell() + b = f.read(2) + self.assertEqual(data[offset:offset + 2], b) + + f.seek(-data_size, os.SEEK_END) + self.assertEqual(0, f.tell()) + offset = f.tell() + b = f.read(2) + self.assertEqual(data[offset:offset + 2], b) + + f.seek(-1, os.SEEK_END) + self.assertEqual(data_size - 1, f.tell()) + offset = f.tell() + b = f.read(2) + self.assertEqual(data[data_size - 1:], b) + + f.seek(0, os.SEEK_END) + self.assertEqual(data_size, f.tell()) + offset = f.tell() + b = f.read(2) + self.assertEqual(b'', b) + + f.seek(123, os.SEEK_SET) + self.assertEqual(123, f.tell()) + offset = f.tell() + b = f.read(2) + self.assertEqual(data[offset:offset + 2], b) + + f.seek(123, os.SEEK_CUR) + self.assertEqual(248, f.tell()) + offset = f.tell() + b = f.read(2) + self.assertEqual(data[offset:offset + 2], b) + + self.assertEqual(True, rf.closed) + self.assertEqual(None, rf._read_buf) + self.assertEqual(None, rf._stream_reader) + self.assertEqual(None, rf._stream_iter) + + # call close many times + rf.close() + rf.close() + + rf = eclient.open_file(self.bucket_name, key) + self.assertIsNotNone(rf) + self.assertEqual(False, rf.closed) + with rf as f: + self.assertEqual(False, f.closed) + self.assertEqual(True, rf.closed) + + def test_eclient_open_file_read_size(self): + mc = oss.crypto.MasterRsaCipher( + mat_desc={"tag": "value"}, + public_key=RSA_PUBLIC_KEY, + private_key=RSA_PRIVATE_KEY + ) + eclient = oss.EncryptionClient(self.client, mc) + + key = 'read_size-' + random_str(6) + data = random_str(100 * 1024 * 5 + 1234).encode() + + result = eclient.put_object(oss.PutObjectRequest( + bucket=self.bucket_name, + key=key, + body=data + )) + self.assertEqual(200, result.status_code) + self.assertEqual('OK', result.status) + + # read with size + rf: oss.ReadOnlyFile = None + with eclient.open_file(self.bucket_name, key) as f: + self.assertIsNotNone(f) + rf = f + end = 129 + for i in range(0, end): + size = 200 * 1024 + 12345 - i + f.seek(i, 0) + self.assertEqual(i, f.tell()) + got = f.read(size) + self.assertEqual(size, len(got)) + self.assertEqual(data[i:i + size], got) + self.assertEqual(i + size, f.tell()) + + self.assertEqual(True, rf.closed) + + def test_eclient_open_file_readall(self): + mc = oss.crypto.MasterRsaCipher( + mat_desc={"tag": "value"}, + public_key=RSA_PUBLIC_KEY, + private_key=RSA_PRIVATE_KEY + ) + eclient = oss.EncryptionClient(self.client, mc) + + key = 'readall-' + random_str(6) + data = random_str(100 * 1024 * 5 + 1234).encode() + + result = eclient.put_object(oss.PutObjectRequest( + bucket=self.bucket_name, + key=key, + body=data + )) + self.assertEqual(200, result.status_code) + self.assertEqual('OK', result.status) + + # read all + with eclient.open_file(self.bucket_name, key) as f: + self.assertIsNotNone(f) + f.seek(123) + self.assertEqual(123, f.tell()) + got = f.readall() + self.assertEqual(data[123:], got) + self.assertEqual(len(data), f.tell()) + + f.seek(1234) + got1 = f.read(17) + self.assertEqual(1234 + 17, f.tell()) + self.assertEqual(data[1234:1234 + 17], got1) + got2 = f.read() + self.assertEqual(len(data), f.tell()) + self.assertEqual(data[1234 + 17:], got2) + self.assertEqual(data[1234:], got1 + got2) + + f.seek(12345) + got1 = f.read(172) + self.assertEqual(12345 + 172, f.tell()) + self.assertEqual(data[12345:12345 + 172], got1) + got2 = f.readall() + self.assertEqual(len(data), f.tell()) + self.assertEqual(data[12345 + 172:], got2) + self.assertEqual(data[12345:], got1 + got2) + + def test_eclient_open_file_readinto(self): + mc = oss.crypto.MasterRsaCipher( + mat_desc={"tag": "value"}, + public_key=RSA_PUBLIC_KEY, + private_key=RSA_PRIVATE_KEY + ) + eclient = oss.EncryptionClient(self.client, mc) + + key = 'readinto-' + random_str(6) + data = random_str(100 * 1024 * 5 + 1234).encode() + + result = eclient.put_object(oss.PutObjectRequest( + bucket=self.bucket_name, + key=key, + body=data + )) + self.assertEqual(200, result.status_code) + self.assertEqual('OK', result.status) + + # read into bytearray + with eclient.open_file(self.bucket_name, key) as f: + self.assertIsNotNone(f) + b = bytearray(11) + self.assertEqual(0, f.tell()) + n = f.readinto(b) + self.assertEqual(11, n) + self.assertEqual(data[0:11], b) + self.assertEqual(11, f.tell()) + + b = bytearray(9) + n = f.readinto(b) + self.assertEqual(9, n) + self.assertEqual(data[11:20], b) + self.assertEqual(20, f.tell()) + + b = bytearray(len(data)) + f.seek(12345) + n = f.readinto(b) + self.assertEqual(len(data) - 12345, n) + self.assertEqual(len(data), f.tell()) + + b = bytearray(len(data) * 2) + f.seek(1234) + n = f.readinto(b) + self.assertEqual(len(data) - 1234, n) + self.assertEqual(len(data), f.tell()) + self.assertEqual(data[1234:], b[:len(data) - 1234]) + + # read into blob = memoryview(bytearray(size)) + with eclient.open_file(self.bucket_name, key) as f: + self.assertIsNotNone(f) + blob = memoryview(bytearray(len(data))) + self.assertEqual(0, f.tell()) + n = f.readinto(blob[0:11]) + self.assertEqual(11, n) + self.assertEqual(data[0:11], blob[0:11]) + self.assertEqual(11, f.tell()) + + n = f.readinto(blob[11:20]) + self.assertEqual(9, n) + self.assertEqual(data[11:20], blob[11:20]) + self.assertEqual(20, f.tell()) + + # remains + n = f.readinto(blob) + self.assertEqual(len(data) - 20, n) + self.assertEqual(data[20:], blob[0:n]) + self.assertEqual(len(data), f.tell()) + + def test_eclient_open_file_fail(self): + mc = oss.crypto.MasterRsaCipher( + mat_desc={"tag": "value"}, + public_key=RSA_PUBLIC_KEY, + private_key=RSA_PRIVATE_KEY + ) + eclient = oss.EncryptionClient(self.client, mc) + + key = 'fail-test-' + random_str(6) + nokey = key + 'no-key' + data = random_str(1234).encode() + + result = eclient.put_object(oss.PutObjectRequest( + bucket=self.bucket_name, + key=key, + body=data + )) + self.assertEqual(200, result.status_code) + self.assertEqual('OK', result.status) + + # open fail + try: + with eclient.open_file(self.bucket_name, nokey) as f: + self.assertIsNotNone(f) + self.fail('should not here') + except oss.PathError as err: + self.assertIn('stat_object', str(err)) + self.assertIn(f'oss://{self.bucket_name}/{nokey}', str(err)) + + # seek fail + try: + with eclient.open_file(self.bucket_name, key) as f: + f.seek(len(data) + 1, os.SEEK_SET) + self.fail('should not here') + except oss.PathError as err: + self.assertIn('seek', str(err)) + self.assertIn('offset is unavailable', str(err)) + + try: + with eclient.open_file(self.bucket_name, key) as f: + f.seek(-1, os.SEEK_SET) + self.fail('should not here') + except oss.PathError as err: + self.assertIn('seek', str(err)) + self.assertIn('negative seek position', str(err)) + + try: + with eclient.open_file(self.bucket_name, key) as f: + f.seek(0, 3) + self.fail('should not here') + except oss.PathError as err: + self.assertIn('seek', str(err)) + self.assertIn('unsupported whence value', str(err)) + + try: + with eclient.open_file(self.bucket_name, key) as f: + f.seek('123', 3) + self.fail('should not here') + except oss.PathError as err: + self.assertIn('seek', str(err)) + self.assertIn('is not an integer', str(err)) + + # call after close + rf: oss.ReadOnlyFile = None + with eclient.open_file(self.bucket_name, key) as f: + self.assertIsNotNone(f) + rf = f + + try: + rf.read() + except oss.PathError as err: + self.assertIn('read', str(err)) + self.assertIn('I/O operation on closed file.', str(err)) + + try: + rf.readall() + except oss.PathError as err: + self.assertIn('read', str(err)) + self.assertIn('I/O operation on closed file.', str(err)) + + try: + rf.readinto(bytearray(123)) + except oss.PathError as err: + self.assertIn('read', str(err)) + self.assertIn('I/O operation on closed file.', str(err)) + + try: + rf.seek(0, os.SEEK_CUR) + except oss.PathError as err: + self.assertIn('seek', str(err)) + self.assertIn('I/O operation on closed file.', str(err)) + + try: + rf.tell() + except oss.PathError as err: + self.assertIn('tell', str(err)) + self.assertIn('I/O operation on closed file.', str(err)) + + try: + rf.readable() + except oss.PathError as err: + self.assertIn('readable', str(err)) + self.assertIn('I/O operation on closed file.', str(err)) + + try: + rf.seekable() + except oss.PathError as err: + self.assertIn('seekable', str(err)) + self.assertIn('I/O operation on closed file.', str(err)) + + try: + with rf as f: + pass + except oss.PathError as err: + self.assertIn('enter', str(err)) + self.assertIn('I/O operation on closed file.', str(err)) + + def tes_eclient_open_file_resume_read(self): + mc = oss.crypto.MasterRsaCipher( + mat_desc={"tag": "value"}, + public_key=RSA_PUBLIC_KEY, + private_key=RSA_PRIVATE_KEY + ) + eclient = oss.EncryptionClient(self.client, mc) + + key = 'resume_read-' + random_str(6) + data = random_str(200 * 1024 + 1234).encode() + + result = eclient.put_object(oss.PutObjectRequest( + bucket=self.bucket_name, + key=key, + body=data + )) + self.assertEqual(200, result.status_code) + self.assertEqual('OK', result.status) + + with eclient.open_file(self.bucket_name, key) as f: + b1 = f.read(1234) + # wait stream close + f._stream_iter = None + # time.sleep(120) + b2 = f.read() + self.assertEqual(data, b1 + b2) + + def test_eclient_open_file_source_changed(self): + mc = oss.crypto.MasterRsaCipher( + mat_desc={"tag": "value"}, + public_key=RSA_PUBLIC_KEY, + private_key=RSA_PRIVATE_KEY + ) + eclient = oss.EncryptionClient(self.client, mc) + + key = 'source_changed-' + random_str(6) + data1 = random_str(200 * 1024 + 1234).encode() + data2 = random_str(201 * 1024 + 1234).encode() + result = eclient.put_object(oss.PutObjectRequest( + bucket=self.bucket_name, + key=key, + body=data1 + )) + self.assertEqual(200, result.status_code) + self.assertEqual('OK', result.status) + + with eclient.open_file(self.bucket_name, key) as f: + b1 = f.read(1234) + self.assertEqual(data1[0:len(b1)], b1) + + # change file + result = eclient.put_object(oss.PutObjectRequest( + bucket=self.bucket_name, + key=key, + body=data2 + )) + self.assertEqual(200, result.status_code) + self.assertEqual('OK', result.status) + + try: + f.seek(0) + f.readall() + except oss.PathError as err: + self.assertIn('get_object', str(err)) + self.assertIn('Source file is changed, origin info', str(err)) + + def test_eclient_open_file_prefetch_read(self): + mc = oss.crypto.MasterRsaCipher( + mat_desc={"tag": "value"}, + public_key=RSA_PUBLIC_KEY, + private_key=RSA_PRIVATE_KEY + ) + eclient = oss.EncryptionClient(self.client, mc) + + key = 'prefetch-' + random_str(6) + data_len = 11 * 200 * 1024 + 1234 + data = random_str(data_len).encode() + result = eclient.put_object(oss.PutObjectRequest( + bucket=self.bucket_name, + key=key, + body=data + )) + self.assertEqual(200, result.status_code) + self.assertEqual('OK', result.status) + + rf: oss.ReadOnlyFile = None + with eclient.open_file( + self.bucket_name, key, + enable_prefetch=True, + prefetch_num=3, + chunk_size=2 * 200 * 1024, + prefetch_threshold=0) as f: + + rf = f + self.assertEqual(True, f._enable_prefetch) + self.assertEqual(3, f._prefetch_num) + self.assertEqual(2 * 200 * 1024, f._chunk_size) + self.assertEqual(0, f._prefetch_threshold) + + # size + start = f.seek(0, os.SEEK_SET) + end = f.seek(0, os.SEEK_END) + self.assertEqual(data_len, end - start) + + # print('\nreadall') + # readall + f.seek(0, os.SEEK_SET) + b = f.readall() + self.assertEqual(data, b) + self.assertIsNone(f._stream_reader) + self.assertIsNone(f._stream_iter) + self.assertIsNotNone(f._generator) + self.assertIsNotNone(f._executor) + self.assertEqual(3, f._executor._max_workers) + + b = f.readall() + self.assertEqual(b'', b) + + # print('seek readN') + # seek readN + for _ in range(0, 64): + offset = random.randint(0, data_len // 5) + n = random.randint(0, data_len // 4) + 3 * 200 * 1024 + begin = f.seek(offset, os.SEEK_SET) + f._num_ooo_read = 0 + self.assertEqual(offset, begin) + # print(f'seek readN {offset} {n}') + b = f.read(n) + self.assertEqual(n, len(b)) + self.assertEqual(data[offset:offset + n], b) + if n % f._chunk_size > 0: + self.assertGreater(len(f._prefetch_readers), 1) + self.assertIsNone(f._stream_reader) + self.assertIsNone(f._stream_iter) + self.assertIsNotNone(f._generator) + + # print('seek read from offset to end') + # seek read from offset to end + for _ in range(0, 64): + offset = random.randint(0, data_len // 5) + begin = f.seek(offset, os.SEEK_SET) + f._num_ooo_read = 0 + self.assertEqual(offset, begin) + b = f.read() + self.assertEqual(data_len - offset, len(b)) + self.assertEqual(data[offset:], b) + self.assertEqual(0, len(f._prefetch_readers)) + self.assertIsNone(f._stream_reader) + self.assertIsNone(f._stream_iter) + self.assertIsNotNone(f._generator) + + # print('seek readInto N') + # seek readInto N + for _ in range(0, 64): + offset = random.randint(0, data_len // 5) + n = random.randint(0, data_len // 4) + 3 * 200 * 1024 + begin = f.seek(offset, os.SEEK_SET) + f._num_ooo_read = 0 + self.assertEqual(offset, begin) + blob = memoryview(bytearray(n)) + got = f.readinto(blob) + self.assertEqual(n, got) + self.assertEqual(data[offset:offset + n], blob[0:]) + if n % f._chunk_size > 0: + self.assertGreater(len(f._prefetch_readers), 1) + self.assertIsNotNone(f._generator) + self.assertIsNone(f._stream_reader) + self.assertIsNone(f._stream_iter) + + # print('seek readInto from offset to end') + # seek readInto from offset to end + bloball = memoryview(bytearray(data_len)) + for _ in range(0, 64): + offset = random.randint(0, data_len // 5) + begin = f.seek(offset, os.SEEK_SET) + f._num_ooo_read = 0 + self.assertEqual(offset, begin) + got = f.readinto(bloball) + self.assertEqual(data_len - offset, got) + self.assertEqual(data[offset:], bloball[0:got]) + self.assertEqual(0, len(f._prefetch_readers)) + self.assertIsNone(f._stream_reader) + self.assertIsNone(f._stream_iter) + self.assertIsNotNone(f._generator) + + self.assertEqual(None, rf._read_buf) + self.assertEqual(None, rf._stream_reader) + self.assertEqual(None, rf._prefetch_readers) + self.assertEqual(None, rf._generator) + self.assertEqual(None, rf._executor) + + # call close many times + rf.close() + rf.close() + + def test_eclient_open_file_mix_read(self): + mc = oss.crypto.MasterRsaCipher( + mat_desc={"tag": "value"}, + public_key=RSA_PUBLIC_KEY, + private_key=RSA_PRIVATE_KEY + ) + eclient = oss.EncryptionClient(self.client, mc) + + key = 'mix-' + random_str(6) + data_len = 11 * 200 * 1024 + 12345 + data = random_str(data_len).encode() + result = eclient.put_object(oss.PutObjectRequest( + bucket=self.bucket_name, + key=key, + body=data + )) + self.assertEqual(200, result.status_code) + self.assertEqual('OK', result.status) + + rf: oss.ReadOnlyFile = None + with eclient.open_file( + self.bucket_name, key, + enable_prefetch=True, + prefetch_num=3, + chunk_size=1 * 200 * 1024, + prefetch_threshold=5 * 200 * 1024) as f: + rf = f + self.assertEqual(True, f._enable_prefetch) + self.assertEqual(3, f._prefetch_num) + self.assertEqual(1 * 200 * 1024, f._chunk_size) + self.assertEqual(5 * 200 * 1024, f._prefetch_threshold) + + # read some + some1 = 3 * 100 * 1024 + 123 + f.seek(0, os.SEEK_SET) + b1 = f.read(some1) + self.assertEqual(data[0:some1], b1) + self.assertIsNotNone(f._stream_reader) + self.assertIsNotNone(f._stream_iter) + self.assertIsNone(f._generator) + self.assertIsNone(f._executor) + + # read some + some2 = 8 * 100 * 1024 + 123 + self.assertGreater(some1 + some2, f._prefetch_threshold) + b2 = f.read(some2) + self.assertEqual(data[some1:some1 + some2], b2) + self.assertIsNone(f._stream_reader) + self.assertIsNone(f._stream_iter) + self.assertIsNotNone(f._generator) + self.assertIsNotNone(f._executor) + self.assertEqual(3, f._executor._max_workers) + + # read last + b3 = f.readall() + self.assertEqual(data, b1 + b2 + b3) + + self.assertEqual(None, rf._read_buf) + self.assertEqual(None, rf._stream_reader) + self.assertEqual(None, rf._prefetch_readers) + self.assertEqual(None, rf._generator) + self.assertEqual(None, rf._executor) + + # seq read, seek, read all + with eclient.open_file( + self.bucket_name, key, + enable_prefetch=True, + prefetch_num=3, + chunk_size=1 * 200 * 1024, + prefetch_threshold=5 * 200 * 1024) as f: + self.assertEqual(True, f._enable_prefetch) + self.assertEqual(3, f._prefetch_num) + self.assertEqual(1 * 200 * 1024, f._chunk_size) + self.assertEqual(5 * 200 * 1024, f._prefetch_threshold) + + # read some + off1 = 1 + some1 = 3 * 100 * 1024 + 123 + f.seek(off1, os.SEEK_SET) + b1 = f.read(some1) + self.assertEqual(data[off1:off1 + some1], b1) + self.assertIsNotNone(f._stream_reader) + self.assertIsNotNone(f._stream_iter) + self.assertIsNone(f._generator) + self.assertIsNone(f._executor) + + # read some + off2 = 100 + some2 = 15 * 100 * 1024 + 123 + self.assertGreater(some2, f._prefetch_threshold) + f.seek(off2, os.SEEK_SET) + b2 = f.read(some2) + self.assertEqual(data[off2:off2 + some2], b2) + self.assertIsNone(f._stream_reader) + self.assertIsNone(f._stream_iter) + self.assertIsNotNone(f._generator) + self.assertIsNotNone(f._executor) + self.assertEqual(3, f._executor._max_workers) + + def test_eclient_open_file_prefetch_source_changed(self): + mc = oss.crypto.MasterRsaCipher( + mat_desc={"tag": "value"}, + public_key=RSA_PUBLIC_KEY, + private_key=RSA_PRIVATE_KEY + ) + eclient = oss.EncryptionClient(self.client, mc) + + key = 'prefetch_source_changed-' + random_str(6) + data1 = random_str(11 * 200 * 1024 + 12345).encode() + data2 = random_str(11 * 200 * 1024 + 12345).encode() + result = eclient.put_object(oss.PutObjectRequest( + bucket=self.bucket_name, + key=key, + body=data1 + )) + self.assertEqual(200, result.status_code) + self.assertEqual('OK', result.status) + + with eclient.open_file( + self.bucket_name, key, + enable_prefetch=True, + prefetch_num=3, + chunk_size=2 * 200 * 1024, + prefetch_threshold=0) as f: + + len1 = 3 * 200 * 1024 + 12 + b1 = f.read(len1) + self.assertIsNone(f._stream_reader) + self.assertIsNone(f._stream_iter) + self.assertIsNotNone(f._generator) + self.assertIsNotNone(f._executor) + self.assertEqual(data1[0:len1], b1) + + # change file + result = eclient.put_object(oss.PutObjectRequest( + bucket=self.bucket_name, + key=key, + body=data2 + )) + self.assertEqual(200, result.status_code) + self.assertEqual('OK', result.status) + + # read data saved in the buffer + len2 = 1 * 200 * 1024 + b2 = f.read(len2) + self.assertEqual(data1[len1:len1 + len2], b2) + + # read remains + try: + f.readall() + except oss.PathError as err: + self.assertIn('get_object', str(err)) + self.assertIn('Source file is changed, origin info', str(err)) + + def test_eclient_open_file_mix_read2(self): + mc = oss.crypto.MasterRsaCipher( + mat_desc={"tag": "value"}, + public_key=RSA_PUBLIC_KEY, + private_key=RSA_PRIVATE_KEY + ) + eclient = oss.EncryptionClient(self.client, mc) + + key = 'prefetch-' + random_str(6) + data_len = 6 * 100 * 1024 + 1234 + data = random_str(data_len).encode() + result = eclient.put_object(oss.PutObjectRequest( + bucket=self.bucket_name, + key=key, + body=data + )) + self.assertEqual(200, result.status_code) + self.assertEqual('OK', result.status) + + with eclient.open_file( + self.bucket_name, key, + enable_prefetch=True, + prefetch_num=2, + chunk_size=1 * 200 * 1024, + prefetch_threshold=1 * 200 * 1024) as f: + self.assertEqual(True, f._enable_prefetch) + self.assertEqual(2, f._prefetch_num) + self.assertEqual(1 * 200 * 1024, f._chunk_size) + self.assertEqual(1 * 200 * 1024, f._prefetch_threshold) + + len1 = 12345 + b1 = f.read(len1) + self.assertEqual(data[0:len1], b1) + self.assertIsNotNone(f._stream_reader) + self.assertIsNotNone(f._stream_iter) + self.assertIsNone(f._generator) + self.assertIsNone(f._executor) + + len2 = 1 * 200 * 1024 + b2 = f.read(len2) + self.assertEqual(data[len1:len1 + len2], b2) + self.assertIsNone(f._stream_reader) + self.assertIsNone(f._stream_iter) + self.assertIsNotNone(f._generator) + self.assertIsNotNone(f._executor) + + # set reader fail + f._prefetch_readers[0]._failed = True + len3 = 1 * 100 * 1024 + b3 = f.read(len3) + self.assertEqual(data[:len1 + len2 + len3], b1 + b2 + b3) + self.assertIsNotNone(f._stream_reader) + self.assertIsNotNone(f._stream_iter) + self.assertIsNone(f._generator) + self.assertIsNotNone(f._executor) \ No newline at end of file