Skip to content

Commit f33fa05

Browse files
authored
Implement tower::Service trait (#401)
* feat(lambda_runtime): switch to tower::Service * feat(lambda-http): implement tower::Service * feat(lambda-runtime): rename LambdaRequest to LambdaEvent * feat(lambda-extension): implement tower::Service * feat(lambda-http): implement tower::Service (part 1) * feat(lambda-integration-tests): switch to tower::Service * chore: cargo fmt * feat(lambda-http): implement tower::Service (part 2) * feat(lambda-http): implement tower::Service (part 3) * feat(lambda-integration-tests): add lambda_http trait * fix(lambda-http): fix ext example * feat: cleanup + doc
1 parent 70adf9f commit f33fa05

24 files changed

+347
-239
lines changed

Makefile

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
INTEG_STACK_NAME ?= rust-lambda-integration-tests
2-
INTEG_FUNCTIONS_BUILD := runtime-fn runtime-trait http-fn
2+
INTEG_FUNCTIONS_BUILD := runtime-fn runtime-trait http-fn http-trait
33
INTEG_FUNCTIONS_INVOKE := RuntimeFn RuntimeFnAl2 RuntimeTrait RuntimeTraitAl2 Python PythonAl2
44
INTEG_API_INVOKE := RestApiUrl HttpApiUrl
55
INTEG_EXTENSIONS := extension-fn extension-trait
@@ -10,6 +10,8 @@ pr-check:
1010
cargo +1.54.0 check --all
1111
cargo +stable fmt --all -- --check
1212
cargo +stable clippy
13+
cargo +1.54.0 test
14+
cargo +stable test
1315

1416
integration-tests:
1517
# Build Integration functions
@@ -47,7 +49,11 @@ invoke-integration-api-%:
4749
--query 'Stacks[0].Outputs[?OutputKey==`$*`].OutputValue' \
4850
--output text))
4951
curl $(API_URL)/get
52+
curl $(API_URL)/trait/get
5053
curl $(API_URL)/al2/get
54+
curl $(API_URL)/al2-trait/get
5155
curl -X POST -d '{"command": "hello"}' $(API_URL)/post
56+
curl -X POST -d '{"command": "hello"}' $(API_URL)/trait/post
5257
curl -X POST -d '{"command": "hello"}' $(API_URL)/al2/post
58+
curl -X POST -d '{"command": "hello"}' $(API_URL)/al2-trait/post
5359

README.md

+6-8
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,18 @@ This package makes it easy to run AWS Lambda Functions written in Rust. This wor
1515
The code below creates a simple function that receives an event with a `firstName` field and returns a message to the caller. Notice: this crate is tested against latest stable Rust.
1616

