Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions docs/cloudformation_compatibility.rst
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,28 @@ SourceAccessConfigurations All

MSK
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
======================== ================================== ========================
Property Name Intrinsic(s) Supported Reasons
======================== ================================== ========================
Stream All
Topics All
StartingPosition All
======================== ================================== ========================
================================ ================================== ========================
Property Name Intrinsic(s) Supported Reasons
================================ ================================== ========================
MaximumBatchingWindowInSeconds All
Stream All
Topics All
StartingPosition All
ConsumerGroupId All
================================ ================================== ========================

SelfManagedKafka
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
================================ ================================== ========================
Property Name Intrinsic(s) Supported Reasons
================================ ================================== ========================
BatchSize All
Topics All
KafkaBootstrapServers All
SourceAccessConfigurations All
Enabled All
ConsumerGroupId All
================================ ================================= ========================

DynamoDB
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
9 changes: 3 additions & 6 deletions integration/combination/test_api_with_authorizer_apikey.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@ def test_authorizer_apikey(self):
apigw_client = self.client_provider.api_client

authorizers = apigw_client.get_authorizers(restApiId=rest_api_id)["items"]
lambda_authorizer_uri = (
"arn:aws:apigateway:"
+ self.my_region
+ ":lambda:path/2015-03-31/functions/"
+ stack_outputs["AuthorizerFunctionArn"]
+ "/invocations"

lambda_authorizer_uri = "arn:{}:apigateway:{}:lambda:path/2015-03-31/functions/{}/invocations".format(
self.partition, self.my_region, stack_outputs["AuthorizerFunctionArn"]
)

lambda_token_authorizer = get_authorizer_by_name(authorizers, "MyLambdaTokenAuth")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from unittest.case import skipIf
import pytest

from integration.helpers.base_test import BaseTest
Expand Down
1 change: 1 addition & 0 deletions integration/helpers/base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def setUpClass(cls):
cls.code_dir = Path(cls.resources_dir, "code")
cls.session = boto3.session.Session()
cls.my_region = cls.session.region_name
cls.partition = cls.session.get_partition_for_region(cls.my_region)
cls.client_provider = ClientProvider()
cls.file_to_s3_uri_map = read_test_config_file("file_to_s3_map_modified.json")
cls.code_key_to_file = read_test_config_file("code_key_to_file_map.json")
Expand Down
2 changes: 1 addition & 1 deletion samtranslator/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.49.0"
__version__ = "1.50.0"
12 changes: 12 additions & 0 deletions samtranslator/model/eventsources/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class PullEventSource(ResourceMacro):
"FunctionResponseTypes": PropertyType(False, is_type(list)),
"KafkaBootstrapServers": PropertyType(False, is_type(list)),
"FilterCriteria": PropertyType(False, is_type(dict)),
"ConsumerGroupId": PropertyType(False, is_str()),
}

def get_policy_arn(self):
Expand Down Expand Up @@ -112,6 +113,17 @@ def to_cloudformation(self, **kwargs):
lambda_eventsourcemapping.SelfManagedEventSource = {
"Endpoints": {"KafkaBootstrapServers": self.KafkaBootstrapServers}
}
if self.ConsumerGroupId:
consumer_group_id_structure = {"ConsumerGroupId": self.ConsumerGroupId}
if self.resource_type == "MSK":
lambda_eventsourcemapping.AmazonManagedKafkaConfig = consumer_group_id_structure
elif self.resource_type == "SelfManagedKafka":
lambda_eventsourcemapping.SelfManagedKafkaConfig = consumer_group_id_structure
else:
raise InvalidEventException(
self.logical_id,
"Property ConsumerGroupId not defined for resource of type {}.".format(self.resource_type),
)

destination_config_policy = None
if self.DestinationConfig:
Expand Down
2 changes: 2 additions & 0 deletions samtranslator/model/lambda_.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class LambdaEventSourceMapping(Resource):
"FunctionResponseTypes": PropertyType(False, is_type(list)),
"SelfManagedEventSource": PropertyType(False, is_type(dict)),
"FilterCriteria": PropertyType(False, is_type(dict)),
"AmazonManagedKafkaConfig": PropertyType(False, is_type(dict)),
"SelfManagedKafkaConfig": PropertyType(False, is_type(dict)),
}

runtime_attrs = {"name": lambda self: ref(self.logical_id)}
Expand Down
28 changes: 28 additions & 0 deletions tests/translator/input/error_consumer_group_id.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
Resources:
NotSupportedPullTrigger:
Type: AWS::Serverless::Function
Properties:
CodeUri: s3://sam-demo-bucket/filtered_events.zip
Handler: index.handler
Runtime: nodejs16.x
Events:
DDBEvent:
Type: DynamoDB
Properties:
Stream: arn:aws:dynamodb:us-east-1:012345678901:table/TestTable/stream/2015-05-11T21:21:33.291
StartingPosition: TRIM_HORIZON
ConsumerGroupId: consumergroup1

NotSupportedPushTrigger:
Type: AWS::Serverless::Function
Properties:
CodeUri: s3://sam-demo-bucket/filtered_events.zip
Handler: index.handler
Runtime: nodejs16.x
Events:
SNSEvent:
Type: SNS
Properties:
Topic: arn:aws:sns:us-east-1:123456789012:my_topic
ConsumerGroupId: consumergroup1

