Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 83 additions & 3 deletions samtranslator/model/eventsources/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,17 @@ class PullEventSource(ResourceMacro):
"Broker": PropertyType(False, is_str()),
"Queues": PropertyType(False, is_type(list)),
"SourceAccessConfigurations": PropertyType(False, is_type(list)),
"SecretsManagerKmsKeyId": PropertyType(False, is_str()),
"TumblingWindowInSeconds": PropertyType(False, is_type(int)),
"FunctionResponseTypes": PropertyType(False, is_type(list)),
}

def get_policy_arn(self):
raise NotImplementedError("Subclass must implement this method")

def get_policy_statements(self):
raise NotImplementedError("Subclass must implement this method")

def to_cloudformation(self, **kwargs):
"""Returns the Lambda EventSourceMapping to which this pull event corresponds. Adds the appropriate managed
policy to the function's execution role, if such a role is provided.
Expand Down Expand Up @@ -133,8 +137,17 @@ def _link_policy(self, role, destination_config_policy=None):
:param model.iam.IAMRole role: the execution role generated for the function
"""
policy_arn = self.get_policy_arn()
if role is not None and policy_arn not in role.ManagedPolicyArns:
role.ManagedPolicyArns.append(policy_arn)
policy_statements = self.get_policy_statements()
if role is not None:
if policy_arn is not None and policy_arn not in role.ManagedPolicyArns:
role.ManagedPolicyArns.append(policy_arn)
if policy_statements is not None:
if role.Policies is None:
role.Policies = []
for policy in policy_statements:
if policy not in role.Policies:
if not policy.get("PolicyDocument") in [d["PolicyDocument"] for d in role.Policies]:
role.Policies.append(policy)
# add SQS or SNS policy only if role is present in kwargs
if role is not None and destination_config_policy is not None and destination_config_policy:
if role.Policies is None:
Expand All @@ -154,6 +167,9 @@ class Kinesis(PullEventSource):
def get_policy_arn(self):
return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaKinesisExecutionRole")

def get_policy_statements(self):
return None


class DynamoDB(PullEventSource):
"""DynamoDB Streams event source."""
Expand All @@ -163,6 +179,9 @@ class DynamoDB(PullEventSource):
def get_policy_arn(self):
return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaDynamoDBExecutionRole")

def get_policy_statements(self):
return None


class SQS(PullEventSource):
"""SQS Queue event source."""
Expand All @@ -172,6 +191,9 @@ class SQS(PullEventSource):
def get_policy_arn(self):
return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaSQSQueueExecutionRole")

def get_policy_statements(self):
return None


class MSK(PullEventSource):
"""MSK event source."""
Expand All @@ -181,11 +203,69 @@ class MSK(PullEventSource):
def get_policy_arn(self):
return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaMSKExecutionRole")

def get_policy_statements(self):
return None


class MQ(PullEventSource):
"""MQ event source."""

resource_type = "MQ"

def get_policy_arn(self):
return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaAMQExecutionRole")
return None

def get_policy_statements(self):
if not self.SourceAccessConfigurations:
raise InvalidEventException(
self.relative_id,
"No SourceAccessConfigurations for ActiveMQ provided.",
)
if not type(self.SourceAccessConfigurations) is list:
raise InvalidEventException(
self.relative_id,
"Provided SourceAccessConfigurations cannot be parsed into a list.",
)
# MQ only supports SourceAccessConfigurations with list size of 1
if not (len(self.SourceAccessConfigurations) == 1):
raise InvalidEventException(
self.relative_id,
"SourceAccessConfigurations for ActiveMQ only supports single configuration entry.",
)
if not self.SourceAccessConfigurations[0].get("URI"):
raise InvalidEventException(
self.relative_id,
"No URI property specified in SourceAccessConfigurations for ActiveMQ.",
)
document = {
"PolicyName": "SamAutoGeneratedAMQPolicy",
"PolicyDocument": {
"Statement": [
{
"Action": [
"secretsmanager:GetSecretValue",
],
"Effect": "Allow",
"Resource": self.SourceAccessConfigurations[0].get("URI"),
},
{
"Action": [
"mq:DescribeBroker",
],
"Effect": "Allow",
"Resource": self.Broker,
},
]
},
}
if self.SecretsManagerKmsKeyId:
kms_policy = {
"Action": "kms:Decrypt",
"Effect": "Allow",
"Resource": {
"Fn::Sub": "arn:${AWS::Partition}:kms:${AWS::Region}:${AWS::AccountId}:key/"
+ self.SecretsManagerKmsKeyId
},
}
document["PolicyDocument"]["Statement"].append(kms_policy)
return [document]
18 changes: 18 additions & 0 deletions tests/translator/input/function_with_amq_kms.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Resources:
MQFunction:
Type: 'AWS::Serverless::Function'
Properties:
CodeUri: s3://sam-demo-bucket/queues.zip
Handler: queue.mq_handler
Runtime: python2.7
Events:
MyMQQueue:
Type: MQ
Properties:
Broker: arn:aws:mq:us-east-2:123456789012:broker:MyBroker:b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9
Queues:
- "Queue1"
SourceAccessConfigurations:
- Type: BASIC_AUTH
URI: arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c
SecretsManagerKmsKeyId: 1abc23d4-567f-8ab9-cde0-1fab234c5d67
70 changes: 0 additions & 70 deletions tests/translator/output/amq.json

This file was deleted.

70 changes: 0 additions & 70 deletions tests/translator/output/aws-cn/amq.json

This file was deleted.

Loading