Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions samtranslator/model/eventsources/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from samtranslator.model.lambda_ import LambdaEventSourceMapping
from samtranslator.translator.arn_generator import ArnGenerator
from samtranslator.model.exceptions import InvalidEventException
from samtranslator.model.iam import IAMRolePolicies


class PullEventSource(ResourceMacro):
Expand Down Expand Up @@ -74,26 +75,58 @@ def to_cloudformation(self, **kwargs):
lambda_eventsourcemapping.MaximumRetryAttempts = self.MaximumRetryAttempts
lambda_eventsourcemapping.BisectBatchOnFunctionError = self.BisectBatchOnFunctionError
lambda_eventsourcemapping.MaximumRecordAgeInSeconds = self.MaximumRecordAgeInSeconds
lambda_eventsourcemapping.DestinationConfig = self.DestinationConfig
lambda_eventsourcemapping.ParallelizationFactor = self.ParallelizationFactor

destination_config_policy = None
if self.DestinationConfig:
# `Type` property is for sam to attach the right policies
destination_type = self.DestinationConfig.get('OnFailure').get('Type')

# SAM attaches the policies for SQS or SNS only if 'Type' is given
if destination_type:
# the values 'SQS' and 'SNS' are allowed. No intrinsics are allowed
if destination_type not in ['SQS', 'SNS']:
raise InvalidEventException(self.logical_id, "The only valid values for 'Type' are 'SQS' and 'SNS'")
if self.DestinationConfig.get('OnFailure') is None:
raise InvalidEventException(self.logical_id, "'OnFailure' is a required field for "
"'DestinationConfig'")
if destination_type == 'SQS':
queue_arn = self.DestinationConfig.get('OnFailure').get('Destination')
destination_config_policy = IAMRolePolicies().sqs_send_message_role_policy(queue_arn,
self.logical_id)
elif destination_type == "SNS":
sns_topic_arn = self.DestinationConfig.get('OnFailure').get('Destination')
destination_config_policy = IAMRolePolicies(). sns_publish_role_policy(sns_topic_arn,
self.logical_id)

lambda_eventsourcemapping.DestinationConfig = self.DestinationConfig

if 'Condition' in function.resource_attributes:
lambda_eventsourcemapping.set_resource_attribute('Condition', function.resource_attributes['Condition'])

if 'role' in kwargs:
self._link_policy(kwargs['role'])
self._link_policy(kwargs['role'], destination_config_policy)

return resources

def _link_policy(self, role):
def _link_policy(self, role, destination_config_policy=None):
"""If this source triggers a Lambda function whose execution role is auto-generated by SAM, add the
appropriate managed policy to this Role.

:param model.iam.IAMROle role: the execution role generated for the function
:param model.iam.IAMRole role: the execution role generated for the function
"""
policy_arn = self.get_policy_arn()
if role is not None and policy_arn not in role.ManagedPolicyArns:
role.ManagedPolicyArns.append(policy_arn)
# add SQS or SNS policy only if role is present in kwargs
if role is not None and destination_config_policy is not None and destination_config_policy:
if role.Policies is None:
role.Policies = []
role.Policies.append(destination_config_policy)
if role.Policies and destination_config_policy not in role.Policies:
# do not add the policy if the same policy document is already present
if not destination_config_policy.get('PolicyDocument') in [d['PolicyDocument'] for d in role.Policies]:
role.Policies.append(destination_config_policy)


class Kinesis(PullEventSource):
Expand Down
28 changes: 28 additions & 0 deletions samtranslator/model/iam.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,31 @@ def dead_letter_queue_policy(cls, action, resource):
}]
}
}

@classmethod
def sqs_send_message_role_policy(cls, queue_arn, logical_id):
document = {
'PolicyName': 'SQSPublishPolicy' + logical_id,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I change the naming convention to logical_id + SQSPublishPolicy?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the existing convention?

Copy link
Author

@ShreyaGangishetty ShreyaGangishetty Dec 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

existing convention is FunctioRole logical_id+ 'Policy'+'integer number'. But I cannot follow this convention as it might conflict with the explicit Policies section.
logical_id is Functioname+EventName

Copy link
Author

@ShreyaGangishetty ShreyaGangishetty Dec 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the policy name to <eventlogicalid> + <sqspolicy/snspolicy>

'PolicyDocument': {
'Statement': [{
'Action': 'sqs:SendMessage',
'Effect': 'Allow',
'Resource': queue_arn
}]
}
}
return document

