diff --git a/.pylintrc b/.pylintrc index cb8d8cb..cb369db 100644 --- a/.pylintrc +++ b/.pylintrc @@ -13,6 +13,7 @@ disable= too-few-public-methods, # triggers when inheriting ungrouped-imports, # clashes with isort duplicate-code, # broken, setup.py + wrong-import-order, # isort has precedence [BASIC] diff --git a/tests/lib/boto3_proxy_test.py b/tests/lib/boto3_proxy_test.py index bd3e564..4995d6e 100644 --- a/tests/lib/boto3_proxy_test.py +++ b/tests/lib/boto3_proxy_test.py @@ -2,8 +2,6 @@ from cloudformation_cli_python_lib.boto3_proxy import SessionProxy, _get_boto_session from cloudformation_cli_python_lib.utils import Credentials -import botocore # pylint: disable=C0411 - def test_get_boto_session_returns_proxy(): proxy = _get_boto_session(Credentials("", "", "")) @@ -15,14 +13,6 @@ def test_get_boto_session_returns_none(): assert proxy is None -def test_can_create_boto_client(): - proxy = _get_boto_session(Credentials("", "", "")) - client = proxy.client( - "s3", region_name="us-west-2" - ) # just in case AWS_REGION not set in test environment - assert isinstance(client, botocore.client.BaseClient) - - def test_can_retrieve_boto_session(): proxy = _get_boto_session(Credentials("", "", "")) session = proxy.session diff --git a/tests/lib/cipher_test.py b/tests/lib/cipher_test.py index 926f416..2920150 100644 --- a/tests/lib/cipher_test.py +++ b/tests/lib/cipher_test.py @@ -17,7 +17,7 @@ def test_create_kms_cipher(): with patch( "cloudformation_cli_python_lib.cipher.aws_encryption_sdk.StrictAwsKmsMasterKeyProvider", autospec=True, - ): + ), patch("boto3.client", autospec=True): cipher = KmsCipher("encryptionKeyArn", "encryptionKeyRole") assert cipher @@ -35,7 +35,9 @@ def test_decrypt_credentials_success(): autospec=True, ), patch( "cloudformation_cli_python_lib.cipher.aws_encryption_sdk.EncryptionSDKClient.decrypt" - ) as mock_decrypt: + ) as mock_decrypt, patch( + "boto3.client", autospec=True + ): mock_decrypt.return_value = ( b'{"accessKeyId": "IASAYK835GAIFHAHEI23", "secretAccessKey": "66iOGPN5LnpZorcLr8Kh25u8AbjHVllv5poh2O0", "sessionToken": "lameHS2vQOknSHWhdFYTxm2eJc1JMn9YBNI4nV4mXue945KPL6DHfW8EsUQT5zwssYEC1NvYP9yD6Y5s5lKR3chflOHPFsIe6eqg"}', # noqa: B950 Mock(), @@ -56,7 +58,9 @@ def test_decrypt_credentials_fail(): "cloudformation_cli_python_lib.cipher.aws_encryption_sdk.EncryptionSDKClient.decrypt" ) as mock_decrypt, pytest.raises( _EncryptionError - ) as excinfo: + ) as excinfo, patch( + "boto3.client", autospec=True + ): mock_decrypt.side_effect = AWSEncryptionSDKClientError() cipher = KmsCipher("encryptionKeyArn", "encryptionKeyRole") cipher.decrypt_credentials( @@ -73,7 +77,9 @@ def test_decrypt_credentials_returns_null_fail(): "cloudformation_cli_python_lib.cipher.aws_encryption_sdk.EncryptionSDKClient.decrypt" ) as mock_decrypt, pytest.raises( _EncryptionError - ) as excinfo: + ) as excinfo, patch( + "boto3.client", autospec=True + ): mock_decrypt.return_value = ( b"null", Mock(), diff --git a/tests/lib/log_delivery_test.py b/tests/lib/log_delivery_test.py index 002437f..91ab8aa 100644 --- a/tests/lib/log_delivery_test.py +++ b/tests/lib/log_delivery_test.py @@ -4,8 +4,6 @@ from uuid import uuid4 import pytest -from boto3.session import Session -from cloudformation_cli_python_lib.boto3_proxy import SessionProxy from cloudformation_cli_python_lib.log_delivery import ( HookProviderLogHandler, ProviderFilter, @@ -18,6 +16,13 @@ RequestData, ) +import botocore.errorfactory +import botocore.session + +logs_model = botocore.session.get_session().get_service_model("logs") +factory = botocore.errorfactory.ClientExceptionsFactory() +logs_exceptions = factory.create_client_exceptions(logs_model) + @pytest.fixture def mock_logger(): @@ -100,38 +105,36 @@ def mock_handler_set_formatter(): @pytest.fixture -def mock_provider_handler(): +def mock_provider_handler(mock_session): plh = ProviderLogHandler( group="test-group", stream="test-stream", - session=SessionProxy( - Session( - aws_access_key_id="", aws_secret_access_key="", aws_session_token="" - ) - ), + session=mock_session, ) # not mocking the whole client because that replaces generated exception classes to # be replaced with mocks for method in ["create_log_group", "create_log_stream", "put_log_events"]: setattr(plh.client, method, Mock(auto_spec=True)) + + # set exceptions instead of using Mock + plh.client.exceptions = logs_exceptions return plh @pytest.fixture -def mock_hook_provider_handler(): +def mock_hook_provider_handler(mock_session): plh = HookProviderLogHandler( group="test-hook-group", stream="test-hook-stream", - session=SessionProxy( - Session( - aws_access_key_id="", aws_secret_access_key="", aws_session_token="" - ) - ), + session=mock_session, ) # not mocking the whole client because that replaces generated exception classes to # be replaced with mocks for method in ["create_log_group", "create_log_stream", "put_log_events"]: setattr(plh.client, method, Mock(auto_spec=True)) + + # set exceptions instead of using Mock + plh.client.exceptions = logs_exceptions return plh @@ -286,12 +289,11 @@ def test__put_log_event_success(mock_provider_handler, sequence_token): def test__put_log_event_invalid_token(mock_provider_handler): - exc = mock_provider_handler.client.exceptions mock_put = mock_provider_handler.client.put_log_events mock_put.return_value = {"nextSequenceToken": "some-other-seq"} mock_put.side_effect = [ - exc.InvalidSequenceTokenException({}, operation_name="Test"), - exc.DataAlreadyAcceptedException({}, operation_name="Test"), + logs_exceptions.InvalidSequenceTokenException({}, operation_name="Test"), + logs_exceptions.DataAlreadyAcceptedException({}, operation_name="Test"), DEFAULT, ] mock_provider_handler._put_log_event( @@ -309,8 +311,7 @@ def test_emit_existing_cwl_group_stream(mock_provider_handler): def test_emit_no_group_stream(mock_provider_handler): - exc = mock_provider_handler.client.exceptions.ResourceNotFoundException - group_exc = exc( + group_exc = logs_exceptions.ResourceNotFoundException( {"Error": {"Message": "log group does not exist"}}, operation_name="PutLogRecords", ) @@ -326,7 +327,7 @@ def test_emit_no_group_stream(mock_provider_handler): mock_provider_handler._create_log_stream.assert_called_once() # create_group should not be called again if the group already exists - stream_exc = exc( + stream_exc = logs_exceptions.ResourceNotFoundException( {"Error": {"Message": "log stream does not exist"}}, operation_name="PutLogRecords", ) @@ -474,8 +475,9 @@ def test_hook_log_stream_create_success(mock_hook_provider_handler): @pytest.mark.parametrize("create_method", ["_create_log_group", "_create_log_stream"]) def test__hook_create_already_exists(mock_hook_provider_handler, create_method): mock_logs_method = getattr(mock_hook_provider_handler.client, create_method[1:]) - exc = mock_hook_provider_handler.client.exceptions.ResourceAlreadyExistsException - mock_logs_method.side_effect = exc({}, operation_name="Test") + mock_logs_method.side_effect = logs_exceptions.ResourceAlreadyExistsException( + {}, operation_name="Test" + ) # should not raise an exception if the log group already exists getattr(mock_hook_provider_handler, create_method)() mock_logs_method.assert_called_once() @@ -493,12 +495,11 @@ def test__hook_put_log_event_success(mock_hook_provider_handler, sequence_token) def test__hook_put_log_event_invalid_token(mock_hook_provider_handler): - exc = mock_hook_provider_handler.client.exceptions mock_put = mock_hook_provider_handler.client.put_log_events mock_put.return_value = {"nextSequenceToken": "some-other-seq"} mock_put.side_effect = [ - exc.InvalidSequenceTokenException({}, operation_name="Test"), - exc.DataAlreadyAcceptedException({}, operation_name="Test"), + logs_exceptions.InvalidSequenceTokenException({}, operation_name="Test"), + logs_exceptions.DataAlreadyAcceptedException({}, operation_name="Test"), DEFAULT, ] mock_hook_provider_handler._put_log_event( @@ -516,8 +517,7 @@ def test_hook_emit_existing_cwl_group_stream(mock_hook_provider_handler): def test_hook_emit_no_group_stream(mock_hook_provider_handler): - exc = mock_hook_provider_handler.client.exceptions.ResourceNotFoundException - group_exc = exc( + group_exc = logs_exceptions.ResourceNotFoundException( {"Error": {"Message": "log group does not exist"}}, operation_name="PutLogRecords", ) @@ -533,7 +533,7 @@ def test_hook_emit_no_group_stream(mock_hook_provider_handler): mock_hook_provider_handler._create_log_stream.assert_called_once() # create_group should not be called again if the group already exists - stream_exc = exc( + stream_exc = logs_exceptions.ResourceNotFoundException( {"Error": {"Message": "log stream does not exist"}}, operation_name="PutLogRecords", ) diff --git a/tests/lib/metrics_test.py b/tests/lib/metrics_test.py index ec3dbda..c5dbf57 100644 --- a/tests/lib/metrics_test.py +++ b/tests/lib/metrics_test.py @@ -3,7 +3,6 @@ from datetime import datetime from unittest.mock import Mock, call, patch -import boto3 import pytest from cloudformation_cli_python_lib.interface import ( Action, @@ -18,7 +17,13 @@ format_dimensions, ) -from botocore.stub import Stubber # pylint: disable=C0411 +import botocore.errorfactory +import botocore.session + +cloudwatch_model = botocore.session.get_session().get_service_model("cloudwatch") +factory = botocore.errorfactory.ClientExceptionsFactory() +cloudwatch_exceptions = factory.create_client_exceptions(cloudwatch_model) + ACCOUNT_ID = "123456789012" RESOURCE_TYPE = "Aa::Bb::Cc" @@ -42,11 +47,16 @@ def test_format_dimensions(): def test_put_metric_catches_error(mock_session): - client = boto3.client("cloudwatch") - stubber = Stubber(client) - - stubber.add_client_error("put_metric_data", "InternalServiceError") - stubber.activate() + client = mock_session.client("cloudwatch") + client.exceptions = cloudwatch_exceptions + mock_put = client.put_metric_data + mock_put.return_value = {} + mock_put.side_effect = [ + cloudwatch_exceptions.InternalServiceFault( + {"Error": {"Code": "InternalServiceError", "Message": ""}}, + operation_name="PutMetricData", + ), + ] mock_session.client.return_value = client @@ -67,7 +77,6 @@ def test_put_metric_catches_error(mock_session): datetime.now(), ) - stubber.deactivate() expected_calls = [ call.error( "An error occurred while publishing metrics: %s", @@ -197,11 +206,16 @@ def test_publish_log_delivery_exception_metric(mock_session): def test_put_hook_metric_catches_error(mock_session): - client = boto3.client("cloudwatch") - stubber = Stubber(client) - - stubber.add_client_error("put_metric_data", "InternalServiceError") - stubber.activate() + client = mock_session.client("cloudwatch") + client.exceptions = cloudwatch_exceptions + mock_put = client.put_metric_data + mock_put.return_value = {} + mock_put.side_effect = [ + cloudwatch_exceptions.InternalServiceFault( + {"Error": {"Code": "InternalServiceError", "Message": ""}}, + operation_name="PutMetricData", + ), + ] mock_session.client.return_value = client @@ -222,7 +236,6 @@ def test_put_hook_metric_catches_error(mock_session): datetime.now(), ) - stubber.deactivate() expected_calls = [ call.error( "An error occurred while publishing metrics: %s",