1717
```rust,no_run
18-
use lambda_runtime::{handler_fn, Context, Error};
18+
use lambda_runtime::{service_fn, LambdaEvent, Error};
1919
use serde_json::{json, Value};
2020
2121
#[tokio::main]
2222
async fn main() -> Result<(), Error> {
23-
let func = handler_fn(func);
23+
let func = service_fn(func);
2424
lambda_runtime::run(func).await?;
2525
Ok(())
2626
}
2727
28-
async fn func(event: Value, _: Context) -> Result<Value, Error> {
28+
async fn func(event: LambdaEvent<Value>) -> Result<Value, Error> {
29+
let (event, _context) = event.into_parts();
2930
let first_name = event["firstName"].as_str().unwrap_or("world");
3031
3132
Ok(json!({ "message": format!("Hello, {}!", first_name) }))
@@ -213,12 +214,9 @@ Lambdas can be run and debugged locally using a special [Lambda debug proxy](htt
213214

214215
## `lambda`
215216

216-
`lambda_runtime` is a library for authoring reliable and performant Rust-based AWS Lambda functions. At a high level, it provides a few major components:
217+
`lambda_runtime` is a library for authoring reliable and performant Rust-based AWS Lambda functions. At a high level, it provides `lambda_runtime::run`, a function that runs a `tower::Service<LambdaEvent>`.
217218

218-
- `Handler`, a trait that defines interactions between customer-authored code and this library.
219-
- `lambda_runtime::run`, function that runs an `Handler`.
220-
221-
The function `handler_fn` converts a rust function or closure to `Handler`, which can then be run by `lambda_runtime::run`.
219+
To write a function that will handle request, you need to pass it through `service_fn`, which will convert your function into a `tower::Service<LambdaEvent>`, which can then be run by `lambda_runtime::run`.
222220

223221
## AWS event objects
224222

lambda-extension/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ bytes = "1.0"
1919
http = "0.2"
2020
async-stream = "0.3"
2121
tracing = { version = "0.1", features = ["log"] }
22-
tower-service = "0.3"
22+
tower = { version = "0.4", features = ["util"] }
2323
tokio-stream = "0.1.2"
2424
lambda_runtime_api_client = { version = "0.4", path = "../lambda-runtime-api-client" }
2525

lambda-extension/examples/basic.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use lambda_extension::{extension_fn, Error, LambdaEvent, NextEvent};
1+
use lambda_extension::{service_fn, Error, LambdaEvent, NextEvent};
22

33
async fn my_extension(event: LambdaEvent) -> Result<(), Error> {
44
match event.next {
@@ -25,6 +25,6 @@ async fn main() -> Result<(), Error> {
2525
.without_time()
2626
.init();
2727

28-
let func = extension_fn(my_extension);
28+
let func = service_fn(my_extension);
2929
lambda_extension::run(func).await
3030
}

lambda-extension/examples/custom_events.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use lambda_extension::{extension_fn, Error, LambdaEvent, NextEvent, Runtime};
1+
use lambda_extension::{service_fn, Error, LambdaEvent, NextEvent, Runtime};
22

33
async fn my_extension(event: LambdaEvent) -> Result<(), Error> {
44
match event.next {
@@ -27,7 +27,7 @@ async fn main() -> Result<(), Error> {
2727
.without_time()
2828
.init();
2929

30-
let func = extension_fn(my_extension);
30+
let func = service_fn(my_extension);
3131

3232
let runtime = Runtime::builder().with_events(&["SHUTDOWN"]).register().await?;
3333

lambda-extension/examples/custom_trait_implementation.rs

+11-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use lambda_extension::{run, Error, Extension, InvokeEvent, LambdaEvent, NextEvent};
1+
use lambda_extension::{run, Error, InvokeEvent, LambdaEvent, NextEvent, Service};
22
use std::{
33
future::{ready, Future},
44
pin::Pin,
@@ -9,9 +9,16 @@ struct MyExtension {
99
data: Vec<InvokeEvent>,
1010
}
1111

12-
impl Extension for MyExtension {
13-
type Fut = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
14-
fn call(&mut self, event: LambdaEvent) -> Self::Fut {
12+
impl Service<LambdaEvent> for MyExtension {
13+
type Error = Error;
14+
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
15+
type Response = ();
16+
17+
fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll<Result<(), Self::Error>> {
18+
core::task::Poll::Ready(Ok(()))
19+
}
20+
21+
fn call(&mut self, event: LambdaEvent) -> Self::Future {
1522
match event.next {
1623
NextEvent::Shutdown(_e) => {
1724
self.data.clear();

lambda-extension/src/lib.rs

+16-43
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@
99
use hyper::client::{connect::Connection, HttpConnector};
1010
use lambda_runtime_api_client::Client;
1111
use serde::Deserialize;
12-
use std::{future::Future, path::PathBuf};
12+
use std::{fmt, future::Future, path::PathBuf};
1313
use tokio::io::{AsyncRead, AsyncWrite};
1414
use tokio_stream::StreamExt;
15-
use tower_service::Service;
15+
pub use tower::{self, service_fn, Service};
1616
use tracing::trace;
1717

1818
/// Include several request builders to interact with the Extension API.
@@ -103,40 +103,6 @@ pub struct LambdaEvent {
103103
pub next: NextEvent,
104104
}
105105

106-
/// A trait describing an asynchronous extension.
107-
pub trait Extension {
108-
/// Response of this Extension.
109-
type Fut: Future<Output = Result<(), Error>>;
110-
/// Handle the incoming event.
111-
fn call(&mut self, event: LambdaEvent) -> Self::Fut;
112-
}
113-
114-
/// Returns a new [`ExtensionFn`] with the given closure.
115-
///
116-
/// [`ExtensionFn`]: struct.ExtensionFn.html
117-
pub fn extension_fn<F>(f: F) -> ExtensionFn<F> {
118-
ExtensionFn { f }
119-
}
120-
121-
/// An [`Extension`] implemented by a closure.
122-
///
123-
/// [`Extension`]: trait.Extension.html
124-
#[derive(Clone, Debug)]
125-
pub struct ExtensionFn<F> {
126-
f: F,
127-
}
128-
129-
impl<F, Fut> Extension for ExtensionFn<F>
130-
where
131-
F: Fn(LambdaEvent) -> Fut,
132-
Fut: Future<Output = Result<(), Error>>,
133-
{
134-
type Fut = Fut;
135-
fn call(&mut self, event: LambdaEvent) -> Self::Fut {
136-
(self.f)(event)
137-
}
138-
}
139-
140106
/// The Runtime handles all the incoming extension requests
141107
pub struct Runtime<C: Service<http::Uri> = HttpConnector> {
142108
extension_id: String,
@@ -153,13 +119,18 @@ impl Runtime {
153119
impl<C> Runtime<C>
154120
where
155121
C: Service<http::Uri> + Clone + Send + Sync + Unpin + 'static,
156-
<C as Service<http::Uri>>::Future: Unpin + Send,
157-
<C as Service<http::Uri>>::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
158-
<C as Service<http::Uri>>::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
122+
C::Future: Unpin + Send,
123+
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
124+
C::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
159125
{
160126
/// Execute the given extension.
161127
/// Register the extension with the Extensions API and wait for incoming events.
162-
pub async fn run(&self, mut extension: impl Extension) -> Result<(), Error> {
128+
pub async fn run<E>(&self, mut extension: E) -> Result<(), Error>
129+
where
130+
E: Service<LambdaEvent>,
131+
E::Future: Future<Output = Result<(), E::Error>>,
132+
E::Error: Into<Box<dyn std::error::Error + Send + Sync>> + fmt::Display,
133+
{
163134
let client = &self.client;
164135

165136
let incoming = async_stream::stream! {
@@ -196,7 +167,7 @@ where
196167
};
197168

198169
self.client.call(req).await?;
199-
return Err(error);
170+
return Err(error.into());
200171
}
201172
}
202173

@@ -263,9 +234,11 @@ impl<'a> RuntimeBuilder<'a> {
263234
}
264235

265236
/// Execute the given extension
266-
pub async fn run<Ex>(extension: Ex) -> Result<(), Error>
237+
pub async fn run<E>(extension: E) -> Result<(), Error>
267238
where
268-
Ex: Extension,
239+
E: Service<LambdaEvent>,
240+
E::Future: Future<Output = Result<(), E::Error>>,
241+
E::Error: Into<Box<dyn std::error::Error + Send + Sync>> + fmt::Display,
269242
{
270243
Runtime::builder().register().await?.run(extension).await
271244
}

lambda-http/examples/hello-http.rs

+3-7
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,12 @@
1-
use lambda_http::{
2-
handler,
3-
lambda_runtime::{self, Context, Error},
4-
IntoResponse, Request, RequestExt, Response,
5-
};
1+
use lambda_http::{service_fn, Error, IntoResponse, Request, RequestExt, Response};
62

73
#[tokio::main]
84
async fn main() -> Result<(), Error> {
9-
lambda_runtime::run(handler(func)).await?;
5+
lambda_http::run(service_fn(func)).await?;
106
Ok(())
117
}
128

13-
async fn func(event: Request, _: Context) -> Result<impl IntoResponse, Error> {
9+
async fn func(event: Request) -> Result<impl IntoResponse, Error> {
1410
Ok(match event.query_string_parameters().get("first_name") {
1511
Some(first_name) => format!("Hello, {}!", first_name).into_response(),
1612
_ => Response::builder()

lambda-http/examples/shared-resources-example.rs

+6-8
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
1-
use lambda_http::{
2-
handler,
3-
lambda_runtime::{self, Context, Error},
4-
IntoResponse, Request, RequestExt, Response,
5-
};
1+
use lambda_http::{service_fn, Error, IntoResponse, Request, RequestExt, Response};
62

73
struct SharedClient {
84
name: &'static str,
@@ -23,9 +19,11 @@ async fn main() -> Result<(), Error> {
2319
let shared_client_ref = &shared_client;
2420

2521
// Define a closure here that makes use of the shared client.
26-
let handler_func_closure = move |event: Request, ctx: Context| async move {
22+
let handler_func_closure = move |event: Request| async move {
2723
Ok(match event.query_string_parameters().get("first_name") {
28-
Some(first_name) => shared_client_ref.response(ctx.request_id, first_name).into_response(),
24+
Some(first_name) => shared_client_ref
25+
.response(event.lambda_context().request_id, first_name)
26+
.into_response(),
2927
_ => Response::builder()
3028
.status(400)
3129
.body("Empty first name".into())
@@ -34,6 +32,6 @@ async fn main() -> Result<(), Error> {
3432
};
3533

3634
// Pass the closure to the runtime here.
37-
lambda_runtime::run(handler(handler_func_closure)).await?;
35+
lambda_http::run(service_fn(handler_func_closure)).await?;
3836
Ok(())
3937
}

lambda-http/src/ext.rs

+23-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Extension methods for `http::Request` types
22
33
use crate::{request::RequestContext, strmap::StrMap, Body};
4+
use lambda_runtime::Context;
45
use serde::{de::value::Error as SerdeError, Deserialize};
56
use std::{error::Error, fmt};
67

@@ -66,7 +67,7 @@ impl Error for PayloadError {
6667
/// as well as `{"x":1, "y":2}` respectively.
6768
///
6869
/// ```rust,no_run
69-
/// use lambda_http::{handler, lambda_runtime::{self, Error, Context}, Body, IntoResponse, Request, Response, RequestExt};
70+
/// use lambda_http::{service_fn, Error, Context, Body, IntoResponse, Request, Response, RequestExt};
7071
/// use serde::Deserialize;
7172
///
7273
/// #[derive(Debug,Deserialize,Default)]
@@ -79,13 +80,12 @@ impl Error for PayloadError {
7980
///
8081
/// #[tokio::main]
8182
/// async fn main() -> Result<(), Error> {
82-
/// lambda_runtime::run(handler(add)).await?;
83+
/// lambda_http::run(service_fn(add)).await?;
8384
/// Ok(())
8485
/// }
8586
///
8687
/// async fn add(
87-
/// request: Request,
88-
/// _: Context
88+
/// request: Request
8989
/// ) -> Result<Response<Body>, Error> {
9090
/// let args: Args = request.payload()
9191
/// .unwrap_or_else(|_parse_err| None)
@@ -167,6 +167,12 @@ pub trait RequestExt {
167167
fn payload<D>(&self) -> Result<Option<D>, PayloadError>
168168
where
169169
for<'de> D: Deserialize<'de>;
170+
171+
/// Return the Lambda function context associated with the request
172+
fn lambda_context(&self) -> Context;
173+
174+
/// Configures instance with lambda context
175+
fn with_lambda_context(self, context: Context) -> Self;
170176
}
171177

172178
impl RequestExt for http::Request<Body> {
@@ -226,6 +232,19 @@ impl RequestExt for http::Request<Body> {
226232
.expect("Request did not contain a request context")
227233
}
228234

235+
fn lambda_context(&self) -> Context {
236+
self.extensions()
237+
.get::<Context>()
238+
.cloned()
239+
.expect("Request did not contain a lambda context")
240+
}
241+
242+
fn with_lambda_context(self, context: Context) -> Self {
243+
let mut s = self;
244+
s.extensions_mut().insert(context);
245+
s
246+
}
247+
229248
fn payload<D>(&self) -> Result<Option<D>, PayloadError>
230249
where
231250
for<'de> D: Deserialize<'de>,

0 commit comments

Comments
 (0)