diff --git a/docs/cloudformation_compatibility.rst b/docs/cloudformation_compatibility.rst index e5d2d3473..ed1319a08 100644 --- a/docs/cloudformation_compatibility.rst +++ b/docs/cloudformation_compatibility.rst @@ -120,13 +120,28 @@ SourceAccessConfigurations All MSK ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -======================== ================================== ======================== - Property Name Intrinsic(s) Supported Reasons -======================== ================================== ======================== -Stream All -Topics All -StartingPosition All -======================== ================================== ======================== +================================ ================================== ======================== + Property Name Intrinsic(s) Supported Reasons +================================ ================================== ======================== +MaximumBatchingWindowInSeconds All +Stream All +Topics All +StartingPosition All +ConsumerGroupId All +================================ ================================== ======================== + +SelfManagedKafka +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +================================ ================================== ======================== + Property Name Intrinsic(s) Supported Reasons +================================ ================================== ======================== +BatchSize All +Topics All +KafkaBootstrapServers All +SourceAccessConfigurations All +Enabled All +ConsumerGroupId All +================================ ================================= ======================== DynamoDB ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/integration/combination/test_api_with_authorizer_apikey.py b/integration/combination/test_api_with_authorizer_apikey.py index df47f264c..f7d58237b 100644 --- a/integration/combination/test_api_with_authorizer_apikey.py +++ b/integration/combination/test_api_with_authorizer_apikey.py @@ -19,12 +19,9 @@ def test_authorizer_apikey(self): apigw_client = self.client_provider.api_client authorizers = apigw_client.get_authorizers(restApiId=rest_api_id)["items"] - lambda_authorizer_uri = ( - "arn:aws:apigateway:" - + self.my_region - + ":lambda:path/2015-03-31/functions/" - + stack_outputs["AuthorizerFunctionArn"] - + "/invocations" + + lambda_authorizer_uri = "arn:{}:apigateway:{}:lambda:path/2015-03-31/functions/{}/invocations".format( + self.partition, self.my_region, stack_outputs["AuthorizerFunctionArn"] ) lambda_token_authorizer = get_authorizer_by_name(authorizers, "MyLambdaTokenAuth") diff --git a/integration/combination/test_function_with_self_managed_kafka.py b/integration/combination/test_function_with_self_managed_kafka.py index baf8048f8..9a8ad13a4 100644 --- a/integration/combination/test_function_with_self_managed_kafka.py +++ b/integration/combination/test_function_with_self_managed_kafka.py @@ -1,3 +1,4 @@ +from unittest.case import skipIf import pytest from integration.helpers.base_test import BaseTest diff --git a/integration/helpers/base_test.py b/integration/helpers/base_test.py index 3b0e52aaf..2ba79e1ad 100644 --- a/integration/helpers/base_test.py +++ b/integration/helpers/base_test.py @@ -72,6 +72,7 @@ def setUpClass(cls): cls.code_dir = Path(cls.resources_dir, "code") cls.session = boto3.session.Session() cls.my_region = cls.session.region_name + cls.partition = cls.session.get_partition_for_region(cls.my_region) cls.client_provider = ClientProvider() cls.file_to_s3_uri_map = read_test_config_file("file_to_s3_map_modified.json") cls.code_key_to_file = read_test_config_file("code_key_to_file_map.json") diff --git a/samtranslator/__init__.py b/samtranslator/__init__.py index 94b9a2003..d20d165f6 100644 --- a/samtranslator/__init__.py +++ b/samtranslator/__init__.py @@ -1 +1 @@ -__version__ = "1.49.0" +__version__ = "1.50.0" diff --git a/samtranslator/model/eventsources/pull.py b/samtranslator/model/eventsources/pull.py index 4ed685582..3df724c9a 100644 --- a/samtranslator/model/eventsources/pull.py +++ b/samtranslator/model/eventsources/pull.py @@ -46,6 +46,7 @@ class PullEventSource(ResourceMacro): "FunctionResponseTypes": PropertyType(False, is_type(list)), "KafkaBootstrapServers": PropertyType(False, is_type(list)), "FilterCriteria": PropertyType(False, is_type(dict)), + "ConsumerGroupId": PropertyType(False, is_str()), } def get_policy_arn(self): @@ -112,6 +113,17 @@ def to_cloudformation(self, **kwargs): lambda_eventsourcemapping.SelfManagedEventSource = { "Endpoints": {"KafkaBootstrapServers": self.KafkaBootstrapServers} } + if self.ConsumerGroupId: + consumer_group_id_structure = {"ConsumerGroupId": self.ConsumerGroupId} + if self.resource_type == "MSK": + lambda_eventsourcemapping.AmazonManagedKafkaConfig = consumer_group_id_structure + elif self.resource_type == "SelfManagedKafka": + lambda_eventsourcemapping.SelfManagedKafkaConfig = consumer_group_id_structure + else: + raise InvalidEventException( + self.logical_id, + "Property ConsumerGroupId not defined for resource of type {}.".format(self.resource_type), + ) destination_config_policy = None if self.DestinationConfig: diff --git a/samtranslator/model/lambda_.py b/samtranslator/model/lambda_.py index efc162218..61645f7d1 100644 --- a/samtranslator/model/lambda_.py +++ b/samtranslator/model/lambda_.py @@ -81,6 +81,8 @@ class LambdaEventSourceMapping(Resource): "FunctionResponseTypes": PropertyType(False, is_type(list)), "SelfManagedEventSource": PropertyType(False, is_type(dict)), "FilterCriteria": PropertyType(False, is_type(dict)), + "AmazonManagedKafkaConfig": PropertyType(False, is_type(dict)), + "SelfManagedKafkaConfig": PropertyType(False, is_type(dict)), } runtime_attrs = {"name": lambda self: ref(self.logical_id)} diff --git a/tests/translator/input/error_consumer_group_id.yaml b/tests/translator/input/error_consumer_group_id.yaml new file mode 100644 index 000000000..0e3f33566 --- /dev/null +++ b/tests/translator/input/error_consumer_group_id.yaml @@ -0,0 +1,28 @@ +Resources: + NotSupportedPullTrigger: + Type: AWS::Serverless::Function + Properties: + CodeUri: s3://sam-demo-bucket/filtered_events.zip + Handler: index.handler + Runtime: nodejs16.x + Events: + DDBEvent: + Type: DynamoDB + Properties: + Stream: arn:aws:dynamodb:us-east-1:012345678901:table/TestTable/stream/2015-05-11T21:21:33.291 + StartingPosition: TRIM_HORIZON + ConsumerGroupId: consumergroup1 + + NotSupportedPushTrigger: + Type: AWS::Serverless::Function + Properties: + CodeUri: s3://sam-demo-bucket/filtered_events.zip + Handler: index.handler + Runtime: nodejs16.x + Events: + SNSEvent: + Type: SNS + Properties: + Topic: arn:aws:sns:us-east-1:123456789012:my_topic + ConsumerGroupId: consumergroup1 + diff --git a/tests/translator/input/function_with_msk.yaml b/tests/translator/input/function_with_msk.yaml new file mode 100644 index 000000000..6ac5e8ddc --- /dev/null +++ b/tests/translator/input/function_with_msk.yaml @@ -0,0 +1,20 @@ +AWSTemplateFormatVersion: '2010-09-09' +Parameters: {} + +Resources: + MyMskStreamProcessor: + Type: AWS::Serverless::Function + Properties: + Runtime: nodejs12.x + Handler: index.handler + CodeUri: s3://sam-demo-bucket/kafka.zip + Events: + MyMskEvent: + Type: MSK + Properties: + StartingPosition: LATEST + Stream: !Sub arn:aws:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2 + Topics: + - "MyDummyTestTopic" + ConsumerGroupId: consumergroup1 + diff --git a/tests/translator/input/function_with_msk_with_intrinsics.yaml b/tests/translator/input/function_with_msk_with_intrinsics.yaml new file mode 100644 index 000000000..de380a72c --- /dev/null +++ b/tests/translator/input/function_with_msk_with_intrinsics.yaml @@ -0,0 +1,35 @@ +AWSTemplateFormatVersion: '2010-09-09' +Parameters: + StartingPositionValue: + Type: String + Default: LATEST + + StreamValue: + Type: String + Default: arn:aws:kafka:us-east-1:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2 + + TopicsValue: + Type: CommaDelimitedList + Default: Topic + + ConsumerGroupValue: + Type: String + Default: consumergroup1 + + +Resources: + MyMskStreamProcessor: + Type: AWS::Serverless::Function + Properties: + Runtime: nodejs12.x + Handler: index.handler + CodeUri: s3://sam-demo-bucket/kafka.zip + Events: + MyMskEvent: + Type: MSK + Properties: + StartingPosition: !Ref StartingPositionValue + Stream: !Ref StreamValue + Topics: !Ref TopicsValue + ConsumerGroupId: !Ref ConsumerGroupValue + diff --git a/tests/translator/input/function_with_self_managed_kafka.yaml b/tests/translator/input/function_with_self_managed_kafka.yaml index a5ed1dfaf..ef4b7d5d4 100644 --- a/tests/translator/input/function_with_self_managed_kafka.yaml +++ b/tests/translator/input/function_with_self_managed_kafka.yaml @@ -23,4 +23,5 @@ Resources: URI: subnet:subnet-12345 - Type: VPC_SECURITY_GROUP URI: security_group:sg-67890 + ConsumerGroupId: consumergroup1 diff --git a/tests/translator/input/self_managed_kafka_with_intrinsics.yaml b/tests/translator/input/self_managed_kafka_with_intrinsics.yaml index 5a0f3b380..8c13ff6ed 100644 --- a/tests/translator/input/self_managed_kafka_with_intrinsics.yaml +++ b/tests/translator/input/self_managed_kafka_with_intrinsics.yaml @@ -14,8 +14,12 @@ Parameters: KafkaBootstrapServersValue: Type: CommaDelimitedList Default: abc.xyz.com:9092,123.45.67.89:9096 - - + + ConsumerGroupValue: + Type: String + Default: consumergroup1 + + Resources: KafkaFunction: Type: 'AWS::Serverless::Function' @@ -43,6 +47,8 @@ Resources: - Type: BASIC_AUTH URI: Ref: KafkaUserSecret + ConsumerGroupId: + Ref: ConsumerGroupValue KafkaUserSecret: Type: AWS::SecretsManager::Secret diff --git a/tests/translator/output/aws-cn/function_with_msk.json b/tests/translator/output/aws-cn/function_with_msk.json new file mode 100644 index 000000000..362225770 --- /dev/null +++ b/tests/translator/output/aws-cn/function_with_msk.json @@ -0,0 +1,78 @@ +{ + "AWSTemplateFormatVersion": "2010-09-09", + "Parameters": {}, + "Resources": { + "MyMskStreamProcessor": { + "Type": "AWS::Lambda::Function", + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "MyMskStreamProcessorRole", + "Arn" + ] + }, + "Runtime": "nodejs12.x", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + } + }, + "MyMskStreamProcessorRole": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Version": "2012-10-17", + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ] + }, + "ManagedPolicyArns": [ + "arn:aws-cn:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", + "arn:aws-cn:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole" + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + } + }, + "MyMskStreamProcessorMyMskEvent": { + "Type": "AWS::Lambda::EventSourceMapping", + "Properties": { + "EventSourceArn": { + "Fn::Sub": "arn:aws:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2" + }, + "FunctionName": { + "Ref": "MyMskStreamProcessor" + }, + "StartingPosition": "LATEST", + "Topics": [ + "MyDummyTestTopic" + ], + "AmazonManagedKafkaConfig": { + "ConsumerGroupId": "consumergroup1" + } + } + } + } +} \ No newline at end of file diff --git a/tests/translator/output/aws-cn/function_with_msk_with_intrinsics.json b/tests/translator/output/aws-cn/function_with_msk_with_intrinsics.json new file mode 100644 index 000000000..3ad206270 --- /dev/null +++ b/tests/translator/output/aws-cn/function_with_msk_with_intrinsics.json @@ -0,0 +1,99 @@ +{ + "AWSTemplateFormatVersion": "2010-09-09", + "Parameters": { + "StartingPositionValue": { + "Type": "String", + "Default": "LATEST" + }, + "StreamValue": { + "Type": "String", + "Default": "arn:aws:kafka:us-east-1:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2" + }, + "TopicsValue": { + "Type": "CommaDelimitedList", + "Default": "Topic" + }, + "ConsumerGroupValue": { + "Type": "String", + "Default": "consumergroup1" + } + }, + "Resources": { + "MyMskStreamProcessor": { + "Type": "AWS::Lambda::Function", + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "MyMskStreamProcessorRole", + "Arn" + ] + }, + "Runtime": "nodejs12.x", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + } + }, + "MyMskStreamProcessorRole": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Version": "2012-10-17", + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ] + }, + "ManagedPolicyArns": [ + "arn:aws-cn:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", + "arn:aws-cn:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole" + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + } + }, + "MyMskStreamProcessorMyMskEvent": { + "Type": "AWS::Lambda::EventSourceMapping", + "Properties": { + "EventSourceArn": { + "Ref": "StreamValue" + }, + "FunctionName": { + "Ref": "MyMskStreamProcessor" + }, + "StartingPosition": { + "Ref": "StartingPositionValue" + }, + "Topics": { + "Ref": "TopicsValue" + }, + "AmazonManagedKafkaConfig": { + "ConsumerGroupId": { + "Ref": "ConsumerGroupValue" + } + } + } + } + } +} \ No newline at end of file diff --git a/tests/translator/output/aws-cn/function_with_self_managed_kafka.json b/tests/translator/output/aws-cn/function_with_self_managed_kafka.json index 9c0ff439a..ab448acbc 100644 --- a/tests/translator/output/aws-cn/function_with_self_managed_kafka.json +++ b/tests/translator/output/aws-cn/function_with_self_managed_kafka.json @@ -114,6 +114,9 @@ "123.45.67.89:9096" ] } + }, + "SelfManagedKafkaConfig": { + "ConsumerGroupId": "consumergroup1" } } } diff --git a/tests/translator/output/aws-cn/self_managed_kafka_with_intrinsics.json b/tests/translator/output/aws-cn/self_managed_kafka_with_intrinsics.json index 75743f1bd..a18a25257 100644 --- a/tests/translator/output/aws-cn/self_managed_kafka_with_intrinsics.json +++ b/tests/translator/output/aws-cn/self_managed_kafka_with_intrinsics.json @@ -15,6 +15,10 @@ "KafkaBootstrapServersValue": { "Type": "CommaDelimitedList", "Default": "abc.xyz.com:9092,123.45.67.89:9096" + }, + "ConsumerGroupValue": { + "Type": "String", + "Default": "consumergroup1" } }, "Resources": { @@ -148,6 +152,11 @@ "Ref": "KafkaBootstrapServersValue" } } + }, + "SelfManagedKafkaConfig": { + "ConsumerGroupId": { + "Ref": "ConsumerGroupValue" + } } } } diff --git a/tests/translator/output/aws-us-gov/function_with_msk.json b/tests/translator/output/aws-us-gov/function_with_msk.json new file mode 100644 index 000000000..a60fd546c --- /dev/null +++ b/tests/translator/output/aws-us-gov/function_with_msk.json @@ -0,0 +1,78 @@ +{ + "AWSTemplateFormatVersion": "2010-09-09", + "Parameters": {}, + "Resources": { + "MyMskStreamProcessor": { + "Type": "AWS::Lambda::Function", + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "MyMskStreamProcessorRole", + "Arn" + ] + }, + "Runtime": "nodejs12.x", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + } + }, + "MyMskStreamProcessorRole": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Version": "2012-10-17", + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ] + }, + "ManagedPolicyArns": [ + "arn:aws-us-gov:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", + "arn:aws-us-gov:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole" + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + } + }, + "MyMskStreamProcessorMyMskEvent": { + "Type": "AWS::Lambda::EventSourceMapping", + "Properties": { + "EventSourceArn": { + "Fn::Sub": "arn:aws:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2" + }, + "FunctionName": { + "Ref": "MyMskStreamProcessor" + }, + "StartingPosition": "LATEST", + "Topics": [ + "MyDummyTestTopic" + ], + "AmazonManagedKafkaConfig": { + "ConsumerGroupId": "consumergroup1" + } + } + } + } +} \ No newline at end of file diff --git a/tests/translator/output/aws-us-gov/function_with_msk_with_intrinsics.json b/tests/translator/output/aws-us-gov/function_with_msk_with_intrinsics.json new file mode 100644 index 000000000..889547cde --- /dev/null +++ b/tests/translator/output/aws-us-gov/function_with_msk_with_intrinsics.json @@ -0,0 +1,99 @@ +{ + "AWSTemplateFormatVersion": "2010-09-09", + "Parameters": { + "StartingPositionValue": { + "Type": "String", + "Default": "LATEST" + }, + "StreamValue": { + "Type": "String", + "Default": "arn:aws:kafka:us-east-1:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2" + }, + "TopicsValue": { + "Type": "CommaDelimitedList", + "Default": "Topic" + }, + "ConsumerGroupValue": { + "Type": "String", + "Default": "consumergroup1" + } + }, + "Resources": { + "MyMskStreamProcessor": { + "Type": "AWS::Lambda::Function", + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "MyMskStreamProcessorRole", + "Arn" + ] + }, + "Runtime": "nodejs12.x", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + } + }, + "MyMskStreamProcessorRole": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Version": "2012-10-17", + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ] + }, + "ManagedPolicyArns": [ + "arn:aws-us-gov:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", + "arn:aws-us-gov:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole" + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + } + }, + "MyMskStreamProcessorMyMskEvent": { + "Type": "AWS::Lambda::EventSourceMapping", + "Properties": { + "EventSourceArn": { + "Ref": "StreamValue" + }, + "FunctionName": { + "Ref": "MyMskStreamProcessor" + }, + "StartingPosition": { + "Ref": "StartingPositionValue" + }, + "Topics": { + "Ref": "TopicsValue" + }, + "AmazonManagedKafkaConfig": { + "ConsumerGroupId": { + "Ref": "ConsumerGroupValue" + } + } + } + } + } +} \ No newline at end of file diff --git a/tests/translator/output/aws-us-gov/function_with_self_managed_kafka.json b/tests/translator/output/aws-us-gov/function_with_self_managed_kafka.json index 8b62e5d04..cb55d01f3 100644 --- a/tests/translator/output/aws-us-gov/function_with_self_managed_kafka.json +++ b/tests/translator/output/aws-us-gov/function_with_self_managed_kafka.json @@ -114,6 +114,9 @@ "123.45.67.89:9096" ] } + }, + "SelfManagedKafkaConfig": { + "ConsumerGroupId": "consumergroup1" } } } diff --git a/tests/translator/output/aws-us-gov/self_managed_kafka_with_intrinsics.json b/tests/translator/output/aws-us-gov/self_managed_kafka_with_intrinsics.json index e9725ebe6..36407a7a8 100644 --- a/tests/translator/output/aws-us-gov/self_managed_kafka_with_intrinsics.json +++ b/tests/translator/output/aws-us-gov/self_managed_kafka_with_intrinsics.json @@ -15,6 +15,10 @@ "KafkaBootstrapServersValue": { "Type": "CommaDelimitedList", "Default": "abc.xyz.com:9092,123.45.67.89:9096" + }, + "ConsumerGroupValue": { + "Type": "String", + "Default": "consumergroup1" } }, "Resources": { @@ -148,6 +152,11 @@ "Ref": "KafkaBootstrapServersValue" } } + }, + "SelfManagedKafkaConfig": { + "ConsumerGroupId": { + "Ref": "ConsumerGroupValue" + } } } } diff --git a/tests/translator/output/error_consumer_group_id.json b/tests/translator/output/error_consumer_group_id.json new file mode 100644 index 000000000..4e3665a66 --- /dev/null +++ b/tests/translator/output/error_consumer_group_id.json @@ -0,0 +1,3 @@ +{ + "errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 2. Resource with id [NotSupportedPullTrigger] is invalid. Event with id [NotSupportedPullTriggerDDBEvent] is invalid. Property ConsumerGroupId not defined for resource of type DynamoDB. Resource with id [NotSupportedPushTriggerSNSEvent] is invalid. property ConsumerGroupId not defined for resource of type SNS" +} \ No newline at end of file diff --git a/tests/translator/output/function_with_msk.json b/tests/translator/output/function_with_msk.json new file mode 100644 index 000000000..3ae7058e3 --- /dev/null +++ b/tests/translator/output/function_with_msk.json @@ -0,0 +1,78 @@ +{ + "AWSTemplateFormatVersion": "2010-09-09", + "Parameters": {}, + "Resources": { + "MyMskStreamProcessor": { + "Type": "AWS::Lambda::Function", + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "MyMskStreamProcessorRole", + "Arn" + ] + }, + "Runtime": "nodejs12.x", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + } + }, + "MyMskStreamProcessorRole": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Version": "2012-10-17", + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ] + }, + "ManagedPolicyArns": [ + "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", + "arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole" + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + } + }, + "MyMskStreamProcessorMyMskEvent": { + "Type": "AWS::Lambda::EventSourceMapping", + "Properties": { + "EventSourceArn": { + "Fn::Sub": "arn:aws:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2" + }, + "FunctionName": { + "Ref": "MyMskStreamProcessor" + }, + "StartingPosition": "LATEST", + "Topics": [ + "MyDummyTestTopic" + ], + "AmazonManagedKafkaConfig": { + "ConsumerGroupId": "consumergroup1" + } + } + } + } +} \ No newline at end of file diff --git a/tests/translator/output/function_with_msk_with_intrinsics.json b/tests/translator/output/function_with_msk_with_intrinsics.json new file mode 100644 index 000000000..97143bf56 --- /dev/null +++ b/tests/translator/output/function_with_msk_with_intrinsics.json @@ -0,0 +1,99 @@ +{ + "AWSTemplateFormatVersion": "2010-09-09", + "Parameters": { + "StartingPositionValue": { + "Type": "String", + "Default": "LATEST" + }, + "StreamValue": { + "Type": "String", + "Default": "arn:aws:kafka:us-east-1:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2" + }, + "TopicsValue": { + "Type": "CommaDelimitedList", + "Default": "Topic" + }, + "ConsumerGroupValue": { + "Type": "String", + "Default": "consumergroup1" + } + }, + "Resources": { + "MyMskStreamProcessor": { + "Type": "AWS::Lambda::Function", + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "MyMskStreamProcessorRole", + "Arn" + ] + }, + "Runtime": "nodejs12.x", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + } + }, + "MyMskStreamProcessorRole": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Version": "2012-10-17", + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ] + }, + "ManagedPolicyArns": [ + "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", + "arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole" + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + } + }, + "MyMskStreamProcessorMyMskEvent": { + "Type": "AWS::Lambda::EventSourceMapping", + "Properties": { + "EventSourceArn": { + "Ref": "StreamValue" + }, + "FunctionName": { + "Ref": "MyMskStreamProcessor" + }, + "StartingPosition": { + "Ref": "StartingPositionValue" + }, + "Topics": { + "Ref": "TopicsValue" + }, + "AmazonManagedKafkaConfig": { + "ConsumerGroupId": { + "Ref": "ConsumerGroupValue" + } + } + } + } + } +} \ No newline at end of file diff --git a/tests/translator/output/function_with_self_managed_kafka.json b/tests/translator/output/function_with_self_managed_kafka.json index 6cb7ae49d..ccbfdde0d 100644 --- a/tests/translator/output/function_with_self_managed_kafka.json +++ b/tests/translator/output/function_with_self_managed_kafka.json @@ -114,6 +114,9 @@ "123.45.67.89:9096" ] } + }, + "SelfManagedKafkaConfig": { + "ConsumerGroupId": "consumergroup1" } } } diff --git a/tests/translator/output/self_managed_kafka_with_intrinsics.json b/tests/translator/output/self_managed_kafka_with_intrinsics.json index 535c0f3d9..04cf40aad 100644 --- a/tests/translator/output/self_managed_kafka_with_intrinsics.json +++ b/tests/translator/output/self_managed_kafka_with_intrinsics.json @@ -15,6 +15,10 @@ "KafkaBootstrapServersValue": { "Type": "CommaDelimitedList", "Default": "abc.xyz.com:9092,123.45.67.89:9096" + }, + "ConsumerGroupValue": { + "Type": "String", + "Default": "consumergroup1" } }, "Resources": { @@ -148,6 +152,11 @@ "Ref": "KafkaBootstrapServersValue" } } + }, + "SelfManagedKafkaConfig": { + "ConsumerGroupId": { + "Ref": "ConsumerGroupValue" + } } } }