Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
324 changes: 322 additions & 2 deletions qcloud_cos/cos_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3464,11 +3464,54 @@ def delete_bucket_referer(self, Bucket, **kwargs):
params=params)
return None

def put_bucket_intelligenttiering_v2(self, Bucket, IntelligentTieringConfiguration=None, Id=None, **kwargs):
"""设置存储桶智能分层配置

:param Bucket(string): 存储桶名称.
:param IntelligentTieringConfiguration(dict): 智能分层配置
:param kwargs(dict): 设置请求headers.
:return: None.

.. code-block:: python

config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象
client = CosS3Client(config)

intelligent_tiering_conf = {
'Status': 'Enable',
'Transition': {
'Days': '30|60|90',
'RequestFrequent': '1'
}
}
client.put_bucket_intelligenttiering(Bucket="bucket", IntelligentTieringConfiguration=intelligent_tiering_conf)
"""

if IntelligentTieringConfiguration is None:
IntelligentTieringConfiguration = {}
xml_config = format_xml(data=IntelligentTieringConfiguration, root='IntelligentTieringConfiguration')
headers = mapped(kwargs)
headers['Content-Type'] = 'application/xml'
params = {'id': Id}
url = self._conf.uri(bucket=Bucket) + '?intelligent-tiering'
logger.info("put bucket intelligenttiering, url=:{url} ,headers=:{headers}".format(
url=url,
headers=headers))
rt = self.send_request(
method='PUT',
url=url,
bucket=Bucket,
data=xml_config,
auth=CosS3Auth(self._conf, params=params),
headers=headers,
params=params)
return None

def put_bucket_intelligenttiering(self, Bucket, IntelligentTieringConfiguration=None, **kwargs):
"""设置存储桶智能分层配置

:param Bucket(string): 存储桶名称.
:param IntelligentTieringConfiguration(dict): 只能分层配置
:param IntelligentTieringConfiguration(dict): 智能分层配置
:param kwargs(dict): 设置请求headers.
:return: None.

Expand Down Expand Up @@ -3510,7 +3553,6 @@ def put_bucket_intelligenttiering(self, Bucket, IntelligentTieringConfiguration=
def get_bucket_intelligenttiering(self, Bucket, **kwargs):
"""获取存储桶智能分层配置
:param Bucket(string): 存储桶名称.
:param IntelligentTieringConfiguration(dict): 智能分层配置
:param kwargs(dict): 设置请求headers.
:return(dict): 智能分层配置.

Expand All @@ -3536,6 +3578,284 @@ def get_bucket_intelligenttiering(self, Bucket, **kwargs):
data = xml_to_dict(rt.content)
return data

def get_bucket_intelligenttiering_v2(self, Bucket, Id, **kwargs):
"""获取存储桶智能分层配置

:param Bucket(string): 存储桶名称.
:param Id(string) 智能分层规则Id.
:param kwargs(dict): 设置请求headers.
:return(dict): 智能分层配置.

.. code-block:: python
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象
client = CosS3Client(config)
client.get_bucket_intelligenttiering_v2(Bucket='bucket', Id='id')
"""

headers = mapped(kwargs)
params = {'id': Id}
url = self._conf.uri(bucket=Bucket) + '?intelligent-tiering'
logger.info("get bucket intelligenttiering, url=:{url} ,headers=:{headers}".format(
url=url,
headers=headers))
rt = self.send_request(
method='GET',
url=url,
bucket=Bucket,
auth=CosS3Auth(self._conf, params=params),
headers=headers,
params=params)
data = xml_to_dict(rt.content)
return data

def list_bucket_intelligenttiering_configurations(self, Bucket, **kwargs):
"""列举存储桶中的所有智能分层配置

:param Bucket(string): 存储桶名称.
:param kwargs(dict): 设置请求headers.
:return(dict): 所有的智能分层配置.

.. code-block:: python
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象
client = CosS3Client(config)
client.list_bucket_intelligenttiering_configurations(Bucket='bucket')
"""

headers = mapped(kwargs)
params = {}
url = self._conf.uri(bucket=Bucket) + "?intelligent-tiering"
logger.info("list bucket intelligenttiering configurations, url=:{url} ,headers=:{headers}".format(
url=url,
headers=headers))
rt = self.send_request(
method='GET',
url=url,
bucket=Bucket,
auth=CosS3Auth(self._conf, params=params),
headers=headers,
params=params)
data = xml_to_dict(rt.content)
return data

