diff --git a/README.md b/README.md index ae54457d..bd8a6d4a 100644 --- a/README.md +++ b/README.md @@ -190,7 +190,11 @@ pub trait Handler { `Handler` provides a default implementation that enables you to provide a Rust closure or function pointer to the `lambda!()` macro. -Optionally, you can pass your own instance of Tokio runtime to the `lambda!()` macro. See our [`with_custom_runtime.rs` example](https://github.com/awslabs/aws-lambda-rust-runtime/tree/master/lambda-runtime/examples/with_custom_runtime.rs) +Optionally, you can pass your own instance of Tokio runtime to the `lambda!()` macro: +``` +let rt = tokio::runtime::Runtime::new()?; +lambda!(my_handler, rt); +``` ## AWS event objects diff --git a/lambda-http/src/lib.rs b/lambda-http/src/lib.rs index cf7e3eff..5d6fb32e 100644 --- a/lambda-http/src/lib.rs +++ b/lambda-http/src/lib.rs @@ -122,7 +122,7 @@ pub trait Handler: Sized { /// The type of Response this Handler will return type Response: IntoResponse; /// The type of Future this Handler will return - type Fut: Future> + 'static; + type Fut: Future> + Send + Sync + 'static; /// Function used to execute handler behavior fn call(&self, event: Request, context: Context) -> Self::Fut; } @@ -137,7 +137,7 @@ impl Handler for F where F: Fn(Request, Context) -> Fut, R: IntoResponse, - Fut: Future> + Send + 'static, + Fut: Future> + Send + Sync + 'static, { type Response = R; type Error = Error; @@ -150,7 +150,7 @@ where #[doc(hidden)] pub struct TransformResponse { is_alb: bool, - fut: Pin>>>, + fut: Pin> + Send + Sync>>, } impl Future for TransformResponse diff --git a/lambda-http/src/request.rs b/lambda-http/src/request.rs index fd4e52d9..287a789a 100644 --- a/lambda-http/src/request.rs +++ b/lambda-http/src/request.rs @@ -102,42 +102,65 @@ impl LambdaRequest<'_> { } } +/// See [context-variable-reference](https://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-mapping-template-reference.html) for more detail. #[derive(Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct ApiGatewayV2RequestContext { + /// The API owner's AWS account ID. pub account_id: String, + /// The identifier API Gateway assigns to your API. pub api_id: String, + /// The stringified value of the specified key-value pair of the context map returned from an API Gateway Lambda authorizer function. #[serde(default)] pub authorizer: HashMap, + /// The full domain name used to invoke the API. This should be the same as the incoming Host header. pub domain_name: String, + /// The first label of the $context.domainName. This is often used as a caller/customer identifier. pub domain_prefix: String, + /// The HTTP method used. pub http: Http, + /// The ID that API Gateway assigns to the API request. pub request_id: String, + /// Undocumented, could be resourcePath pub route_key: String, + /// The deployment stage of the API request (for example, Beta or Prod). pub stage: String, + /// Undocumented, could be requestTime pub time: String, + /// Undocumented, could be requestTimeEpoch pub time_epoch: usize, } +/// See [context-variable-reference](https://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-mapping-template-reference.html) for more detail. #[derive(Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct ApiGatewayRequestContext { - //pub path: String, + /// The API owner's AWS account ID. pub account_id: String, + /// The identifier that API Gateway assigns to your resource. pub resource_id: String, + /// The deployment stage of the API request (for example, Beta or Prod). pub stage: String, + /// The ID that API Gateway assigns to the API request. pub request_id: String, + /// The path to your resource. For example, for the non-proxy request URI of `https://{rest-api-id.execute-api.{region}.amazonaws.com/{stage}/root/child`, The $context.resourcePath value is /root/child. pub resource_path: String, + /// The HTTP method used. Valid values include: DELETE, GET, HEAD, OPTIONS, PATCH, POST, and PUT. pub http_method: String, + /// The stringified value of the specified key-value pair of the context map returned from an API Gateway Lambda authorizer function. #[serde(default)] pub authorizer: HashMap, + /// The identifier API Gateway assigns to your API. pub api_id: String, + /// Cofnito identity information pub identity: Identity, } +/// Elastic load balancer context information #[derive(Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct AlbRequestContext { + /// Elastic load balancer context information pub elb: Elb, } @@ -167,10 +190,17 @@ pub struct Elb { #[serde(rename_all = "camelCase")] pub struct Http { #[serde(deserialize_with = "deserialize_method")] + /// The HTTP method used. Valid values include: DELETE, GET, HEAD, OPTIONS, PATCH, POST, and PUT. pub method: http::Method, + /// The request path. For example, for a non-proxy request URL of + /// `https://{rest-api-id.execute-api.{region}.amazonaws.com/{stage}/root/child`, + /// the $context.path value is `/{stage}/root/child`. pub path: String, + /// The request protocol, for example, HTTP/1.1. pub protocol: String, + /// The source IP address of the TCP connection making the request to API Gateway. pub source_ip: String, + /// The User-Agent header of the API caller. pub user_agent: String, } @@ -178,17 +208,35 @@ pub struct Http { #[derive(Deserialize, Debug, Default, Clone)] #[serde(rename_all = "camelCase")] pub struct Identity { + /// The source IP address of the TCP connection making the request to API Gateway. pub source_ip: String, + /// The Amazon Cognito identity ID of the caller making the request. + /// Available only if the request was signed with Amazon Cognito credentials. pub cognito_identity_id: Option, + /// The Amazon Cognito identity pool ID of the caller making the request. + /// Available only if the request was signed with Amazon Cognito credentials. pub cognito_identity_pool_id: Option, + /// A comma-separated list of the Amazon Cognito authentication providers used by the caller making the request. + /// Available only if the request was signed with Amazon Cognito credentials. pub cognito_authentication_provider: Option, + /// The Amazon Cognito authentication type of the caller making the request. + /// Available only if the request was signed with Amazon Cognito credentials. pub cognito_authentication_type: Option, + /// The AWS account ID associated with the request. pub account_id: Option, + /// The principal identifier of the caller making the request. pub caller: Option, + /// For API methods that require an API key, this variable is the API key associated with the method request. + /// For methods that don't require an API key, this variable is null. pub api_key: Option, + /// Undocumented. Can be the API key ID associated with an API request that requires an API key. + /// The description of `api_key` and `access_key` may actually be reversed. pub access_key: Option, + /// The principal identifier of the user making the request. Used in Lambda authorizers. pub user: Option, + /// The User-Agent header of the API caller. pub user_agent: Option, + /// The Amazon Resource Name (ARN) of the effective user identified after authentication. pub user_arn: Option, } @@ -352,7 +400,7 @@ impl<'a> From> for http::Request { .expect("failed to build request"); // no builder method that sets headers in batch - mem::replace(req.headers_mut(), headers); + let _ = mem::replace(req.headers_mut(), headers); req } @@ -416,7 +464,7 @@ impl<'a> From> for http::Request { } // no builder method that sets headers in batch - mem::replace(req.headers_mut(), multi_value_headers); + let _ = mem::replace(req.headers_mut(), multi_value_headers); req } @@ -477,7 +525,7 @@ impl<'a> From> for http::Request { } // no builder method that sets headers in batch - mem::replace(req.headers_mut(), multi_value_headers); + let _ = mem::replace(req.headers_mut(), multi_value_headers); req } diff --git a/lambda/Cargo.toml b/lambda/Cargo.toml index 796efec8..f8322171 100644 --- a/lambda/Cargo.toml +++ b/lambda/Cargo.toml @@ -31,3 +31,5 @@ async-stream = "0.2" tracing-subscriber = "0.2" once_cell = "1.4.0" simple_logger = "1.6.0" +log = "0.4" +simple-error = "0.2" diff --git a/lambda/examples/README.md b/lambda/examples/README.md new file mode 100644 index 00000000..51a5a10b --- /dev/null +++ b/lambda/examples/README.md @@ -0,0 +1,254 @@ + +## How to compile and run the examples + +1. Create a Lambda function called _RuntimeTest_ in AWS with a custom runtime and no code. + +2. Compile all examples + +``` +cargo build --release --target x86_64-unknown-linux-musl --examples +``` +3. Prepare the package for the example you want to test, e.g. +``` +cp ./target/x86_64-unknown-linux-musl/release/examples/hello ./bootstrap && zip lambda.zip bootstrap && rm bootstrap +``` +4. Upload the package to AWS Lambda +``` +aws lambda update-function-code --region us-east-1 --function-name RuntimeTest --zip-file fileb://lambda.zip +``` +_Feel free to replace the names and IDs with your own values._ + +## basic.rs + +**Deployment**: +```bash +cp ./target/x86_64-unknown-linux-musl/release/examples/basic ./bootstrap && zip lambda.zip bootstrap && rm bootstrap +aws lambda update-function-code --region us-east-1 --function-name RuntimeTest --zip-file fileb://lambda.zip +``` + +**Test event JSON (success)**: +```json +{ "command": "do something" } +``` + +Sample response: +```json +{ + "msg": "Command do something executed.", + "req_id": "67a038e4-dc19-4adf-aa32-5ba09312c6ca" +} +``` + +**Test event JSON (error)**: +```json +{ "foo": "do something" } +``` + +Sample response: +```json +{ + "errorType": "Runtime.ExitError", + "errorMessage": "RequestId: 586366df-f271-4e6e-9c30-b3dcab30f4e8 Error: Runtime exited with error: exit status 1" +} +``` +The runtime could not deserialize our invalid input, but it did not give a detailed explanation why the error occurred in the response. More details appear in the CloudWatch log: +``` +START RequestId: 6e667f61-c5d4-4b07-a60f-cd1ab339c35f Version: $LATEST +Error: Error("missing field `command`", line: 1, column: 22) +END RequestId: 6e667f61-c5d4-4b07-a60f-cd1ab339c35f +REPORT RequestId: 6e667f61-c5d4-4b07-a60f-cd1ab339c35f Duration: 36.34 ms Billed Duration: 100 ms Memory Size: 128 MB Max Memory Used: 10 MB +RequestId: 6e667f61-c5d4-4b07-a60f-cd1ab339c35f Error: Runtime exited with error: exit status 1 +Runtime.ExitError +``` + + See _error-handling.rs_ example for more error handling options. + +## macro.rs + +The most basic example using `#[lambda]` macro to reduce the amount of boilerplate code. + +**Deployment**: +```bash +cp ./target/x86_64-unknown-linux-musl/release/examples/macro ./bootstrap && zip lambda.zip bootstrap && rm bootstrap +aws lambda update-function-code --region us-east-1 --function-name RuntimeTest --zip-file fileb://lambda.zip +``` + +**Test event JSON**: +```json +{ "foo": "bar" } +``` + +Sample response: +```json +{ + "foo": "bar" +} +``` + +## error-handling.rs + +Errors are logged by the runtime only if `log` is initialized by the handler. +These examples use `simple_logger`, but you can use any other provider that works with `log`. +``` +simple_logger::init_with_level(log::Level::Debug)?; +``` + +**Deployment**: +```bash +cp ./target/x86_64-unknown-linux-musl/release/examples/error-handling ./bootstrap && zip lambda.zip bootstrap && rm bootstrap +aws lambda update-function-code --region us-east-1 --function-name RuntimeTest --zip-file fileb://lambda.zip +``` + +The following input/output examples correspond to different `match` arms in the handler of `error-handling.rs`. + +#### Invalid event JSON + +Test input: +```json +{ + "event_type": "WrongType" +} +``` + +Lambda output: +``` +{ + "errorType": "alloc::boxed::Box", + "errorMessage": "unknown variant `WrongType`, expected one of `Response`, `ExternalError`, `SimpleError`, `CustomError`" +} +``` + +CloudWatch records: +``` +START RequestId: b98e07c6-e2ba-4ca6-9968-d0b94729ddba Version: $LATEST +2020-07-21 04:28:52,630 ERROR [lambda] unknown variant `WrongType`, expected one of `Response`, `ExternalError`, `SimpleError`, `CustomError` +END RequestId: b98e07c6-e2ba-4ca6-9968-d0b94729ddba +REPORT RequestId: b98e07c6-e2ba-4ca6-9968-d0b94729ddba Duration: 2.06 ms Billed Duration: 100 ms Memory Size: 128 MB Max Memory Used: 28 MB Init Duration: 33.67 ms +``` + +#### A simple text-only error + +Test event JSON: +```json +{ + "event_type": "SimpleError" +} +``` + +Lambda output: +``` +{ + "errorType": "alloc::boxed::Box", + "errorMessage": "A simple error as requested!" +} +``` + +CloudWatch records: +``` +START RequestId: 77c66dbf-bd60-4f77-8453-682d0bceba91 Version: $LATEST +2020-07-21 04:35:28,044 ERROR [lambda] A simple error as requested! +END RequestId: 77c66dbf-bd60-4f77-8453-682d0bceba91 +REPORT RequestId: 77c66dbf-bd60-4f77-8453-682d0bceba91 Duration: 0.98 ms Billed Duration: 100 ms Memory Size: 128 MB Max Memory Used: 28 MB +``` + +#### A custom error with JSON output for Display trait + +Test event JSON: +```json +{ + "event_type": "CustomError" +} +``` + +Lambda output: +``` +{ + "errorType": "alloc::boxed::Box", + "errorMessage": "{\"is_authenticated\":false,\"msg\":\"A custom error as requested!\",\"req_id\":\"b46b0588-1383-4224-bc7a-42b0d61930c1\"}" +} +``` + +CloudWatch records: +``` +START RequestId: b46b0588-1383-4224-bc7a-42b0d61930c1 Version: $LATEST +2020-07-21 04:39:00,133 ERROR [lambda] {"is_authenticated":false,"msg":"A custom error as requested!","req_id":"b46b0588-1383-4224-bc7a-42b0d61930c1"} +END RequestId: b46b0588-1383-4224-bc7a-42b0d61930c1 +REPORT RequestId: b46b0588-1383-4224-bc7a-42b0d61930c1 Duration: 0.91 ms Billed Duration: 100 ms Memory Size: 128 MB Max Memory Used: 29 MB +``` + +#### A 3rd party error from _std::fs::File::open_ + +Test event JSON: +```json +{ + "event_type": "ExternalError" +} +``` + +Lambda output: +``` +{ + "errorType": "alloc::boxed::Box", + "errorMessage": "No such file or directory (os error 2)" +} +``` + +CloudWatch records: +``` +START RequestId: 893d24e5-cb79-4f6f-bae0-36304c62e9da Version: $LATEST +2020-07-21 04:43:56,254 ERROR [lambda] No such file or directory (os error 2) +END RequestId: 893d24e5-cb79-4f6f-bae0-36304c62e9da +REPORT RequestId: 893d24e5-cb79-4f6f-bae0-36304c62e9da Duration: 1.15 ms Billed Duration: 100 ms Memory Size: 128 MB Max Memory Used: 29 MB +``` + +#### Handler panic + +Test event JSON: +```json +{ + "event_type": "Panic" +} +``` + +Lambda output: +``` +{ + "errorType": "Runtime.ExitError", + "errorMessage": "RequestId: 2d579019-07f7-409a-a6e6-af7725253307 Error: Runtime exited with error: exit status 101" +} +``` + +CloudWatch records: +``` +START RequestId: 2d579019-07f7-409a-a6e6-af7725253307 Version: $LATEST +thread 'main' panicked at 'explicit panic', lambda/examples/error-handling.rs:87:13 +note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace +END RequestId: 2d579019-07f7-409a-a6e6-af7725253307 +REPORT RequestId: 2d579019-07f7-409a-a6e6-af7725253307 Duration: 43.40 ms Billed Duration: 100 ms Memory Size: 128 MB Max Memory Used: 27 MB Init Duration: 23.15 ms +RequestId: 2d579019-07f7-409a-a6e6-af7725253307 Error: Runtime exited with error: exit status 101 +Runtime.ExitError +``` + +#### A response to a successful Lambda execution + +Test event JSON: +```json +{ + "event_type": "Response" +} +``` + +Lambda output: +``` +{ + "msg": "OK", + "req_id": "9752a3ad-6566-44e4-aafd-74db1fd4f361" +} +``` + +CloudWatch records: +``` +START RequestId: 9752a3ad-6566-44e4-aafd-74db1fd4f361 Version: $LATEST +END RequestId: 9752a3ad-6566-44e4-aafd-74db1fd4f361 +REPORT RequestId: 9752a3ad-6566-44e4-aafd-74db1fd4f361 Duration: 0.89 ms Billed Duration: 100 ms Memory Size: 128 MB Max Memory Used: 29 MB +``` \ No newline at end of file diff --git a/lambda/examples/basic.rs b/lambda/examples/basic.rs new file mode 100644 index 00000000..2d2ad02d --- /dev/null +++ b/lambda/examples/basic.rs @@ -0,0 +1,53 @@ +// This example requires the following input to succeed: +// { "command": "do something" } + +use lambda::{handler_fn, Context}; +use serde::{Deserialize, Serialize}; +use simple_logger; + +/// A shorthand for `Box` +/// type required by aws-lambda-rust-runtime. +pub type Error = Box; + +/// This is also a made-up example. Requests come into the runtime as unicode +/// strings in json format, which can map to any structure that implements `serde::Deserialize` +/// The runtime pays no attention to the contents of the request payload. +#[derive(Deserialize)] +struct Request { + command: String, +} + +/// This is a made-up example of what a response structure may look like. +/// There is no restriction on what it can be. The runtime requires responses +/// to be serialized into json. The runtime pays no attention +/// to the contents of the response payload. +#[derive(Serialize)] +struct Response { + req_id: String, + msg: String, +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + // required to enable CloudWatch error logging by the runtime + // can be replaced with any other method of initializing `log` + simple_logger::init_with_level(log::Level::Info)?; + + let func = handler_fn(my_handler); + lambda::run(func).await?; + Ok(()) +} + +pub(crate) async fn my_handler(event: Request, ctx: Context) -> Result { + // extract some useful info from the request + let command = event.command; + + // prepare the response + let resp = Response { + req_id: ctx.request_id, + msg: format!("Command {} executed.", command), + }; + + // return `Response` (it will be serialized to JSON automatically by the runtime) + Ok(resp) +} diff --git a/lambda/examples/error-handling.rs b/lambda/examples/error-handling.rs new file mode 100644 index 00000000..a94579a0 --- /dev/null +++ b/lambda/examples/error-handling.rs @@ -0,0 +1,115 @@ +/// See https://github.com/awslabs/aws-lambda-rust-runtime for more info on Rust runtime for AWS Lambda +use lambda::handler_fn; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use std::fs::File; + +/// A shorthand for `Box` type required by aws-lambda-rust-runtime. +pub type Error = Box; + +/// A simple Lambda request structure with just one field +/// that tells the Lambda what is expected of it. +#[derive(Deserialize)] +struct Request { + event_type: EventType, +} + +/// Event types that tell our Lambda what to do do. +#[derive(Deserialize, PartialEq)] +enum EventType { + Response, + ExternalError, + SimpleError, + CustomError, + Panic, +} + +/// A simple Lambda response structure. +#[derive(Serialize)] +struct Response { + req_id: String, + msg: String, +} + +#[derive(Debug, Serialize)] +struct CustomError { + is_authenticated: bool, + req_id: String, + msg: String, +} + +impl std::error::Error for CustomError { + // this implementation required `Debug` and `Display` traits +} + +impl std::fmt::Display for CustomError { + /// Display the error struct as a JSON string + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let err_as_json = json!(self).to_string(); + write!(f, "{}", err_as_json) + } +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + // The runtime logging can be enabled here by initializing `log` with `simple_logger` + // or another compatible crate. The runtime is using `tracing` internally. + // You can comment out the `simple_logger` init line and uncomment the following block to + // use `tracing` in the handler function. + // + simple_logger::init_with_level(log::Level::Info)?; + /* + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + // this needs to be set to false, otherwise ANSI color codes will + // show up in a confusing manner in CloudWatch logs. + .with_ansi(false) + // disabling time is handy because CloudWatch will add the ingestion time. + .without_time() + .init(); + */ + + // call the actual handler of the request + let func = handler_fn(func); + lambda::run(func).await?; + Ok(()) +} + +/// The actual handler of the Lambda request. +pub(crate) async fn func(event: Value, ctx: lambda::Context) -> Result { + // check what action was requested + match serde_json::from_value::(event)?.event_type { + EventType::SimpleError => { + // generate a simple text message error using `simple_error` crate + return Err(Box::new(simple_error::SimpleError::new("A simple error as requested!"))); + } + EventType::CustomError => { + // generate a custom error using our own structure + let cust_err = CustomError { + is_authenticated: ctx.identity.is_some(), + req_id: ctx.request_id, + msg: "A custom error as requested!".into(), + }; + return Err(Box::new(cust_err)); + } + EventType::ExternalError => { + // try to open a non-existent file to get an error and propagate it with `?` + let _file = File::open("non-existent-file.txt")?; + + // it should never execute past the above line + unreachable!(); + } + EventType::Panic => { + panic!(); + } + EventType::Response => { + // generate and return an OK response in JSON format + let resp = Response { + req_id: ctx.request_id, + msg: "OK".into(), + }; + + return Ok(json!(resp)); + } + } +} diff --git a/lambda/examples/hello-without-macro-log.rs b/lambda/examples/hello-without-macro-log.rs deleted file mode 100644 index a2059b8c..00000000 --- a/lambda/examples/hello-without-macro-log.rs +++ /dev/null @@ -1,19 +0,0 @@ -use lambda::{handler_fn, Context}; -use serde_json::Value; -use tracing::info; - -type Error = Box; - -#[tokio::main] -async fn main() -> Result<(), Error> { - let func = handler_fn(func); - simple_logger::init().unwrap(); - - info!("Hello!"); - lambda::run(func).await?; - Ok(()) -} - -async fn func(event: Value, _: Context) -> Result { - Ok(event) -} diff --git a/lambda/examples/hello-without-macro-tracing.rs b/lambda/examples/hello-without-macro-tracing.rs deleted file mode 100644 index 8d46a853..00000000 --- a/lambda/examples/hello-without-macro-tracing.rs +++ /dev/null @@ -1,26 +0,0 @@ -use lambda::{handler_fn, Context}; -use serde_json::Value; -use tracing::info; - -type Error = Box; - -#[tokio::main] -async fn main() -> Result<(), Error> { - let func = handler_fn(func); - tracing_subscriber::fmt() - .with_max_level(tracing::Level::TRACE) - // this needs to be set to false, otherwise ANSI color codes will - // show up in a confusing manner in CloudWatch logs. - .with_ansi(false) - // disabling time is handy because CloudWatch will add the ingestion time. - .without_time() - .init(); - - info!("Hello!"); - lambda::run(func).await?; - Ok(()) -} - -async fn func(event: Value, _: Context) -> Result { - Ok(event) -} diff --git a/lambda/examples/hello.rs b/lambda/examples/macro.rs similarity index 61% rename from lambda/examples/hello.rs rename to lambda/examples/macro.rs index 99472874..fff7de45 100644 --- a/lambda/examples/hello.rs +++ b/lambda/examples/macro.rs @@ -3,6 +3,10 @@ use serde_json::Value; type Error = Box; +// #[lambda] attribute removes the need for boilerplate code +// required by `lambda::run(func).await?` as demonstrated in other +// examples. + #[lambda] #[tokio::main] async fn main(event: Value, _: Context) -> Result { diff --git a/lambda/src/client.rs b/lambda/src/client.rs index 88de0037..82b6e64c 100644 --- a/lambda/src/client.rs +++ b/lambda/src/client.rs @@ -63,10 +63,10 @@ mod endpoint_tests { use hyper::{server::conn::Http, service::service_fn, Body}; use serde_json::json; use std::convert::TryFrom; - use tokio::stream::StreamExt; use tokio::{ io::{AsyncRead, AsyncWrite}, select, + stream::StreamExt, sync::{self, oneshot}, }; diff --git a/lambda/src/lib.rs b/lambda/src/lib.rs index 0dadbf74..9025717f 100644 --- a/lambda/src/lib.rs +++ b/lambda/src/lib.rs @@ -55,7 +55,7 @@ use tokio::{ stream::{Stream, StreamExt}, }; use tower_service::Service; -use tracing::trace; +use tracing::{error, trace}; mod client; mod requests; @@ -130,8 +130,8 @@ pub struct HandlerFn { impl Handler for HandlerFn where F: Fn(A, Context) -> Fut, - Fut: Future> + Send + Sync, - Error: Into> + fmt::Debug, + Fut: Future> + Send, + Error: Into> + fmt::Display, { type Error = Error; type Fut = Fut; @@ -173,8 +173,8 @@ where ) -> Result<(), Error> where F: Handler + Send + Sync + 'static, - >::Fut: Future>::Error>> + Send + Sync + 'static, - >::Error: fmt::Debug + Send + Sync + 'static, + >::Fut: Future>::Error>> + Send + 'static, + >::Error: fmt::Display + Send + Sync + 'static, A: for<'de> Deserialize<'de> + Send + Sync + 'static, B: Serialize + Send + Sync + 'static, { @@ -182,11 +182,13 @@ where let handler = Arc::new(handler); tokio::pin!(incoming); while let Some(event) = incoming.next().await { + trace!("New event arrived (run loop)"); let event = event?; let (parts, body) = event.into_parts(); let ctx: Context = Context::try_from(parts.headers)?; let body = hyper::body::to_bytes(body).await?; + trace!("{}", std::str::from_utf8(&body)?); // this may be very verbose let body = serde_json::from_slice(&body)?; let handler = Arc::clone(&handler); @@ -195,28 +197,37 @@ where let req = match task.await { Ok(response) => match response.await { - Ok(response) => EventCompletionRequest { - request_id, - body: response, + Ok(response) => { + trace!("Ok response from handler (run loop)"); + EventCompletionRequest { + request_id, + body: response, + } + .into_req() + } + Err(err) => { + error!("{}", err); // logs the error in CloudWatch + EventErrorRequest { + request_id, + diagnostic: Diagnostic { + error_type: type_name_of_val(&err).to_owned(), + error_message: format!("{}", err), // returns the error to the caller via Lambda API + }, + } + .into_req() } - .into_req(), - Err(err) => EventErrorRequest { + }, + Err(err) if err.is_panic() => { + error!("{:?}", err); // inconsistent with other log record formats - to be reviewed + EventErrorRequest { request_id, diagnostic: Diagnostic { error_type: type_name_of_val(&err).to_owned(), - error_message: format!("{:?}", err), + error_message: format!("Lambda panicked: {}", err), }, } - .into_req(), - }, - Err(err) if err.is_panic() => EventErrorRequest { - request_id, - diagnostic: Diagnostic { - error_type: type_name_of_val(&err).to_owned(), - error_message: "Lambda panicked!".to_owned(), - }, + .into_req() } - .into_req(), Err(_) => unreachable!("tokio::task should not be canceled"), }; let req = req?; @@ -285,6 +296,7 @@ where { async_stream::stream! { loop { + trace!("Waiting for next event (incoming loop)"); let req = NextEventRequest.into_req().expect("Unable to construct request"); let res = client.call(req).await; yield res; @@ -316,8 +328,8 @@ where pub async fn run(handler: F) -> Result<(), Error> where F: Handler + Send + Sync + 'static, - >::Fut: Future>::Error>> + Send + Sync + 'static, - >::Error: fmt::Debug + Send + Sync + 'static, + >::Fut: Future>::Error>> + Send + 'static, + >::Error: fmt::Display + Send + Sync + 'static, A: for<'de> Deserialize<'de> + Send + Sync + 'static, B: Serialize + Send + Sync + 'static, { @@ -331,7 +343,7 @@ where .expect("Unable create runtime"); let client = &runtime.client; - let incoming = incoming(client).take(1); + let incoming = incoming(client); runtime.run(incoming, handler).await }