Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions docs/internals/generated_resources.rst
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,12 @@ Example:
Type: SNS
Properties:
Topic: arn:aws:sns:us-east-1:123456789012:my_topic
SqsSubscription: true
SqsSubscription:
QueuePolicyLogicalId: CustomQueuePolicyLogicalId
QueueArn: !GetAtt MyCustomQueue.Arn
QueueUrl: !Ref MyCustomQueue
BatchSize: 5
Enabled: true
...

Additional generated resources:
Expand All @@ -275,6 +280,24 @@ AWS::SQS::QueuePolicy MyFunction\ **MyTrigger**\ QueuePolicy

NOTE: ``AWS::Lambda::Permission`` resources are only generated if SqsSubscription is ``false``. ``AWS::Lambda::EventSourceMapping``, ``AWS::SQS::Queue``, ``AWS::SQS::QueuePolicy`` resources are only generated if SqsSubscription is ``true``.

``AWS::SQS::Queue`` resources are only generated if SqsSubscription is ``true``.

Example:

.. code:: yaml

MyFunction:
Type: AWS::Serverless::Function
Properties:
...
Events:
MyTrigger:
Type: SNS
Properties:
Topic: arn:aws:sns:us-east-1:123456789012:my_topic
SqsSubscription: true
...

Kinesis
^^^^^^^

Expand Down Expand Up @@ -444,9 +467,9 @@ AWS::ApiGateway::Stage MyApi\ **dev**\ Stage
AWS::ApiGateway::Deployment MyApi\ Deployment\ *SHA* (10 Digits of SHA256 of DefinitionUri or DefinitionBody value)
================================== ================================

NOTE: By just specifying AWS::Serverless::Api resource, SAM will *not* add permission for API Gateway to invoke the
NOTE: By just specifying AWS::Serverless::Api resource, SAM will *not* add permission for API Gateway to invoke the
the Lambda Function backing the APIs. You should explicitly re-define all APIs under ``Events`` section of the
AWS::Serverless::Function resource but include a `RestApiId` property that references the AWS::Serverless::Api
AWS::Serverless::Function resource but include a `RestApiId` property that references the AWS::Serverless::Api
resource. SAM will add permission for these APIs to invoke the function.

Example:
Expand Down
57 changes: 43 additions & 14 deletions samtranslator/model/eventsources/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ class SNS(PushEventSource):
'Topic': PropertyType(True, is_str()),
'Region': PropertyType(False, is_str()),
'FilterPolicy': PropertyType(False, dict_of(is_str(), list_of(one_of(is_str(), is_type(dict))))),
'SqsSubscription': PropertyType(False, is_type(bool))
'SqsSubscription': PropertyType(False, one_of(is_type(bool), is_type(dict)))
}

def to_cloudformation(self, **kwargs):
Expand All @@ -393,17 +393,46 @@ def to_cloudformation(self, **kwargs):
)
return [self._construct_permission(function, source_arn=self.Topic), subscription]

# SNS -> SQS -> Lambda
# SNS -> SQS(Create New) -> Lambda
if isinstance(self.SqsSubscription, bool):
Copy link
Contributor Author

@53ningen 53ningen Nov 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if isinstance(self.SqsSubscription, bool) is True, then self.SqsSubscription is always True, because we checked if self.SqsSubscription is False on L388

resources = []
queue = self._inject_sqs_queue()
queue_arn = queue.get_runtime_attr('arn')
queue_url = queue.get_runtime_attr('queue_url')

queue_policy = self._inject_sqs_queue_policy(self.Topic, queue_arn, queue_url)
subscription = self._inject_subscription(
'sqs', queue_arn,
self.Topic, self.Region, self.FilterPolicy, function.resource_attributes
)
event_source = self._inject_sqs_event_source_mapping(function, role, queue_arn)

resources = resources + event_source
resources.append(queue)
resources.append(queue_policy)
resources.append(subscription)
return resources

