Skip to content

[WIP] feat!: Replace Handler with tower::Service #375

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions lambda-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ extern crate maplit;

pub use http::{self, Response};
pub use lambda_runtime::{self, Context};
use lambda_runtime::{Error, Handler as LambdaHandler};
use lambda_runtime::{tower::Service, Error, LambdaRequest as RuntimeRequest};

mod body;
pub mod ext;
Expand Down Expand Up @@ -164,13 +164,18 @@ impl<'a, H: Handler<'a>> Handler<'a> for Adapter<'a, H> {
}
}

impl<'a, 'b, H: Handler<'a>> LambdaHandler<LambdaRequest<'b>, LambdaResponse> for Adapter<'a, H> {
impl<'a, 'b, H: Handler<'a>> Service<RuntimeRequest<LambdaRequest<'b>>> for Adapter<'a, H> {
type Error = H::Error;
type Fut = TransformResponse<'a, H::Response, Self::Error>;
type Response = LambdaResponse;
type Future = TransformResponse<'a, H::Response, Self::Error>;

fn call(&mut self, event: LambdaRequest<'_>, context: Context) -> Self::Fut {
let request_origin = event.request_origin();
let fut = Box::pin(self.handler.call(event.into(), context));
fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll<Result<(), Self::Error>> {
core::task::Poll::Ready(Ok(()))
}

fn call(&mut self, req: RuntimeRequest<LambdaRequest<'_>>) -> Self::Future {
let request_origin = req.event.request_origin();
let fut = Box::pin(self.handler.call(req.event.into(), req.context));
TransformResponse { request_origin, fut }
}
}
2 changes: 1 addition & 1 deletion lambda-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async-stream = "0.3"
futures = "0.3"
tracing-error = "0.2"
tracing = { version = "0.1", features = ["log"] }
tower-service = "0.3"
tower = { version = "0.4", features = ["util"] }
tokio-stream = "0.1.2"

[dev-dependencies]
Expand Down
86 changes: 35 additions & 51 deletions lambda-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ use std::{
};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_stream::{Stream, StreamExt};
use tower_service::Service;
pub use tower;
use tracing::{error, trace};

use tower::{service_fn, util::ServiceFn, Service};

mod client;
mod requests;
#[cfg(test)]
Expand All @@ -30,6 +32,8 @@ mod types;
use requests::{EventCompletionRequest, EventErrorRequest, IntoRequest, NextEventRequest};
use types::Diagnostic;

pub use types::LambdaRequest;

/// Error type that lambdas may result in
pub type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

Expand Down Expand Up @@ -68,42 +72,17 @@ impl Config {
}
}

/// A trait describing an asynchronous function `A` to `B`.
pub trait Handler<A, B> {
/// Errors returned by this handler.
type Error;
/// Response of this handler.
type Fut: Future<Output = Result<B, Self::Error>>;
/// Handle the incoming event.
fn call(&mut self, event: A, context: Context) -> Self::Fut;
}

