Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/cloudformation_compatibility.rst
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ CloudWatchEvent (superseded by EventBridgeRule, see below)
Pattern All
Input All
InputPath All
DeadLetterConfig All
RetryPolicy All
======================== ================================== ========================

EventBridgeRule
Expand All @@ -179,6 +181,8 @@ EventBridgeRule
Pattern All
Input All
InputPath All
DeadLetterConfig All
RetryPolicy All
======================== ================================== ========================

IotRule
Expand Down
11 changes: 11 additions & 0 deletions docs/internals/generated_resources.rst
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,8 @@ Example:
Type: Schedule
Properties:
Input: rate(5 minutes)
DeadLetterConfig:
Type: SQS
...
Additional generated resources:
Expand All @@ -464,6 +466,8 @@ CloudFormation Resource Type Logical ID
================================== ================================
AWS::Lambda::Permission MyFunction\ **MyTimer**\ Permission
AWS::Events::Rule MyFunction\ **MyTimer**
AWS::SQS::Queue MyFunction\ **MyTimer**\ Queue
AWS::SQS::QueuePolicy MyFunction\ **MyTimer**\ QueuePolicy
================================== ================================

CloudWatchEvent (superseded by EventBridgeRule, see below)
Expand Down Expand Up @@ -523,6 +527,11 @@ Example:
detail:
state:
- terminated
DeadLetterConfig:
Type: SQS
RetryPolicy:
MaximumEventAgeInSeconds: 600
MaximumRetryAttempts:3
...
Additional generated resources:
Expand All @@ -532,6 +541,8 @@ CloudFormation Resource Type Logical ID
================================== ================================
AWS::Lambda::Permission MyFunction\ **OnTerminate**\ Permission
AWS::Events::Rule MyFunction\ **OnTerminate**
AWS::SQS::Queue MyFunction\ **OnTerminate**\ Queue
AWS::SQS::QueuePolicy MyFunction\ **OnTerminate**\ QueuePolicy
================================== ================================

AWS::Serverless::Api
Expand Down
54 changes: 54 additions & 0 deletions samtranslator/model/eventbridge_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from samtranslator.model.sqs import SQSQueue, SQSQueuePolicy, SQSQueuePolicies
from samtranslator.model.exceptions import InvalidEventException


class EventBridgeRuleUtils:
@staticmethod
def create_dead_letter_queue_with_policy(rule_logical_id, rule_arn, queue_logical_id=None):
resources = []

queue = SQSQueue(queue_logical_id or rule_logical_id + "Queue")
dlq_queue_arn = queue.get_runtime_attr("arn")
dlq_queue_url = queue.get_runtime_attr("queue_url")

# grant necessary permission to Eventbridge Rule resource for sending messages to dead-letter queue
policy = SQSQueuePolicy(rule_logical_id + "QueuePolicy")
policy.PolicyDocument = SQSQueuePolicies.eventbridge_dlq_send_message_resource_based_policy(
rule_arn, dlq_queue_arn
)
policy.Queues = [dlq_queue_url]

resources.append(queue)
resources.append(policy)

return resources

@staticmethod
def validate_dlq_config(source_logical_id, dead_letter_config):
supported_types = ["SQS"]
is_arn_defined = "Arn" in dead_letter_config
is_type_defined = "Type" in dead_letter_config
if is_arn_defined and is_type_defined:
raise InvalidEventException(
source_logical_id, "You can either define 'Arn' or 'Type' property of DeadLetterConfig"
)
if is_type_defined and dead_letter_config.get("Type") not in supported_types:
raise InvalidEventException(
source_logical_id,
"The only valid value for 'Type' property of DeadLetterConfig is 'SQS'",
)
if not is_arn_defined and not is_type_defined:
raise InvalidEventException(source_logical_id, "No 'Arn' or 'Type' property provided for DeadLetterConfig")

@staticmethod
def get_dlq_queue_arn_and_resources(cw_event_source, source_arn):
"""returns dlq queue arn and dlq_resources, assuming cw_event_source.DeadLetterConfig has been validated"""
dlq_queue_arn = cw_event_source.DeadLetterConfig.get("Arn")
if dlq_queue_arn is not None:
return dlq_queue_arn, []
queue_logical_id = cw_event_source.DeadLetterConfig.get("QueueLogicalId")
dlq_resources = EventBridgeRuleUtils.create_dead_letter_queue_with_policy(
cw_event_source.logical_id, source_arn, queue_logical_id
)
dlq_queue_arn = dlq_resources[0].get_runtime_attr("arn")
return dlq_queue_arn, dlq_resources
43 changes: 37 additions & 6 deletions samtranslator/model/eventsources/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from samtranslator.model.events import EventsRule
from samtranslator.model.eventsources.pull import SQS
from samtranslator.model.sqs import SQSQueue, SQSQueuePolicy, SQSQueuePolicies
from samtranslator.model.eventbridge_utils import EventBridgeRuleUtils
from samtranslator.model.iot import IotTopicRule
from samtranslator.model.cognito import CognitoUserPool
from samtranslator.translator import logical_id_generator
Expand Down Expand Up @@ -94,6 +95,8 @@ class Schedule(PushEventSource):
"Enabled": PropertyType(False, is_type(bool)),
"Name": PropertyType(False, is_str()),
"Description": PropertyType(False, is_str()),
"DeadLetterConfig": PropertyType(False, is_type(dict)),
"RetryPolicy": PropertyType(False, is_type(dict)),
}

