Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions docs/internals/generated_resources.rst
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,12 @@ Example:
Type: SNS
Properties:
Topic: arn:aws:sns:us-east-1:123456789012:my_topic
SqsSubscription: true
SqsSubscription:
QueuePolicyLogicalId: CustomQueuePolicyLogicalId
QueueArn: !GetAtt MyCustomQueue.Arn
QueueUrl: !Ref MyCustomQueue
BatchSize: 5
Enabled: true
...

Additional generated resources:
Expand All @@ -241,6 +246,24 @@ AWS::SQS::QueuePolicy MyFunction\ **MyTrigger**\ QueuePolicy

NOTE: ``AWS::Lambda::Permission`` resources are only generated if SqsSubscription is ``false``. ``AWS::Lambda::EventSourceMapping``, ``AWS::SQS::Queue``, ``AWS::SQS::QueuePolicy`` resources are only generated if SqsSubscription is ``true``.

``AWS::SQS::Queue`` resources are only generated if SqsSubscription is ``true``.

Example:

.. code:: yaml

MyFunction:
Type: AWS::Serverless::Function
Properties:
...
Events:
MyTrigger:
Type: SNS
Properties:
Topic: arn:aws:sns:us-east-1:123456789012:my_topic
SqsSubscription: true
...

Kinesis
^^^^^^^

Expand Down Expand Up @@ -410,9 +433,9 @@ AWS::ApiGateway::Stage MyApi\ **dev**\ Stage
AWS::ApiGateway::Deployment MyApi\ Deployment\ *SHA* (10 Digits of SHA256 of DefinitionUri or DefinitionBody value)
================================== ================================

NOTE: By just specifying AWS::Serverless::Api resource, SAM will *not* add permission for API Gateway to invoke the
NOTE: By just specifying AWS::Serverless::Api resource, SAM will *not* add permission for API Gateway to invoke the
the Lambda Function backing the APIs. You should explicitly re-define all APIs under ``Events`` section of the
AWS::Serverless::Function resource but include a `RestApiId` property that references the AWS::Serverless::Api
AWS::Serverless::Function resource but include a `RestApiId` property that references the AWS::Serverless::Api
resource. SAM will add permission for these APIs to invoke the function.

Example:
Expand Down
53 changes: 39 additions & 14 deletions samtranslator/model/eventsources/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ class SNS(PushEventSource):
'Topic': PropertyType(True, is_str()),
'Region': PropertyType(False, is_str()),
'FilterPolicy': PropertyType(False, dict_of(is_str(), list_of(one_of(is_str(), is_type(dict))))),
'SqsSubscription': PropertyType(False, is_type(bool))
'SqsSubscription': PropertyType(False, one_of(is_type(bool), is_type(dict)))
}

def to_cloudformation(self, **kwargs):
Expand All @@ -392,17 +392,42 @@ def to_cloudformation(self, **kwargs):
)
return [self._construct_permission(function, source_arn=self.Topic), subscription]

# SNS -> SQS -> Lambda
# SNS -> SQS(Create New) -> Lambda
if isinstance(self.SqsSubscription, bool):
Copy link
Contributor Author

@53ningen 53ningen Nov 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if isinstance(self.SqsSubscription, bool) is True, then self.SqsSubscription is always True, because we checked if self.SqsSubscription is False on L388

resources = []
queue = self._inject_sqs_queue()
queue_arn = queue.get_runtime_attr('arn')
queue_url = queue.get_runtime_attr('queue_url')

queue_policy = self._inject_sqs_queue_policy(self.Topic, queue_arn, queue_url)
subscription = self._inject_subscription(
'sqs', queue_arn,
self.Topic, self.Region, self.FilterPolicy, function.resource_attributes
)
event_source = self._inject_sqs_event_source_mapping(function, role, queue_arn)

resources = resources + event_source
resources.append(queue)
resources.append(queue_policy)
resources.append(subscription)
return resources

# SNS -> SQS(Existing) -> Lambda
resources = []
queue = self._inject_sqs_queue()
queue_policy = self._inject_sqs_queue_policy(self.Topic, queue)
queue_arn = self.SqsSubscription.get('QueueArn', None)
queue_url = self.SqsSubscription.get('QueueUrl', None)
queue_policy_logical_id = self.SqsSubscription.get('QueuePolicyLogicalId', None)
batch_size = self.SqsSubscription.get('BatchSize', None)
enabled = self.SqsSubscription.get('Enabled', None)

queue_policy = self._inject_sqs_queue_policy(self.Topic, queue_arn, queue_url, queue_policy_logical_id)
subscription = self._inject_subscription(
'sqs', queue.get_runtime_attr('arn'),
'sqs', queue_arn,
self.Topic, self.Region, self.FilterPolicy, function.resource_attributes
)
event_source = self._inject_sqs_event_source_mapping(function, role, queue_arn, batch_size, enabled)

