|
| 1 | +from Crypto.Hash import MD5 |
| 2 | +import base64 |
| 3 | +import httplib2 |
| 4 | +import tempfile |
| 5 | +import time |
| 6 | +import unittest2 |
| 7 | + |
| 8 | +from gcloud import storage |
| 9 | +# This assumes the command is being run via tox hence the |
| 10 | +# repository root is the current directory. |
| 11 | +from regression import regression_utils |
| 12 | + |
| 13 | + |
| 14 | +HTTP = httplib2.Http() |
| 15 | +SHARED_BUCKETS = {} |
| 16 | + |
| 17 | + |
| 18 | +def setUpModule(): |
| 19 | + if 'test_bucket' not in SHARED_BUCKETS: |
| 20 | + connection = regression_utils.get_storage_connection() |
| 21 | + # %d rounds milliseconds to nearest integer. |
| 22 | + bucket_name = 'new%d' % (1000 * time.time(),) |
| 23 | + # In the **very** rare case the bucket name is reserved, this |
| 24 | + # fails with a ConnectionError. |
| 25 | + SHARED_BUCKETS['test_bucket'] = connection.create_bucket(bucket_name) |
| 26 | + |
| 27 | + |
| 28 | +def tearDownModule(): |
| 29 | + for bucket in SHARED_BUCKETS.values(): |
| 30 | + # Passing force=True also deletes all files. |
| 31 | + bucket.delete(force=True) |
| 32 | + |
| 33 | + |
| 34 | +class TestStorage(unittest2.TestCase): |
| 35 | + |
| 36 | + @classmethod |
| 37 | + def setUpClass(cls): |
| 38 | + cls.connection = regression_utils.get_storage_connection() |
| 39 | + |
| 40 | + |
| 41 | +class TestStorageBuckets(TestStorage): |
| 42 | + |
| 43 | + def setUp(self): |
| 44 | + self.case_buckets_to_delete = [] |
| 45 | + |
| 46 | + def tearDown(self): |
| 47 | + for bucket in self.case_buckets_to_delete: |
| 48 | + bucket.delete() |
| 49 | + |
| 50 | + def test_create_bucket(self): |
| 51 | + new_bucket_name = 'a-new-bucket' |
| 52 | + self.assertRaises(storage.exceptions.NotFoundError, |
| 53 | + self.connection.get_bucket, new_bucket_name) |
| 54 | + created = self.connection.create_bucket(new_bucket_name) |
| 55 | + self.case_buckets_to_delete.append(created) |
| 56 | + self.assertEqual(created.name, new_bucket_name) |
| 57 | + |
| 58 | + def test_get_buckets(self): |
| 59 | + buckets_to_create = [ |
| 60 | + 'new%d' % (1000 * time.time(),), |
| 61 | + 'newer%d' % (1000 * time.time(),), |
| 62 | + 'newest%d' % (1000 * time.time(),), |
| 63 | + ] |
| 64 | + created_buckets = [] |
| 65 | + for bucket_name in buckets_to_create: |
| 66 | + bucket = self.connection.create_bucket(bucket_name) |
| 67 | + self.case_buckets_to_delete.append(bucket) |
| 68 | + |
| 69 | + # Retrieve the buckets. |
| 70 | + all_buckets = self.connection.get_all_buckets() |
| 71 | + created_buckets = [bucket for bucket in all_buckets |
| 72 | + if bucket.name in buckets_to_create] |
| 73 | + self.assertEqual(len(created_buckets), len(buckets_to_create)) |
| 74 | + |
| 75 | + |
| 76 | +class TestStorageFiles(TestStorage): |
| 77 | + |
| 78 | + FILES = { |
| 79 | + 'logo': { |
| 80 | + 'path': 'regression/data/CloudPlatform_128px_Retina.png', |
| 81 | + }, |
| 82 | + 'big': { |
| 83 | + 'path': 'regression/data/five-mb-file.zip', |
| 84 | + }, |
| 85 | + } |
| 86 | + |
| 87 | + @staticmethod |
| 88 | + def _get_base64_md5hash(filename): |
| 89 | + with open(filename, 'rb') as file_obj: |
| 90 | + hash = MD5.new(data=file_obj.read()) |
| 91 | + digest_bytes = hash.digest() |
| 92 | + return base64.b64encode(digest_bytes) |
| 93 | + |
| 94 | + @classmethod |
| 95 | + def setUpClass(cls): |
| 96 | + super(TestStorageFiles, cls).setUpClass() |
| 97 | + for file_data in cls.FILES.values(): |
| 98 | + file_data['hash'] = cls._get_base64_md5hash(file_data['path']) |
| 99 | + cls.bucket = SHARED_BUCKETS['test_bucket'] |
| 100 | + |
| 101 | + def setUp(self): |
| 102 | + self.case_keys_to_delete = [] |
| 103 | + |
| 104 | + def tearDown(self): |
| 105 | + for key in self.case_keys_to_delete: |
| 106 | + key.delete() |
| 107 | + |
| 108 | + |
| 109 | +class TestStorageWriteFiles(TestStorageFiles): |
| 110 | + |
| 111 | + def test_large_file_write_from_stream(self): |
| 112 | + key = self.bucket.new_key('LargeFile') |
| 113 | + self.assertEqual(key.metadata, {}) |
| 114 | + |
| 115 | + file_data = self.FILES['big'] |
| 116 | + with open(file_data['path'], 'rb') as file_obj: |
| 117 | + self.bucket.upload_file_object(file_obj, key=key) |
| 118 | + self.case_keys_to_delete.append(key) |
| 119 | + |
| 120 | + key.reload_metadata() |
| 121 | + self.assertEqual(key.metadata['md5Hash'], file_data['hash']) |
| 122 | + |
| 123 | + def test_write_metadata(self): |
| 124 | + my_metadata = {'contentType': 'image/png'} |
| 125 | + key = self.bucket.upload_file(self.FILES['logo']['path']) |
| 126 | + self.case_keys_to_delete.append(key) |
| 127 | + |
| 128 | + # NOTE: This should not be necessary. We should be able to pass |
| 129 | + # it in to upload_file and also to upload_from_string. |
| 130 | + key.patch_metadata(my_metadata) |
| 131 | + self.assertEqual(key.metadata['contentType'], |
| 132 | + my_metadata['contentType']) |
| 133 | + |
| 134 | + def test_direct_write_and_read_into_file(self): |
| 135 | + key = self.bucket.new_key('MyBuffer') |
| 136 | + file_contents = 'Hello World' |
| 137 | + key.upload_from_string(file_contents) |
| 138 | + self.case_keys_to_delete.append(key) |
| 139 | + |
| 140 | + same_key = self.bucket.new_key('MyBuffer') |
| 141 | + temp_filename = tempfile.mktemp() |
| 142 | + with open(temp_filename, 'w') as file_obj: |
| 143 | + same_key.get_contents_to_file(file_obj) |
| 144 | + |
| 145 | + with open(temp_filename, 'rb') as file_obj: |
| 146 | + stored_contents = file_obj.read() |
| 147 | + |
| 148 | + self.assertEqual(file_contents, stored_contents) |
| 149 | + |
| 150 | + def test_copy_existing_file(self): |
| 151 | + key = self.bucket.upload_file(self.FILES['logo']['path'], |
| 152 | + key='CloudLogo') |
| 153 | + self.case_keys_to_delete.append(key) |
| 154 | + |
| 155 | + new_key = self.bucket.copy_key(key, self.bucket, 'CloudLogoCopy') |
| 156 | + self.case_keys_to_delete.append(new_key) |
| 157 | + |
| 158 | + base_contents = key.get_contents_as_string() |
| 159 | + copied_contents = new_key.get_contents_as_string() |
| 160 | + self.assertEqual(base_contents, copied_contents) |
| 161 | + |
| 162 | + |
| 163 | +class TestStorageListFiles(TestStorageFiles): |
| 164 | + |
| 165 | + FILENAMES = ['CloudLogo1', 'CloudLogo2', 'CloudLogo3'] |
| 166 | + |
| 167 | + @classmethod |
| 168 | + def setUpClass(cls): |
| 169 | + super(TestStorageListFiles, cls).setUpClass() |
| 170 | + # Make sure bucket empty before beginning. |
| 171 | + for key in cls.bucket: |
| 172 | + key.delete() |
| 173 | + |
| 174 | + logo_path = cls.FILES['logo']['path'] |
| 175 | + key = cls.bucket.upload_file(logo_path, key=cls.FILENAMES[0]) |
| 176 | + cls.suite_keys_to_delete = [key] |
| 177 | + |
| 178 | + # Copy main key onto remaining in FILENAMES. |
| 179 | + for filename in cls.FILENAMES[1:]: |
| 180 | + new_key = cls.bucket.copy_key(key, cls.bucket, filename) |
| 181 | + cls.suite_keys_to_delete.append(new_key) |
| 182 | + |
| 183 | + @classmethod |
| 184 | + def tearDownClass(cls): |
| 185 | + for key in cls.suite_keys_to_delete: |
| 186 | + key.delete() |
| 187 | + |
| 188 | + def test_list_files(self): |
| 189 | + all_keys = self.bucket.get_all_keys() |
| 190 | + self.assertEqual(len(all_keys), len(self.FILENAMES)) |
| 191 | + |
| 192 | + def test_paginate_files(self): |
| 193 | + truncation_size = 1 |
| 194 | + extra_params = {'maxResults': len(self.FILENAMES) - truncation_size} |
| 195 | + iterator = storage.key._KeyIterator(bucket=self.bucket, |
| 196 | + extra_params=extra_params) |
| 197 | + response = iterator.get_next_page_response() |
| 198 | + keys = list(iterator.get_items_from_response(response)) |
| 199 | + self.assertEqual(len(keys), extra_params['maxResults']) |
| 200 | + self.assertEqual(iterator.page_number, 1) |
| 201 | + self.assertTrue(iterator.next_page_token is not None) |
| 202 | + |
| 203 | + response = iterator.get_next_page_response() |
| 204 | + last_keys = list(iterator.get_items_from_response(response)) |
| 205 | + self.assertEqual(len(last_keys), truncation_size) |
| 206 | + |
| 207 | + |
| 208 | +class TestStorageSignURLs(TestStorageFiles): |
| 209 | + |
| 210 | + def setUp(self): |
| 211 | + super(TestStorageSignURLs, self).setUp() |
| 212 | + |
| 213 | + logo_path = self.FILES['logo']['path'] |
| 214 | + with open(logo_path, 'r') as file_obj: |
| 215 | + self.LOCAL_FILE = file_obj.read() |
| 216 | + |
| 217 | + key = self.bucket.new_key('LogoToSign.jpg') |
| 218 | + key.upload_from_string(self.LOCAL_FILE) |
| 219 | + self.case_keys_to_delete.append(key) |
| 220 | + |
| 221 | + def tearDown(self): |
| 222 | + for key in self.case_keys_to_delete: |
| 223 | + if key.exists(): |
| 224 | + key.delete() |
| 225 | + |
| 226 | + def test_create_signed_read_url(self): |
| 227 | + key = self.bucket.new_key('LogoToSign.jpg') |
| 228 | + expiration = int(time.time() + 5) |
| 229 | + signed_url = key.generate_signed_url(expiration, method='GET') |
| 230 | + |
| 231 | + response, content = HTTP.request(signed_url, method='GET') |
| 232 | + self.assertEqual(response.status, 200) |
| 233 | + self.assertEqual(content, self.LOCAL_FILE) |
| 234 | + |
| 235 | + def test_create_signed_delete_url(self): |
| 236 | + key = self.bucket.new_key('LogoToSign.jpg') |
| 237 | + expiration = int(time.time() + 283473274) |
| 238 | + signed_delete_url = key.generate_signed_url(expiration, |
| 239 | + method='DELETE') |
| 240 | + |
| 241 | + response, content = HTTP.request(signed_delete_url, method='DELETE') |
| 242 | + self.assertEqual(response.status, 204) |
| 243 | + self.assertEqual(content, '') |
| 244 | + |
| 245 | + # Check that the key has actually been deleted. |
| 246 | + self.assertRaises(storage.exceptions.NotFoundError, |
| 247 | + key.reload_metadata) |
0 commit comments