def to_cloudformation(self, **kwargs):
Expand All @@ -118,16 +121,23 @@ def to_cloudformation(self, **kwargs):
events_rule.State = "ENABLED" if self.Enabled else "DISABLED"
events_rule.Name = self.Name
events_rule.Description = self.Description
events_rule.Targets = [self._construct_target(function)]

source_arn = events_rule.get_runtime_attr("arn")
dlq_queue_arn = None
if self.DeadLetterConfig is not None:
EventBridgeRuleUtils.validate_dlq_config(self.logical_id, self.DeadLetterConfig)
dlq_queue_arn, dlq_resources = EventBridgeRuleUtils.get_dlq_queue_arn_and_resources(self, source_arn)
resources.extend(dlq_resources)

events_rule.Targets = [self._construct_target(function, dlq_queue_arn)]

if CONDITION in function.resource_attributes:
events_rule.set_resource_attribute(CONDITION, function.resource_attributes[CONDITION])
resources.append(self._construct_permission(function, source_arn=source_arn))

return resources

def _construct_target(self, function):
def _construct_target(self, function, dead_letter_queue_arn=None):
"""Constructs the Target property for the EventBridge Rule.

:returns: the Target property
Expand All @@ -137,6 +147,12 @@ def _construct_target(self, function):
if self.Input is not None:
target["Input"] = self.Input

if self.DeadLetterConfig is not None:
target["DeadLetterConfig"] = {"Arn": dead_letter_queue_arn}

if self.RetryPolicy is not None:
target["RetryPolicy"] = self.RetryPolicy

return target


Expand All @@ -148,6 +164,8 @@ class CloudWatchEvent(PushEventSource):
property_types = {
"EventBusName": PropertyType(False, is_str()),
"Pattern": PropertyType(False, is_type(dict)),
"DeadLetterConfig": PropertyType(False, is_type(dict)),
"RetryPolicy": PropertyType(False, is_type(dict)),
"Input": PropertyType(False, is_str()),
"InputPath": PropertyType(False, is_str()),
"Target": PropertyType(False, is_type(dict)),
Expand All @@ -171,18 +189,24 @@ def to_cloudformation(self, **kwargs):
events_rule = EventsRule(self.logical_id)
events_rule.EventBusName = self.EventBusName
events_rule.EventPattern = self.Pattern
events_rule.Targets = [self._construct_target(function)]
source_arn = events_rule.get_runtime_attr("arn")

dlq_queue_arn = None
if self.DeadLetterConfig is not None:
EventBridgeRuleUtils.validate_dlq_config(self.logical_id, self.DeadLetterConfig)
dlq_queue_arn, dlq_resources = EventBridgeRuleUtils.get_dlq_queue_arn_and_resources(self, source_arn)
resources.extend(dlq_resources)

events_rule.Targets = [self._construct_target(function, dlq_queue_arn)]
if CONDITION in function.resource_attributes:
events_rule.set_resource_attribute(CONDITION, function.resource_attributes[CONDITION])

resources.append(events_rule)

source_arn = events_rule.get_runtime_attr("arn")
resources.append(self._construct_permission(function, source_arn=source_arn))

return resources

def _construct_target(self, function):
def _construct_target(self, function, dead_letter_queue_arn=None):
"""Constructs the Target property for the CloudWatch Events/EventBridge Rule.

:returns: the Target property
Expand All @@ -195,6 +219,13 @@ def _construct_target(self, function):

if self.InputPath is not None:
target["InputPath"] = self.InputPath

if self.DeadLetterConfig is not None:
target["DeadLetterConfig"] = {"Arn": dead_letter_queue_arn}

if self.RetryPolicy is not None:
target["RetryPolicy"] = self.RetryPolicy

return target


Expand Down
20 changes: 18 additions & 2 deletions samtranslator/model/sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ class SQSQueuePolicy(Resource):


class SQSQueuePolicies:
@classmethod
def sns_topic_send_message_role_policy(cls, topic_arn, queue_arn):
@staticmethod
def sns_topic_send_message_role_policy(topic_arn, queue_arn):
document = {
"Version": "2012-10-17",
"Statement": [
Expand All @@ -34,3 +34,19 @@ def sns_topic_send_message_role_policy(cls, topic_arn, queue_arn):
],
}
return document