@classmethod
def sns_publish_role_policy(cls, topic_arn, logical_id):
document = {
'PolicyName': 'SNSPublishPolicy' + logical_id,
'PolicyDocument': {
'Statement': [{
'Action': 'sns:publish',
'Effect': 'Allow',
'Resource': topic_arn
}]
}
}
return document
12 changes: 8 additions & 4 deletions tests/translator/input/function_with_event_source_mapping.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ Resources:
}
}
Runtime: nodejs8.10
Policies:
- SQSSendMessagePolicy:
QueueName: !GetAtt MySqsQueue.QueueName
Events:
Stream:
Type: Kinesis
Expand All @@ -38,6 +35,10 @@ Resources:
Stream: !GetAtt KinesisStream1.Arn
MaximumBatchingWindowInSeconds: !Ref MyBatchingWindowParam
StartingPosition: LATEST
DestinationConfig:
OnFailure:
Type: SNS
Destination: !Ref MySnsTopic
DynamoDBStreamEvent:
Type: DynamoDB
Properties:
Expand All @@ -51,6 +52,7 @@ Resources:
StartingPosition: TRIM_HORIZON
DestinationConfig:
OnFailure:
Type: SQS
Destination: !GetAtt MySqsQueue.Arn

KinesisStream:
Expand Down Expand Up @@ -78,4 +80,6 @@ Resources:
StreamViewType: NEW_IMAGE

MySqsQueue:
Type: AWS::SQS::Queue
Type: AWS::SQS::Queue
MySnsTopic:
Type: AWS::SNS::Topic
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
"MySqsQueue": {
"Type": "AWS::SQS::Queue"
},
"MySnsTopic": {
"Type": "AWS::SNS::Topic"
},
"MyFunctionForBatchingExampleStream": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
Expand Down Expand Up @@ -61,30 +64,35 @@
],
"Policies": [
{
"PolicyName": "MyFunctionForBatchingExampleRolePolicy0",
"PolicyName": "SQSPublishPolicyMyFunctionForBatchingExampleDynamoDBStreamEvent",
"PolicyDocument": {
"Statement": [
{
"Action": [
"sqs:SendMessage*"
],
"Action": "sqs:SendMessage",
"Effect": "Allow",
"Resource": {
"Fn::Sub": [
"arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:${queueName}",
{
"queueName": {
"Fn::GetAtt": [
"MySqsQueue",
"QueueName"
]
}
}
"Fn::GetAtt": [
"MySqsQueue",
"Arn"
]
},
"Effect": "Allow"
}
}
]
}
},
{
"PolicyDocument": {
"Statement": [
{
"Action": "sns:publish",
"Effect": "Allow",
"Resource": {
"Ref": "MySnsTopic"
}
}
]
},
"PolicyName": "SNSPublishPolicyMyFunctionForBatchingExampleStreamEvent"
}
],
"Tags": [
Expand Down Expand Up @@ -135,7 +143,8 @@
"MySqsQueue",
"Arn"
]
}
},
"Type": "SQS"
}
},
"EventSourceArn": {
Expand Down Expand Up @@ -187,7 +196,15 @@
"FunctionName": {
"Ref": "MyFunctionForBatchingExample"
},
"StartingPosition": "LATEST"
"StartingPosition": "LATEST",
"DestinationConfig": {
"OnFailure": {
"Destination": {
"Ref": "MySnsTopic"
},
"Type": "SNS"
}
}
}
},
"KinesisStream": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
"MySqsQueue": {
"Type": "AWS::SQS::Queue"
},
"MySnsTopic": {
"Type": "AWS::SNS::Topic"
},
"MyFunctionForBatchingExampleStream": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
Expand Down Expand Up @@ -61,30 +64,35 @@
],
"Policies": [
{
"PolicyName": "MyFunctionForBatchingExampleRolePolicy0",
"PolicyName": "SQSPublishPolicyMyFunctionForBatchingExampleDynamoDBStreamEvent",
"PolicyDocument": {
"Statement": [
{
"Action": [
"sqs:SendMessage*"
],
"Action": "sqs:SendMessage",
"Effect": "Allow",
"Resource": {
"Fn::Sub": [
"arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:${queueName}",
{
"queueName": {
"Fn::GetAtt": [
"MySqsQueue",
"QueueName"
]
}
}
"Fn::GetAtt": [
"MySqsQueue",
"Arn"
]
},
"Effect": "Allow"
}
}
]
}
},
{
"PolicyDocument": {
"Statement": [
{
"Action": "sns:publish",
"Effect": "Allow",
"Resource": {
"Ref": "MySnsTopic"
}
}
]
},
"PolicyName": "SNSPublishPolicyMyFunctionForBatchingExampleStreamEvent"
}
],
"Tags": [
Expand Down Expand Up @@ -135,7 +143,8 @@
"MySqsQueue",
"Arn"
]
}
},
"Type": "SQS"
}
},
"EventSourceArn": {
Expand Down Expand Up @@ -187,7 +196,15 @@
"FunctionName": {
"Ref": "MyFunctionForBatchingExample"
},
"StartingPosition": "LATEST"
"StartingPosition": "LATEST",
"DestinationConfig": {
"OnFailure": {
"Destination": {
"Ref": "MySnsTopic"
},
"Type": "SNS"
}
}
}
},
"KinesisStream": {
Expand Down
Loading