diff --git a/lambda-http/src/lib.rs b/lambda-http/src/lib.rs index ec80c30b..0421350e 100644 --- a/lambda-http/src/lib.rs +++ b/lambda-http/src/lib.rs @@ -63,7 +63,7 @@ extern crate maplit; pub use http::{self, Response}; pub use lambda_runtime::{self, Context}; -use lambda_runtime::{Error, Handler as LambdaHandler}; +use lambda_runtime::{tower::Service, Error, LambdaRequest as RuntimeRequest}; mod body; pub mod ext; @@ -164,13 +164,18 @@ impl<'a, H: Handler<'a>> Handler<'a> for Adapter<'a, H> { } } -impl<'a, 'b, H: Handler<'a>> LambdaHandler, LambdaResponse> for Adapter<'a, H> { +impl<'a, 'b, H: Handler<'a>> Service>> for Adapter<'a, H> { type Error = H::Error; - type Fut = TransformResponse<'a, H::Response, Self::Error>; + type Response = LambdaResponse; + type Future = TransformResponse<'a, H::Response, Self::Error>; - fn call(&mut self, event: LambdaRequest<'_>, context: Context) -> Self::Fut { - let request_origin = event.request_origin(); - let fut = Box::pin(self.handler.call(event.into(), context)); + fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll> { + core::task::Poll::Ready(Ok(())) + } + + fn call(&mut self, req: RuntimeRequest>) -> Self::Future { + let request_origin = req.event.request_origin(); + let fut = Box::pin(self.handler.call(req.event.into(), req.context)); TransformResponse { request_origin, fut } } } diff --git a/lambda-runtime/Cargo.toml b/lambda-runtime/Cargo.toml index 4d0de675..c86278f3 100644 --- a/lambda-runtime/Cargo.toml +++ b/lambda-runtime/Cargo.toml @@ -25,7 +25,7 @@ async-stream = "0.3" futures = "0.3" tracing-error = "0.2" tracing = { version = "0.1", features = ["log"] } -tower-service = "0.3" +tower = { version = "0.4", features = ["util"] } tokio-stream = "0.1.2" [dev-dependencies] diff --git a/lambda-runtime/src/lib.rs b/lambda-runtime/src/lib.rs index 30220e45..bb5cc800 100644 --- a/lambda-runtime/src/lib.rs +++ b/lambda-runtime/src/lib.rs @@ -17,9 +17,11 @@ use std::{ }; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_stream::{Stream, StreamExt}; -use tower_service::Service; +pub use tower; use tracing::{error, trace}; +use tower::{service_fn, util::ServiceFn, Service}; + mod client; mod requests; #[cfg(test)] @@ -30,6 +32,8 @@ mod types; use requests::{EventCompletionRequest, EventErrorRequest, IntoRequest, NextEventRequest}; use types::Diagnostic; +pub use types::LambdaRequest; + /// Error type that lambdas may result in pub type Error = Box; @@ -68,42 +72,17 @@ impl Config { } } -/// A trait describing an asynchronous function `A` to `B`. -pub trait Handler { - /// Errors returned by this handler. - type Error; - /// Response of this handler. - type Fut: Future>; - /// Handle the incoming event. - fn call(&mut self, event: A, context: Context) -> Self::Fut; -} - -/// Returns a new [`HandlerFn`] with the given closure. -/// -/// [`HandlerFn`]: struct.HandlerFn.html -pub fn handler_fn(f: F) -> HandlerFn { - HandlerFn { f } +/// Wraps a function that takes 2 arguments into one that only takes a [`LambdaRequest`]. +fn handler_wrapper(f: impl Fn(A, Context) -> Fut) -> impl Fn(LambdaRequest) -> Fut { + move |req| f(req.event, req.context) } -/// A [`Handler`] implemented by a closure. -/// -/// [`Handler`]: trait.Handler.html -#[derive(Clone, Debug)] -pub struct HandlerFn { - f: F, -} - -impl Handler for HandlerFn +/// Return a new [`ServiceFn`] with a closure that takes an event and context as separate arguments. +pub fn handler_fn(f: F) -> ServiceFn) -> Fut> where F: Fn(A, Context) -> Fut, - Fut: Future>, - Error: Into> + fmt::Display, { - type Error = Error; - type Fut = Fut; - fn call(&mut self, req: A, ctx: Context) -> Self::Fut { - (self.f)(req, ctx) - } + service_fn(handler_wrapper(f)) } #[non_exhaustive] @@ -128,9 +107,9 @@ impl Runtime { impl Runtime where C: Service + Clone + Send + Sync + Unpin + 'static, - >::Future: Unpin + Send, - >::Error: Into>, - >::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, + C::Future: Unpin + Send, + C::Error: Into>, + C::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, { pub async fn run( &self, @@ -139,9 +118,9 @@ where config: &Config, ) -> Result<(), Error> where - F: Handler, - >::Fut: Future>::Error>>, - >::Error: fmt::Display, + F: Service>, + F::Future: Future>, + F::Error: fmt::Display, A: for<'de> Deserialize<'de>, B: Serialize, { @@ -162,7 +141,12 @@ where env::set_var("_X_AMZN_TRACE_ID", xray_trace_id); let request_id = &ctx.request_id.clone(); - let task = panic::catch_unwind(panic::AssertUnwindSafe(|| handler.call(body, ctx))); + let task = panic::catch_unwind(panic::AssertUnwindSafe(|| { + handler.call(LambdaRequest { + event: body, + context: ctx, + }) + })); let req = match task { Ok(response) => match response.await { @@ -217,16 +201,16 @@ struct RuntimeBuilder = hyper::client::HttpConnector> { impl RuntimeBuilder where C: Service + Clone + Send + Sync + Unpin + 'static, - >::Future: Unpin + Send, - >::Error: Into>, - >::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, + C::Future: Unpin + Send, + C::Error: Into>, + C::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, { pub fn with_connector(self, connector: C2) -> RuntimeBuilder where C2: Service + Clone + Send + Sync + Unpin + 'static, - >::Future: Unpin + Send, - >::Error: Into>, - >::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, + C2::Future: Unpin + Send, + C2::Error: Into>, + C2::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, { RuntimeBuilder { connector, @@ -262,9 +246,9 @@ fn test_builder() { fn incoming(client: &Client) -> impl Stream, Error>> + Send + '_ where C: Service + Clone + Send + Sync + Unpin + 'static, - >::Future: Unpin + Send, - >::Error: Into>, - >::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, + C::Future: Unpin + Send, + C::Error: Into>, + C::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, { async_stream::stream! { loop { @@ -297,9 +281,9 @@ where /// ``` pub async fn run(handler: F) -> Result<(), Error> where - F: Handler, - >::Fut: Future>::Error>>, - >::Error: fmt::Display, + F: Service>, + F::Future: Future>, + F::Error: fmt::Display, A: for<'de> Deserialize<'de>, B: Serialize, { diff --git a/lambda-runtime/src/types.rs b/lambda-runtime/src/types.rs index ad894971..4cfbd1aa 100644 --- a/lambda-runtime/src/types.rs +++ b/lambda-runtime/src/types.rs @@ -148,6 +148,15 @@ impl TryFrom for Context { } } +/// Incoming Lambda request containing the event payload and context. +#[derive(Clone, Debug)] +pub struct LambdaRequest { + /// Event payload. + pub event: T, + /// Invocation context. + pub context: Context, +} + #[cfg(test)] mod test { use super::*;