@staticmethod
def eventbridge_dlq_send_message_resource_based_policy(rule_arn, queue_arn):
document = {
"Version": "2012-10-17",
"Statement": [
{
"Action": "sqs:SendMessage",
"Effect": "Allow",
"Principal": {"Service": "events.amazonaws.com"},
"Resource": queue_arn,
"Condition": {"ArnEquals": {"aws:SourceArn": rule_arn}},
}
],
}
return document
41 changes: 37 additions & 4 deletions samtranslator/model/stepfunctions/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from samtranslator.model.intrinsics import fnSub
from samtranslator.translator import logical_id_generator
from samtranslator.model.exceptions import InvalidEventException, InvalidResourceException
from samtranslator.model.eventbridge_utils import EventBridgeRuleUtils
from samtranslator.translator.arn_generator import ArnGenerator
from samtranslator.swagger.swagger import SwaggerEditor
from samtranslator.open_api.open_api import OpenApiEditor
Expand Down Expand Up @@ -81,6 +82,8 @@ class Schedule(EventSource):
"Enabled": PropertyType(False, is_type(bool)),
"Name": PropertyType(False, is_str()),
"Description": PropertyType(False, is_str()),
"DeadLetterConfig": PropertyType(False, is_type(dict)),
"RetryPolicy": PropertyType(False, is_type(dict)),
}

def to_cloudformation(self, resource, **kwargs):
Expand All @@ -107,11 +110,18 @@ def to_cloudformation(self, resource, **kwargs):

role = self._construct_role(resource, permissions_boundary)
resources.append(role)
events_rule.Targets = [self._construct_target(resource, role)]

source_arn = events_rule.get_runtime_attr("arn")
dlq_queue_arn = None
if self.DeadLetterConfig is not None:
EventBridgeRuleUtils.validate_dlq_config(self.logical_id, self.DeadLetterConfig)
dlq_queue_arn, dlq_resources = EventBridgeRuleUtils.get_dlq_queue_arn_and_resources(self, source_arn)
resources.extend(dlq_resources)
events_rule.Targets = [self._construct_target(resource, role, dlq_queue_arn)]

return resources

def _construct_target(self, resource, role):
def _construct_target(self, resource, role, dead_letter_queue_arn=None):
"""Constructs the Target property for the EventBridge Rule.

:returns: the Target property
Expand All @@ -125,6 +135,12 @@ def _construct_target(self, resource, role):
if self.Input is not None:
target["Input"] = self.Input

if self.DeadLetterConfig is not None:
target["DeadLetterConfig"] = {"Arn": dead_letter_queue_arn}

if self.RetryPolicy is not None:
target["RetryPolicy"] = self.RetryPolicy

return target


Expand All @@ -138,6 +154,8 @@ class CloudWatchEvent(EventSource):
"Pattern": PropertyType(False, is_type(dict)),
"Input": PropertyType(False, is_str()),
"InputPath": PropertyType(False, is_str()),
"DeadLetterConfig": PropertyType(False, is_type(dict)),
"RetryPolicy": PropertyType(False, is_type(dict)),
}

def to_cloudformation(self, resource, **kwargs):
Expand All @@ -162,11 +180,19 @@ def to_cloudformation(self, resource, **kwargs):

role = self._construct_role(resource, permissions_boundary)
resources.append(role)
events_rule.Targets = [self._construct_target(resource, role)]

source_arn = events_rule.get_runtime_attr("arn")
dlq_queue_arn = None
if self.DeadLetterConfig is not None:
EventBridgeRuleUtils.validate_dlq_config(self.logical_id, self.DeadLetterConfig)
dlq_queue_arn, dlq_resources = EventBridgeRuleUtils.get_dlq_queue_arn_and_resources(self, source_arn)
resources.extend(dlq_resources)

events_rule.Targets = [self._construct_target(resource, role, dlq_queue_arn)]

return resources

def _construct_target(self, resource, role):
def _construct_target(self, resource, role, dead_letter_queue_arn=None):
"""Constructs the Target property for the CloudWatch Events/EventBridge Rule.

:returns: the Target property
Expand All @@ -182,6 +208,13 @@ def _construct_target(self, resource, role):

if self.InputPath is not None:
target["InputPath"] = self.InputPath

if self.DeadLetterConfig is not None:
target["DeadLetterConfig"] = {"Arn": dead_letter_queue_arn}

if self.RetryPolicy is not None:
target["RetryPolicy"] = self.RetryPolicy

return target


Expand Down
28 changes: 28 additions & 0 deletions samtranslator/validator/sam_schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,34 @@
},
"Pattern": {
"type": "object"
},
"DeadLetterConfig": {
"additionalProperties": false,
"properties": {
"Arn": {
"type": "string"
},
"Type": {
"type": "string"
},
"QueueLogicalId": {
"type": "string"
}
},
"type": "object"
},
"RetryPolicy": {
"additionalProperties": false,
"minProperties": 1,
"properties": {
"MaximumEventAgeInSeconds": {
"type": "number"
},
"MaximumRetryAttempts": {
"type": "number"
}
},
"type": "object"
}
},
"required": [
Expand Down
Loading