Skip to content

Implement tower::Service trait #401

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jan 21, 2022
Merged
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
INTEG_STACK_NAME ?= rust-lambda-integration-tests
INTEG_FUNCTIONS_BUILD := runtime-fn runtime-trait http-fn
INTEG_FUNCTIONS_BUILD := runtime-fn runtime-trait http-fn http-trait
INTEG_FUNCTIONS_INVOKE := RuntimeFn RuntimeFnAl2 RuntimeTrait RuntimeTraitAl2 Python PythonAl2
INTEG_API_INVOKE := RestApiUrl HttpApiUrl
INTEG_EXTENSIONS := extension-fn extension-trait
Expand All @@ -10,6 +10,8 @@ pr-check:
cargo +1.54.0 check --all
cargo +stable fmt --all -- --check
cargo +stable clippy
cargo +1.54.0 test
cargo +stable test

integration-tests:
# Build Integration functions
Expand Down Expand Up @@ -47,7 +49,11 @@ invoke-integration-api-%:
--query 'Stacks[0].Outputs[?OutputKey==`$*`].OutputValue' \
--output text))
curl $(API_URL)/get
curl $(API_URL)/trait/get
curl $(API_URL)/al2/get
curl $(API_URL)/al2-trait/get
curl -X POST -d '{"command": "hello"}' $(API_URL)/post
curl -X POST -d '{"command": "hello"}' $(API_URL)/trait/post
curl -X POST -d '{"command": "hello"}' $(API_URL)/al2/post
curl -X POST -d '{"command": "hello"}' $(API_URL)/al2-trait/post

14 changes: 6 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@ This package makes it easy to run AWS Lambda Functions written in Rust. This wor
The code below creates a simple function that receives an event with a `firstName` field and returns a message to the caller. Notice: this crate is tested against latest stable Rust.