20 changes: 20 additions & 0 deletions tests/translator/input/function_with_msk.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
AWSTemplateFormatVersion: '2010-09-09'
Parameters: {}

Resources:
MyMskStreamProcessor:
Type: AWS::Serverless::Function
Properties:
Runtime: nodejs12.x
Handler: index.handler
CodeUri: s3://sam-demo-bucket/kafka.zip
Events:
MyMskEvent:
Type: MSK
Properties:
StartingPosition: LATEST
Stream: !Sub arn:aws:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2
Topics:
- "MyDummyTestTopic"
ConsumerGroupId: consumergroup1

35 changes: 35 additions & 0 deletions tests/translator/input/function_with_msk_with_intrinsics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
AWSTemplateFormatVersion: '2010-09-09'
Parameters:
StartingPositionValue:
Type: String
Default: LATEST

StreamValue:
Type: String
Default: arn:aws:kafka:us-east-1:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2

TopicsValue:
Type: CommaDelimitedList
Default: Topic

ConsumerGroupValue:
Type: String
Default: consumergroup1


Resources:
MyMskStreamProcessor:
Type: AWS::Serverless::Function
Properties:
Runtime: nodejs12.x
Handler: index.handler
CodeUri: s3://sam-demo-bucket/kafka.zip
Events:
MyMskEvent:
Type: MSK
Properties:
StartingPosition: !Ref StartingPositionValue
Stream: !Ref StreamValue
Topics: !Ref TopicsValue
ConsumerGroupId: !Ref ConsumerGroupValue

Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ Resources:
URI: subnet:subnet-12345
- Type: VPC_SECURITY_GROUP
URI: security_group:sg-67890
ConsumerGroupId: consumergroup1

10 changes: 8 additions & 2 deletions tests/translator/input/self_managed_kafka_with_intrinsics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ Parameters:
KafkaBootstrapServersValue:
Type: CommaDelimitedList
Default: abc.xyz.com:9092,123.45.67.89:9096



ConsumerGroupValue:
Type: String
Default: consumergroup1


Resources:
KafkaFunction:
Type: 'AWS::Serverless::Function'
Expand Down Expand Up @@ -43,6 +47,8 @@ Resources:
- Type: BASIC_AUTH
URI:
Ref: KafkaUserSecret
ConsumerGroupId:
Ref: ConsumerGroupValue

KafkaUserSecret:
Type: AWS::SecretsManager::Secret
Expand Down
78 changes: 78 additions & 0 deletions tests/translator/output/aws-cn/function_with_msk.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
{
"AWSTemplateFormatVersion": "2010-09-09",
"Parameters": {},
"Resources": {
"MyMskStreamProcessor": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Code": {
"S3Bucket": "sam-demo-bucket",
"S3Key": "kafka.zip"
},
"Handler": "index.handler",
"Role": {
"Fn::GetAtt": [
"MyMskStreamProcessorRole",
"Arn"
]
},
"Runtime": "nodejs12.x",
"Tags": [
{
"Key": "lambda:createdBy",
"Value": "SAM"
}
]
}
},
"MyMskStreamProcessorRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com"
]
}
}
]
},
"ManagedPolicyArns": [
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole"
],
"Tags": [
{
"Key": "lambda:createdBy",
"Value": "SAM"
}
]
}
},
"MyMskStreamProcessorMyMskEvent": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
"EventSourceArn": {
"Fn::Sub": "arn:aws:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2"
},
"FunctionName": {
"Ref": "MyMskStreamProcessor"
},
"StartingPosition": "LATEST",
"Topics": [
"MyDummyTestTopic"
],
"AmazonManagedKafkaConfig": {
"ConsumerGroupId": "consumergroup1"
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
{
"AWSTemplateFormatVersion": "2010-09-09",
"Parameters": {
"StartingPositionValue": {
"Type": "String",
"Default": "LATEST"
},
"StreamValue": {
"Type": "String",
"Default": "arn:aws:kafka:us-east-1:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2"
},
"TopicsValue": {
"Type": "CommaDelimitedList",
"Default": "Topic"
},
"ConsumerGroupValue": {
"Type": "String",
"Default": "consumergroup1"
}
},
"Resources": {
"MyMskStreamProcessor": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Code": {
"S3Bucket": "sam-demo-bucket",
"S3Key": "kafka.zip"
},
"Handler": "index.handler",
"Role": {
"Fn::GetAtt": [
"MyMskStreamProcessorRole",
"Arn"
]
},
"Runtime": "nodejs12.x",
"Tags": [
{
"Key": "lambda:createdBy",
"Value": "SAM"
}
]
}
},
"MyMskStreamProcessorRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com"
]
}
}
]
},
"ManagedPolicyArns": [
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole"
],
"Tags": [
{
"Key": "lambda:createdBy",
"Value": "SAM"
}
]
}
},
"MyMskStreamProcessorMyMskEvent": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
"EventSourceArn": {
"Ref": "StreamValue"
},
"FunctionName": {
"Ref": "MyMskStreamProcessor"
},
"StartingPosition": {
"Ref": "StartingPositionValue"
},
"Topics": {
"Ref": "TopicsValue"
},
"AmazonManagedKafkaConfig": {
"ConsumerGroupId": {
"Ref": "ConsumerGroupValue"
}
}
}
}
}
}
Loading