From 3a011e2582bcf46c72b63ec7a4f39e234351975f Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Mon, 6 Apr 2020 18:09:02 -0400 Subject: [PATCH 01/21] Handle HTTP requests. --- provider/Cargo.toml | 1 + provider/src/lib.rs | 49 ++++++++++++++++++++++++++++++++++++--------- 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/provider/Cargo.toml b/provider/Cargo.toml index 1fe1a6e..9192e30 100644 --- a/provider/Cargo.toml +++ b/provider/Cargo.toml @@ -15,3 +15,4 @@ reqwest = { version = "0.10.4", features = ["blocking", "json"] } serde = "1.0.105" serde_json = "1.0.50" codec = { path = "../codec" } +aws_lambda_events = "0.2.7" diff --git a/provider/src/lib.rs b/provider/src/lib.rs index ffb3ace..7862a8e 100644 --- a/provider/src/lib.rs +++ b/provider/src/lib.rs @@ -23,7 +23,9 @@ extern crate log; #[macro_use] extern crate wascc_codec; +use aws_lambda_events::event::alb::AlbTargetGroupRequest; use env_logger; +use serde_json; use std::collections::HashMap; use std::env; use wascc_codec::capabilities::{CapabilityProvider, Dispatcher, NullDispatcher}; @@ -188,18 +190,21 @@ impl Poller { // Get next event. debug!("Poller get next event"); let event = match self.client.next_invocation_event() { - Err(err) => { - error!("{}", err); + Err(e) => { + error!("{}", e); continue; } Ok(evt) => match evt { - None => continue, + None => { + warn!("No event"); + continue; + } Some(event) => event, }, }; let request_id = match event.request_id() { None => { - warn!("Missing request ID"); + warn!("No request ID"); continue; } Some(request_id) => request_id, @@ -210,17 +215,23 @@ impl Poller { env::set_var("_X_AMZN_TRACE_ID", trace_id); } + // Check for HTTP requests. + let msg = match self.actor_message(event.body()) { + Ok(msg) => msg, + Err(e) => { + // TODO Should POST to invocation error endpoint. + error!("{}", e); + continue; + } + }; + // Call handler. debug!("Poller call handler"); let handler_resp = { - let event = codec::Event { - body: event.body().to_vec(), - }; - let buf = serialize(event).unwrap(); let lock = self.dispatcher.read().unwrap(); lock.dispatch( &format!("{}!{}", &self.module_id, codec::OP_HANDLE_EVENT), - &buf, + &msg, ) }; // Handle response or error. @@ -251,4 +262,24 @@ impl Poller { fn shutdown(&self) -> bool { *self.shutdown.read().unwrap().get(&self.module_id).unwrap() } + + /// Returns the message to be dispatched to the target actor. + fn actor_message(&self, body: &Vec) -> anyhow::Result> { + match serde_json::from_slice(body) { + Ok(r) => return self.serialize_alb(r), + _ => {} + } + + let event = codec::Event { + body: body.to_vec(), + }; + let buf = + serialize(event).map_err(|e| anyhow!("Failed to serialize actor message: {}", e))?; + + Ok(buf) + } + + fn serialize_alb(&self, req: AlbTargetGroupRequest) -> anyhow::Result> { + Ok(vec![]) + } } From 0499d1e2949c8948a9feffa4d6a5b6988d8d9fed Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Tue, 7 Apr 2020 16:38:22 -0400 Subject: [PATCH 02/21] Handle HTTP requests. --- provider/src/lambda.rs | 5 +- provider/src/lib.rs | 115 +++++++++++++++++++++++++---------------- 2 files changed, 72 insertions(+), 48 deletions(-) diff --git a/provider/src/lambda.rs b/provider/src/lambda.rs index 3eb9c91..38f7f4c 100644 --- a/provider/src/lambda.rs +++ b/provider/src/lambda.rs @@ -18,7 +18,6 @@ use reqwest::header::USER_AGENT; use serde_json; -use std::error::Error; /// Represents an invocation event. pub(crate) struct InvocationEvent { @@ -35,7 +34,7 @@ pub(crate) struct InvocationResponse { /// Represents an invocation error. pub(crate) struct InvocationError { - error: Box, + error: anyhow::Error, request_id: String, } @@ -181,7 +180,7 @@ impl InvocationResponse { impl InvocationError { /// Creates a new `InvocationError` with the specified error and request ID. - pub fn new(error: Box, request_id: &str) -> Self { + pub fn new(error: anyhow::Error, request_id: &str) -> Self { InvocationError { error: error, request_id: request_id.into(), diff --git a/provider/src/lib.rs b/provider/src/lib.rs index e32b878..dd55b5e 100644 --- a/provider/src/lib.rs +++ b/provider/src/lib.rs @@ -211,68 +211,93 @@ impl Poller { env::set_var("_X_AMZN_TRACE_ID", trace_id); } - // Check for HTTP requests. - let msg = match self.actor_message(event.body()) { - Ok(msg) => msg, - Err(e) => { - // TODO Should POST to invocation error endpoint. - error!("{}", e); - continue; - } - }; - - // Call handler. - debug!("Poller call handler"); - let handler_resp = { - let lock = self.dispatcher.read().unwrap(); - lock.dispatch(&self.module_id, codec::OP_HANDLE_EVENT, &msg) - }; - // Handle response or error. - match handler_resp { - Ok(r) => { - let r = deserialize::(r.as_slice()).unwrap(); - let resp = lambda::InvocationResponse::new(r.body, request_id); - debug!("Poller send response"); - match self.client.send_invocation_response(resp) { - Ok(_) => {} - Err(e) => error!("Unable to send invocation response: {}", e), - } - } - Err(e) => { - error!("Guest failed to handle Lambda event: {}", e); - let err = lambda::InvocationError::new(e, request_id); - debug!("Poller send error"); - match self.client.send_invocation_error(err) { - Ok(_) => {} - Err(e) => error!("Unable to send invocation error: {}", e), - } - } + // Try first to dispatch as an HTTP request. + if self.try_dispatch_http_request(event.body(), request_id) { + continue; } + + self.dispatch_lambda_event(event.body(), request_id); + } + } + + /// Sends an invocation error. + fn send_invocation_error(&self, e: anyhow::Error, request_id: &str) { + let err = lambda::InvocationError::new(e, request_id); + debug!("Poller send error"); + match self.client.send_invocation_error(err) { + Ok(_) => {} + Err(e) => error!("Unable to send invocation error: {}", e), + } + } + + /// Sends an invocation response. + fn send_invocation_response(&self, body: Vec, request_id: &str) { + let resp = lambda::InvocationResponse::new(body, request_id); + debug!("Poller send response"); + match self.client.send_invocation_response(resp) { + Ok(_) => {} + Err(e) => error!("Unable to send invocation response: {}", e), } } - // Returns whether the shutdown flag is set. + /// Returns whether the shutdown flag is set. fn shutdown(&self) -> bool { *self.shutdown.read().unwrap().get(&self.module_id).unwrap() } - /// Returns the message to be dispatched to the target actor. - fn actor_message(&self, body: &Vec) -> anyhow::Result> { + /// Attempts to dispatch an HTTP request to an actor. + fn try_dispatch_http_request(&self, body: &Vec, request_id: &str) -> bool { match serde_json::from_slice(body) { - Ok(r) => return self.serialize_alb(r), + Ok(request) => return self.dispatch_alb_http_request(request, request_id), _ => {} } + false + } + + /// Dispatches a Lambda event to an actor. + fn dispatch_lambda_event(&self, body: &Vec, request_id: &str) { let event = codec::Event { body: body.to_vec(), }; - let buf = - serialize(event).map_err(|e| anyhow!("Failed to serialize actor message: {}", e))?; + let msg = match serialize(event) { + Ok(msg) => msg, + Err(e) => { + let err = anyhow!("Failed to serialize actor message: {}", e); + error!("{}", err); + self.send_invocation_error(err, request_id); + + return; + } + }; - Ok(buf) + // Call handler. + debug!("Poller call handler"); + let handler_resp = { + let lock = self.dispatcher.read().unwrap(); + lock.dispatch(&self.module_id, codec::OP_HANDLE_EVENT, &msg) + }; + // Handle response or error. + match handler_resp { + Ok(r) => { + let resp = deserialize::(r.as_slice()).unwrap(); + self.send_invocation_response(resp.body, request_id); + } + Err(e) => { + let err = anyhow!("Guest failed to handle Lambda event: {}", e); + error!("{}", err); + self.send_invocation_error(err, request_id); + } + } } - fn serialize_alb(&self, req: AlbTargetGroupRequest) -> anyhow::Result> { - Ok(vec![]) + /// Dispatches an ALB HTTP request to an actor. + fn dispatch_alb_http_request(&self, request: AlbTargetGroupRequest, request_id: &str) -> bool { + true + } + + /// Dispatches an HTTP request to an actor. + fn dispatch_http_request(&self, request: wascc_codec::http::Request, request_id: &str) -> bool { + true } } From 29f25e0a6483e5132356ade8d2e0e35775ba4efb Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Wed, 8 Apr 2020 17:19:47 -0400 Subject: [PATCH 03/21] Dispatch ALB target group request. --- provider/Cargo.toml | 2 + provider/src/lib.rs | 162 +++++++++++++++++++++++++++++++++++--------- 2 files changed, 131 insertions(+), 33 deletions(-) diff --git a/provider/Cargo.toml b/provider/Cargo.toml index 0929d2f..4a887c5 100644 --- a/provider/Cargo.toml +++ b/provider/Cargo.toml @@ -16,3 +16,5 @@ serde = "1.0.105" serde_json = "1.0.50" codec = { path = "../codec" } aws_lambda_events = "0.2.7" +base64 = "0.12.0" +url = "2.1.1" diff --git a/provider/src/lib.rs b/provider/src/lib.rs index dd55b5e..5c1d910 100644 --- a/provider/src/lib.rs +++ b/provider/src/lib.rs @@ -21,10 +21,10 @@ extern crate anyhow; #[macro_use] extern crate log; -use aws_lambda_events::event::alb::AlbTargetGroupRequest; +use aws_lambda_events::event::alb::{AlbTargetGroupRequest, AlbTargetGroupResponse}; use env_logger; use serde_json; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::env; use wascc_codec::capabilities::{CapabilityProvider, Dispatcher, NullDispatcher}; use wascc_codec::core::{CapabilityConfiguration, OP_BIND_ACTOR, OP_REMOVE_ACTOR}; @@ -50,6 +50,7 @@ struct Poller { dispatcher: Arc>>, module_id: String, shutdown: Arc>>, + dispatched: HashSet, } impl Default for AwsLambdaRuntimeProvider { @@ -93,7 +94,7 @@ impl AwsLambdaRuntimeProvider { // Initialize this poller's shutdown flag. shutdown.write().unwrap().insert(module_id.clone(), false); - let poller = Poller::new(&module_id, &endpoint, dispatcher, shutdown); + let mut poller = Poller::new(&module_id, &endpoint, dispatcher, shutdown); poller.run(); }); @@ -173,11 +174,12 @@ impl Poller { dispatcher, module_id: module_id.into(), shutdown, + dispatched: HashSet::new(), } } /// Runs the poller until shutdown. - fn run(&self) { + fn run(&mut self) { loop { if self.shutdown() { break; @@ -211,12 +213,42 @@ impl Poller { env::set_var("_X_AMZN_TRACE_ID", trace_id); } - // Try first to dispatch as an HTTP request. - if self.try_dispatch_http_request(event.body(), request_id) { - continue; + if self.dispatched.contains(request_id) { + self.send_invocation_error( + anyhow!("Already dispatched: {}", request_id), + request_id, + ); } - self.dispatch_lambda_event(event.body(), request_id); + // Try first to dispatch as an HTTP request. + match self.try_dispatch_http_request(event.body(), request_id) { + // The event couldn't be converted into an HTTP request. + // Dispatch as a Lambda raw event. + Ok(body) if body.is_empty() => {} + // The event could be converted to an HTTP request and was dispatched succesfully. + Ok(body) => { + self.send_invocation_response(body, request_id); + continue; + } + // The event could be converted to an HTTP request and was + // dispatched succesfully but there was an error after dispatch. + Err(e) if self.dispatched.contains(request_id) => { + error!("{}", e); + self.send_invocation_error(e, request_id); + continue; + } + // The event could be converted to an HTTP request but wasn't dispatched succesfully. + // Dispatch as a Lambda raw event. + Err(e) => warn!("{}", e), + }; + + match self.try_dispatch_lambda_event(event.body(), request_id) { + Ok(body) => self.send_invocation_response(body, request_id), + Err(e) => { + error!("{}", e); + self.send_invocation_error(e, request_id) + } + } } } @@ -246,33 +278,39 @@ impl Poller { } /// Attempts to dispatch an HTTP request to an actor. - fn try_dispatch_http_request(&self, body: &Vec, request_id: &str) -> bool { + fn try_dispatch_http_request( + &mut self, + body: &Vec, + request_id: &str, + ) -> anyhow::Result> { match serde_json::from_slice(body) { - Ok(request) => return self.dispatch_alb_http_request(request, request_id), + Ok(request) => { + let response = self.dispatch_alb_http_request(request, request_id)?; + return serde_json::to_vec(&response) + .map_err(|e| anyhow!("Failed to serialize ALB response: {}", e)); + } _ => {} - } + }; - false + Ok(vec![]) } - /// Dispatches a Lambda event to an actor. - fn dispatch_lambda_event(&self, body: &Vec, request_id: &str) { + /// Attempts to dispatch a Lambda raw event to an actor. + fn try_dispatch_lambda_event( + &mut self, + body: &Vec, + request_id: &str, + ) -> anyhow::Result> { let event = codec::Event { body: body.to_vec(), }; let msg = match serialize(event) { Ok(msg) => msg, - Err(e) => { - let err = anyhow!("Failed to serialize actor message: {}", e); - error!("{}", err); - self.send_invocation_error(err, request_id); - - return; - } + Err(e) => return Err(anyhow!("Failed to serialize Lambda raw event: {}", e)), }; // Call handler. - debug!("Poller call handler"); + info!("Poller dispatch Lambda raw event"); let handler_resp = { let lock = self.dispatcher.read().unwrap(); lock.dispatch(&self.module_id, codec::OP_HANDLE_EVENT, &msg) @@ -280,24 +318,82 @@ impl Poller { // Handle response or error. match handler_resp { Ok(r) => { - let resp = deserialize::(r.as_slice()).unwrap(); - self.send_invocation_response(resp.body, request_id); - } - Err(e) => { - let err = anyhow!("Guest failed to handle Lambda event: {}", e); - error!("{}", err); - self.send_invocation_error(err, request_id); + // Record that the request was dispatched. + self.dispatched.insert(request_id.into()); + + match deserialize::(r.as_slice()) { + Ok(resp) => Ok(resp.body), + Err(e) => Err(anyhow!("Failed to deserialize HTTP response: {}", e)), + } } + Err(e) => Err(anyhow!("Guest failed to handle Lambda event: {}", e)), } } /// Dispatches an ALB HTTP request to an actor. - fn dispatch_alb_http_request(&self, request: AlbTargetGroupRequest, request_id: &str) -> bool { - true + fn dispatch_alb_http_request( + &mut self, + request: AlbTargetGroupRequest, + request_id: &str, + ) -> anyhow::Result { + let query_string = url::form_urlencoded::Serializer::new(String::new()) + .extend_pairs(request.query_string_parameters.iter()) + .finish(); + let request = wascc_codec::http::Request { + method: request + .http_method + .ok_or(anyhow!("Missing method in ALB request"))?, + path: request.path.ok_or(anyhow!("Missing path in ALB request"))?, + query_string, + header: request.headers, + body: match request.body { + Some(s) if request.is_base64_encoded => base64::decode(s)?, + Some(s) => s.into_bytes(), + None => vec![], + }, + }; + + info!("Dispatching ALB target group request"); + let response = self.dispatch_http_request(request, request_id)?; + Ok(AlbTargetGroupResponse { + status_code: response.status_code as i64, + status_description: Some(response.status), + headers: response.header, + multi_value_headers: HashMap::new(), + body: Some(base64::encode(response.body)), + is_base64_encoded: true, + }) } /// Dispatches an HTTP request to an actor. - fn dispatch_http_request(&self, request: wascc_codec::http::Request, request_id: &str) -> bool { - true + fn dispatch_http_request( + &mut self, + request: wascc_codec::http::Request, + request_id: &str, + ) -> anyhow::Result { + let msg = match serialize(request) { + Ok(msg) => msg, + Err(e) => return Err(anyhow!("Failed to serialize HTTP request: {}", e)), + }; + + // Call handler. + info!("Poller dispatch HTTP request"); + let handler_resp = { + let lock = self.dispatcher.read().unwrap(); + lock.dispatch(&self.module_id, wascc_codec::http::OP_HANDLE_REQUEST, &msg) + }; + // Return response or error. + match handler_resp { + Ok(resp) => { + // Record that the request was dispatched. + self.dispatched.insert(request_id.into()); + + match deserialize::(resp.as_slice()) { + Ok(resp) => Ok(resp), + Err(e) => Err(anyhow!("Failed to deserialize HTTP response: {}", e)), + } + } + Err(e) => Err(anyhow!("Guest failed to handle HTTP request: {}", e)), + } } } From fa34d976988b9031b2a16b6f0164f0075659eba9 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Wed, 8 Apr 2020 17:52:06 -0400 Subject: [PATCH 04/21] Add API Gateway example. --- examples/README.md | 3 +- examples/apigw/.gitignore | 7 ++ examples/apigw/Makefile | 9 +++ examples/apigw/README.md | 35 ++++++++ examples/apigw/main.tf | 153 +++++++++++++++++++++++++++++++++++ examples/apigw/manifest.yaml | 5 ++ 6 files changed, 211 insertions(+), 1 deletion(-) create mode 100644 examples/apigw/.gitignore create mode 100644 examples/apigw/Makefile create mode 100644 examples/apigw/README.md create mode 100644 examples/apigw/main.tf create mode 100644 examples/apigw/manifest.yaml diff --git a/examples/README.md b/examples/README.md index 2529666..44d9f51 100644 --- a/examples/README.md +++ b/examples/README.md @@ -9,7 +9,8 @@ Examples of using the [waSCC](https://wascc.dev/) runtime for AWS Lambda. ## Examples -* [`custom`](custom/README.md) A simple example that processes a custom lambda event +* [`custom`](custom/README.md) A simple example that processes a custom Lambda event +* [`apigw`](apigw/README.md) An example that receives an HTTP request via API Gateway * [`sqs`](sqs/README.md) Receive an SQS message and publish a reply ## Notes diff --git a/examples/apigw/.gitignore b/examples/apigw/.gitignore new file mode 100644 index 0000000..d7c2077 --- /dev/null +++ b/examples/apigw/.gitignore @@ -0,0 +1,7 @@ +.terraform/ + +terraform.tfstate +terraform.tfstate.backup + +app.zip +output.json diff --git a/examples/apigw/Makefile b/examples/apigw/Makefile new file mode 100644 index 0000000..bf927af --- /dev/null +++ b/examples/apigw/Makefile @@ -0,0 +1,9 @@ +.PHONY: all apply + +all: apply + +apply: app.zip + terraform12 apply + +app.zip: manifest.yaml actor/target/wasm32-unknown-unknown/release/wascc_actor_hello_lambda_signed.wasm + zip -j $@ $^ diff --git a/examples/apigw/README.md b/examples/apigw/README.md new file mode 100644 index 0000000..14a4a9a --- /dev/null +++ b/examples/apigw/README.md @@ -0,0 +1,35 @@ +# API Gateway HTTP Request Invocation + +### Build + +Build the [sample waSCC actor](actor/README.md). + +```console +$ cd actor +$ make release +$ cd .. +``` + +### Deploy + +This examples uses the `wascc-slim` Lambda layer. +See [`layers`](../../layers/README/md) for instructions on building the waSCC runtime Lambda layers. + +```console +$ terraform init +``` + +Set AWS environment variables for your authenticated session. + +```console +$ make +``` + +### Test + + +### Known Issues + +It works on my machine! + +The public key of the actor in `manifest.yaml` is the value I use and will have to be changed when you generate your own keys. diff --git a/examples/apigw/main.tf b/examples/apigw/main.tf new file mode 100644 index 0000000..301a84a --- /dev/null +++ b/examples/apigw/main.tf @@ -0,0 +1,153 @@ +// +// waSCC runtime for AWS Lambda example configuration. +// + +terraform { + required_version = ">= 0.12.19" +} + +provider "aws" { + version = ">= 2.50.0" +} + +// +// Data sources for current AWS account ID, partition and region. +// + +data "aws_caller_identity" "current" {} + +data "aws_partition" "current" {} + +data "aws_region" "current" {} + +// +// API Gateway resources. +// + +resource "aws_api_gateway_rest_api" "example" { + name = "waSCC-example-apigw" +} +resource "aws_api_gateway_resource" "example" { + path_part = "helloworld" + parent_id = aws_api_gateway_rest_api.example.root_resource_id + rest_api_id = aws_api_gateway_rest_api.example.id +} +resource "aws_api_gateway_method" "example" { + rest_api_id = aws_api_gateway_rest_api.example.id + resource_id = aws_api_gateway_resource.example.id + http_method = "GET" + authorization = "NONE" +} +resource "aws_api_gateway_integration" "example" { + rest_api_id = aws_api_gateway_rest_api.example.id + resource_id = aws_api_gateway_resource.example.id + http_method = aws_api_gateway_method.example.http_method + integration_http_method = "POST" + type = "AWS_PROXY" + uri = aws_lambda_function.example.invoke_arn +} +resource "aws_api_gateway_deployment" "example" { + depends_on = [aws_api_gateway_integration.example] + rest_api_id = aws_api_gateway_rest_api.example.id + stage_name = "test" +} + +// +// Lambda resources. +// + +data "aws_lambda_layer_version" "slim" { + layer_name = "wascc-slim" +} + +resource "aws_lambda_function" "example" { + filename = "${path.module}/app.zip" + source_code_hash = filebase64sha256("${path.module}/app.zip") + function_name = "waSCC-example-apigw" + role = aws_iam_role.example.arn + handler = "doesnt.matter" + runtime = "provided" + memory_size = 256 + timeout = 90 + + layers = [data.aws_lambda_layer_version.slim.arn] + + environment { + variables = { + RUST_BACKTRACE = "1" + RUST_LOG = "info,cranelift_wasm=warn" + } + } +} + +resource "aws_lambda_permission" "example" { + action = "lambda:InvokeFunction" + function_name = aws_lambda_function.example.function_name + principal = "apigateway.amazonaws.com" + source_arn = "arn:${data.aws_partition.current.partition}:execute-api:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:${aws_api_gateway_rest_api.example.id}/*/${aws_api_gateway_method.example.http_method}${aws_api_gateway_resource.example.path}" +} + +// +// IAM resources. +// + +resource "aws_iam_role" "example" { + name = "waSCC-example-apigw-Lambda-role" + + assume_role_policy = < Date: Fri, 10 Apr 2020 14:44:26 -0400 Subject: [PATCH 05/21] Add 'http' module for Lambda events that correspond to standard actor HTTP requests. --- provider/src/http.rs | 88 ++++++++++++++++++++++++++++++++++++++++++++ provider/src/lib.rs | 1 + 2 files changed, 89 insertions(+) create mode 100644 provider/src/http.rs diff --git a/provider/src/http.rs b/provider/src/http.rs new file mode 100644 index 0000000..44af8d2 --- /dev/null +++ b/provider/src/http.rs @@ -0,0 +1,88 @@ +// Copyright 2015-2020 Capital One Services, LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +// waSCC AWS Lambda Runtime Provider +// + +use aws_lambda_events::event::alb; + +use std::collections::HashMap; +use std::convert::{TryFrom, TryInto}; + +struct AlbTargetGroupRequestWrapper(alb::AlbTargetGroupRequest); + +impl TryInto for AlbTargetGroupRequestWrapper { + type Error = anyhow::Error; + + /// Attempts conversion of an ALB request to an actor's HTTP request. + fn try_into(self) -> anyhow::Result { + let query_string = query_string(self.0.query_string_parameters); + + Ok(wascc_codec::http::Request { + method: self + .0 + .http_method + .ok_or(anyhow!("Missing method in ALB request"))?, + path: self.0.path.ok_or(anyhow!("Missing path in ALB request"))?, + query_string, + header: self.0.headers, + body: match self.0.body { + Some(s) if self.0.is_base64_encoded => base64::decode(s)?, + Some(s) => s.into_bytes(), + None => vec![], + }, + }) + } +} + +struct AlbTargetGroupResponseWrapper(alb::AlbTargetGroupResponse); + +impl TryFrom for AlbTargetGroupResponseWrapper { + type Error = anyhow::Error; + + /// Attempts conversion of an actor's HTTP response to an ALB response. + fn try_from(response: wascc_codec::http::Response) -> anyhow::Result { + let (body, is_base64_encoded) = body_string(response.body); + + Ok(AlbTargetGroupResponseWrapper(alb::AlbTargetGroupResponse { + status_code: response.status_code as i64, + status_description: Some(response.status), + headers: response.header, + multi_value_headers: HashMap::new(), + body, + is_base64_encoded, + })) + } +} + +/// Returns a string representation of the specified bytes and +/// a flag indicating whether or not the string is base64 encoded. +fn body_string(bytes: Vec) -> (Option, bool) { + if bytes.is_empty() { + return (None, false); + } + + match std::str::from_utf8(&bytes) { + Ok(s) => (Some(s.into()), false), + Err(_) => (Some(base64::encode(bytes)), true), + } +} + +/// Returns a string representation of the specified query string parameters. +fn query_string(qs: HashMap) -> String { + url::form_urlencoded::Serializer::new(String::new()) + .extend_pairs(qs.iter()) + .finish() +} diff --git a/provider/src/lib.rs b/provider/src/lib.rs index 35c0b93..cb7caec 100644 --- a/provider/src/lib.rs +++ b/provider/src/lib.rs @@ -33,6 +33,7 @@ use std::error::Error; use std::sync::{Arc, RwLock}; use std::thread; +mod http; mod lambda; const CAPABILITY_ID: &str = "awslambda:runtime"; From ffeb30793a46f2b40489d4ec68c167764883167d Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Fri, 10 Apr 2020 15:36:05 -0400 Subject: [PATCH 06/21] Add 'dispatch' module. --- provider/src/dispatch.rs | 115 +++++++++++++++++++++++++++++++++++++++ provider/src/lib.rs | 1 + 2 files changed, 116 insertions(+) create mode 100644 provider/src/dispatch.rs diff --git a/provider/src/dispatch.rs b/provider/src/dispatch.rs new file mode 100644 index 0000000..496d266 --- /dev/null +++ b/provider/src/dispatch.rs @@ -0,0 +1,115 @@ +// Copyright 2015-2020 Capital One Services, LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +// waSCC AWS Lambda Runtime Provider +// + +use serde::{Deserialize, Serialize}; +use wascc_codec::{deserialize, serialize}; + +use std::sync::{Arc, RwLock}; + +/// Represents dispatching a request to an actor and returning its response. +pub trait Dispatcher<'de> { + /// The request type. + type T: Serialize; + + /// The response type. + type U: Deserialize<'de>; + + /// The operation this dispatcher dispatches. + const OP: &'static str; + + /// Dispatches a request to the specified actor using the specified dispatcher. + fn dispatch(&self, actor: &str, request: Self::T) -> anyhow::Result { + let msg = match serialize(request) { + Ok(msg) => msg, + Err(e) => return Err(anyhow!("Failed to serialize actor's request: {}", e)), + }; + + let handler_resp = { + let dispatcher = self.dispatcher(); + let lock = dispatcher.read().unwrap(); + lock.dispatch(actor, Self::OP, &msg) + }; + let resp = handler_resp + .map_err(|e| anyhow!("Guest {} failed to handle {}: {}", actor, Self::OP, e))?; + + deserialize::(resp.as_slice()) + .map_err(|e| anyhow!("Failed to deserialize actor's response: {}", e)) + } + + /// Returns a dispatcher. + fn dispatcher(&self) -> Arc>>; + + /// Attempts to dispatch a Lambda invocation event, returning an invocation response. + fn try_dispatch(&self, actor: &str, body: &[u8]) -> anyhow::Result>; +} + +/// Dispatches HTTP requests. +pub struct HttpDispatcher { + dispatcher: Arc>>, +} + +impl HttpDispatcher { + fn new(dispatcher: Arc>>) -> Self { + HttpDispatcher { dispatcher } + } +} + +impl Dispatcher<'_> for HttpDispatcher { + type T = wascc_codec::http::Request; + type U = wascc_codec::http::Response; + + const OP: &'static str = wascc_codec::http::OP_HANDLE_REQUEST; + + fn dispatcher(&self) -> Arc>> { + Arc::clone(&self.dispatcher) + } + + fn try_dispatch(&self, actor: &str, body: &[u8]) -> anyhow::Result> { + Ok(vec![]) + } +} + +/// Dispatches Lambda raw events. +pub struct RawEventDispatcher { + dispatcher: Arc>>, +} + +impl RawEventDispatcher { + fn new(dispatcher: Arc>>) -> Self { + RawEventDispatcher { dispatcher } + } +} + +impl Dispatcher<'_> for RawEventDispatcher { + type T = codec::Event; + type U = codec::Response; + + const OP: &'static str = codec::OP_HANDLE_EVENT; + + fn dispatcher(&self) -> Arc>> { + Arc::clone(&self.dispatcher) + } + + fn try_dispatch(&self, actor: &str, body: &[u8]) -> anyhow::Result> { + let event = codec::Event { + body: body.to_vec(), + }; + + Ok(self.dispatch(actor, event)?.body) + } +} diff --git a/provider/src/lib.rs b/provider/src/lib.rs index cb7caec..1b00de5 100644 --- a/provider/src/lib.rs +++ b/provider/src/lib.rs @@ -33,6 +33,7 @@ use std::error::Error; use std::sync::{Arc, RwLock}; use std::thread; +mod dispatch; mod http; mod lambda; From 936c636b9d021403ad16de41c2a37d3ccfe89f7e Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Mon, 13 Apr 2020 10:37:02 -0400 Subject: [PATCH 07/21] Add DispatcherError enum. --- provider/Cargo.toml | 1 + provider/src/dispatch.rs | 69 ++++++++++++++++++++++++++++++---------- 2 files changed, 53 insertions(+), 17 deletions(-) diff --git a/provider/Cargo.toml b/provider/Cargo.toml index f0f6e0c..e1c46f9 100644 --- a/provider/Cargo.toml +++ b/provider/Cargo.toml @@ -17,3 +17,4 @@ codec = { path = "../codec" } aws_lambda_events = "0.2.7" base64 = "0.12.0" url = "2.1.1" +thiserror = "1.0.15" diff --git a/provider/src/dispatch.rs b/provider/src/dispatch.rs index 496d266..c400823 100644 --- a/provider/src/dispatch.rs +++ b/provider/src/dispatch.rs @@ -21,6 +21,33 @@ use wascc_codec::{deserialize, serialize}; use std::sync::{Arc, RwLock}; +/// A dispatcher error. +#[derive(thiserror::Error, Debug)] +pub enum DispatcherError { + /// Request was not dispatched. + #[error("guest {} failed to handle {}: {}", actor, op, source)] + NotDispatched { + actor: String, + op: String, + #[source] + source: anyhow::Error, + }, + + /// Request serialization error. + #[error("failed to serialize actor's request: {}", source)] + RequestSerialization { + #[source] + source: anyhow::Error, + }, + + /// Response deserialization error. + #[error("failed to deserialize actor's response: {}", source)] + ResponseDeserialization { + #[source] + source: anyhow::Error, + }, +} + /// Represents dispatching a request to an actor and returning its response. pub trait Dispatcher<'de> { /// The request type. @@ -32,30 +59,38 @@ pub trait Dispatcher<'de> { /// The operation this dispatcher dispatches. const OP: &'static str; - /// Dispatches a request to the specified actor using the specified dispatcher. - fn dispatch(&self, actor: &str, request: Self::T) -> anyhow::Result { - let msg = match serialize(request) { - Ok(msg) => msg, - Err(e) => return Err(anyhow!("Failed to serialize actor's request: {}", e)), - }; + /// Dispatches a request to the specified actor using our dispatcher. + fn dispatch_request(&self, actor: &str, request: Self::T) -> anyhow::Result { + let input = serialize(request).map_err(|e| DispatcherError::RequestSerialization { + source: anyhow!("{}", e), + })?; let handler_resp = { let dispatcher = self.dispatcher(); let lock = dispatcher.read().unwrap(); - lock.dispatch(actor, Self::OP, &msg) + lock.dispatch(actor, Self::OP, &input) }; - let resp = handler_resp - .map_err(|e| anyhow!("Guest {} failed to handle {}: {}", actor, Self::OP, e))?; - - deserialize::(resp.as_slice()) - .map_err(|e| anyhow!("Failed to deserialize actor's response: {}", e)) + let output = handler_resp.map_err(|e| DispatcherError::NotDispatched { + actor: actor.into(), + op: Self::OP.into(), + source: anyhow!("{}", e), + })?; + + let response = deserialize::(output.as_slice()).map_err(|e| { + DispatcherError::ResponseDeserialization { + source: anyhow!("{}", e), + } + })?; + + Ok(response) } /// Returns a dispatcher. fn dispatcher(&self) -> Arc>>; /// Attempts to dispatch a Lambda invocation event, returning an invocation response. - fn try_dispatch(&self, actor: &str, body: &[u8]) -> anyhow::Result>; + /// The bodies of the invocation event and response are passed and returned. + fn dispatch_invocation_event(&self, actor: &str, event: &[u8]) -> anyhow::Result>; } /// Dispatches HTTP requests. @@ -79,7 +114,7 @@ impl Dispatcher<'_> for HttpDispatcher { Arc::clone(&self.dispatcher) } - fn try_dispatch(&self, actor: &str, body: &[u8]) -> anyhow::Result> { + fn dispatch_invocation_event(&self, actor: &str, body: &[u8]) -> anyhow::Result> { Ok(vec![]) } } @@ -105,11 +140,11 @@ impl Dispatcher<'_> for RawEventDispatcher { Arc::clone(&self.dispatcher) } - fn try_dispatch(&self, actor: &str, body: &[u8]) -> anyhow::Result> { - let event = codec::Event { + fn dispatch_invocation_event(&self, actor: &str, body: &[u8]) -> anyhow::Result> { + let raw_event = codec::Event { body: body.to_vec(), }; - Ok(self.dispatch(actor, event)?.body) + Ok(self.dispatch_request(actor, raw_event)?.body) } } From 392751cf4d754f6da62482e8a459d34345c0bccf Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Mon, 13 Apr 2020 11:10:05 -0400 Subject: [PATCH 08/21] Use RawEventDispatcher. --- provider/src/dispatch.rs | 10 +++---- provider/src/lib.rs | 62 ++++++++++++---------------------------- 2 files changed, 23 insertions(+), 49 deletions(-) diff --git a/provider/src/dispatch.rs b/provider/src/dispatch.rs index c400823..8f5814d 100644 --- a/provider/src/dispatch.rs +++ b/provider/src/dispatch.rs @@ -23,7 +23,7 @@ use std::sync::{Arc, RwLock}; /// A dispatcher error. #[derive(thiserror::Error, Debug)] -pub enum DispatcherError { +pub(crate) enum DispatcherError { /// Request was not dispatched. #[error("guest {} failed to handle {}: {}", actor, op, source)] NotDispatched { @@ -49,7 +49,7 @@ pub enum DispatcherError { } /// Represents dispatching a request to an actor and returning its response. -pub trait Dispatcher<'de> { +pub(crate) trait Dispatcher<'de> { /// The request type. type T: Serialize; @@ -94,7 +94,7 @@ pub trait Dispatcher<'de> { } /// Dispatches HTTP requests. -pub struct HttpDispatcher { +pub(crate) struct HttpDispatcher { dispatcher: Arc>>, } @@ -120,12 +120,12 @@ impl Dispatcher<'_> for HttpDispatcher { } /// Dispatches Lambda raw events. -pub struct RawEventDispatcher { +pub(crate) struct RawEventDispatcher { dispatcher: Arc>>, } impl RawEventDispatcher { - fn new(dispatcher: Arc>>) -> Self { + pub fn new(dispatcher: Arc>>) -> Self { RawEventDispatcher { dispatcher } } } diff --git a/provider/src/lib.rs b/provider/src/lib.rs index 1b00de5..c94a488 100644 --- a/provider/src/lib.rs +++ b/provider/src/lib.rs @@ -23,16 +23,18 @@ extern crate log; use aws_lambda_events::event::alb::{AlbTargetGroupRequest, AlbTargetGroupResponse}; use serde_json; -use std::collections::{HashMap, HashSet}; -use std::env; -use wascc_codec::capabilities::{CapabilityProvider, Dispatcher, NullDispatcher}; +use wascc_codec::capabilities::CapabilityProvider; use wascc_codec::core::{CapabilityConfiguration, OP_BIND_ACTOR, OP_REMOVE_ACTOR}; use wascc_codec::{deserialize, serialize}; +use std::collections::{HashMap, HashSet}; +use std::env; use std::error::Error; use std::sync::{Arc, RwLock}; use std::thread; +use crate::dispatch::{Dispatcher, RawEventDispatcher}; + mod dispatch; mod http; mod lambda; @@ -43,14 +45,14 @@ const CAPABILITY_ID: &str = "awslambda:runtime"; /// Represents a waSCC AWS Lambda runtime provider. pub struct AwsLambdaRuntimeProvider { - dispatcher: Arc>>, + dispatcher: Arc>>, shutdown: Arc>>, } /// Polls the Lambda event machinery. struct Poller { client: lambda::Client, - dispatcher: Arc>>, + dispatcher: Arc>>, module_id: String, shutdown: Arc>>, dispatched: HashSet, @@ -60,7 +62,9 @@ impl Default for AwsLambdaRuntimeProvider { // Returns the default value for `AwsLambdaRuntimeProvider`. fn default() -> Self { AwsLambdaRuntimeProvider { - dispatcher: Arc::new(RwLock::new(Box::new(NullDispatcher::new()))), + dispatcher: Arc::new(RwLock::new(Box::new( + wascc_codec::capabilities::NullDispatcher::new(), + ))), shutdown: Arc::new(RwLock::new(HashMap::new())), } } @@ -132,7 +136,10 @@ impl CapabilityProvider for AwsLambdaRuntimeProvider { } /// Called when the host runtime is ready and has configured a dispatcher. - fn configure_dispatch(&self, dispatcher: Box) -> Result<(), Box> { + fn configure_dispatch( + &self, + dispatcher: Box, + ) -> Result<(), Box> { debug!("awslambda:runtime configure_dispatch"); let mut lock = self.dispatcher.write().unwrap(); @@ -165,7 +172,7 @@ impl Poller { fn new( module_id: &str, endpoint: &str, - dispatcher: Arc>>, + dispatcher: Arc>>, shutdown: Arc>>, ) -> Self { Poller { @@ -179,6 +186,8 @@ impl Poller { /// Runs the poller until shutdown. fn run(&mut self) { + let raw_event_dispatcher = RawEventDispatcher::new(Arc::clone(&self.dispatcher)); + loop { if self.shutdown() { break; @@ -241,7 +250,7 @@ impl Poller { Err(e) => warn!("{}", e), }; - match self.try_dispatch_lambda_event(event.body(), request_id) { + match raw_event_dispatcher.dispatch_invocation_event(&self.module_id, event.body()) { Ok(body) => self.send_invocation_response(body, request_id), Err(e) => { error!("{}", e); @@ -294,41 +303,6 @@ impl Poller { Ok(vec![]) } - /// Attempts to dispatch a Lambda raw event to an actor. - fn try_dispatch_lambda_event( - &mut self, - body: &Vec, - request_id: &str, - ) -> anyhow::Result> { - let event = codec::Event { - body: body.to_vec(), - }; - let msg = match serialize(event) { - Ok(msg) => msg, - Err(e) => return Err(anyhow!("Failed to serialize Lambda raw event: {}", e)), - }; - - // Call handler. - info!("Poller dispatch Lambda raw event"); - let handler_resp = { - let lock = self.dispatcher.read().unwrap(); - lock.dispatch(&self.module_id, codec::OP_HANDLE_EVENT, &msg) - }; - // Handle response or error. - match handler_resp { - Ok(r) => { - // Record that the request was dispatched. - self.dispatched.insert(request_id.into()); - - match deserialize::(r.as_slice()) { - Ok(resp) => Ok(resp.body), - Err(e) => Err(anyhow!("Failed to deserialize HTTP response: {}", e)), - } - } - Err(e) => Err(anyhow!("Guest failed to handle Lambda event: {}", e)), - } - } - /// Dispatches an ALB HTTP request to an actor. fn dispatch_alb_http_request( &mut self, From 0d5871f8ea22ac6d9a3a31cd14ceec2e91a8f382 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Mon, 13 Apr 2020 12:57:03 -0400 Subject: [PATCH 09/21] Use HttpDispatcher. --- provider/src/dispatch.rs | 23 ++++-- provider/src/lib.rs | 146 +++++++++------------------------------ 2 files changed, 52 insertions(+), 117 deletions(-) diff --git a/provider/src/dispatch.rs b/provider/src/dispatch.rs index 8f5814d..7df5804 100644 --- a/provider/src/dispatch.rs +++ b/provider/src/dispatch.rs @@ -25,7 +25,7 @@ use std::sync::{Arc, RwLock}; #[derive(thiserror::Error, Debug)] pub(crate) enum DispatcherError { /// Request was not dispatched. - #[error("guest {} failed to handle {}: {}", actor, op, source)] + #[error("Guest {} failed to handle {}: {}", actor, op, source)] NotDispatched { actor: String, op: String, @@ -34,14 +34,14 @@ pub(crate) enum DispatcherError { }, /// Request serialization error. - #[error("failed to serialize actor's request: {}", source)] + #[error("Failed to serialize actor's request: {}", source)] RequestSerialization { #[source] source: anyhow::Error, }, /// Response deserialization error. - #[error("failed to deserialize actor's response: {}", source)] + #[error("Failed to deserialize actor's response: {}", source)] ResponseDeserialization { #[source] source: anyhow::Error, @@ -93,13 +93,18 @@ pub(crate) trait Dispatcher<'de> { fn dispatch_invocation_event(&self, actor: &str, event: &[u8]) -> anyhow::Result>; } +/// The invocation request is not an HTTP request. +#[derive(thiserror::Error, Debug)] +#[error("Not an HTTP request")] +pub(crate) struct NotHttpRequestError; + /// Dispatches HTTP requests. pub(crate) struct HttpDispatcher { dispatcher: Arc>>, } impl HttpDispatcher { - fn new(dispatcher: Arc>>) -> Self { + pub fn new(dispatcher: Arc>>) -> Self { HttpDispatcher { dispatcher } } } @@ -115,7 +120,15 @@ impl Dispatcher<'_> for HttpDispatcher { } fn dispatch_invocation_event(&self, actor: &str, body: &[u8]) -> anyhow::Result> { - Ok(vec![]) + // match serde_json::from_slice(body) { + // Ok(request) => { + // let response = self.dispatch_alb_http_request(request, request_id)?; + // return serde_json::to_vec(&response) + // .map_err(|e| anyhow!("Failed to serialize ALB response: {}", e)); + // } + // _ => {} + // }; + Err(NotHttpRequestError {}.into()) } } diff --git a/provider/src/lib.rs b/provider/src/lib.rs index c94a488..6c32fa5 100644 --- a/provider/src/lib.rs +++ b/provider/src/lib.rs @@ -21,19 +21,19 @@ extern crate anyhow; #[macro_use] extern crate log; -use aws_lambda_events::event::alb::{AlbTargetGroupRequest, AlbTargetGroupResponse}; -use serde_json; use wascc_codec::capabilities::CapabilityProvider; use wascc_codec::core::{CapabilityConfiguration, OP_BIND_ACTOR, OP_REMOVE_ACTOR}; -use wascc_codec::{deserialize, serialize}; +use wascc_codec::deserialize; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::env; use std::error::Error; use std::sync::{Arc, RwLock}; use std::thread; -use crate::dispatch::{Dispatcher, RawEventDispatcher}; +use crate::dispatch::{ + Dispatcher, DispatcherError, HttpDispatcher, NotHttpRequestError, RawEventDispatcher, +}; mod dispatch; mod http; @@ -55,7 +55,6 @@ struct Poller { dispatcher: Arc>>, module_id: String, shutdown: Arc>>, - dispatched: HashSet, } impl Default for AwsLambdaRuntimeProvider { @@ -97,7 +96,7 @@ impl AwsLambdaRuntimeProvider { // Initialize this poller's shutdown flag. shutdown.write().unwrap().insert(module_id.clone(), false); - let mut poller = Poller::new(&module_id, &endpoint, dispatcher, shutdown); + let poller = Poller::new(&module_id, &endpoint, dispatcher, shutdown); poller.run(); }); @@ -180,12 +179,12 @@ impl Poller { dispatcher, module_id: module_id.into(), shutdown, - dispatched: HashSet::new(), } } /// Runs the poller until shutdown. - fn run(&mut self) { + fn run(&self) { + let http_dispatcher = HttpDispatcher::new(Arc::clone(&self.dispatcher)); let raw_event_dispatcher = RawEventDispatcher::new(Arc::clone(&self.dispatcher)); loop { @@ -221,33 +220,41 @@ impl Poller { env::set_var("_X_AMZN_TRACE_ID", trace_id); } - if self.dispatched.contains(request_id) { - self.send_invocation_error( - anyhow!("Already dispatched: {}", request_id), - request_id, - ); - } - // Try first to dispatch as an HTTP request. - match self.try_dispatch_http_request(event.body(), request_id) { - // The event couldn't be converted into an HTTP request. - // Dispatch as a Lambda raw event. - Ok(body) if body.is_empty() => {} - // The event could be converted to an HTTP request and was dispatched succesfully. + match http_dispatcher.dispatch_invocation_event(&self.module_id, event.body()) { + // The invocation event could be converted to an HTTP request and was dispatched succesfully. Ok(body) => { self.send_invocation_response(body, request_id); continue; } - // The event could be converted to an HTTP request and was - // dispatched succesfully but there was an error after dispatch. - Err(e) if self.dispatched.contains(request_id) => { + // The event couldn't be converted to an HTTP request. + // Dispatch as a Lambda raw event. + Err(e) if e.is::() => debug!("{}", e), + Err(e) if e.is::() => { + match e.downcast_ref::().unwrap() { + // The event could be converted to an HTTP request but couldn't be serialized. + // Dispatch as a Lambda raw event. + e @ DispatcherError::RequestSerialization { .. } => warn!("{}", e), + // The event could be converted to an HTTP request but wasn't dispatched to the actor. + // Dispatch as a Lambda raw event. + e @ DispatcherError::NotDispatched { .. } => warn!("{}", e), + // The event could be converted to an HTTP request and was + // dispatched succesfully but there was an error after dispatch, + // Fail the invocation. + _ => { + error!("{}", e); + self.send_invocation_error(e, request_id); + continue; + } + } + } + // Some other error. + // Fail the invocation. + Err(e) => { error!("{}", e); self.send_invocation_error(e, request_id); continue; } - // The event could be converted to an HTTP request but wasn't dispatched succesfully. - // Dispatch as a Lambda raw event. - Err(e) => warn!("{}", e), }; match raw_event_dispatcher.dispatch_invocation_event(&self.module_id, event.body()) { @@ -284,89 +291,4 @@ impl Poller { fn shutdown(&self) -> bool { *self.shutdown.read().unwrap().get(&self.module_id).unwrap() } - - /// Attempts to dispatch an HTTP request to an actor. - fn try_dispatch_http_request( - &mut self, - body: &Vec, - request_id: &str, - ) -> anyhow::Result> { - match serde_json::from_slice(body) { - Ok(request) => { - let response = self.dispatch_alb_http_request(request, request_id)?; - return serde_json::to_vec(&response) - .map_err(|e| anyhow!("Failed to serialize ALB response: {}", e)); - } - _ => {} - }; - - Ok(vec![]) - } - - /// Dispatches an ALB HTTP request to an actor. - fn dispatch_alb_http_request( - &mut self, - request: AlbTargetGroupRequest, - request_id: &str, - ) -> anyhow::Result { - let query_string = url::form_urlencoded::Serializer::new(String::new()) - .extend_pairs(request.query_string_parameters.iter()) - .finish(); - let request = wascc_codec::http::Request { - method: request - .http_method - .ok_or(anyhow!("Missing method in ALB request"))?, - path: request.path.ok_or(anyhow!("Missing path in ALB request"))?, - query_string, - header: request.headers, - body: match request.body { - Some(s) if request.is_base64_encoded => base64::decode(s)?, - Some(s) => s.into_bytes(), - None => vec![], - }, - }; - - info!("Dispatching ALB target group request"); - let response = self.dispatch_http_request(request, request_id)?; - Ok(AlbTargetGroupResponse { - status_code: response.status_code as i64, - status_description: Some(response.status), - headers: response.header, - multi_value_headers: HashMap::new(), - body: Some(base64::encode(response.body)), - is_base64_encoded: true, - }) - } - - /// Dispatches an HTTP request to an actor. - fn dispatch_http_request( - &mut self, - request: wascc_codec::http::Request, - request_id: &str, - ) -> anyhow::Result { - let msg = match serialize(request) { - Ok(msg) => msg, - Err(e) => return Err(anyhow!("Failed to serialize HTTP request: {}", e)), - }; - - // Call handler. - info!("Poller dispatch HTTP request"); - let handler_resp = { - let lock = self.dispatcher.read().unwrap(); - lock.dispatch(&self.module_id, wascc_codec::http::OP_HANDLE_REQUEST, &msg) - }; - // Return response or error. - match handler_resp { - Ok(resp) => { - // Record that the request was dispatched. - self.dispatched.insert(request_id.into()); - - match deserialize::(resp.as_slice()) { - Ok(resp) => Ok(resp), - Err(e) => Err(anyhow!("Failed to deserialize HTTP response: {}", e)), - } - } - Err(e) => Err(anyhow!("Guest failed to handle HTTP request: {}", e)), - } - } } From 1ba23988cda927cd0aab7b8fa2ea212788a6c667 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Mon, 13 Apr 2020 13:04:44 -0400 Subject: [PATCH 10/21] Implement 'TryFrom', not 'TryInto'. --- provider/src/http.rs | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/provider/src/http.rs b/provider/src/http.rs index 44af8d2..efed101 100644 --- a/provider/src/http.rs +++ b/provider/src/http.rs @@ -19,27 +19,30 @@ use aws_lambda_events::event::alb; use std::collections::HashMap; -use std::convert::{TryFrom, TryInto}; +use std::convert::TryFrom; struct AlbTargetGroupRequestWrapper(alb::AlbTargetGroupRequest); -impl TryInto for AlbTargetGroupRequestWrapper { +impl TryFrom for wascc_codec::http::Request { type Error = anyhow::Error; /// Attempts conversion of an ALB request to an actor's HTTP request. - fn try_into(self) -> anyhow::Result { - let query_string = query_string(self.0.query_string_parameters); + fn try_from(request: AlbTargetGroupRequestWrapper) -> anyhow::Result { + let query_string = query_string(request.0.query_string_parameters); Ok(wascc_codec::http::Request { - method: self + method: request .0 .http_method .ok_or(anyhow!("Missing method in ALB request"))?, - path: self.0.path.ok_or(anyhow!("Missing path in ALB request"))?, + path: request + .0 + .path + .ok_or(anyhow!("Missing path in ALB request"))?, query_string, - header: self.0.headers, - body: match self.0.body { - Some(s) if self.0.is_base64_encoded => base64::decode(s)?, + header: request.0.headers, + body: match request.0.body { + Some(s) if request.0.is_base64_encoded => base64::decode(s)?, Some(s) => s.into_bytes(), None => vec![], }, From e5f2290fc496a9f6797f0487f2173d8bf99077d6 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Mon, 13 Apr 2020 14:45:10 -0400 Subject: [PATCH 11/21] Dispatch ALB requests. --- provider/src/dispatch.rs | 35 +++++++++++++++++++++++++++-------- provider/src/http.rs | 18 ++++++++++++++++-- provider/src/lib.rs | 2 +- 3 files changed, 44 insertions(+), 11 deletions(-) diff --git a/provider/src/dispatch.rs b/provider/src/dispatch.rs index 7df5804..1c5a6e8 100644 --- a/provider/src/dispatch.rs +++ b/provider/src/dispatch.rs @@ -16,11 +16,15 @@ // waSCC AWS Lambda Runtime Provider // +use aws_lambda_events::event::{alb, apigw}; use serde::{Deserialize, Serialize}; use wascc_codec::{deserialize, serialize}; +use std::convert::TryInto; use std::sync::{Arc, RwLock}; +use crate::http::{AlbTargetGroupRequestWrapper, AlbTargetGroupResponseWrapper}; + /// A dispatcher error. #[derive(thiserror::Error, Debug)] pub(crate) enum DispatcherError { @@ -107,6 +111,16 @@ impl HttpDispatcher { pub fn new(dispatcher: Arc>>) -> Self { HttpDispatcher { dispatcher } } + + fn dispatch_alb_request( + &self, + actor: &str, + request: AlbTargetGroupRequestWrapper, + ) -> anyhow::Result { + Ok(self + .dispatch_request(actor, request.try_into()?)? + .try_into()?) + } } impl Dispatcher<'_> for HttpDispatcher { @@ -120,14 +134,19 @@ impl Dispatcher<'_> for HttpDispatcher { } fn dispatch_invocation_event(&self, actor: &str, body: &[u8]) -> anyhow::Result> { - // match serde_json::from_slice(body) { - // Ok(request) => { - // let response = self.dispatch_alb_http_request(request, request_id)?; - // return serde_json::to_vec(&response) - // .map_err(|e| anyhow!("Failed to serialize ALB response: {}", e)); - // } - // _ => {} - // }; + match serde_json::from_slice(body) { + Ok(request @ alb::AlbTargetGroupRequest { .. }) => { + let response: alb::AlbTargetGroupResponse = + self.dispatch_alb_request(actor, request.into())?.into(); + return serde_json::to_vec(&response).map_err(|e| e.into()); + } + _ => debug!("Not an ALB request"), + }; + match serde_json::from_slice(body) { + Ok(request @ apigw::ApiGatewayProxyRequest { .. }) => {} + _ => {} + }; + Err(NotHttpRequestError {}.into()) } } diff --git a/provider/src/http.rs b/provider/src/http.rs index efed101..d5c65b9 100644 --- a/provider/src/http.rs +++ b/provider/src/http.rs @@ -21,7 +21,14 @@ use aws_lambda_events::event::alb; use std::collections::HashMap; use std::convert::TryFrom; -struct AlbTargetGroupRequestWrapper(alb::AlbTargetGroupRequest); +pub(crate) struct AlbTargetGroupRequestWrapper(alb::AlbTargetGroupRequest); + +impl From for AlbTargetGroupRequestWrapper { + /// Converts an ALB request to an instance of the wrapper type. + fn from(request: alb::AlbTargetGroupRequest) -> Self { + AlbTargetGroupRequestWrapper(request) + } +} impl TryFrom for wascc_codec::http::Request { type Error = anyhow::Error; @@ -50,7 +57,14 @@ impl TryFrom for wascc_codec::http::Request { } } -struct AlbTargetGroupResponseWrapper(alb::AlbTargetGroupResponse); +pub(crate) struct AlbTargetGroupResponseWrapper(alb::AlbTargetGroupResponse); + +impl From for alb::AlbTargetGroupResponse { + /// Converts instance of the wrapper type to an ALB response. + fn from(response: AlbTargetGroupResponseWrapper) -> Self { + response.0 + } +} impl TryFrom for AlbTargetGroupResponseWrapper { type Error = anyhow::Error; diff --git a/provider/src/lib.rs b/provider/src/lib.rs index 6c32fa5..f5ce502 100644 --- a/provider/src/lib.rs +++ b/provider/src/lib.rs @@ -229,7 +229,7 @@ impl Poller { } // The event couldn't be converted to an HTTP request. // Dispatch as a Lambda raw event. - Err(e) if e.is::() => debug!("{}", e), + Err(e) if e.is::() => info!("{}", e), Err(e) if e.is::() => { match e.downcast_ref::().unwrap() { // The event could be converted to an HTTP request but couldn't be serialized. From 77daa6675229366f211532c200c8bfa93e7c2613 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Mon, 13 Apr 2020 15:01:50 -0400 Subject: [PATCH 12/21] Handle API Gateway proxy HTTP requests. --- provider/src/dispatch.rs | 33 ++++++++++++-- provider/src/http.rs | 94 +++++++++++++++++++++++++++++++++++++--- provider/src/lib.rs | 1 + 3 files changed, 118 insertions(+), 10 deletions(-) diff --git a/provider/src/dispatch.rs b/provider/src/dispatch.rs index 1c5a6e8..d04e6ca 100644 --- a/provider/src/dispatch.rs +++ b/provider/src/dispatch.rs @@ -23,7 +23,10 @@ use wascc_codec::{deserialize, serialize}; use std::convert::TryInto; use std::sync::{Arc, RwLock}; -use crate::http::{AlbTargetGroupRequestWrapper, AlbTargetGroupResponseWrapper}; +use crate::http::{ + AlbTargetGroupRequestWrapper, AlbTargetGroupResponseWrapper, ApiGatewayProxyRequestWrapper, + ApiGatewayProxyResponseWrapper, +}; /// A dispatcher error. #[derive(thiserror::Error, Debug)] @@ -108,15 +111,30 @@ pub(crate) struct HttpDispatcher { } impl HttpDispatcher { + /// Returns a new `HttpDispatcher`. pub fn new(dispatcher: Arc>>) -> Self { HttpDispatcher { dispatcher } } + /// Dispatches an ALB target group request. fn dispatch_alb_request( &self, actor: &str, request: AlbTargetGroupRequestWrapper, ) -> anyhow::Result { + info!("HttpDispatcher dispatch ALB target group request"); + Ok(self + .dispatch_request(actor, request.try_into()?)? + .try_into()?) + } + + /// Dispatches an API Gateway proxy request. + fn dispatch_apigw_request( + &self, + actor: &str, + request: ApiGatewayProxyRequestWrapper, + ) -> anyhow::Result { + info!("HttpDispatcher dispatch API Gateway proxy request"); Ok(self .dispatch_request(actor, request.try_into()?)? .try_into()?) @@ -133,6 +151,8 @@ impl Dispatcher<'_> for HttpDispatcher { Arc::clone(&self.dispatcher) } + /// Attempts to dispatch a Lambda invocation event, returning an invocation response. + /// The bodies of the invocation event and response are passed and returned. fn dispatch_invocation_event(&self, actor: &str, body: &[u8]) -> anyhow::Result> { match serde_json::from_slice(body) { Ok(request @ alb::AlbTargetGroupRequest { .. }) => { @@ -143,8 +163,12 @@ impl Dispatcher<'_> for HttpDispatcher { _ => debug!("Not an ALB request"), }; match serde_json::from_slice(body) { - Ok(request @ apigw::ApiGatewayProxyRequest { .. }) => {} - _ => {} + Ok(request @ apigw::ApiGatewayProxyRequest { .. }) => { + let response: apigw::ApiGatewayProxyResponse = + self.dispatch_apigw_request(actor, request.into())?.into(); + return serde_json::to_vec(&response).map_err(|e| e.into()); + } + _ => debug!("Not an API Gateway proxy request"), }; Err(NotHttpRequestError {}.into()) @@ -157,6 +181,7 @@ pub(crate) struct RawEventDispatcher { } impl RawEventDispatcher { + /// Returns a new `RawEventDispatcher`. pub fn new(dispatcher: Arc>>) -> Self { RawEventDispatcher { dispatcher } } @@ -172,6 +197,8 @@ impl Dispatcher<'_> for RawEventDispatcher { Arc::clone(&self.dispatcher) } + /// Attempts to dispatch a Lambda invocation event, returning an invocation response. + /// The bodies of the invocation event and response are passed and returned. fn dispatch_invocation_event(&self, actor: &str, body: &[u8]) -> anyhow::Result> { let raw_event = codec::Event { body: body.to_vec(), diff --git a/provider/src/http.rs b/provider/src/http.rs index d5c65b9..5ea2a65 100644 --- a/provider/src/http.rs +++ b/provider/src/http.rs @@ -16,7 +16,7 @@ // waSCC AWS Lambda Runtime Provider // -use aws_lambda_events::event::alb; +use aws_lambda_events::event::{alb, apigw}; use std::collections::HashMap; use std::convert::TryFrom; @@ -24,7 +24,7 @@ use std::convert::TryFrom; pub(crate) struct AlbTargetGroupRequestWrapper(alb::AlbTargetGroupRequest); impl From for AlbTargetGroupRequestWrapper { - /// Converts an ALB request to an instance of the wrapper type. + /// Converts an ALB target group request to an instance of the wrapper type. fn from(request: alb::AlbTargetGroupRequest) -> Self { AlbTargetGroupRequestWrapper(request) } @@ -33,7 +33,7 @@ impl From for AlbTargetGroupRequestWrapper { impl TryFrom for wascc_codec::http::Request { type Error = anyhow::Error; - /// Attempts conversion of an ALB request to an actor's HTTP request. + /// Attempts conversion of an ALB target group request to an actor's HTTP request. fn try_from(request: AlbTargetGroupRequestWrapper) -> anyhow::Result { let query_string = query_string(request.0.query_string_parameters); @@ -41,11 +41,11 @@ impl TryFrom for wascc_codec::http::Request { method: request .0 .http_method - .ok_or(anyhow!("Missing method in ALB request"))?, + .ok_or(anyhow!("Missing method in ALB target group request"))?, path: request .0 .path - .ok_or(anyhow!("Missing path in ALB request"))?, + .ok_or(anyhow!("Missing path in ALB target group request"))?, query_string, header: request.0.headers, body: match request.0.body { @@ -66,6 +66,13 @@ impl From for alb::AlbTargetGroupResponse { } } +impl From for AlbTargetGroupResponseWrapper { + /// Converts instance of an ALB response to the wrapper type. + fn from(response: alb::AlbTargetGroupResponse) -> Self { + AlbTargetGroupResponseWrapper(response) + } +} + impl TryFrom for AlbTargetGroupResponseWrapper { type Error = anyhow::Error; @@ -73,14 +80,85 @@ impl TryFrom for AlbTargetGroupResponseWrapper { fn try_from(response: wascc_codec::http::Response) -> anyhow::Result { let (body, is_base64_encoded) = body_string(response.body); - Ok(AlbTargetGroupResponseWrapper(alb::AlbTargetGroupResponse { + Ok(alb::AlbTargetGroupResponse { status_code: response.status_code as i64, status_description: Some(response.status), headers: response.header, multi_value_headers: HashMap::new(), body, is_base64_encoded, - })) + } + .into()) + } +} + +pub(crate) struct ApiGatewayProxyRequestWrapper(apigw::ApiGatewayProxyRequest); + +impl From for ApiGatewayProxyRequestWrapper { + /// Converts an API Gateway proxy request to an instance of the wrapper type. + fn from(request: apigw::ApiGatewayProxyRequest) -> Self { + ApiGatewayProxyRequestWrapper(request) + } +} + +impl TryFrom for wascc_codec::http::Request { + type Error = anyhow::Error; + + /// Attempts conversion of an API Gateway proxy request to an actor's HTTP request. + fn try_from(request: ApiGatewayProxyRequestWrapper) -> anyhow::Result { + let query_string = query_string(request.0.query_string_parameters); + + Ok(wascc_codec::http::Request { + method: request + .0 + .http_method + .ok_or(anyhow!("Missing method in ALB target group request"))?, + path: request + .0 + .path + .ok_or(anyhow!("Missing path in ALB target group request"))?, + query_string, + header: request.0.headers, + body: match request.0.body { + Some(s) if request.0.is_base64_encoded.is_some() => base64::decode(s)?, + Some(s) => s.into_bytes(), + None => vec![], + }, + }) + } +} + +pub(crate) struct ApiGatewayProxyResponseWrapper(apigw::ApiGatewayProxyResponse); + +impl From for apigw::ApiGatewayProxyResponse { + /// Converts instance of the wrapper type to an API Gateway proxy response. + fn from(response: ApiGatewayProxyResponseWrapper) -> Self { + response.0 + } +} + +impl From for ApiGatewayProxyResponseWrapper { + /// Converts instance of an API Gateway proxy response to the wrapper type. + fn from(response: apigw::ApiGatewayProxyResponse) -> Self { + ApiGatewayProxyResponseWrapper(response) + } +} + +impl TryFrom for ApiGatewayProxyResponseWrapper { + type Error = anyhow::Error; + + /// Attempts conversion of an actor's HTTP response to an API Gateway proxy response. + fn try_from(response: wascc_codec::http::Response) -> anyhow::Result { + let (body, is_base64_encoded) = body_string(response.body); + + Ok(apigw::ApiGatewayProxyResponse { + status_code: response.status_code as i64, + headers: response.header, + multi_value_headers: HashMap::new(), + body, + is_base64_encoded: Some(is_base64_encoded), + } + .into()) } } @@ -103,3 +181,5 @@ fn query_string(qs: HashMap) -> String { .extend_pairs(qs.iter()) .finish() } + +// TODO Handle multi_value_query_string_parameters. diff --git a/provider/src/lib.rs b/provider/src/lib.rs index f5ce502..3b94f8e 100644 --- a/provider/src/lib.rs +++ b/provider/src/lib.rs @@ -257,6 +257,7 @@ impl Poller { } }; + // Dispatch as a Lambda raw event. match raw_event_dispatcher.dispatch_invocation_event(&self.module_id, event.body()) { Ok(body) => self.send_invocation_response(body, request_id), Err(e) => { From 74ff0fb8e18b1ecd50abfd67abf2b517282ffdcd Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Mon, 13 Apr 2020 17:25:11 -0400 Subject: [PATCH 13/21] Add Uppercase demo. --- examples/apigw/Makefile | 2 +- examples/apigw/README.md | 3 ++ examples/apigw/actor/.gitignore | 1 + examples/apigw/actor/Cargo.toml | 22 +++++++++++++++ examples/apigw/actor/Makefile | 49 +++++++++++++++++++++++++++++++++ examples/apigw/actor/README.md | 38 +++++++++++++++++++++++++ examples/apigw/actor/src/lib.rs | 35 +++++++++++++++++++++++ examples/apigw/main.tf | 2 +- examples/apigw/manifest.yaml | 2 +- 9 files changed, 151 insertions(+), 3 deletions(-) create mode 100644 examples/apigw/actor/.gitignore create mode 100644 examples/apigw/actor/Cargo.toml create mode 100644 examples/apigw/actor/Makefile create mode 100644 examples/apigw/actor/README.md create mode 100644 examples/apigw/actor/src/lib.rs diff --git a/examples/apigw/Makefile b/examples/apigw/Makefile index bf927af..03e0afe 100644 --- a/examples/apigw/Makefile +++ b/examples/apigw/Makefile @@ -5,5 +5,5 @@ all: apply apply: app.zip terraform12 apply -app.zip: manifest.yaml actor/target/wasm32-unknown-unknown/release/wascc_actor_hello_lambda_signed.wasm +app.zip: manifest.yaml actor/target/wasm32-unknown-unknown/release/uppercase.wasm zip -j $@ $^ diff --git a/examples/apigw/README.md b/examples/apigw/README.md index 14a4a9a..0d2f7b2 100644 --- a/examples/apigw/README.md +++ b/examples/apigw/README.md @@ -1,5 +1,8 @@ # API Gateway HTTP Request Invocation +This actor is identical to the [Krustlet Uppercase](https://github.com/deislabs/krustlet/tree/master/demos/wascc/uppercase) demo, +the only change being that the actor is signed with the `awslambda:runtime` capability instead of the standard HTTP server capability. + ### Build Build the [sample waSCC actor](actor/README.md). diff --git a/examples/apigw/actor/.gitignore b/examples/apigw/actor/.gitignore new file mode 100644 index 0000000..4a96773 --- /dev/null +++ b/examples/apigw/actor/.gitignore @@ -0,0 +1 @@ +.keys/ diff --git a/examples/apigw/actor/Cargo.toml b/examples/apigw/actor/Cargo.toml new file mode 100644 index 0000000..0302e6e --- /dev/null +++ b/examples/apigw/actor/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "uppercase" +version = "0.6.1" +authors = ["Kit Ewbank "] +edition = "2018" +license = "Apache-2.0" +readme = "README.md" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +wascc-actor = "0.6.0" +log = '0.4.8' +serde = { version = "1.0.104", features = ["derive"]} +wascc-codec = "0.6.0" + +[profile.release] +# Optimize for small code size +opt-level = "s" + +[workspace] diff --git a/examples/apigw/actor/Makefile b/examples/apigw/actor/Makefile new file mode 100644 index 0000000..45c97ad --- /dev/null +++ b/examples/apigw/actor/Makefile @@ -0,0 +1,49 @@ +COLOR ?= always # Valid COLOR options: {always, auto, never} +CARGO = cargo --color $(COLOR) +TARGET = target/wasm32-unknown-unknown +DEBUG = $(TARGET)/debug +RELEASE = $(TARGET)/release +KEYDIR ?= .keys + +.PHONY: all bench build check clean doc test update keys keys-account keys-module + +all: build + +bench: + @$(CARGO) bench + +build: + @$(CARGO) build --target wasm32-unknown-unknown + wascap sign $(DEBUG)/uppercase.wasm $(DEBUG)/uppercase_signed.wasm --issuer $(KEYDIR)/account.nk --subject $(KEYDIR)/module.nk --cap awslambda:runtime --cap wascc:logging --name uppercase + +check: + @$(CARGO) check + +clean: + @$(CARGO) clean + +doc: + @$(CARGO) doc + +test: build + @$(CARGO) test + +update: + @$(CARGO) update + +release: + @$(CARGO) build --release --target wasm32-unknown-unknown + wascap sign $(RELEASE)/uppercase.wasm $(RELEASE)/uppercase.wasm --issuer $(KEYDIR)/account.nk --subject $(KEYDIR)/module.nk --cap awslambda:runtime --cap wascc:logging --name uppercase + +keys: keys-account +keys: keys-module + +keys-account: + @mkdir -p $(KEYDIR) + nk gen account > $(KEYDIR)/account.txt + awk '/Seed/{ print $$2 }' $(KEYDIR)/account.txt > $(KEYDIR)/account.nk + +keys-module: + @mkdir -p $(KEYDIR) + nk gen module > $(KEYDIR)/module.txt + awk '/Seed/{ print $$2 }' $(KEYDIR)/module.txt > $(KEYDIR)/module.nk diff --git a/examples/apigw/actor/README.md b/examples/apigw/actor/README.md new file mode 100644 index 0000000..c82988e --- /dev/null +++ b/examples/apigw/actor/README.md @@ -0,0 +1,38 @@ +# Sample waSCC Actor + +A sample [waSCC](https://wascc.dev/) actor that uses the AWS Lambda runtime capability provider. + +This actor is identical to the [Krustlet Uppercase](https://github.com/deislabs/krustlet/tree/master/demos/wascc/uppercase) demo, +the only change being that the actor is signed with the `awslambda:runtime` capability instead of the standard HTTP server capability. + +## Build + +#### Install [NKeys](https://github.com/encabulators/nkeys) + +```console +cargo install nkeys --features "cli" +``` + +#### Generate Keys + +```console +make keys +``` + +#### Add `wasm32-unknown-unknown` Compilation Target + +```console +rustup target add wasm32-unknown-unknown +``` + +#### Install [WASCAP](https://github.com/wascc/wascap) + +```console +cargo install wascap --features "cli" +``` + +#### Build Actor + +```console +make release +``` diff --git a/examples/apigw/actor/src/lib.rs b/examples/apigw/actor/src/lib.rs new file mode 100644 index 0000000..4d8f29f --- /dev/null +++ b/examples/apigw/actor/src/lib.rs @@ -0,0 +1,35 @@ +extern crate wascc_actor as actor; + +#[macro_use] +extern crate log; +extern crate serde; +extern crate wascc_codec; + +use actor::prelude::*; +use serde::Serialize; +use wascc_codec::serialize; + +actor_handlers! { + codec::http::OP_HANDLE_REQUEST => uppercase, + codec::core::OP_HEALTH_REQUEST => health +} + +fn uppercase(r: codec::http::Request) -> CallResult { + info!("Query String: {}", r.query_string); + let upper = UppercaseResponse { + original: r.query_string.to_string(), + uppercased: r.query_string.to_ascii_uppercase(), + }; + + Ok(serialize(codec::http::Response::json(upper, 200, "OK"))?) +} + +fn health(_req: codec::core::HealthRequest) -> ReceiveResult { + Ok(vec![]) +} + +#[derive(Serialize)] +struct UppercaseResponse { + original: String, + uppercased: String, +} diff --git a/examples/apigw/main.tf b/examples/apigw/main.tf index 301a84a..cd0ddd8 100644 --- a/examples/apigw/main.tf +++ b/examples/apigw/main.tf @@ -28,7 +28,7 @@ resource "aws_api_gateway_rest_api" "example" { name = "waSCC-example-apigw" } resource "aws_api_gateway_resource" "example" { - path_part = "helloworld" + path_part = "uppercase" parent_id = aws_api_gateway_rest_api.example.root_resource_id rest_api_id = aws_api_gateway_rest_api.example.id } diff --git a/examples/apigw/manifest.yaml b/examples/apigw/manifest.yaml index 1c01bfe..fc773ca 100644 --- a/examples/apigw/manifest.yaml +++ b/examples/apigw/manifest.yaml @@ -1,5 +1,5 @@ --- actors: - - wascc_actor_hello_lambda_signed.wasm + - uppercase.wasm capabilities: [] bindings: [] From 3cf64febf76917adaecd4518e946584279c1141e Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Mon, 20 Apr 2020 14:24:32 -0400 Subject: [PATCH 14/21] Use custom version of aws_lambda_events, including https://github.com/aws/aws-lambda-go/pull/280. --- provider/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/provider/Cargo.toml b/provider/Cargo.toml index e1c46f9..64e6997 100644 --- a/provider/Cargo.toml +++ b/provider/Cargo.toml @@ -14,7 +14,7 @@ reqwest = { version = "0.10.4", features = ["blocking", "json"] } serde = "1.0.106" serde_json = "1.0.51" codec = { path = "../codec" } -aws_lambda_events = "0.2.7" +aws_lambda_events = { git = "https://github.com/ewbankkit/aws-lambda-events", branch = "include-aws-lambda-go.pr-280" } base64 = "0.12.0" url = "2.1.1" thiserror = "1.0.15" From 3d5975c9f83d2645a35de3267e96d52dede4a247 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Mon, 20 Apr 2020 14:47:12 -0400 Subject: [PATCH 15/21] Add support for APIGW v2 proxy request and response. --- provider/src/dispatch.rs | 23 +++++++++++- provider/src/http.rs | 79 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 99 insertions(+), 3 deletions(-) diff --git a/provider/src/dispatch.rs b/provider/src/dispatch.rs index d04e6ca..3ec83e5 100644 --- a/provider/src/dispatch.rs +++ b/provider/src/dispatch.rs @@ -25,7 +25,8 @@ use std::sync::{Arc, RwLock}; use crate::http::{ AlbTargetGroupRequestWrapper, AlbTargetGroupResponseWrapper, ApiGatewayProxyRequestWrapper, - ApiGatewayProxyResponseWrapper, + ApiGatewayProxyResponseWrapper, ApiGatewayV2ProxyRequestWrapper, + ApiGatewayV2ProxyResponseWrapper, }; /// A dispatcher error. @@ -139,6 +140,18 @@ impl HttpDispatcher { .dispatch_request(actor, request.try_into()?)? .try_into()?) } + + /// Dispatches an API Gateway v2 proxy request. + fn dispatch_apigwv2_request( + &self, + actor: &str, + request: ApiGatewayV2ProxyRequestWrapper, + ) -> anyhow::Result { + info!("HttpDispatcher dispatch API Gateway v2 proxy request"); + Ok(self + .dispatch_request(actor, request.try_into()?)? + .try_into()?) + } } impl Dispatcher<'_> for HttpDispatcher { @@ -170,6 +183,14 @@ impl Dispatcher<'_> for HttpDispatcher { } _ => debug!("Not an API Gateway proxy request"), }; + match serde_json::from_slice(body) { + Ok(request @ apigw::ApiGatewayV2httpRequest { .. }) => { + let response: apigw::ApiGatewayV2httpResponse = + self.dispatch_apigwv2_request(actor, request.into())?.into(); + return serde_json::to_vec(&response).map_err(|e| e.into()); + } + _ => debug!("Not an API Gateway v2 proxy request"), + }; Err(NotHttpRequestError {}.into()) } diff --git a/provider/src/http.rs b/provider/src/http.rs index 5ea2a65..1cc371a 100644 --- a/provider/src/http.rs +++ b/provider/src/http.rs @@ -112,11 +112,11 @@ impl TryFrom for wascc_codec::http::Request { method: request .0 .http_method - .ok_or(anyhow!("Missing method in ALB target group request"))?, + .ok_or(anyhow!("Missing method in API Gateway proxy request"))?, path: request .0 .path - .ok_or(anyhow!("Missing path in ALB target group request"))?, + .ok_or(anyhow!("Missing path in API Gateway proxy request"))?, query_string, header: request.0.headers, body: match request.0.body { @@ -162,6 +162,81 @@ impl TryFrom for ApiGatewayProxyResponseWrapper { } } +pub(crate) struct ApiGatewayV2ProxyRequestWrapper(apigw::ApiGatewayV2httpRequest); + +impl From for ApiGatewayV2ProxyRequestWrapper { + /// Converts an API Gateway v2 proxy request to an instance of the wrapper type. + fn from(request: apigw::ApiGatewayV2httpRequest) -> Self { + ApiGatewayV2ProxyRequestWrapper(request) + } +} + +impl TryFrom for wascc_codec::http::Request { + type Error = anyhow::Error; + + /// Attempts conversion of an API Gateway v2 proxy request to an actor's HTTP request. + fn try_from(request: ApiGatewayV2ProxyRequestWrapper) -> anyhow::Result { + let query_string = query_string(request.0.query_string_parameters); + + Ok(wascc_codec::http::Request { + method: request + .0 + .request_context + .http + .method + .ok_or(anyhow!("Missing method in API Gateway v2 proxy request"))?, + path: request + .0 + .request_context + .http + .path + .ok_or(anyhow!("Missing path in API Gateway v2 proxy request"))?, + query_string, + header: request.0.headers, + body: match request.0.body { + Some(s) if request.0.is_base64_encoded => base64::decode(s)?, + Some(s) => s.into_bytes(), + None => vec![], + }, + }) + } +} + +pub(crate) struct ApiGatewayV2ProxyResponseWrapper(apigw::ApiGatewayV2httpResponse); + +impl From for apigw::ApiGatewayV2httpResponse { + /// Converts instance of the wrapper type to an API Gateway v2 proxy response. + fn from(response: ApiGatewayV2ProxyResponseWrapper) -> Self { + response.0 + } +} + +impl From for ApiGatewayV2ProxyResponseWrapper { + /// Converts instance of an API Gateway v2 proxy response to the wrapper type. + fn from(response: apigw::ApiGatewayV2httpResponse) -> Self { + ApiGatewayV2ProxyResponseWrapper(response) + } +} + +impl TryFrom for ApiGatewayV2ProxyResponseWrapper { + type Error = anyhow::Error; + + /// Attempts conversion of an actor's HTTP response to an API Gateway v2 proxy response. + fn try_from(response: wascc_codec::http::Response) -> anyhow::Result { + let (body, is_base64_encoded) = body_string(response.body); + + Ok(apigw::ApiGatewayV2httpResponse { + status_code: response.status_code as i64, + headers: response.header, + multi_value_headers: HashMap::new(), + body, + is_base64_encoded: Some(is_base64_encoded), + cookies: Vec::new(), + } + .into()) + } +} + /// Returns a string representation of the specified bytes and /// a flag indicating whether or not the string is base64 encoded. fn body_string(bytes: Vec) -> (Option, bool) { From 1e4862998fa2921cdacaa3b07e97d9d8f6645d6d Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Mon, 20 Apr 2020 15:45:31 -0400 Subject: [PATCH 16/21] Use API Gateway v2 quick create. --- examples/apigw/main.tf | 40 +++++++++------------------------------- 1 file changed, 9 insertions(+), 31 deletions(-) diff --git a/examples/apigw/main.tf b/examples/apigw/main.tf index cd0ddd8..d3f54b9 100644 --- a/examples/apigw/main.tf +++ b/examples/apigw/main.tf @@ -6,9 +6,9 @@ terraform { required_version = ">= 0.12.19" } -provider "aws" { - version = ">= 2.50.0" -} +# provider "aws" { +# version = ">= 2.58.0" +# } // // Data sources for current AWS account ID, partition and region. @@ -24,32 +24,10 @@ data "aws_region" "current" {} // API Gateway resources. // -resource "aws_api_gateway_rest_api" "example" { - name = "waSCC-example-apigw" -} -resource "aws_api_gateway_resource" "example" { - path_part = "uppercase" - parent_id = aws_api_gateway_rest_api.example.root_resource_id - rest_api_id = aws_api_gateway_rest_api.example.id -} -resource "aws_api_gateway_method" "example" { - rest_api_id = aws_api_gateway_rest_api.example.id - resource_id = aws_api_gateway_resource.example.id - http_method = "GET" - authorization = "NONE" -} -resource "aws_api_gateway_integration" "example" { - rest_api_id = aws_api_gateway_rest_api.example.id - resource_id = aws_api_gateway_resource.example.id - http_method = aws_api_gateway_method.example.http_method - integration_http_method = "POST" - type = "AWS_PROXY" - uri = aws_lambda_function.example.invoke_arn -} -resource "aws_api_gateway_deployment" "example" { - depends_on = [aws_api_gateway_integration.example] - rest_api_id = aws_api_gateway_rest_api.example.id - stage_name = "test" +resource "aws_apigatewayv2_api" "example" { + name = "waSCC-example-apigw" + protocol_type = "HTTP" + target = aws_lambda_function.example.arn } // @@ -84,7 +62,7 @@ resource "aws_lambda_permission" "example" { action = "lambda:InvokeFunction" function_name = aws_lambda_function.example.function_name principal = "apigateway.amazonaws.com" - source_arn = "arn:${data.aws_partition.current.partition}:execute-api:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:${aws_api_gateway_rest_api.example.id}/*/${aws_api_gateway_method.example.http_method}${aws_api_gateway_resource.example.path}" + source_arn = "arn:${data.aws_partition.current.partition}:execute-api:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:${aws_apigatewayv2_api.example.id}/" } // @@ -149,5 +127,5 @@ output "FunctionName" { } output "Url" { - value = "${aws_api_gateway_deployment.example.invoke_url}/${aws_api_gateway_resource.example.path_part}" + value = aws_apigatewayv2_api.example.api_endpoint } From 19da2c84854af9c70723b62a9669d70255ea7592 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Tue, 21 Apr 2020 09:19:39 -0400 Subject: [PATCH 17/21] Correct Lambda permission for APIGW quick create. --- examples/apigw/main.tf | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/apigw/main.tf b/examples/apigw/main.tf index d3f54b9..619996e 100644 --- a/examples/apigw/main.tf +++ b/examples/apigw/main.tf @@ -58,11 +58,12 @@ resource "aws_lambda_function" "example" { } } +// See https://docs.aws.amazon.com/lambda/latest/dg/services-apigateway.html#apigateway-permissions. resource "aws_lambda_permission" "example" { action = "lambda:InvokeFunction" function_name = aws_lambda_function.example.function_name principal = "apigateway.amazonaws.com" - source_arn = "arn:${data.aws_partition.current.partition}:execute-api:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:${aws_apigatewayv2_api.example.id}/" + source_arn = "arn:${data.aws_partition.current.partition}:execute-api:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:${aws_apigatewayv2_api.example.id}/*/$default" } // From 3d5f995593056df5e3aaaaa96280ab1174361a6a Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Tue, 21 Apr 2020 12:19:39 -0400 Subject: [PATCH 18/21] Log runtime version. --- runtime/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/src/main.rs b/runtime/src/main.rs index 66e8c50..0fd74bb 100644 --- a/runtime/src/main.rs +++ b/runtime/src/main.rs @@ -48,7 +48,7 @@ fn main() -> anyhow::Result<()> { info!("Logger already intialized"); } - info!("aws-lambda-wascc-runtime starting"); + info!("aws-lambda-wascc-runtime {} starting", env!("CARGO_PKG_VERSION")); let host = WasccHost::new(); From 5fbb6f2481e4e0b9c90ea7f42b59a8c534610bd6 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Tue, 21 Apr 2020 12:20:10 -0400 Subject: [PATCH 19/21] Log invocation body when attempting conversion to an HTTP request. --- provider/src/dispatch.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/provider/src/dispatch.rs b/provider/src/dispatch.rs index 3ec83e5..31bafd5 100644 --- a/provider/src/dispatch.rs +++ b/provider/src/dispatch.rs @@ -167,7 +167,14 @@ impl Dispatcher<'_> for HttpDispatcher { /// Attempts to dispatch a Lambda invocation event, returning an invocation response. /// The bodies of the invocation event and response are passed and returned. fn dispatch_invocation_event(&self, actor: &str, body: &[u8]) -> anyhow::Result> { - match serde_json::from_slice(body) { + let body = std::str::from_utf8(body).map_err(|e| { + debug!("{}", e); + NotHttpRequestError {} + })?; + + debug!("Lambda invocation event body:\n{}", body); + + match serde_json::from_str(body) { Ok(request @ alb::AlbTargetGroupRequest { .. }) => { let response: alb::AlbTargetGroupResponse = self.dispatch_alb_request(actor, request.into())?.into(); @@ -175,7 +182,7 @@ impl Dispatcher<'_> for HttpDispatcher { } _ => debug!("Not an ALB request"), }; - match serde_json::from_slice(body) { + match serde_json::from_str(body) { Ok(request @ apigw::ApiGatewayProxyRequest { .. }) => { let response: apigw::ApiGatewayProxyResponse = self.dispatch_apigw_request(actor, request.into())?.into(); @@ -183,7 +190,7 @@ impl Dispatcher<'_> for HttpDispatcher { } _ => debug!("Not an API Gateway proxy request"), }; - match serde_json::from_slice(body) { + match serde_json::from_str(body) { Ok(request @ apigw::ApiGatewayV2httpRequest { .. }) => { let response: apigw::ApiGatewayV2httpResponse = self.dispatch_apigwv2_request(actor, request.into())?.into(); From 9f13140f3d554f3f7f0dec2ef37b2f0aad2b2cb8 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Tue, 21 Apr 2020 12:20:43 -0400 Subject: [PATCH 20/21] Tweak RUST_LOG value. --- examples/apigw/main.tf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/apigw/main.tf b/examples/apigw/main.tf index 619996e..0b03c52 100644 --- a/examples/apigw/main.tf +++ b/examples/apigw/main.tf @@ -53,7 +53,7 @@ resource "aws_lambda_function" "example" { environment { variables = { RUST_BACKTRACE = "1" - RUST_LOG = "info,cranelift_wasm=warn" + RUST_LOG = "info,cranelift_wasm=warn,cranelift_codegen=info" } } } From 6d0f2a805c75882ec4f04da15dc1ae3b4118fe93 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Tue, 21 Apr 2020 13:18:00 -0400 Subject: [PATCH 21/21] README additions. --- examples/apigw/README.md | 4 ++++ examples/apigw/main.tf | 3 +++ 2 files changed, 7 insertions(+) diff --git a/examples/apigw/README.md b/examples/apigw/README.md index 0d2f7b2..e233e21 100644 --- a/examples/apigw/README.md +++ b/examples/apigw/README.md @@ -30,6 +30,10 @@ $ make ### Test +```console +$ curl https://v3390lt0j2.execute-api.us-west-2.amazonaws.com/?today=tuesday +{"original":"today=tuesday","uppercased":"TODAY=TUESDAY"} +``` ### Known Issues diff --git a/examples/apigw/main.tf b/examples/apigw/main.tf index 0b03c52..99f6377 100644 --- a/examples/apigw/main.tf +++ b/examples/apigw/main.tf @@ -6,6 +6,9 @@ terraform { required_version = ">= 0.12.19" } + +# Build from https://github.com/terraform-providers/terraform-provider-aws/commit/df71a4fd95c0e5a9afe5b08c43a951d3a7fda0ed. +# Will be released in v2.59.0. # provider "aws" { # version = ">= 2.58.0" # }