def put_bucket_object_lock(self, Bucket, ObjectLockConfiguration={}, **kwargs):
"""设置存储桶对象锁定配置

:param Bucket(string): 存储桶名称.
:param ObjectLockConfiguration(dict): 对象锁定配置.
:param kwargs(dict): 设置请求headers.
:return: None.

.. code-block:: python

config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象
client = CosS3Client(config)

object_lock_conf = {
'ObjectLockEnabled': 'Enabled',
}
client.put_bucket_object_lock(Bucket="bucket", ObjectLockConfiguration=objeck_lock_conf)
"""

xml_config = format_xml(data=ObjectLockConfiguration, root='ObjectLockConfiguration')
headers = mapped(kwargs)
headers['Content-MD5'] = get_md5(xml_config)
headers['Content-Type'] = 'application/xml'
params = {'object-lock': ''}
url = self._conf.uri(bucket=Bucket)
logger.info("put bucket object-lock, url=:{url} ,headers=:{headers}".format(
url=url,
headers=headers))
rt = self.send_request(
method='PUT',
url=url,
bucket=Bucket,
data=xml_config,
auth=CosS3Auth(self._conf, params=params),
headers=headers,
params=params)
return rt.headers

def get_bucket_object_lock(self, Bucket, **kwargs):
"""获取存储桶对象锁定配置

:param Bucket(string): 存储桶名称.
:param kwargs(dict): 设置请求headers.
:return(dict): 对象锁定配置.

.. code-block:: python

config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象
client = CosS3Client(config)
client.get_bucket_object_lock(Bucket="bucket")
"""
headers = mapped(kwargs)
params = {'object-lock': ''}
url = self._conf.uri(bucket=Bucket)
logger.info("get bucket object-lock, url=:{url} ,headers=:{headers}".format(
url=url,
headers=headers))
rt = self.send_request(
method='GET',
url=url,
bucket=Bucket,
auth=CosS3Auth(self._conf, params=params),
headers=headers,
params=params)
data = xml_to_dict(rt.content)
return data

def get_bucket_meta(self, Bucket, **kwargs):
"""获取存储桶各项配置

:param Bucket(string): 存储桶名称.
:param kwargs(dict): 设置请求headers.
:return(dict): 存储桶各项配置.

.. code-block:: python

config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象
client = CosS3Client(config)
client.get_bucket_meta(Bucket="bucket")
"""
data = {
'BucketUrl': None,
'OFS': False,
'MAZ': False,
'Encryption': None,
'ACL': None,
'Website': None,
'Logging': None,
'CORS': None,
'Versioning': None,
'IntelligentTiering': None,
'Lifecycle': None,
'Tagging': None,
'ObjectLock': None,
'Replication': None,
}
pool = SimpleThreadPool(num_threads=10)

# HeadBucket
def _head_bucket_wrapper(Bucket, **kwargs):
resp = self.head_bucket(Bucket, **kwargs)
# x-cos-bucket-arch: 'OFS'
# x-cos-bucket-az-type: 'MAZ'
# x-cos-bucket-region: 'ap-beijing'
if 'x-cos-bucket-arch' in resp and resp['x-cos-bucket-arch'] == 'OFS':
data.update({'OFS': True})
else:
data.update({'OFS': False})
if 'x-cos-bucket-az-type' in resp and resp['x-cos-bucket-az-type'] == 'MAZ':
data.update({'MAZ': True})
else:
data.update({'MAZ': False})
data.update({"Location": resp['x-cos-bucket-region']})
url = self._conf.uri(bucket=Bucket).strip('/')
data.update({'BucketUrl': url})
pool.add_task(_head_bucket_wrapper, Bucket, **kwargs)

# Website
def _get_bucket_website_wrapper(Bucket, **kwargs):
try:
resp = self.get_bucket_website(Bucket, **kwargs)
data.update({'Website': resp})
except CosServiceError as e:
logger.debug("get_bucket_meta failed to get website conf:{}".format(e))
pool.add_task(_get_bucket_website_wrapper, Bucket, **kwargs)