# SNS -> SQS(Existing) -> Lambda
resources = []
queue = self._inject_sqs_queue()
queue_policy = self._inject_sqs_queue_policy(self.Topic, queue)
queue_arn = self.SqsSubscription.get('QueueArn', None)
queue_url = self.SqsSubscription.get('QueueUrl', None)
if not queue_arn or not queue_url:
raise InvalidEventException(
self.relative_id, "No QueueARN or QueueURL provided.")

queue_policy_logical_id = self.SqsSubscription.get('QueuePolicyLogicalId', None)
batch_size = self.SqsSubscription.get('BatchSize', None)
enabled = self.SqsSubscription.get('Enabled', None)

queue_policy = self._inject_sqs_queue_policy(self.Topic, queue_arn, queue_url, queue_policy_logical_id)
subscription = self._inject_subscription(
'sqs', queue.get_runtime_attr('arn'),
'sqs', queue_arn,
self.Topic, self.Region, self.FilterPolicy, function.resource_attributes
)
event_source = self._inject_sqs_event_source_mapping(function, role, queue_arn, batch_size, enabled)

resources = resources + self._inject_sqs_event_source_mapping(function, role, queue.get_runtime_attr('arn'))
resources.append(queue)
resources = resources + event_source
resources.append(queue_policy)
resources.append(subscription)
return resources
Expand All @@ -426,19 +455,19 @@ def _inject_subscription(self, protocol, endpoint, topic, region, filterPolicy,
def _inject_sqs_queue(self):
return SQSQueue(self.logical_id + 'Queue')

def _inject_sqs_event_source_mapping(self, function, role, queue_arn):
def _inject_sqs_event_source_mapping(self, function, role, queue_arn, batch_size=None, enabled=None):
event_source = SQS(self.logical_id + 'EventSourceMapping')
event_source.Queue = queue_arn
event_source.BatchSize = 10
event_source.Enabled = True
event_source.BatchSize = batch_size or 10
event_source.Enabled = enabled or True
return event_source.to_cloudformation(function=function, role=role)

def _inject_sqs_queue_policy(self, topic_arn, queue):
policy = SQSQueuePolicy(self.logical_id + 'QueuePolicy')
def _inject_sqs_queue_policy(self, topic_arn, queue_arn, queue_url, logical_id=None):
policy = SQSQueuePolicy(logical_id or self.logical_id + 'QueuePolicy')
policy.PolicyDocument = SQSQueuePolicies.sns_topic_send_message_role_policy(
topic_arn, queue.get_runtime_attr('arn')
topic_arn, queue_arn
)
policy.Queues = [queue.get_runtime_attr('queue_url')]
policy.Queues = [queue_url]
return policy


Expand Down
26 changes: 26 additions & 0 deletions tests/model/eventsources/test_sns_event_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,29 @@ def test_to_cloudformation_passes_the_filter_policy(self):

def test_to_cloudformation_throws_when_no_function(self):
self.assertRaises(TypeError, self.sns_event_source.to_cloudformation)

def test_to_cloudformation_throws_when_queue_url_or_queue_arn_not_given(self):
sqsSubscription = {
'BatchSize': 5
}
self.sns_event_source.SqsSubscription = sqsSubscription
self.assertRaises(TypeError, self.sns_event_source.to_cloudformation)

def test_to_cloudformation_when_sqs_subscription_disable(self):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add/update any of the existing unit tests for a condition where SqsSubscritption: False.

I've added on this line.

sqsSubscription = False
self.sns_event_source.SqsSubscription = sqsSubscription

resources = self.sns_event_source.to_cloudformation(
function=self.function)
self.assertEqual(len(resources), 2)
self.assertEqual(resources[0].resource_type,
'AWS::Lambda::Permission')
self.assertEqual(resources[1].resource_type,
'AWS::SNS::Subscription')

subscription = resources[1]
self.assertEqual(subscription.TopicArn, 'arn:aws:sns:MyTopic')
self.assertEqual(subscription.Protocol, 'lambda')
self.assertEqual(subscription.Endpoint, 'arn:aws:lambda:mock')
self.assertIsNone(subscription.Region)
self.assertIsNone(subscription.FilterPolicy)
31 changes: 31 additions & 0 deletions tests/translator/input/sns_existing_sqs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
Resources:
SaveNotificationFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: s3://sam-demo-bucket/notifications.zip
Handler: index.save_notification
Runtime: nodejs8.10
Events:
NotificationTopic:
Type: SNS
Properties:
Topic: !Ref Notifications
SqsSubscription:
QueueUrl: !Ref Queue
QueueArn: !GetAtt Queue.Arn
QueuePolicyLogicalId: NotificationA
BatchSize: 8
Enabled: true
FilterPolicy:
store:
- example_corp
price_usd:
- numeric:
- ">="
- 100

Notifications:
Type: AWS::SNS::Topic

Queue:
Type: AWS::SQS::Queue
28 changes: 28 additions & 0 deletions tests/translator/input/sns_outside_sqs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
Resources:
SaveNotificationFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: s3://sam-demo-bucket/notifications.zip
Handler: index.save_notification
Runtime: nodejs8.10
Events:
NotificationTopic:
Type: SNS
Properties:
Topic: !Ref Notifications
SqsSubscription:
QueueUrl: https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue
QueueArn: arn:aws:sqs:us-east-1:123456789012:MyQueue
QueuePolicyLogicalId: NotificationB
BatchSize: 8
Enabled: true
FilterPolicy:
store:
- example_corp
price_usd:
- numeric:
- ">="
- 100

Notifications:
Type: AWS::SNS::Topic
141 changes: 141 additions & 0 deletions tests/translator/output/aws-cn/sns_existing_sqs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
{
"Resources": {
"Queue": {
"Type": "AWS::SQS::Queue"
},
"Notifications": {
"Type": "AWS::SNS::Topic"
},
"NotificationA": {
"Type": "AWS::SQS::QueuePolicy",
"Properties": {
"Queues": [
{
"Ref": "Queue"
}
],
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": "sqs:SendMessage",
"Resource": {
"Fn::GetAtt": [
"Queue",
"Arn"
]
},
"Effect": "Allow",
"Condition": {
"ArnEquals": {
"aws:SourceArn": {
"Ref": "Notifications"
}
}
},
"Principal": "*"
}
]
}
}
},
"SaveNotificationFunctionNotificationTopic": {
"Type": "AWS::SNS::Subscription",
"Properties": {
"FilterPolicy": {
"price_usd": [
{
"numeric": [
">=",
100
]
}
],
"store": [
"example_corp"
]
},
"Endpoint": {
"Fn::GetAtt": [
"Queue",
"Arn"
]
},
"Protocol": "sqs",
"TopicArn": {
"Ref": "Notifications"
}
}
},
"SaveNotificationFunctionRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com"
]
}
}
]
},
"ManagedPolicyArns": [
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaSQSQueueExecutionRole"
],
"Tags": [
{
"Value": "SAM",
"Key": "lambda:createdBy"
}
]
}
},
"SaveNotificationFunctionNotificationTopicEventSourceMapping": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
"BatchSize": 8,
"Enabled": true,
"FunctionName": {
"Ref": "SaveNotificationFunction"
},
"EventSourceArn": {
"Fn::GetAtt": [
"Queue",
"Arn"
]
}
}
},
"SaveNotificationFunction": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Handler": "index.save_notification",
"Code": {
"S3Bucket": "sam-demo-bucket",
"S3Key": "notifications.zip"
},
"Role": {
"Fn::GetAtt": [
"SaveNotificationFunctionRole",
"Arn"
]
},
"Runtime": "nodejs8.10",
"Tags": [
{
"Value": "SAM",
"Key": "lambda:createdBy"
}
]
}
}
}
}
Loading