Skip to content

feat(lambda-http): accept http_body::Body in responses #466

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lambda-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ base64 = "0.13.0"
bytes = "1"
http = "0.2"
http-body = "0.4"
hyper = "0.14"
lambda_runtime = { path = "../lambda-runtime", version = "0.5" }
serde = { version = "^1", features = ["derive"] }
serde_json = "^1"
Expand All @@ -32,4 +33,4 @@ query_map = { version = "0.4", features = ["url-query"] }
log = "^0.4"
maplit = "1.0"
tokio = { version = "1.0", features = ["macros"] }
tower-http = { version = "0.2", features = ["cors"] }
tower-http = { version = "0.2", features = ["cors", "trace"] }
2 changes: 1 addition & 1 deletion lambda-http/examples/hello-cors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async fn main() -> Result<(), Error> {

async fn func(event: Request) -> Result<Response<Body>, Error> {
Ok(match event.query_string_parameters().first("first_name") {
Some(first_name) => format!("Hello, {}!", first_name).into_response(),
Some(first_name) => format!("Hello, {}!", first_name).into_response().await,
_ => Response::builder()
.status(400)
.body("Empty first name".into())
Expand Down
2 changes: 1 addition & 1 deletion lambda-http/examples/hello-http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ async fn main() -> Result<(), Error> {

async fn func(event: Request) -> Result<impl IntoResponse, Error> {
Ok(match event.query_string_parameters().first("first_name") {
Some(first_name) => format!("Hello, {}!", first_name).into_response(),
Some(first_name) => format!("Hello, {}!", first_name).into_response().await,
_ => Response::builder()
.status(400)
.body("Empty first name".into())
Expand Down
4 changes: 3 additions & 1 deletion lambda-http/examples/hello-raw-http-path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ async fn main() -> Result<(), Error> {
}

async fn func(event: Request) -> Result<impl IntoResponse, Error> {
let res = format!("The raw path for this request is: {}", event.raw_http_path()).into_response();
let res = format!("The raw path for this request is: {}", event.raw_http_path())
.into_response()
.await;

Ok(res)
}
15 changes: 15 additions & 0 deletions lambda-http/examples/hello-trace.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use lambda_http::{tower::ServiceBuilder, Body, Error, IntoResponse, Request, Response};
use tower_http::trace::TraceLayer;

#[tokio::main]
async fn main() -> Result<(), Error> {
let service = ServiceBuilder::new()
.layer(TraceLayer::new_for_http())
.service_fn(handler);
lambda_http::run(service).await?;
Ok(())
}

async fn handler(_event: Request) -> Result<Response<Body>, Error> {
Ok("Success".into_response().await)
}
11 changes: 11 additions & 0 deletions lambda-http/examples/hello-tuple.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use lambda_http::{service_fn, Error, IntoResponse, Request};

#[tokio::main]
async fn main() -> Result<(), Error> {
lambda_http::run(service_fn(func)).await?;
Ok(())
}

async fn func(_event: Request) -> Result<impl IntoResponse, Error> {
Ok((200, "Hello, world!"))
}
9 changes: 6 additions & 3 deletions lambda-http/examples/shared-resources-example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ async fn main() -> Result<(), Error> {
// Define a closure here that makes use of the shared client.
let handler_func_closure = move |event: Request| async move {
Ok(match event.query_string_parameters().first("first_name") {
Some(first_name) => shared_client_ref
.response(event.lambda_context().request_id, first_name)
.into_response(),
Some(first_name) => {
shared_client_ref
.response(event.lambda_context().request_id, first_name)
.into_response()
.await
}
_ => Response::builder()
.status(400)
.body("Empty first name".into())
Expand Down
29 changes: 22 additions & 7 deletions lambda-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ extern crate maplit;
pub use http::{self, Response};
use lambda_runtime::LambdaEvent;
pub use lambda_runtime::{self, service_fn, tower, Context, Error, Service};
use response::ResponseFuture;

pub mod ext;
pub mod request;
Expand All @@ -91,7 +92,8 @@ pub type Request = http::Request<Body>;
#[doc(hidden)]
pub struct TransformResponse<'a, R, E> {
request_origin: RequestOrigin,
fut: Pin<Box<dyn Future<Output = Result<R, E>> + 'a>>,
fut_req: Pin<Box<dyn Future<Output = Result<R, E>> + 'a>>,
fut_res: Option<ResponseFuture>,
Comment on lines +95 to +96
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since only one of these will be polled at a time, it might be better to model it as an enum with two variants.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need some help here @calavera, @eduardomourar or @david-perez

The PR as it is now simply doesn't work. The lambdas using lambda_http get called (I can see my info logs with the event) but never return the response and time out.

I believe this poll function is at the core of the issue: https://github.com/awslabs/aws-lambda-rust-runtime/pull/466/files#diff-c62692e6b817476306b4d6966f541eec77a1768674e4dad5cf7f10c4faababa4R106-R119

I observed that the value of self.fut_res gets set then nothing else happens, the poll function never gets executed again with the value of self.fut_res set, it actually looks like it gets called once then that's it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok bear with me I think I know why this isn't working as-is: https://tokio.rs/tokio/tutorial/async#wakers

When a future returns Poll::Pending, it must ensure that the waker is signaled at some point. Forgetting to do this results in the task hanging indefinitely.

Forgetting to wake a task after returning Poll::Pending is a common source of bugs.

Looks like by returning Poll::Pending from Poll::Ready and doing nothing else we are breaking the contract we also need to take an additional step

Before returning Poll::Pending, we called cx.waker().wake_by_ref(). This is to satisfy the future contract. By returning Poll::Pending, we are responsible for signaling the waker.

The code should be:

Poll::Ready(Ok(resp)) => {
    self.fut_res = Some(resp.into_response());
    cx.waker().wake_by_ref()
    Poll::Pending
}

And I've confirmed now that this works correctly. Continuing on with testing.

Copy link
Contributor

@david-perez david-perez Jun 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hugobast Good catch! This is indeed a sharp edge of async Rust.

Typically only leaf futures are the ones that interact with wakers. Most futures uphold the poll contract by returning Poll::Pending only if some other future also returned Poll::Pending; in this way, it trivially fulfills the contract of poll since the inner future must follow that same contract.

In this particular case, we can do the same by polling the future that we just put in self.fut_res. That is, do here what is done in the first arm of the if statement too.

I think it's clearer if we model this as an enum, since we're only polling one of the futures at a time, and simply do self.poll() when the request future completes and we now want to poll the response future.

(Note though that your code is perfectly fine in terms of correctness)

Copy link
Contributor

@hugobast hugobast Jun 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@david-perez something like this? hugobast@d58a1bc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that looks good to me

}

impl<'a, R, E> Future for TransformResponse<'a, R, E>
Expand All @@ -101,11 +103,20 @@ where
type Output = Result<LambdaResponse, E>;

fn poll(mut self: Pin<&mut Self>, cx: &mut TaskContext) -> Poll<Self::Output> {
match self.fut.as_mut().poll(cx) {
Poll::Ready(result) => Poll::Ready(
result.map(|resp| LambdaResponse::from_response(&self.request_origin, resp.into_response())),
),
Poll::Pending => Poll::Pending,
if let Some(fut_res) = self.fut_res.as_mut() {
match fut_res.as_mut().poll(cx) {
Poll::Ready(resp) => Poll::Ready(Ok(LambdaResponse::from_response(&self.request_origin, resp))),
Poll::Pending => Poll::Pending,
}
} else {
match self.fut_req.as_mut().poll(cx) {
Poll::Ready(Ok(resp)) => {
self.fut_res = Some(resp.into_response());
Poll::Pending
}
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Pending => Poll::Pending,
}
}
}
}
Expand Down Expand Up @@ -151,7 +162,11 @@ where
let request_origin = req.payload.request_origin();
let event: Request = req.payload.into();
let fut = Box::pin(self.service.call(event.with_lambda_context(req.context)));
TransformResponse { request_origin, fut }
TransformResponse {
request_origin,
fut_req: fut,
fut_res: None,
}
}
}

Expand Down
130 changes: 85 additions & 45 deletions lambda-http/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,22 @@ use crate::request::RequestOrigin;
use aws_lambda_events::encodings::Body;
use aws_lambda_events::event::alb::AlbTargetGroupResponse;
use aws_lambda_events::event::apigw::{ApiGatewayProxyResponse, ApiGatewayV2httpResponse};
use http::StatusCode;
use http::{
header::{CONTENT_TYPE, SET_COOKIE},
Response,
};
use http_body::Body as HttpBody;
use hyper::body::to_bytes;
use serde::Serialize;
use std::convert::TryInto;
use std::future::ready;
use std::{
any::{Any, TypeId},
fmt,
future::Future,
pin::Pin,
};

/// Representation of Lambda response
#[doc(hidden)]
Expand All @@ -20,14 +31,11 @@ pub enum LambdaResponse {
Alb(AlbTargetGroupResponse),
}

/// tranformation from http type to internal type
/// Tranformation from http type to internal type
impl LambdaResponse {
pub(crate) fn from_response<T>(request_origin: &RequestOrigin, value: Response<T>) -> Self
where
T: Into<Body>,
{
pub(crate) fn from_response(request_origin: &RequestOrigin, value: Response<Body>) -> Self {
let (parts, bod) = value.into_parts();
let (is_base64_encoded, body) = match bod.into() {
let (is_base64_encoded, body) = match bod {
Body::Empty => (false, None),
b @ Body::Text(_) => (false, Some(b)),
b @ Body::Binary(_) => (true, Some(b)),
Expand Down Expand Up @@ -87,71 +95,103 @@ impl LambdaResponse {
}
}

/// A conversion of self into a `Response<Body>` for various types.
///
/// Implementations for `Response<B> where B: Into<Body>`,
/// `B where B: Into<Body>` and `serde_json::Value` are provided
/// by default.
///
/// # Example
///
/// ```rust
/// use lambda_http::{Body, IntoResponse, Response};
/// Trait for generating responses
///
/// assert_eq!(
/// "hello".into_response().body(),
/// Response::new(Body::from("hello")).body()
/// );
/// ```
/// Types that implement this trait can be used as return types for handler functions.
pub trait IntoResponse {
/// Return a translation of `self` into a `Response<Body>`
fn into_response(self) -> Response<Body>;
/// Transform into a Response<Body> Future
fn into_response(self) -> ResponseFuture;
}

impl<B> IntoResponse for Response<B>
where
B: Into<Body>,
B: IntoBody + 'static,
{
fn into_response(self) -> Response<Body> {
fn into_response(self) -> ResponseFuture {
let (parts, body) = self.into_parts();
Response::from_parts(parts, body.into())

let fut = async { Response::from_parts(parts, body.into_body().await) };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this have to be a future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, this is due to the .await on into_body(), which is needed to support http_body::Body implementations that stream the body back, as we need to collect the entire payload before we can return a Lambda response.

Copy link

@brainstorm brainstorm Apr 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to collect partial payloads? Thinking HTTP Ranges or terminate early if a piece of data has been seen in the response as it streams instead of waiting for the whole payload? Not sure if this use case should be available through this API or just limited to a custom hack, but it would serve my interests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brainstorm Not immediately for this purpose, since here we just use this to transform it into a aws_lambda_events::encoding::Body enum, which we needs to generate the right response for API Gateway/ALB.

That said, if you're calling an API that returns an http_body::Body implementation, or some form of asynchronous stream, you could always implement that yourself in your function handler.


Box::pin(fut)
}
}

impl IntoResponse for String {
fn into_response(self) -> Response<Body> {
Response::new(Body::from(self))
fn into_response(self) -> ResponseFuture {
Box::pin(ready(Response::new(Body::from(self))))
}
}

impl IntoResponse for &str {
fn into_response(self) -> Response<Body> {
Response::new(Body::from(self))
fn into_response(self) -> ResponseFuture {
Box::pin(ready(Response::new(Body::from(self))))
}
}

impl IntoResponse for serde_json::Value {
fn into_response(self) -> Response<Body> {
Response::builder()
.header(CONTENT_TYPE, "application/json")
.body(
serde_json::to_string(&self)
.expect("unable to serialize serde_json::Value")
.into(),
)
.expect("unable to build http::Response")
fn into_response(self) -> ResponseFuture {
Box::pin(async move {
Response::builder()
.header(CONTENT_TYPE, "application/json")
.body(
serde_json::to_string(&self)
.expect("unable to serialize serde_json::Value")
.into(),
)
.expect("unable to build http::Response")
})
}
}

impl<S, B> IntoResponse for (S, B)
where
S: TryInto<StatusCode> + 'static,
S::Error: fmt::Debug,
B: Into<Body> + 'static,
{
fn into_response(self) -> ResponseFuture {
Box::pin(async move {
Response::builder()
.status(self.0.try_into().expect("unable to transform status code"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will panic if the conversion fails. I'm not sure if this convenience blanket impl is worth panicking. At the very least it should be documented.

.body(self.1.into())
.expect("unable to build http::Response")
})
}
}

pub type ResponseFuture = Pin<Box<dyn Future<Output = Response<Body>>>>;

pub trait IntoBody {
fn into_body(self) -> BodyFuture;
}

impl<B> IntoBody for B
where
B: HttpBody + Unpin + 'static,
B::Error: fmt::Debug,
{
fn into_body(self) -> BodyFuture {
if TypeId::of::<Body>() == self.type_id() {
let any_self = Box::new(self) as Box<dyn Any + 'static>;
// Can safely unwrap here as we do type validation in the 'if' statement
Box::pin(ready(*any_self.downcast::<Body>().unwrap()))
} else {
Box::pin(async move { Body::from(to_bytes(self).await.expect("unable to read bytes from body").to_vec()) })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will always use the impl From<Vec<u8>> for Body, which always creates the Body::Binary variant. Is this correct? The body might just contain UTF-8-encoded text.

See https://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-payload-encodings-workflow.html, which makes my head hurt.

Copy link
Contributor

@bnusunny bnusunny Jun 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ALB integration use content-encoding and content-type headers to determine if the body should be base64 encoded. See it here: https://docs.aws.amazon.com/elasticloadbalancing/latest/application/lambda-functions.html#receive-event-from-load-balancer

Perhaps we could add utility methods to convert hyper::Response or reqwest::Reponse to http::Response<lambda_http::Body>. These methods can convert the Body to correct lambda_http::Body variant based on the same rules used by ALB.

This means we should probably work on Response not Body.

}
}
}

pub type BodyFuture = Pin<Box<dyn Future<Output = Body>>>;

#[cfg(test)]
mod tests {
use super::{Body, IntoResponse, LambdaResponse, RequestOrigin};
use http::{header::CONTENT_TYPE, Response};
use serde_json::{self, json};

#[test]
fn json_into_response() {
let response = json!({ "hello": "lambda"}).into_response();
#[tokio::test]
async fn json_into_response() {
let response = json!({ "hello": "lambda"}).into_response().await;
match response.body() {
Body::Text(json) => assert_eq!(json, r#"{"hello":"lambda"}"#),
_ => panic!("invalid body"),
Expand All @@ -165,9 +205,9 @@ mod tests {
)
}

#[test]
fn text_into_response() {
let response = "text".into_response();
#[tokio::test]
async fn text_into_response() {
let response = "text".into_response().await;
match response.body() {
Body::Text(text) => assert_eq!(text, "text"),
_ => panic!("invalid body"),
Expand Down
7 changes: 5 additions & 2 deletions lambda-integration-tests/src/bin/http-fn.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use lambda_http::{service_fn, Error, IntoResponse, Request, RequestExt, Response};
use lambda_http::{service_fn, Body, Error, IntoResponse, Request, RequestExt, Response};
use tracing::info;

async fn handler(event: Request) -> Result<impl IntoResponse, Error> {
let _context = event.lambda_context();
info!("[http-fn] Received event {} {}", event.method(), event.uri().path());

Ok(Response::builder().status(200).body("Hello, world!").unwrap())
Ok(Response::builder()
.status(200)
.body(Body::from("Hello, world!"))
.unwrap())
}

#[tokio::main]
Expand Down
6 changes: 3 additions & 3 deletions lambda-integration-tests/src/bin/http-trait.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use lambda_http::{Error, Request, RequestExt, Response, Service};
use lambda_http::{Body, Error, Request, RequestExt, Response, Service};
use std::{
future::{ready, Future},
pin::Pin,
Expand All @@ -13,7 +13,7 @@ struct MyHandler {
impl Service<Request> for MyHandler {
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Error>> + Send>>;
type Response = Response<&'static str>;
type Response = Response<Body>;

fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll<Result<(), Self::Error>> {
core::task::Poll::Ready(Ok(()))
Expand All @@ -25,7 +25,7 @@ impl Service<Request> for MyHandler {
info!("[http-trait] Lambda context: {:?}", request.lambda_context());
Box::pin(ready(Ok(Response::builder()
.status(200)
.body("Hello, World!")
.body(Body::from("Hello, World!"))
.unwrap())))
}
}
Expand Down