# ObjectLock
def _get_bucket_object_lock_wrapper(Bucket, **kwargs):
try:
resp = self.get_bucket_object_lock(Bucket, **kwargs)
data.update({'ObjectLock': resp})
except CosServiceError as e:
logger.debug("get_bucket_meta failed to get object_lock conf:{}".format(e))
pool.add_task(_get_bucket_object_lock_wrapper, Bucket, **kwargs)

# ACL
def _get_bucket_acl_wrapper(Bucket, **kwargs):
try:
resp = self.get_bucket_acl(Bucket, **kwargs)
data.update({'ACL': resp})
except CosServiceError as e:
logger.debug("get_bucket_meta failed to get acl conf:{}".format(e))
pool.add_task(_get_bucket_acl_wrapper, Bucket, **kwargs)

# Logging
def _get_bucket_logging_wrapper(Bucket, **kwargs):
try:
resp = self.get_bucket_logging(Bucket, **kwargs)
data.update({'Logging': resp})
except CosServiceError as e:
logger.debug("get_bucket_meta failed to get logging conf:{}".format(e))
pool.add_task(_get_bucket_logging_wrapper, Bucket, **kwargs)

# Lifecycle
def _get_bucket_lifecycle_wrapper(Bucket, **kwargs):
try:
resp = self.get_bucket_lifecycle(Bucket, **kwargs)
data.update({'Lifecycle': resp})
except CosServiceError as e:
logger.debug("get_bucket_meta failed to get lifecycle conf:{}".format(e))
pool.add_task(_get_bucket_lifecycle_wrapper, Bucket, **kwargs)

# Replication
def _get_bucket_replication_wrapper(Bucket, **kwargs):
try:
resp = self.get_bucket_replication(Bucket, **kwargs)
data.update({'Replication': resp})
except CosServiceError as e:
logger.debug("get_bucket_meta failed to get replication conf:{}".format(e))
pool.add_task(_get_bucket_replication_wrapper, Bucket, **kwargs)

# Encryption
def _get_bucket_encryption_wrapper(Bucket, **kwargs):
try:
resp = self.get_bucket_encryption(Bucket, **kwargs)
data.update({'Encryption': resp})
except CosServiceError as e:
logger.debug("get_bucket_meta failed to get encryption conf:{}".format(e))
pool.add_task(_get_bucket_encryption_wrapper, Bucket, **kwargs)

# CORS
def _get_bucket_cors_wrapper(Bucket, **kwargs):
try:
resp = self.get_bucket_cors(Bucket, **kwargs)
data.update({'CORS': resp})
except CosServiceError as e:
logger.debug("get_bucket_meta failed to get cors conf:{}".format(e))
pool.add_task(_get_bucket_cors_wrapper, Bucket, **kwargs)

# Versioning
def _get_bucket_versioning_wrapper(Bucket, **kwargs):
try:
resp = self.get_bucket_versioning(Bucket, **kwargs)
data.update({'Versioning': resp})
except CosServiceError as e:
logger.debug("get_bucket_meta failed to get versioning conf:{}".format(e))
pool.add_task(_get_bucket_versioning_wrapper, Bucket, **kwargs)

# IntelligentTiering
def _list_bucket_intelligenttiering_conf_wrapper(Bucket, **kwargs):
try:
resp = self.list_bucket_intelligenttiering_configurations(Bucket, **kwargs)
data.update({'IntelligentTiering': resp})
except CosServiceError as e:
logger.debug("get_bucket_meta failed to get intelligenttiering conf:{}".format(e))
pool.add_task(_list_bucket_intelligenttiering_conf_wrapper, Bucket, **kwargs)

# Tagging
def _get_bucket_tagging_wrapper(Bucket, **kwargs):
try:
resp = self.get_bucket_tagging(Bucket, **kwargs)
data.update({'Tagging': resp})
except CosServiceError as e:
logger.debug("get_bucket_meta failed to get tagging conf:{}".format(e))
pool.add_task(_get_bucket_tagging_wrapper, Bucket, **kwargs)

pool.wait_completion()
return data

# service interface begin
def list_buckets(self, TagKey=None, TagValue=None, Region=None, CreateTime=None, Range=None, Marker="", MaxKeys=2000, **kwargs):
"""列出符合条件的bucket
Expand Down
Loading