diff --git a/Makefile b/Makefile index 209208f5..0890173a 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ INTEG_STACK_NAME ?= rust-lambda-integration-tests -INTEG_FUNCTIONS_BUILD := runtime-fn runtime-trait http-fn +INTEG_FUNCTIONS_BUILD := runtime-fn runtime-trait http-fn http-trait INTEG_FUNCTIONS_INVOKE := RuntimeFn RuntimeFnAl2 RuntimeTrait RuntimeTraitAl2 Python PythonAl2 INTEG_API_INVOKE := RestApiUrl HttpApiUrl INTEG_EXTENSIONS := extension-fn extension-trait @@ -10,6 +10,8 @@ pr-check: cargo +1.54.0 check --all cargo +stable fmt --all -- --check cargo +stable clippy + cargo +1.54.0 test + cargo +stable test integration-tests: # Build Integration functions @@ -47,7 +49,11 @@ invoke-integration-api-%: --query 'Stacks[0].Outputs[?OutputKey==`$*`].OutputValue' \ --output text)) curl $(API_URL)/get + curl $(API_URL)/trait/get curl $(API_URL)/al2/get + curl $(API_URL)/al2-trait/get curl -X POST -d '{"command": "hello"}' $(API_URL)/post + curl -X POST -d '{"command": "hello"}' $(API_URL)/trait/post curl -X POST -d '{"command": "hello"}' $(API_URL)/al2/post + curl -X POST -d '{"command": "hello"}' $(API_URL)/al2-trait/post \ No newline at end of file diff --git a/README.md b/README.md index ff72aac8..285a97b4 100644 --- a/README.md +++ b/README.md @@ -15,17 +15,18 @@ This package makes it easy to run AWS Lambda Functions written in Rust. This wor The code below creates a simple function that receives an event with a `firstName` field and returns a message to the caller. Notice: this crate is tested against latest stable Rust. ```rust,no_run -use lambda_runtime::{handler_fn, Context, Error}; +use lambda_runtime::{service_fn, LambdaEvent, Error}; use serde_json::{json, Value}; #[tokio::main] async fn main() -> Result<(), Error> { - let func = handler_fn(func); + let func = service_fn(func); lambda_runtime::run(func).await?; Ok(()) } -async fn func(event: Value, _: Context) -> Result { +async fn func(event: LambdaEvent) -> Result { + let (event, _context) = event.into_parts(); let first_name = event["firstName"].as_str().unwrap_or("world"); Ok(json!({ "message": format!("Hello, {}!", first_name) })) @@ -213,12 +214,9 @@ Lambdas can be run and debugged locally using a special [Lambda debug proxy](htt ## `lambda` -`lambda_runtime` is a library for authoring reliable and performant Rust-based AWS Lambda functions. At a high level, it provides a few major components: +`lambda_runtime` is a library for authoring reliable and performant Rust-based AWS Lambda functions. At a high level, it provides `lambda_runtime::run`, a function that runs a `tower::Service`. -- `Handler`, a trait that defines interactions between customer-authored code and this library. -- `lambda_runtime::run`, function that runs an `Handler`. - -The function `handler_fn` converts a rust function or closure to `Handler`, which can then be run by `lambda_runtime::run`. +To write a function that will handle request, you need to pass it through `service_fn`, which will convert your function into a `tower::Service`, which can then be run by `lambda_runtime::run`. ## AWS event objects diff --git a/lambda-extension/Cargo.toml b/lambda-extension/Cargo.toml index 34cd9cf2..7ad1b491 100644 --- a/lambda-extension/Cargo.toml +++ b/lambda-extension/Cargo.toml @@ -19,7 +19,7 @@ bytes = "1.0" http = "0.2" async-stream = "0.3" tracing = { version = "0.1", features = ["log"] } -tower-service = "0.3" +tower = { version = "0.4", features = ["util"] } tokio-stream = "0.1.2" lambda_runtime_api_client = { version = "0.4", path = "../lambda-runtime-api-client" } diff --git a/lambda-extension/examples/basic.rs b/lambda-extension/examples/basic.rs index aedc046b..7a722e72 100644 --- a/lambda-extension/examples/basic.rs +++ b/lambda-extension/examples/basic.rs @@ -1,4 +1,4 @@ -use lambda_extension::{extension_fn, Error, LambdaEvent, NextEvent}; +use lambda_extension::{service_fn, Error, LambdaEvent, NextEvent}; async fn my_extension(event: LambdaEvent) -> Result<(), Error> { match event.next { @@ -25,6 +25,6 @@ async fn main() -> Result<(), Error> { .without_time() .init(); - let func = extension_fn(my_extension); + let func = service_fn(my_extension); lambda_extension::run(func).await } diff --git a/lambda-extension/examples/custom_events.rs b/lambda-extension/examples/custom_events.rs index 560fa295..d2756c23 100644 --- a/lambda-extension/examples/custom_events.rs +++ b/lambda-extension/examples/custom_events.rs @@ -1,4 +1,4 @@ -use lambda_extension::{extension_fn, Error, LambdaEvent, NextEvent, Runtime}; +use lambda_extension::{service_fn, Error, LambdaEvent, NextEvent, Runtime}; async fn my_extension(event: LambdaEvent) -> Result<(), Error> { match event.next { @@ -27,7 +27,7 @@ async fn main() -> Result<(), Error> { .without_time() .init(); - let func = extension_fn(my_extension); + let func = service_fn(my_extension); let runtime = Runtime::builder().with_events(&["SHUTDOWN"]).register().await?; diff --git a/lambda-extension/examples/custom_trait_implementation.rs b/lambda-extension/examples/custom_trait_implementation.rs index 7f056957..c9dff5c3 100644 --- a/lambda-extension/examples/custom_trait_implementation.rs +++ b/lambda-extension/examples/custom_trait_implementation.rs @@ -1,4 +1,4 @@ -use lambda_extension::{run, Error, Extension, InvokeEvent, LambdaEvent, NextEvent}; +use lambda_extension::{run, Error, InvokeEvent, LambdaEvent, NextEvent, Service}; use std::{ future::{ready, Future}, pin::Pin, @@ -9,9 +9,16 @@ struct MyExtension { data: Vec, } -impl Extension for MyExtension { - type Fut = Pin>>>; - fn call(&mut self, event: LambdaEvent) -> Self::Fut { +impl Service for MyExtension { + type Error = Error; + type Future = Pin>>>; + type Response = (); + + fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll> { + core::task::Poll::Ready(Ok(())) + } + + fn call(&mut self, event: LambdaEvent) -> Self::Future { match event.next { NextEvent::Shutdown(_e) => { self.data.clear(); diff --git a/lambda-extension/src/lib.rs b/lambda-extension/src/lib.rs index c38947e5..130aae50 100644 --- a/lambda-extension/src/lib.rs +++ b/lambda-extension/src/lib.rs @@ -9,10 +9,10 @@ use hyper::client::{connect::Connection, HttpConnector}; use lambda_runtime_api_client::Client; use serde::Deserialize; -use std::{future::Future, path::PathBuf}; +use std::{fmt, future::Future, path::PathBuf}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_stream::StreamExt; -use tower_service::Service; +pub use tower::{self, service_fn, Service}; use tracing::trace; /// Include several request builders to interact with the Extension API. @@ -103,40 +103,6 @@ pub struct LambdaEvent { pub next: NextEvent, } -/// A trait describing an asynchronous extension. -pub trait Extension { - /// Response of this Extension. - type Fut: Future>; - /// Handle the incoming event. - fn call(&mut self, event: LambdaEvent) -> Self::Fut; -} - -/// Returns a new [`ExtensionFn`] with the given closure. -/// -/// [`ExtensionFn`]: struct.ExtensionFn.html -pub fn extension_fn(f: F) -> ExtensionFn { - ExtensionFn { f } -} - -/// An [`Extension`] implemented by a closure. -/// -/// [`Extension`]: trait.Extension.html -#[derive(Clone, Debug)] -pub struct ExtensionFn { - f: F, -} - -impl Extension for ExtensionFn -where - F: Fn(LambdaEvent) -> Fut, - Fut: Future>, -{ - type Fut = Fut; - fn call(&mut self, event: LambdaEvent) -> Self::Fut { - (self.f)(event) - } -} - /// The Runtime handles all the incoming extension requests pub struct Runtime = HttpConnector> { extension_id: String, @@ -153,13 +119,18 @@ impl Runtime { impl Runtime where C: Service + Clone + Send + Sync + Unpin + 'static, - >::Future: Unpin + Send, - >::Error: Into>, - >::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, + C::Future: Unpin + Send, + C::Error: Into>, + C::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, { /// Execute the given extension. /// Register the extension with the Extensions API and wait for incoming events. - pub async fn run(&self, mut extension: impl Extension) -> Result<(), Error> { + pub async fn run(&self, mut extension: E) -> Result<(), Error> + where + E: Service, + E::Future: Future>, + E::Error: Into> + fmt::Display, + { let client = &self.client; let incoming = async_stream::stream! { @@ -196,7 +167,7 @@ where }; self.client.call(req).await?; - return Err(error); + return Err(error.into()); } } @@ -263,9 +234,11 @@ impl<'a> RuntimeBuilder<'a> { } /// Execute the given extension -pub async fn run(extension: Ex) -> Result<(), Error> +pub async fn run(extension: E) -> Result<(), Error> where - Ex: Extension, + E: Service, + E::Future: Future>, + E::Error: Into> + fmt::Display, { Runtime::builder().register().await?.run(extension).await } diff --git a/lambda-http/examples/hello-http.rs b/lambda-http/examples/hello-http.rs index 0682d028..40352dab 100644 --- a/lambda-http/examples/hello-http.rs +++ b/lambda-http/examples/hello-http.rs @@ -1,16 +1,12 @@ -use lambda_http::{ - handler, - lambda_runtime::{self, Context, Error}, - IntoResponse, Request, RequestExt, Response, -}; +use lambda_http::{service_fn, Error, IntoResponse, Request, RequestExt, Response}; #[tokio::main] async fn main() -> Result<(), Error> { - lambda_runtime::run(handler(func)).await?; + lambda_http::run(service_fn(func)).await?; Ok(()) } -async fn func(event: Request, _: Context) -> Result { +async fn func(event: Request) -> Result { Ok(match event.query_string_parameters().get("first_name") { Some(first_name) => format!("Hello, {}!", first_name).into_response(), _ => Response::builder() diff --git a/lambda-http/examples/shared-resources-example.rs b/lambda-http/examples/shared-resources-example.rs index 61a30668..24e56f97 100644 --- a/lambda-http/examples/shared-resources-example.rs +++ b/lambda-http/examples/shared-resources-example.rs @@ -1,8 +1,4 @@ -use lambda_http::{ - handler, - lambda_runtime::{self, Context, Error}, - IntoResponse, Request, RequestExt, Response, -}; +use lambda_http::{service_fn, Error, IntoResponse, Request, RequestExt, Response}; struct SharedClient { name: &'static str, @@ -23,9 +19,11 @@ async fn main() -> Result<(), Error> { let shared_client_ref = &shared_client; // Define a closure here that makes use of the shared client. - let handler_func_closure = move |event: Request, ctx: Context| async move { + let handler_func_closure = move |event: Request| async move { Ok(match event.query_string_parameters().get("first_name") { - Some(first_name) => shared_client_ref.response(ctx.request_id, first_name).into_response(), + Some(first_name) => shared_client_ref + .response(event.lambda_context().request_id, first_name) + .into_response(), _ => Response::builder() .status(400) .body("Empty first name".into()) @@ -34,6 +32,6 @@ async fn main() -> Result<(), Error> { }; // Pass the closure to the runtime here. - lambda_runtime::run(handler(handler_func_closure)).await?; + lambda_http::run(service_fn(handler_func_closure)).await?; Ok(()) } diff --git a/lambda-http/src/ext.rs b/lambda-http/src/ext.rs index be094c5a..2f56d78c 100644 --- a/lambda-http/src/ext.rs +++ b/lambda-http/src/ext.rs @@ -1,6 +1,7 @@ //! Extension methods for `http::Request` types use crate::{request::RequestContext, strmap::StrMap, Body}; +use lambda_runtime::Context; use serde::{de::value::Error as SerdeError, Deserialize}; use std::{error::Error, fmt}; @@ -66,7 +67,7 @@ impl Error for PayloadError { /// as well as `{"x":1, "y":2}` respectively. /// /// ```rust,no_run -/// use lambda_http::{handler, lambda_runtime::{self, Error, Context}, Body, IntoResponse, Request, Response, RequestExt}; +/// use lambda_http::{service_fn, Error, Context, Body, IntoResponse, Request, Response, RequestExt}; /// use serde::Deserialize; /// /// #[derive(Debug,Deserialize,Default)] @@ -79,13 +80,12 @@ impl Error for PayloadError { /// /// #[tokio::main] /// async fn main() -> Result<(), Error> { -/// lambda_runtime::run(handler(add)).await?; +/// lambda_http::run(service_fn(add)).await?; /// Ok(()) /// } /// /// async fn add( -/// request: Request, -/// _: Context +/// request: Request /// ) -> Result, Error> { /// let args: Args = request.payload() /// .unwrap_or_else(|_parse_err| None) @@ -167,6 +167,12 @@ pub trait RequestExt { fn payload(&self) -> Result, PayloadError> where for<'de> D: Deserialize<'de>; + + /// Return the Lambda function context associated with the request + fn lambda_context(&self) -> Context; + + /// Configures instance with lambda context + fn with_lambda_context(self, context: Context) -> Self; } impl RequestExt for http::Request { @@ -226,6 +232,19 @@ impl RequestExt for http::Request { .expect("Request did not contain a request context") } + fn lambda_context(&self) -> Context { + self.extensions() + .get::() + .cloned() + .expect("Request did not contain a lambda context") + } + + fn with_lambda_context(self, context: Context) -> Self { + let mut s = self; + s.extensions_mut().insert(context); + s + } + fn payload(&self) -> Result, PayloadError> where for<'de> D: Deserialize<'de>, diff --git a/lambda-http/src/lib.rs b/lambda-http/src/lib.rs index 50e567fa..e1119dd0 100644 --- a/lambda-http/src/lib.rs +++ b/lambda-http/src/lib.rs @@ -12,18 +12,18 @@ //! ## Hello World //! //! The following example is how you would structure your Lambda such that you have a `main` function where you explicitly invoke -//! `lambda_runtime::run` in combination with the [`handler`](fn.handler.html) function. This pattern allows you to utilize global initialization +//! `lambda_http::run` in combination with the [`service_fn`](fn.service_fn.html) function. This pattern allows you to utilize global initialization //! of tools such as loggers, to use on warm invokes to the same Lambda function after the first request, helping to reduce the latency of //! your function's execution path. //! //! ```rust,no_run -//! use lambda_http::{handler, lambda_runtime::{self, Error}}; +//! use lambda_http::{service_fn, Error}; //! //! #[tokio::main] //! async fn main() -> Result<(), Error> { //! // initialize dependencies once here for the lifetime of your //! // lambda task -//! lambda_runtime::run(handler(|request, context| async { Ok("👋 world!") })).await?; +//! lambda_http::run(service_fn(|request| async { Ok("👋 world!") })).await?; //! Ok(()) //! } //! ``` @@ -31,21 +31,22 @@ //! ## Leveraging trigger provided data //! //! You can also access information provided directly from the underlying trigger events, like query string parameters, -//! with the [`RequestExt`](trait.RequestExt.html) trait. +//! or Lambda function context, with the [`RequestExt`](trait.RequestExt.html) trait. //! //! ```rust,no_run -//! use lambda_http::{handler, lambda_runtime::{self, Context, Error}, IntoResponse, Request, RequestExt}; +//! use lambda_http::{service_fn, Error, IntoResponse, Request, RequestExt}; //! //! #[tokio::main] //! async fn main() -> Result<(), Error> { -//! lambda_runtime::run(handler(hello)).await?; +//! lambda_http::run(service_fn(hello)).await?; //! Ok(()) //! } //! //! async fn hello( -//! request: Request, -//! _: Context +//! request: Request //! ) -> Result { +//! let _context = request.lambda_context(); +//! //! Ok(format!( //! "hello {}", //! request @@ -62,8 +63,8 @@ extern crate maplit; pub use http::{self, Response}; -pub use lambda_runtime::{self, Context}; -use lambda_runtime::{Error, Handler as LambdaHandler}; +use lambda_runtime::LambdaEvent; +pub use lambda_runtime::{self, service_fn, tower, Context, Error, Service}; mod body; pub mod ext; @@ -85,47 +86,13 @@ use std::{ /// Type alias for `http::Request`s with a fixed [`Body`](enum.Body.html) type pub type Request = http::Request; -/// Functions serving as ALB and API Gateway REST and HTTP API handlers must conform to this type. +/// Future that will convert an [`IntoResponse`] into an actual [`LambdaResponse`] /// -/// This can be viewed as a `lambda_runtime::Handler` constrained to `http` crate `Request` and `Response` types -pub trait Handler<'a>: Sized { - /// The type of Error that this Handler will return - type Error; - /// The type of Response this Handler will return - type Response: IntoResponse; - /// The type of Future this Handler will return - type Fut: Future> + 'a; - /// Function used to execute handler behavior - fn call(&mut self, event: Request, context: Context) -> Self::Fut; -} - -/// Adapts a [`Handler`](trait.Handler.html) to the `lambda_runtime::run` interface -pub fn handler<'a, H: Handler<'a>>(handler: H) -> Adapter<'a, H> { - Adapter { - handler, - _phantom_data: PhantomData, - } -} - -/// An implementation of `Handler` for a given closure return a `Future` representing the computed response -impl<'a, F, R, Fut> Handler<'a> for F -where - F: Fn(Request, Context) -> Fut, - R: IntoResponse, - Fut: Future> + 'a, -{ - type Response = R; - type Error = Error; - type Fut = Fut; - fn call(&mut self, event: Request, context: Context) -> Self::Fut { - (self)(event, context) - } -} - +/// This is used by the `Adapter` wrapper and is completely internal to the `lambda_http::run` function. #[doc(hidden)] pub struct TransformResponse<'a, R, E> { request_origin: RequestOrigin, - fut: Pin> + 'a>>, + fut: Pin> + Send + 'a>>, } impl<'a, R, E> Future for TransformResponse<'a, R, E> @@ -133,6 +100,7 @@ where R: IntoResponse, { type Output = Result; + fn poll(mut self: Pin<&mut Self>, cx: &mut TaskContext) -> Poll { match self.fut.as_mut().poll(cx) { Poll::Ready(result) => Poll::Ready( @@ -143,34 +111,61 @@ where } } -/// Exists only to satisfy the trait cover rule for `lambda_runtime::Handler` impl +/// Wraps a `Service` in a `Service>` /// -/// User code should never need to interact with this type directly. Since `Adapter` implements `Handler` -/// It serves as a opaque trait covering type. -/// -/// See [this article](http://smallcultfollowing.com/babysteps/blog/2015/01/14/little-orphan-impls/) -/// for a larger explanation of why this is necessary -pub struct Adapter<'a, H: Handler<'a>> { - handler: H, - _phantom_data: PhantomData<&'a H>, +/// This is completely internal to the `lambda_http::run` function. +#[doc(hidden)] +pub struct Adapter<'a, R, S> { + service: S, + _phantom_data: PhantomData<&'a R>, } -impl<'a, H: Handler<'a>> Handler<'a> for Adapter<'a, H> { - type Response = H::Response; - type Error = H::Error; - type Fut = H::Fut; - fn call(&mut self, event: Request, context: Context) -> Self::Fut { - self.handler.call(event, context) +impl<'a, R, S> From for Adapter<'a, R, S> +where + S: Service + Send, + S::Future: Send + 'a, + R: IntoResponse, +{ + fn from(service: S) -> Self { + Adapter { + service, + _phantom_data: PhantomData, + } } } -impl<'a, 'b, H: Handler<'a>> LambdaHandler, LambdaResponse> for Adapter<'a, H> { - type Error = H::Error; - type Fut = TransformResponse<'a, H::Response, Self::Error>; +impl<'a, R, S> Service>> for Adapter<'a, R, S> +where + S: Service + Send, + S::Future: Send + 'a, + R: IntoResponse, +{ + type Response = LambdaResponse; + type Error = Error; + type Future = TransformResponse<'a, R, Self::Error>; + + fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll> { + core::task::Poll::Ready(Ok(())) + } - fn call(&mut self, event: LambdaRequest<'_>, context: Context) -> Self::Fut { - let request_origin = event.request_origin(); - let fut = Box::pin(self.handler.call(event.into(), context)); + fn call(&mut self, req: LambdaEvent>) -> Self::Future { + let request_origin = req.payload.request_origin(); + let event: Request = req.payload.into(); + let fut = Box::pin(self.service.call(event.with_lambda_context(req.context))); TransformResponse { request_origin, fut } } } + +/// Starts the Lambda Rust runtime and begins polling for events on the [Lambda +/// Runtime APIs](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html). +/// +/// This takes care of transforming the LambdaEvent into a [`Request`] and then +/// converting the result into a [`LambdaResponse`]. +pub async fn run<'a, R, S>(handler: S) -> Result<(), Error> +where + S: Service + Send, + S::Future: Send + 'a, + R: IntoResponse, +{ + lambda_runtime::run(Adapter::from(handler)).await +} diff --git a/lambda-integration-tests/src/bin/extension-fn.rs b/lambda-integration-tests/src/bin/extension-fn.rs index 5835b0f7..ea5fc26c 100644 --- a/lambda-integration-tests/src/bin/extension-fn.rs +++ b/lambda-integration-tests/src/bin/extension-fn.rs @@ -1,4 +1,4 @@ -use lambda_extension::{extension_fn, Error, LambdaEvent, NextEvent}; +use lambda_extension::{service_fn, Error, LambdaEvent, NextEvent}; use tracing::info; async fn my_extension(event: LambdaEvent) -> Result<(), Error> { @@ -27,5 +27,5 @@ async fn main() -> Result<(), Error> { .without_time() .init(); - lambda_extension::run(extension_fn(my_extension)).await + lambda_extension::run(service_fn(my_extension)).await } diff --git a/lambda-integration-tests/src/bin/extension-trait.rs b/lambda-integration-tests/src/bin/extension-trait.rs index bc4b3b32..1dc73c75 100644 --- a/lambda-integration-tests/src/bin/extension-trait.rs +++ b/lambda-integration-tests/src/bin/extension-trait.rs @@ -1,4 +1,4 @@ -use lambda_extension::{Error, Extension, LambdaEvent, NextEvent}; +use lambda_extension::{Error, LambdaEvent, NextEvent, Service}; use std::{ future::{ready, Future}, pin::Pin, @@ -10,10 +10,16 @@ struct MyExtension { invoke_count: usize, } -impl Extension for MyExtension { - type Fut = Pin>>>; +impl Service for MyExtension { + type Error = Error; + type Future = Pin>>>; + type Response = (); - fn call(&mut self, event: LambdaEvent) -> Self::Fut { + fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll> { + core::task::Poll::Ready(Ok(())) + } + + fn call(&mut self, event: LambdaEvent) -> Self::Future { match event.next { NextEvent::Shutdown(e) => { info!("[extension] Shutdown event received: {:?}", e); diff --git a/lambda-integration-tests/src/bin/http-fn.rs b/lambda-integration-tests/src/bin/http-fn.rs index 23b4961c..b411b77f 100644 --- a/lambda-integration-tests/src/bin/http-fn.rs +++ b/lambda-integration-tests/src/bin/http-fn.rs @@ -1,10 +1,8 @@ -use lambda_http::{ - lambda_runtime::{self, Context, Error}, - IntoResponse, Request, Response, -}; +use lambda_http::{service_fn, Error, IntoResponse, Request, RequestExt, Response}; use tracing::info; -async fn handler(event: Request, _context: Context) -> Result { +async fn handler(event: Request) -> Result { + let _context = event.lambda_context(); info!("[http-fn] Received event {} {}", event.method(), event.uri().path()); Ok(Response::builder().status(200).body("Hello, world!").unwrap()) @@ -23,5 +21,6 @@ async fn main() -> Result<(), Error> { .without_time() .init(); - lambda_runtime::run(lambda_http::handler(handler)).await + let handler = service_fn(handler); + lambda_http::run(handler).await } diff --git a/lambda-integration-tests/src/bin/http-trait.rs b/lambda-integration-tests/src/bin/http-trait.rs new file mode 100644 index 00000000..091aec8e --- /dev/null +++ b/lambda-integration-tests/src/bin/http-trait.rs @@ -0,0 +1,45 @@ +use lambda_http::{Error, Request, RequestExt, Response, Service}; +use std::{ + future::{ready, Future}, + pin::Pin, +}; +use tracing::info; + +#[derive(Default)] +struct MyHandler { + invoke_count: usize, +} + +impl Service for MyHandler { + type Error = Error; + type Future = Pin> + Send>>; + type Response = Response<&'static str>; + + fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll> { + core::task::Poll::Ready(Ok(())) + } + + fn call(&mut self, request: Request) -> Self::Future { + self.invoke_count += 1; + info!("[http-trait] Received event {}: {:?}", self.invoke_count, request); + info!("[http-trait] Lambda context: {:?}", request.lambda_context()); + Box::pin(ready(Ok(Response::builder() + .status(200) + .body("Hello, World!") + .unwrap()))) + } +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + // this needs to be set to false, otherwise ANSI color codes will + // show up in a confusing manner in CloudWatch logs. + .with_ansi(false) + // disabling time is handy because CloudWatch will add the ingestion time. + .without_time() + .init(); + + lambda_http::run(MyHandler::default()).await +} diff --git a/lambda-integration-tests/src/bin/runtime-fn.rs b/lambda-integration-tests/src/bin/runtime-fn.rs index 327316c3..1b3f3e0d 100644 --- a/lambda-integration-tests/src/bin/runtime-fn.rs +++ b/lambda-integration-tests/src/bin/runtime-fn.rs @@ -1,4 +1,4 @@ -use lambda_runtime::{handler_fn, Context, Error}; +use lambda_runtime::{service_fn, Error, LambdaEvent}; use serde::{Deserialize, Serialize}; use tracing::info; @@ -12,9 +12,11 @@ struct Response { message: String, } -async fn handler(event: Request, _context: Context) -> Result { +async fn handler(event: LambdaEvent) -> Result { info!("[handler-fn] Received event: {:?}", event); + let (event, _) = event.into_parts(); + Ok(Response { message: event.command.to_uppercase(), }) @@ -33,5 +35,5 @@ async fn main() -> Result<(), Error> { .without_time() .init(); - lambda_runtime::run(handler_fn(handler)).await + lambda_runtime::run(service_fn(handler)).await } diff --git a/lambda-integration-tests/src/bin/runtime-trait.rs b/lambda-integration-tests/src/bin/runtime-trait.rs index f7175d09..d86bafdc 100644 --- a/lambda-integration-tests/src/bin/runtime-trait.rs +++ b/lambda-integration-tests/src/bin/runtime-trait.rs @@ -1,4 +1,4 @@ -use lambda_runtime::{Context, Error, Handler}; +use lambda_runtime::{Error, LambdaEvent, Service}; use serde::{Deserialize, Serialize}; use std::{ future::{ready, Future}, @@ -21,15 +21,20 @@ struct MyHandler { invoke_count: usize, } -impl Handler for MyHandler { +impl Service> for MyHandler { type Error = Error; - type Fut = Pin>>>; + type Future = Pin>>>; + type Response = Response; - fn call(&mut self, event: Request, _context: Context) -> Self::Fut { + fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll> { + core::task::Poll::Ready(Ok(())) + } + + fn call(&mut self, request: LambdaEvent) -> Self::Future { self.invoke_count += 1; - info!("[handler] Received event {}: {:?}", self.invoke_count, event); + info!("[handler] Received event {}: {:?}", self.invoke_count, request); Box::pin(ready(Ok(Response { - message: event.command.to_uppercase(), + message: request.payload.command.to_uppercase(), }))) } } diff --git a/lambda-integration-tests/template.yaml b/lambda-integration-tests/template.yaml index 7f408d2c..848f8be5 100644 --- a/lambda-integration-tests/template.yaml +++ b/lambda-integration-tests/template.yaml @@ -28,7 +28,7 @@ Resources: - !Ref ExtensionFn - !Ref ExtensionTrait - # Rust function using a Handler implementation running on AL2 + # Rust function using a Service implementation running on AL2 RuntimeTraitAl2: Type: AWS::Serverless::Function Properties: @@ -38,7 +38,7 @@ Resources: - !Ref ExtensionFn - !Ref ExtensionTrait - # Rust function using a Handler implementation running on AL1 + # Rust function using a Service implementation running on AL1 RuntimeTrait: Type: AWS::Serverless::Function Properties: @@ -48,7 +48,7 @@ Resources: - !Ref ExtensionFn - !Ref ExtensionTrait - # Rust function using lambda_http::runtime running on AL2 + # Rust function using lambda_http::service_fn running on AL2 HttpFnAl2: Type: AWS::Serverless::Function Properties: @@ -78,8 +78,39 @@ Resources: Layers: - !Ref ExtensionFn - !Ref ExtensionTrait + + # Rust function using lambda_http with Service running on AL2 + HttpTraitAl2: + Type: AWS::Serverless::Function + Properties: + CodeUri: ../build/http-trait/ + Runtime: provided.al2 + Events: + ApiGet: + Type: Api + Properties: + Method: GET + Path: /al2-trait/get + ApiPost: + Type: Api + Properties: + Method: POST + Path: /al2-trait/post + ApiV2Get: + Type: HttpApi + Properties: + Method: GET + Path: /al2-trait/get + ApiV2Post: + Type: HttpApi + Properties: + Method: POST + Path: /al2-trait/post + Layers: + - !Ref ExtensionFn + - !Ref ExtensionTrait - # Rust function using lambda_http::runtime running on AL1 + # Rust function using lambda_http::service_fn running on AL1 HttpFn: Type: AWS::Serverless::Function Properties: @@ -110,6 +141,37 @@ Resources: - !Ref ExtensionFn - !Ref ExtensionTrait + # Rust function using lambda_http with Service running on AL1 + HttpTrait: + Type: AWS::Serverless::Function + Properties: + CodeUri: ../build/http-trait/ + Runtime: provided + Events: + ApiGet: + Type: Api + Properties: + Method: GET + Path: /trait/get + ApiPost: + Type: Api + Properties: + Method: POST + Path: /trait/post + ApiV2Get: + Type: HttpApi + Properties: + Method: GET + Path: /trait/get + ApiV2Post: + Type: HttpApi + Properties: + Method: POST + Path: /trait/post + Layers: + - !Ref ExtensionFn + - !Ref ExtensionTrait + # Python function running on AL2 PythonAl2: Type: AWS::Serverless::Function diff --git a/lambda-runtime/Cargo.toml b/lambda-runtime/Cargo.toml index 5bf36f13..4bdb1c84 100644 --- a/lambda-runtime/Cargo.toml +++ b/lambda-runtime/Cargo.toml @@ -23,7 +23,7 @@ bytes = "1.0" http = "0.2" async-stream = "0.3" tracing = { version = "0.1", features = ["log"] } -tower-service = "0.3" +tower = { version = "0.4", features = ["util"] } tokio-stream = "0.1.2" lambda_runtime_api_client = { version = "0.4", path = "../lambda-runtime-api-client" } diff --git a/lambda-runtime/examples/basic.rs b/lambda-runtime/examples/basic.rs index d4c962a7..26589f0d 100644 --- a/lambda-runtime/examples/basic.rs +++ b/lambda-runtime/examples/basic.rs @@ -1,7 +1,7 @@ // This example requires the following input to succeed: // { "command": "do something" } -use lambda_runtime::{handler_fn, Context, Error}; +use lambda_runtime::{service_fn, Error, LambdaEvent}; use serde::{Deserialize, Serialize}; /// This is also a made-up example. Requests come into the runtime as unicode @@ -33,18 +33,18 @@ async fn main() -> Result<(), Error> { .without_time() .init(); - let func = handler_fn(my_handler); + let func = service_fn(my_handler); lambda_runtime::run(func).await?; Ok(()) } -pub(crate) async fn my_handler(event: Request, ctx: Context) -> Result { +pub(crate) async fn my_handler(event: LambdaEvent) -> Result { // extract some useful info from the request - let command = event.command; + let command = event.payload.command; // prepare the response let resp = Response { - req_id: ctx.request_id, + req_id: event.context.request_id, msg: format!("Command {} executed.", command), }; diff --git a/lambda-runtime/examples/error-handling.rs b/lambda-runtime/examples/error-handling.rs index 0957f4ce..6b7a3c96 100644 --- a/lambda-runtime/examples/error-handling.rs +++ b/lambda-runtime/examples/error-handling.rs @@ -1,5 +1,5 @@ /// See https://github.com/awslabs/aws-lambda-rust-runtime for more info on Rust runtime for AWS Lambda -use lambda_runtime::{handler_fn, Error}; +use lambda_runtime::{service_fn, Error, LambdaEvent}; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use std::fs::File; @@ -61,13 +61,15 @@ async fn main() -> Result<(), Error> { .init(); // call the actual handler of the request - let func = handler_fn(func); + let func = service_fn(func); lambda_runtime::run(func).await?; Ok(()) } /// The actual handler of the Lambda request. -pub(crate) async fn func(event: Value, ctx: lambda_runtime::Context) -> Result { +pub(crate) async fn func(event: LambdaEvent) -> Result { + let (event, ctx) = event.into_parts(); + // check what action was requested match serde_json::from_value::(event)?.event_type { EventType::SimpleError => { diff --git a/lambda-runtime/examples/shared_resource.rs b/lambda-runtime/examples/shared_resource.rs index 8110c15c..cd9366dd 100644 --- a/lambda-runtime/examples/shared_resource.rs +++ b/lambda-runtime/examples/shared_resource.rs @@ -4,7 +4,7 @@ // Run it with the following input: // { "command": "do something" } -use lambda_runtime::{handler_fn, Context, Error}; +use lambda_runtime::{service_fn, Error, LambdaEvent}; use serde::{Deserialize, Serialize}; /// This is also a made-up example. Requests come into the runtime as unicode @@ -56,9 +56,9 @@ async fn main() -> Result<(), Error> { let client = SharedClient::new("Shared Client 1 (perhaps a database)"); let client_ref = &client; - lambda_runtime::run(handler_fn(move |event: Request, ctx: Context| async move { - let command = event.command; - Ok::(client_ref.response(ctx.request_id, command)) + lambda_runtime::run(service_fn(move |event: LambdaEvent| async move { + let command = event.payload.command; + Ok::(client_ref.response(event.context.request_id, command)) })) .await?; Ok(()) diff --git a/lambda-runtime/src/lib.rs b/lambda-runtime/src/lib.rs index ad6fecf1..c49cbfa3 100644 --- a/lambda-runtime/src/lib.rs +++ b/lambda-runtime/src/lib.rs @@ -4,16 +4,17 @@ //! The mechanism available for defining a Lambda function is as follows: //! -//! Create a type that conforms to the [`Handler`] trait. This type can then be passed -//! to the the `lambda_runtime::run` function, which launches and runs the Lambda runtime. -pub use crate::types::Context; +//! Create a type that conforms to the [`tower::Service`] trait. This type can +//! then be passed to the the `lambda_runtime::run` function, which launches +//! and runs the Lambda runtime. use hyper::client::{connect::Connection, HttpConnector}; use lambda_runtime_api_client::Client; use serde::{Deserialize, Serialize}; use std::{convert::TryFrom, env, fmt, future::Future, panic}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_stream::{Stream, StreamExt}; -use tower_service::Service; +use tower::util::ServiceFn; +pub use tower::{self, service_fn, Service}; use tracing::{error, trace}; mod requests; @@ -24,6 +25,7 @@ mod types; use requests::{EventCompletionRequest, EventErrorRequest, IntoRequest, NextEventRequest}; use types::Diagnostic; +pub use types::{Context, LambdaEvent}; /// Error type that lambdas may result in pub type Error = lambda_runtime_api_client::Error; @@ -60,42 +62,13 @@ impl Config { } } -/// A trait describing an asynchronous function `A` to `B`. -pub trait Handler { - /// Errors returned by this handler. - type Error; - /// Response of this handler. - type Fut: Future>; - /// Handle the incoming event. - fn call(&mut self, event: A, context: Context) -> Self::Fut; -} - -/// Returns a new [`HandlerFn`] with the given closure. -/// -/// [`HandlerFn`]: struct.HandlerFn.html -pub fn handler_fn(f: F) -> HandlerFn { - HandlerFn { f } -} - -/// A [`Handler`] implemented by a closure. -/// -/// [`Handler`]: trait.Handler.html -#[derive(Clone, Debug)] -pub struct HandlerFn { - f: F, -} - -impl Handler for HandlerFn +/// Return a new [`ServiceFn`] with a closure that takes an event and context as separate arguments. +#[deprecated(since = "0.5.0", note = "Use `service_fn` and `LambdaEvent` instead")] +pub fn handler_fn(f: F) -> ServiceFn) -> Fut> where F: Fn(A, Context) -> Fut, - Fut: Future>, - Error: Into> + fmt::Display, { - type Error = Error; - type Fut = Fut; - fn call(&mut self, req: A, ctx: Context) -> Self::Fut { - (self.f)(req, ctx) - } + service_fn(move |req: LambdaEvent| f(req.payload, req.context)) } struct Runtime = HttpConnector> { @@ -105,9 +78,9 @@ struct Runtime = HttpConnector> { impl Runtime where C: Service + Clone + Send + Sync + Unpin + 'static, - >::Future: Unpin + Send, - >::Error: Into>, - >::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, + C::Future: Unpin + Send, + C::Error: Into>, + C::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, { pub async fn run( &self, @@ -116,9 +89,9 @@ where config: &Config, ) -> Result<(), Error> where - F: Handler, - >::Fut: Future>::Error>>, - >::Error: fmt::Display, + F: Service>, + F::Future: Future>, + F::Error: fmt::Display, A: for<'de> Deserialize<'de>, B: Serialize, { @@ -139,7 +112,7 @@ where env::set_var("_X_AMZN_TRACE_ID", xray_trace_id); let request_id = &ctx.request_id.clone(); - let task = panic::catch_unwind(panic::AssertUnwindSafe(|| handler.call(body, ctx))); + let task = panic::catch_unwind(panic::AssertUnwindSafe(|| handler.call(LambdaEvent::new(body, ctx)))); let req = match task { Ok(response) => match response.await { @@ -208,25 +181,25 @@ where /// /// # Example /// ```no_run -/// use lambda_runtime::{Error, handler_fn, Context}; +/// use lambda_runtime::{Error, service_fn, LambdaEvent}; /// use serde_json::Value; /// /// #[tokio::main] /// async fn main() -> Result<(), Error> { -/// let func = handler_fn(func); +/// let func = service_fn(func); /// lambda_runtime::run(func).await?; /// Ok(()) /// } /// -/// async fn func(event: Value, _: Context) -> Result { -/// Ok(event) +/// async fn func(event: LambdaEvent) -> Result { +/// Ok(event.payload) /// } /// ``` pub async fn run(handler: F) -> Result<(), Error> where - F: Handler, - >::Fut: Future>::Error>>, - >::Error: fmt::Display, + F: Service>, + F::Future: Future>, + F::Error: fmt::Display, A: for<'de> Deserialize<'de>, B: Serialize, { @@ -462,10 +435,11 @@ mod endpoint_tests { .build() .expect("Unable to build client"); - async fn func(event: serde_json::Value, _: crate::Context) -> Result { + async fn func(event: crate::LambdaEvent) -> Result { + let (event, _) = event.into_parts(); Ok(event) } - let f = crate::handler_fn(func); + let f = crate::service_fn(func); // set env vars needed to init Config if they are not already set in the environment if env::var("AWS_LAMBDA_RUNTIME_API").is_err() { diff --git a/lambda-runtime/src/types.rs b/lambda-runtime/src/types.rs index 8c9e5321..ee71ba1c 100644 --- a/lambda-runtime/src/types.rs +++ b/lambda-runtime/src/types.rs @@ -166,6 +166,27 @@ impl TryFrom for Context { } } +/// Incoming Lambda request containing the event payload and context. +#[derive(Clone, Debug)] +pub struct LambdaEvent { + /// Event payload. + pub payload: T, + /// Invocation context. + pub context: Context, +} + +impl LambdaEvent { + /// Creates a new Lambda request + pub fn new(payload: T, context: Context) -> Self { + Self { payload, context } + } + + /// Split the Lambda event into its payload and context. + pub fn into_parts(self) -> (T, Context) { + (self.payload, self.context) + } +} + #[cfg(test)] mod test { use super::*;