```rust,no_run
use lambda_runtime::{handler_fn, Context, Error};
use lambda_runtime::{service_fn, LambdaEvent, Error};
use serde_json::{json, Value};

#[tokio::main]
async fn main() -> Result<(), Error> {
let func = handler_fn(func);
let func = service_fn(func);
lambda_runtime::run(func).await?;
Ok(())
}

async fn func(event: Value, _: Context) -> Result<Value, Error> {
async fn func(event: LambdaEvent<Value>) -> Result<Value, Error> {
let (event, _context) = event.into_parts();
let first_name = event["firstName"].as_str().unwrap_or("world");

Ok(json!({ "message": format!("Hello, {}!", first_name) }))
Expand Down Expand Up @@ -213,12 +214,9 @@ Lambdas can be run and debugged locally using a special [Lambda debug proxy](htt

## `lambda`

`lambda_runtime` is a library for authoring reliable and performant Rust-based AWS Lambda functions. At a high level, it provides a few major components:
`lambda_runtime` is a library for authoring reliable and performant Rust-based AWS Lambda functions. At a high level, it provides `lambda_runtime::run`, a function that runs a `tower::Service<LambdaEvent>`.

- `Handler`, a trait that defines interactions between customer-authored code and this library.
- `lambda_runtime::run`, function that runs an `Handler`.

The function `handler_fn` converts a rust function or closure to `Handler`, which can then be run by `lambda_runtime::run`.
To write a function that will handle request, you need to pass it through `service_fn`, which will convert your function into a `tower::Service<LambdaEvent>`, which can then be run by `lambda_runtime::run`.

## AWS event objects

Expand Down
2 changes: 1 addition & 1 deletion lambda-extension/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ bytes = "1.0"
http = "0.2"
async-stream = "0.3"
tracing = { version = "0.1", features = ["log"] }
tower-service = "0.3"
tower = { version = "0.4", features = ["util"] }
tokio-stream = "0.1.2"
lambda_runtime_api_client = { version = "0.4", path = "../lambda-runtime-api-client" }

Expand Down
4 changes: 2 additions & 2 deletions lambda-extension/examples/basic.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use lambda_extension::{extension_fn, Error, LambdaEvent, NextEvent};
use lambda_extension::{service_fn, Error, LambdaEvent, NextEvent};

async fn my_extension(event: LambdaEvent) -> Result<(), Error> {
match event.next {
Expand All @@ -25,6 +25,6 @@ async fn main() -> Result<(), Error> {
.without_time()
.init();

let func = extension_fn(my_extension);
let func = service_fn(my_extension);
lambda_extension::run(func).await
}
4 changes: 2 additions & 2 deletions lambda-extension/examples/custom_events.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use lambda_extension::{extension_fn, Error, LambdaEvent, NextEvent, Runtime};
use lambda_extension::{service_fn, Error, LambdaEvent, NextEvent, Runtime};

async fn my_extension(event: LambdaEvent) -> Result<(), Error> {
match event.next {
Expand Down Expand Up @@ -27,7 +27,7 @@ async fn main() -> Result<(), Error> {
.without_time()
.init();

let func = extension_fn(my_extension);
let func = service_fn(my_extension);

let runtime = Runtime::builder().with_events(&["SHUTDOWN"]).register().await?;

Expand Down
15 changes: 11 additions & 4 deletions lambda-extension/examples/custom_trait_implementation.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use lambda_extension::{run, Error, Extension, InvokeEvent, LambdaEvent, NextEvent};
use lambda_extension::{run, Error, InvokeEvent, LambdaEvent, NextEvent, Service};
use std::{
future::{ready, Future},
pin::Pin,
Expand All @@ -9,9 +9,16 @@ struct MyExtension {
data: Vec<InvokeEvent>,
}

impl Extension for MyExtension {
type Fut = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
fn call(&mut self, event: LambdaEvent) -> Self::Fut {
impl Service<LambdaEvent> for MyExtension {
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
type Response = ();

fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll<Result<(), Self::Error>> {
core::task::Poll::Ready(Ok(()))
}

fn call(&mut self, event: LambdaEvent) -> Self::Future {
match event.next {
NextEvent::Shutdown(_e) => {
self.data.clear();
Expand Down
59 changes: 16 additions & 43 deletions lambda-extension/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
use hyper::client::{connect::Connection, HttpConnector};
use lambda_runtime_api_client::Client;
use serde::Deserialize;
use std::{future::Future, path::PathBuf};
use std::{fmt, future::Future, path::PathBuf};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_stream::StreamExt;
use tower_service::Service;
pub use tower::{self, service_fn, Service};
use tracing::trace;

/// Include several request builders to interact with the Extension API.
Expand Down Expand Up @@ -103,40 +103,6 @@ pub struct LambdaEvent {
pub next: NextEvent,
}

/// A trait describing an asynchronous extension.
pub trait Extension {
/// Response of this Extension.
type Fut: Future<Output = Result<(), Error>>;
/// Handle the incoming event.
fn call(&mut self, event: LambdaEvent) -> Self::Fut;
}

/// Returns a new [`ExtensionFn`] with the given closure.
///
/// [`ExtensionFn`]: struct.ExtensionFn.html
pub fn extension_fn<F>(f: F) -> ExtensionFn<F> {
ExtensionFn { f }
}

/// An [`Extension`] implemented by a closure.
///
/// [`Extension`]: trait.Extension.html
#[derive(Clone, Debug)]
pub struct ExtensionFn<F> {
f: F,
}

impl<F, Fut> Extension for ExtensionFn<F>
where
F: Fn(LambdaEvent) -> Fut,
Fut: Future<Output = Result<(), Error>>,
{
type Fut = Fut;
fn call(&mut self, event: LambdaEvent) -> Self::Fut {
(self.f)(event)
}
}

/// The Runtime handles all the incoming extension requests
pub struct Runtime<C: Service<http::Uri> = HttpConnector> {
extension_id: String,
Expand All @@ -153,13 +119,18 @@ impl Runtime {
impl<C> Runtime<C>
where
C: Service<http::Uri> + Clone + Send + Sync + Unpin + 'static,
<C as Service<http::Uri>>::Future: Unpin + Send,
<C as Service<http::Uri>>::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
<C as Service<http::Uri>>::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
C::Future: Unpin + Send,
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
C::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
{
/// Execute the given extension.
/// Register the extension with the Extensions API and wait for incoming events.
pub async fn run(&self, mut extension: impl Extension) -> Result<(), Error> {
pub async fn run<E>(&self, mut extension: E) -> Result<(), Error>
where
E: Service<LambdaEvent>,
E::Future: Future<Output = Result<(), E::Error>>,
E::Error: Into<Box<dyn std::error::Error + Send + Sync>> + fmt::Display,
{
let client = &self.client;

let incoming = async_stream::stream! {
Expand Down Expand Up @@ -196,7 +167,7 @@ where
};

self.client.call(req).await?;
return Err(error);
return Err(error.into());
}
}

Expand Down Expand Up @@ -263,9 +234,11 @@ impl<'a> RuntimeBuilder<'a> {
}

/// Execute the given extension
pub async fn run<Ex>(extension: Ex) -> Result<(), Error>
pub async fn run<E>(extension: E) -> Result<(), Error>
where
Ex: Extension,
E: Service<LambdaEvent>,
E::Future: Future<Output = Result<(), E::Error>>,
E::Error: Into<Box<dyn std::error::Error + Send + Sync>> + fmt::Display,
{
Runtime::builder().register().await?.run(extension).await
}
10 changes: 3 additions & 7 deletions lambda-http/examples/hello-http.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
use lambda_http::{
handler,
lambda_runtime::{self, Context, Error},
IntoResponse, Request, RequestExt, Response,
};
use lambda_http::{service_fn, Error, IntoResponse, Request, RequestExt, Response};

#[tokio::main]
async fn main() -> Result<(), Error> {
lambda_runtime::run(handler(func)).await?;
lambda_http::run(service_fn(func)).await?;
Ok(())
}

async fn func(event: Request, _: Context) -> Result<impl IntoResponse, Error> {
async fn func(event: Request) -> Result<impl IntoResponse, Error> {
Ok(match event.query_string_parameters().get("first_name") {
Some(first_name) => format!("Hello, {}!", first_name).into_response(),
_ => Response::builder()
Expand Down
14 changes: 6 additions & 8 deletions lambda-http/examples/shared-resources-example.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
use lambda_http::{
handler,
lambda_runtime::{self, Context, Error},
IntoResponse, Request, RequestExt, Response,
};
use lambda_http::{service_fn, Error, IntoResponse, Request, RequestExt, Response};

struct SharedClient {
name: &'static str,
Expand All @@ -23,9 +19,11 @@ async fn main() -> Result<(), Error> {
let shared_client_ref = &shared_client;

// Define a closure here that makes use of the shared client.
let handler_func_closure = move |event: Request, ctx: Context| async move {
let handler_func_closure = move |event: Request| async move {
Ok(match event.query_string_parameters().get("first_name") {
Some(first_name) => shared_client_ref.response(ctx.request_id, first_name).into_response(),
Some(first_name) => shared_client_ref
.response(event.lambda_context().request_id, first_name)
.into_response(),
_ => Response::builder()
.status(400)
.body("Empty first name".into())
Expand All @@ -34,6 +32,6 @@ async fn main() -> Result<(), Error> {
};

// Pass the closure to the runtime here.
lambda_runtime::run(handler(handler_func_closure)).await?;
lambda_http::run(service_fn(handler_func_closure)).await?;
Ok(())
}
27 changes: 23 additions & 4 deletions lambda-http/src/ext.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Extension methods for `http::Request` types

use crate::{request::RequestContext, strmap::StrMap, Body};
use lambda_runtime::Context;
use serde::{de::value::Error as SerdeError, Deserialize};
use std::{error::Error, fmt};

Expand Down Expand Up @@ -66,7 +67,7 @@ impl Error for PayloadError {
/// as well as `{"x":1, "y":2}` respectively.
///
/// ```rust,no_run
/// use lambda_http::{handler, lambda_runtime::{self, Error, Context}, Body, IntoResponse, Request, Response, RequestExt};
/// use lambda_http::{service_fn, Error, Context, Body, IntoResponse, Request, Response, RequestExt};
/// use serde::Deserialize;
///
/// #[derive(Debug,Deserialize,Default)]
Expand All @@ -79,13 +80,12 @@ impl Error for PayloadError {
///
/// #[tokio::main]
/// async fn main() -> Result<(), Error> {
/// lambda_runtime::run(handler(add)).await?;
/// lambda_http::run(service_fn(add)).await?;
/// Ok(())
/// }
///
/// async fn add(
/// request: Request,
/// _: Context
/// request: Request
/// ) -> Result<Response<Body>, Error> {
/// let args: Args = request.payload()
/// .unwrap_or_else(|_parse_err| None)
Expand Down Expand Up @@ -167,6 +167,12 @@ pub trait RequestExt {
fn payload<D>(&self) -> Result<Option<D>, PayloadError>
where
for<'de> D: Deserialize<'de>;

/// Return the Lambda function context associated with the request
fn lambda_context(&self) -> Context;

/// Configures instance with lambda context
fn with_lambda_context(self, context: Context) -> Self;
}

impl RequestExt for http::Request<Body> {
Expand Down Expand Up @@ -226,6 +232,19 @@ impl RequestExt for http::Request<Body> {
.expect("Request did not contain a request context")
}

fn lambda_context(&self) -> Context {
self.extensions()
.get::<Context>()
.cloned()
.expect("Request did not contain a lambda context")
}

fn with_lambda_context(self, context: Context) -> Self {
let mut s = self;
s.extensions_mut().insert(context);
s
}

fn payload<D>(&self) -> Result<Option<D>, PayloadError>
where
for<'de> D: Deserialize<'de>,
Expand Down
Loading