From 062018913c5881b55537a985b371beb3acd27da3 Mon Sep 17 00:00:00 2001 From: Nicolas Moutschen Date: Sat, 27 Nov 2021 10:05:12 +0100 Subject: [PATCH 1/4] feat(lambda-runtime): replace Handler with tower::Service --- lambda-runtime/src/lib.rs | 63 ++++++++++++++++++------------------- lambda-runtime/src/types.rs | 9 ++++++ 2 files changed, 40 insertions(+), 32 deletions(-) diff --git a/lambda-runtime/src/lib.rs b/lambda-runtime/src/lib.rs index 30220e45..c88170ed 100644 --- a/lambda-runtime/src/lib.rs +++ b/lambda-runtime/src/lib.rs @@ -17,7 +17,7 @@ use std::{ }; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_stream::{Stream, StreamExt}; -use tower_service::Service; +pub use tower_service::Service; use tracing::{error, trace}; mod client; @@ -30,6 +30,8 @@ mod types; use requests::{EventCompletionRequest, EventErrorRequest, IntoRequest, NextEventRequest}; use types::Diagnostic; +pub use types::LambdaRequest; + /// Error type that lambdas may result in pub type Error = Box; @@ -68,42 +70,34 @@ impl Config { } } -/// A trait describing an asynchronous function `A` to `B`. -pub trait Handler { - /// Errors returned by this handler. - type Error; - /// Response of this handler. - type Fut: Future>; - /// Handle the incoming event. - fn call(&mut self, event: A, context: Context) -> Self::Fut; -} - -/// Returns a new [`HandlerFn`] with the given closure. -/// -/// [`HandlerFn`]: struct.HandlerFn.html -pub fn handler_fn(f: F) -> HandlerFn { - HandlerFn { f } -} - -/// A [`Handler`] implemented by a closure. -/// -/// [`Handler`]: trait.Handler.html +/// A [`tower::Service`] implemented by a closure. #[derive(Clone, Debug)] pub struct HandlerFn { f: F, } -impl Handler for HandlerFn +impl Service> for HandlerFn where F: Fn(A, Context) -> Fut, Fut: Future>, Error: Into> + fmt::Display, { + type Response = B; type Error = Error; - type Fut = Fut; - fn call(&mut self, req: A, ctx: Context) -> Self::Fut { - (self.f)(req, ctx) + type Future = Fut; + + fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll> { + core::task::Poll::Ready(Ok(())) } + + fn call(&mut self, req: LambdaRequest) -> Self::Future { + (self.f)(req.event, req.context) + } +} + +/// Returns a new [`HandlerFn`] with the given closure. +pub fn handler_fn(f: F) -> HandlerFn { + HandlerFn { f } } #[non_exhaustive] @@ -139,9 +133,9 @@ where config: &Config, ) -> Result<(), Error> where - F: Handler, - >::Fut: Future>::Error>>, - >::Error: fmt::Display, + F: Service>, + >>::Future: Future>>::Error>>, + >>::Error: fmt::Display, A: for<'de> Deserialize<'de>, B: Serialize, { @@ -162,7 +156,12 @@ where env::set_var("_X_AMZN_TRACE_ID", xray_trace_id); let request_id = &ctx.request_id.clone(); - let task = panic::catch_unwind(panic::AssertUnwindSafe(|| handler.call(body, ctx))); + let task = panic::catch_unwind(panic::AssertUnwindSafe(|| { + handler.call(LambdaRequest { + event: body, + context: ctx, + }) + })); let req = match task { Ok(response) => match response.await { @@ -297,9 +296,9 @@ where /// ``` pub async fn run(handler: F) -> Result<(), Error> where - F: Handler, - >::Fut: Future>::Error>>, - >::Error: fmt::Display, + F: Service>, + >>::Future: Future>>::Error>>, + >>::Error: fmt::Display, A: for<'de> Deserialize<'de>, B: Serialize, { diff --git a/lambda-runtime/src/types.rs b/lambda-runtime/src/types.rs index ad894971..4cfbd1aa 100644 --- a/lambda-runtime/src/types.rs +++ b/lambda-runtime/src/types.rs @@ -148,6 +148,15 @@ impl TryFrom for Context { } } +/// Incoming Lambda request containing the event payload and context. +#[derive(Clone, Debug)] +pub struct LambdaRequest { + /// Event payload. + pub event: T, + /// Invocation context. + pub context: Context, +} + #[cfg(test)] mod test { use super::*; From ddc9d482ac53cf62e4169cbf0ed3883d67ed91ba Mon Sep 17 00:00:00 2001 From: Nicolas Moutschen Date: Sat, 27 Nov 2021 10:31:19 +0100 Subject: [PATCH 2/4] feat(lambda-http)!: remplace runtime Handler with Service --- lambda-http/src/lib.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/lambda-http/src/lib.rs b/lambda-http/src/lib.rs index ec80c30b..07916bdd 100644 --- a/lambda-http/src/lib.rs +++ b/lambda-http/src/lib.rs @@ -63,7 +63,7 @@ extern crate maplit; pub use http::{self, Response}; pub use lambda_runtime::{self, Context}; -use lambda_runtime::{Error, Handler as LambdaHandler}; +use lambda_runtime::{Error, LambdaRequest as RuntimeRequest, Service}; mod body; pub mod ext; @@ -164,13 +164,18 @@ impl<'a, H: Handler<'a>> Handler<'a> for Adapter<'a, H> { } } -impl<'a, 'b, H: Handler<'a>> LambdaHandler, LambdaResponse> for Adapter<'a, H> { +impl<'a, 'b, H: Handler<'a>> Service>> for Adapter<'a, H> { type Error = H::Error; - type Fut = TransformResponse<'a, H::Response, Self::Error>; + type Response = LambdaResponse; + type Future = TransformResponse<'a, H::Response, Self::Error>; - fn call(&mut self, event: LambdaRequest<'_>, context: Context) -> Self::Fut { - let request_origin = event.request_origin(); - let fut = Box::pin(self.handler.call(event.into(), context)); + fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll> { + core::task::Poll::Ready(Ok(())) + } + + fn call(&mut self, req: RuntimeRequest>) -> Self::Future { + let request_origin = req.event.request_origin(); + let fut = Box::pin(self.handler.call(req.event.into(), req.context)); TransformResponse { request_origin, fut } } } From 8427987fd146ba56392f19876255c4bff1599bb4 Mon Sep 17 00:00:00 2001 From: Nicolas Moutschen Date: Sat, 27 Nov 2021 15:49:58 +0100 Subject: [PATCH 3/4] feat!(lambda-runtime): export tower and replace HandlerFn with ServiceFn --- lambda-http/src/lib.rs | 2 +- lambda-runtime/Cargo.toml | 2 +- lambda-runtime/src/lib.rs | 33 +++++++++------------------------ 3 files changed, 11 insertions(+), 26 deletions(-) diff --git a/lambda-http/src/lib.rs b/lambda-http/src/lib.rs index 07916bdd..0421350e 100644 --- a/lambda-http/src/lib.rs +++ b/lambda-http/src/lib.rs @@ -63,7 +63,7 @@ extern crate maplit; pub use http::{self, Response}; pub use lambda_runtime::{self, Context}; -use lambda_runtime::{Error, LambdaRequest as RuntimeRequest, Service}; +use lambda_runtime::{tower::Service, Error, LambdaRequest as RuntimeRequest}; mod body; pub mod ext; diff --git a/lambda-runtime/Cargo.toml b/lambda-runtime/Cargo.toml index 4d0de675..c86278f3 100644 --- a/lambda-runtime/Cargo.toml +++ b/lambda-runtime/Cargo.toml @@ -25,7 +25,7 @@ async-stream = "0.3" futures = "0.3" tracing-error = "0.2" tracing = { version = "0.1", features = ["log"] } -tower-service = "0.3" +tower = { version = "0.4", features = ["util"] } tokio-stream = "0.1.2" [dev-dependencies] diff --git a/lambda-runtime/src/lib.rs b/lambda-runtime/src/lib.rs index c88170ed..adb9ca6e 100644 --- a/lambda-runtime/src/lib.rs +++ b/lambda-runtime/src/lib.rs @@ -17,9 +17,11 @@ use std::{ }; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_stream::{Stream, StreamExt}; -pub use tower_service::Service; +pub use tower; use tracing::{error, trace}; +use tower::{service_fn, util::ServiceFn, Service}; + mod client; mod requests; #[cfg(test)] @@ -70,34 +72,17 @@ impl Config { } } -/// A [`tower::Service`] implemented by a closure. -#[derive(Clone, Debug)] -pub struct HandlerFn { - f: F, +/// Wraps a function that takes 2 arguments into one that only takes a [`LambdaRequest`]. +fn handler_wrapper(f: impl Fn(A, Context) -> Fut) -> impl Fn(LambdaRequest) -> Fut { + move |req| f(req.event, req.context) } -impl Service> for HandlerFn +/// Return a new [`ServiceFn`] with a closure that takes an event and context as separate arguments. +pub fn handler_fn(f: F) -> ServiceFn) -> Fut> where F: Fn(A, Context) -> Fut, - Fut: Future>, - Error: Into> + fmt::Display, { - type Response = B; - type Error = Error; - type Future = Fut; - - fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll> { - core::task::Poll::Ready(Ok(())) - } - - fn call(&mut self, req: LambdaRequest) -> Self::Future { - (self.f)(req.event, req.context) - } -} - -/// Returns a new [`HandlerFn`] with the given closure. -pub fn handler_fn(f: F) -> HandlerFn { - HandlerFn { f } + service_fn(handler_wrapper(f)) } #[non_exhaustive] From 4a92c0c70a92fef18381891f9ef957dd560b55bb Mon Sep 17 00:00:00 2001 From: Nicolas Moutschen Date: Sat, 27 Nov 2021 16:17:22 +0100 Subject: [PATCH 4/4] chore: simplify trait bounds --- lambda-runtime/src/lib.rs | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/lambda-runtime/src/lib.rs b/lambda-runtime/src/lib.rs index adb9ca6e..bb5cc800 100644 --- a/lambda-runtime/src/lib.rs +++ b/lambda-runtime/src/lib.rs @@ -107,9 +107,9 @@ impl Runtime { impl Runtime where C: Service + Clone + Send + Sync + Unpin + 'static, - >::Future: Unpin + Send, - >::Error: Into>, - >::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, + C::Future: Unpin + Send, + C::Error: Into>, + C::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, { pub async fn run( &self, @@ -119,8 +119,8 @@ where ) -> Result<(), Error> where F: Service>, - >>::Future: Future>>::Error>>, - >>::Error: fmt::Display, + F::Future: Future>, + F::Error: fmt::Display, A: for<'de> Deserialize<'de>, B: Serialize, { @@ -201,16 +201,16 @@ struct RuntimeBuilder = hyper::client::HttpConnector> { impl RuntimeBuilder where C: Service + Clone + Send + Sync + Unpin + 'static, - >::Future: Unpin + Send, - >::Error: Into>, - >::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, + C::Future: Unpin + Send, + C::Error: Into>, + C::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, { pub fn with_connector(self, connector: C2) -> RuntimeBuilder where C2: Service + Clone + Send + Sync + Unpin + 'static, - >::Future: Unpin + Send, - >::Error: Into>, - >::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, + C2::Future: Unpin + Send, + C2::Error: Into>, + C2::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, { RuntimeBuilder { connector, @@ -246,9 +246,9 @@ fn test_builder() { fn incoming(client: &Client) -> impl Stream, Error>> + Send + '_ where C: Service + Clone + Send + Sync + Unpin + 'static, - >::Future: Unpin + Send, - >::Error: Into>, - >::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, + C::Future: Unpin + Send, + C::Error: Into>, + C::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, { async_stream::stream! { loop { @@ -282,8 +282,8 @@ where pub async fn run(handler: F) -> Result<(), Error> where F: Service>, - >>::Future: Future>>::Error>>, - >>::Error: fmt::Display, + F::Future: Future>, + F::Error: fmt::Display, A: for<'de> Deserialize<'de>, B: Serialize, {