/// Returns a new [`HandlerFn`] with the given closure.
///
/// [`HandlerFn`]: struct.HandlerFn.html
pub fn handler_fn<F>(f: F) -> HandlerFn<F> {
HandlerFn { f }
/// Wraps a function that takes 2 arguments into one that only takes a [`LambdaRequest`].
fn handler_wrapper<A, Fut>(f: impl Fn(A, Context) -> Fut) -> impl Fn(LambdaRequest<A>) -> Fut {
move |req| f(req.event, req.context)
}
Comment on lines +75 to 78
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intentionally not public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! My thinking here was to only use this function as part of handler_fn to help wrap a function that takes two arguments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha. for the public interface, i think i'd personally try to use (and-reexport) tower::service_fn.


/// A [`Handler`] implemented by a closure.
///
/// [`Handler`]: trait.Handler.html
#[derive(Clone, Debug)]
pub struct HandlerFn<F> {
f: F,
}

impl<F, A, B, Error, Fut> Handler<A, B> for HandlerFn<F>
/// Return a new [`ServiceFn`] with a closure that takes an event and context as separate arguments.
pub fn handler_fn<A, F, Fut>(f: F) -> ServiceFn<impl Fn(LambdaRequest<A>) -> Fut>
where
F: Fn(A, Context) -> Fut,
Fut: Future<Output = Result<B, Error>>,
Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>> + fmt::Display,
{
type Error = Error;
type Fut = Fut;
fn call(&mut self, req: A, ctx: Context) -> Self::Fut {
(self.f)(req, ctx)
}
service_fn(handler_wrapper(f))
}

#[non_exhaustive]
Expand All @@ -128,9 +107,9 @@ impl Runtime {
impl<C> Runtime<C>
where
C: Service<http::Uri> + Clone + Send + Sync + Unpin + 'static,
<C as Service<http::Uri>>::Future: Unpin + Send,
<C as Service<http::Uri>>::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
<C as Service<http::Uri>>::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
C::Future: Unpin + Send,
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
C::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
{
pub async fn run<F, A, B>(
&self,
Expand All @@ -139,9 +118,9 @@ where
config: &Config,
) -> Result<(), Error>
where
F: Handler<A, B>,
<F as Handler<A, B>>::Fut: Future<Output = Result<B, <F as Handler<A, B>>::Error>>,
<F as Handler<A, B>>::Error: fmt::Display,
F: Service<LambdaRequest<A>>,
F::Future: Future<Output = Result<B, F::Error>>,
F::Error: fmt::Display,
A: for<'de> Deserialize<'de>,
B: Serialize,
{
Expand All @@ -162,7 +141,12 @@ where
env::set_var("_X_AMZN_TRACE_ID", xray_trace_id);

let request_id = &ctx.request_id.clone();
let task = panic::catch_unwind(panic::AssertUnwindSafe(|| handler.call(body, ctx)));
let task = panic::catch_unwind(panic::AssertUnwindSafe(|| {
handler.call(LambdaRequest {
event: body,
context: ctx,
})
}));

let req = match task {
Ok(response) => match response.await {
Expand Down Expand Up @@ -217,16 +201,16 @@ struct RuntimeBuilder<C: Service<http::Uri> = hyper::client::HttpConnector> {
impl<C> RuntimeBuilder<C>
where
C: Service<http::Uri> + Clone + Send + Sync + Unpin + 'static,
<C as Service<http::Uri>>::Future: Unpin + Send,
<C as Service<http::Uri>>::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
<C as Service<http::Uri>>::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
C::Future: Unpin + Send,
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
C::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
{
pub fn with_connector<C2>(self, connector: C2) -> RuntimeBuilder<C2>
where
C2: Service<http::Uri> + Clone + Send + Sync + Unpin + 'static,
<C2 as Service<http::Uri>>::Future: Unpin + Send,
<C2 as Service<http::Uri>>::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
<C2 as Service<http::Uri>>::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
C2::Future: Unpin + Send,
C2::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
C2::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
{
RuntimeBuilder {
connector,
Expand Down Expand Up @@ -262,9 +246,9 @@ fn test_builder() {
fn incoming<C>(client: &Client<C>) -> impl Stream<Item = Result<http::Response<hyper::Body>, Error>> + Send + '_
where
C: Service<http::Uri> + Clone + Send + Sync + Unpin + 'static,
<C as Service<http::Uri>>::Future: Unpin + Send,
<C as Service<http::Uri>>::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
<C as Service<http::Uri>>::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
C::Future: Unpin + Send,
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
C::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
{
async_stream::stream! {
loop {
Expand Down Expand Up @@ -297,9 +281,9 @@ where
/// ```
pub async fn run<A, B, F>(handler: F) -> Result<(), Error>
where
F: Handler<A, B>,
<F as Handler<A, B>>::Fut: Future<Output = Result<B, <F as Handler<A, B>>::Error>>,
<F as Handler<A, B>>::Error: fmt::Display,
F: Service<LambdaRequest<A>>,
F::Future: Future<Output = Result<B, F::Error>>,
F::Error: fmt::Display,
A: for<'de> Deserialize<'de>,
B: Serialize,
{
Expand Down
9 changes: 9 additions & 0 deletions lambda-runtime/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ impl TryFrom<HeaderMap> for Context {
}
}

/// Incoming Lambda request containing the event payload and context.
#[derive(Clone, Debug)]
pub struct LambdaRequest<T> {
/// Event payload.
pub event: T,
/// Invocation context.
pub context: Context,
Comment on lines +154 to +157
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'd make these fields private and only expose them via getters to not make adding fields a semver-breaking change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

For reference if anyone else reads this, I wanted to dig into why getters would be a good idea here and found this message on the Rust forum.

That said, given that the only benefit of getters here would be preventing breaking changes, isn't it better to use #[non_exhaustive] instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, given that the only benefit of getters here would be preventing breaking changes, isn't it better to use #[non_exhaustive] instead?

#[non_exhaustive] also makes it impossible for the Context struct to be constructed in e.g., test code. i'd consider creating a builder for this, but i need to remember the standard behavior.

Copy link
Contributor

@rimutaka rimutaka Nov 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be confusing what is being discussed. Feel free to ignore ...
I've been trying to add a cache (#372) that can be managed by the handler and preserved by the runtime between invocations. I wonder how it may work with this change.

My understanding is that &mut is not possible due to move, so we need to pass the ownership of the cache to the handler, get it back with the response, preserve until the next invocation and then pass onto the handler again. The type of the cache is unknown. It will be up to the handler_fn implementers to define it.

Would it be a good idea to have something like this?

pub struct LambdaRequest<T,Q> {
    /// Event payload.
    pub event: T,
    /// Invocation context.
    pub context: Context,
    /// User-defined cache
    pub cache: Q,
}

Q would probably need Send bound, but otherwise keep it as unconstrained as possible.

It probably doesn't matter if the example given by @softprops in #372 (comment) would still work after this change.

Copy link
Contributor Author

@nmoutschen nmoutschen Nov 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @rimutaka ! Thanks for taking the time to look at this. 😄

With this change, the example from @softprops would not directly, but you could make your own tower::Service or tower::Layer. Basically, there would be 2 interfaces for creating Lambda functions:

  • A high-level interface that works in the same way as other languages on Lambda (with handler_fn and functions that take 2 arguments (event and context);
  • A low-level interface that takes a Service<LambdaRequest<A>>, where you can adopt the same strategy as the comment you linked, but substitute Handler<A, B> with Service<LambdaRequest<A>>. See this example using the proposed changes.

That said, for your specific use-case, you could also use the high-level interface if you're OK with using a RefCell.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, taking a step back, I think the LambdaRequest could work slightly differently. I'm not too keen on the context key, so we could have an extensions pattern similar to http::Request.

Then from there, you could provide tower::Layers that would inject values into the request that you could retrieve later. We'd inject the Context by default, but you could use that to inject a cache for example, and retrieve it with request::extensions_mut::<MyCache>().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extensions would be a nice addition to the current option of implementing Handler trait. I am concerned we may overcomplicate the user interface, though. Just a thought.

}

#[cfg(test)]
mod test {
use super::*;
Expand Down