resources = resources + self._inject_sqs_event_source_mapping(function, role, queue.get_runtime_attr('arn'))
resources.append(queue)
resources = resources + event_source
resources.append(queue_policy)
resources.append(subscription)
return resources
Expand All @@ -425,19 +450,19 @@ def _inject_subscription(self, protocol, endpoint, topic, region, filterPolicy,
def _inject_sqs_queue(self):
return SQSQueue(self.logical_id + 'Queue')

def _inject_sqs_event_source_mapping(self, function, role, queue_arn):
def _inject_sqs_event_source_mapping(self, function, role, queue_arn, batch_size=None, enabled=None):
event_source = SQS(self.logical_id + 'EventSourceMapping')
event_source.Queue = queue_arn
event_source.BatchSize = 10
event_source.Enabled = True
event_source.BatchSize = batch_size or 10
event_source.Enabled = enabled or True
return event_source.to_cloudformation(function=function, role=role)

def _inject_sqs_queue_policy(self, topic_arn, queue):
policy = SQSQueuePolicy(self.logical_id + 'QueuePolicy')
def _inject_sqs_queue_policy(self, topic_arn, queue_arn, queue_url, logical_id=None):
policy = SQSQueuePolicy(logical_id or self.logical_id + 'QueuePolicy')
policy.PolicyDocument = SQSQueuePolicies.sns_topic_send_message_role_policy(
topic_arn, queue.get_runtime_attr('arn')
topic_arn, queue_arn
)
policy.Queues = [queue.get_runtime_attr('queue_url')]
policy.Queues = [queue_url]
return policy


Expand Down
31 changes: 31 additions & 0 deletions tests/translator/input/sns_existing_sqs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
Resources:
SaveNotificationFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: s3://sam-demo-bucket/notifications.zip
Handler: index.save_notification
Runtime: nodejs8.10
Events:
NotificationTopic:
Type: SNS
Properties:
Topic: !Ref Notifications
SqsSubscription:
QueueUrl: !Ref Queue
QueueArn: !GetAtt Queue.Arn
QueuePolicyLogicalId: NotificationA
BatchSize: 8
Enabled: true
FilterPolicy:
store:
- example_corp
price_usd:
- numeric:
- ">="
- 100

Notifications:
Type: AWS::SNS::Topic

Queue:
Type: AWS::SQS::Queue
28 changes: 28 additions & 0 deletions tests/translator/input/sns_outside_sqs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
Resources:
SaveNotificationFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: s3://sam-demo-bucket/notifications.zip
Handler: index.save_notification
Runtime: nodejs8.10
Events:
NotificationTopic:
Type: SNS
Properties:
Topic: !Ref Notifications
SqsSubscription:
QueueUrl: https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue
QueueArn: arn:aws:sqs:us-east-1:123456789012:MyQueue
QueuePolicyLogicalId: NotificationB
BatchSize: 8
Enabled: true
FilterPolicy:
store:
- example_corp
price_usd:
- numeric:
- ">="
- 100

Notifications:
Type: AWS::SNS::Topic
141 changes: 141 additions & 0 deletions tests/translator/output/aws-cn/sns_existing_sqs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
{
"Resources": {
"Queue": {
"Type": "AWS::SQS::Queue"
},
"Notifications": {
"Type": "AWS::SNS::Topic"
},
"NotificationA": {
"Type": "AWS::SQS::QueuePolicy",
"Properties": {
"Queues": [
{
"Ref": "Queue"
}
],
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": "sqs:SendMessage",
"Resource": {
"Fn::GetAtt": [
"Queue",
"Arn"
]
},
"Effect": "Allow",
"Condition": {
"ArnEquals": {
"aws:SourceArn": {
"Ref": "Notifications"
}
}
},
"Principal": "*"
}
]
}
}
},
"SaveNotificationFunctionNotificationTopic": {
"Type": "AWS::SNS::Subscription",
"Properties": {
"FilterPolicy": {
"price_usd": [
{
"numeric": [
">=",
100
]
}
],
"store": [
"example_corp"
]
},
"Endpoint": {
"Fn::GetAtt": [
"Queue",
"Arn"
]
},
"Protocol": "sqs",
"TopicArn": {
"Ref": "Notifications"
}
}
},
"SaveNotificationFunctionRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com"
]
}
}
]
},
"ManagedPolicyArns": [
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaSQSQueueExecutionRole"
],
"Tags": [
{
"Value": "SAM",
"Key": "lambda:createdBy"
}
]
}
},
"SaveNotificationFunctionNotificationTopicEventSourceMapping": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
"BatchSize": 8,
"Enabled": true,
"FunctionName": {
"Ref": "SaveNotificationFunction"
},
"EventSourceArn": {
"Fn::GetAtt": [
"Queue",
"Arn"
]
}
}
},
"SaveNotificationFunction": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Handler": "index.save_notification",
"Code": {
"S3Bucket": "sam-demo-bucket",
"S3Key": "notifications.zip"
},
"Role": {
"Fn::GetAtt": [
"SaveNotificationFunctionRole",
"Arn"
]
},
"Runtime": "nodejs8.10",
"Tags": [
{
"Value": "SAM",
"Key": "lambda:createdBy"
}
]
}
}
}
}
Loading