From abef8a411f4b758336a29eb9afc4aca8099d325d Mon Sep 17 00:00:00 2001 From: David Calavera Date: Tue, 30 Aug 2022 18:44:08 -0700 Subject: [PATCH] Report error when we cannot deserialize the payload. Instead of panic, capture the error, and report it to the Lambda api. This is more friendly to operate. Signed-off-by: David Calavera --- lambda-runtime/src/lib.rs | 89 ++++++++++++++++++---------------- lambda-runtime/src/requests.rs | 18 +++++-- lambda-runtime/src/types.rs | 37 +++++++------- 3 files changed, 80 insertions(+), 64 deletions(-) diff --git a/lambda-runtime/src/lib.rs b/lambda-runtime/src/lib.rs index b23c3cd2..4249bcee 100644 --- a/lambda-runtime/src/lib.rs +++ b/lambda-runtime/src/lib.rs @@ -7,10 +7,20 @@ //! Create a type that conforms to the [`tower::Service`] trait. This type can //! then be passed to the the `lambda_runtime::run` function, which launches //! and runs the Lambda runtime. -use hyper::client::{connect::Connection, HttpConnector}; +use hyper::{ + client::{connect::Connection, HttpConnector}, + http::Request, + Body, +}; use lambda_runtime_api_client::Client; use serde::{Deserialize, Serialize}; -use std::{convert::TryFrom, env, fmt, future::Future, panic}; +use std::{ + convert::TryFrom, + env, + fmt::{self, Debug, Display}, + future::Future, + panic, +}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_stream::{Stream, StreamExt}; pub use tower::{self, service_fn, Service}; @@ -24,7 +34,6 @@ mod simulated; mod types; use requests::{EventCompletionRequest, EventErrorRequest, IntoRequest, NextEventRequest}; -use types::Diagnostic; pub use types::{Context, LambdaEvent}; /// Error type that lambdas may result in @@ -121,12 +130,20 @@ where let ctx: Context = Context::try_from(parts.headers)?; let ctx: Context = ctx.with_config(config); - let body = serde_json::from_slice(&body)?; + let request_id = &ctx.request_id.clone(); let xray_trace_id = &ctx.xray_trace_id.clone(); env::set_var("_X_AMZN_TRACE_ID", xray_trace_id); - let request_id = &ctx.request_id.clone(); + let body = match serde_json::from_slice(&body) { + Ok(body) => body, + Err(err) => { + let req = build_event_error_request(request_id, err)?; + client.call(req).await.expect("Unable to send response to Runtime APIs"); + return Ok(()); + } + }; + let req = match handler.ready().await { Ok(handler) => { let task = @@ -141,48 +158,23 @@ where } .into_req() } - Err(err) => { - error!("{:?}", err); // logs the error in CloudWatch - EventErrorRequest { - request_id, - diagnostic: Diagnostic { - error_type: type_name_of_val(&err).to_owned(), - error_message: format!("{}", err), // returns the error to the caller via Lambda API - }, - } - .into_req() - } + Err(err) => build_event_error_request(request_id, err), }, Err(err) => { error!("{:?}", err); - EventErrorRequest { - request_id, - diagnostic: Diagnostic { - error_type: type_name_of_val(&err).to_owned(), - error_message: if let Some(msg) = err.downcast_ref::<&str>() { - format!("Lambda panicked: {}", msg) - } else { - "Lambda panicked".to_string() - }, - }, - } - .into_req() + let error_type = type_name_of_val(&err); + let msg = if let Some(msg) = err.downcast_ref::<&str>() { + format!("Lambda panicked: {}", msg) + } else { + "Lambda panicked".to_string() + }; + EventErrorRequest::new(request_id, error_type, &msg).into_req() } } } - Err(err) => { - error!("{:?}", err); // logs the error in CloudWatch - EventErrorRequest { - request_id, - diagnostic: Diagnostic { - error_type: type_name_of_val(&err).to_owned(), - error_message: format!("{}", err), // returns the error to the caller via Lambda API - }, - } - .into_req() - } - }; - let req = req?; + Err(err) => build_event_error_request(request_id, err), + }?; + client.call(req).await.expect("Unable to send response to Runtime APIs"); } Ok(()) @@ -247,6 +239,17 @@ fn type_name_of_val(_: T) -> &'static str { std::any::type_name::() } +fn build_event_error_request(request_id: &str, err: T) -> Result, Error> +where + T: Display + Debug, +{ + error!("{:?}", err); // logs the error in CloudWatch + let error_type = type_name_of_val(&err); + let msg = format!("{}", err); + + EventErrorRequest::new(request_id, error_type, &msg).into_req() +} + #[cfg(test)] mod endpoint_tests { use crate::{ @@ -431,8 +434,8 @@ mod endpoint_tests { let req = EventErrorRequest { request_id: "156cb537-e2d4-11e8-9b34-d36013741fb9", diagnostic: Diagnostic { - error_type: "InvalidEventDataError".to_string(), - error_message: "Error parsing event data".to_string(), + error_type: "InvalidEventDataError", + error_message: "Error parsing event data", }, }; let req = req.into_req()?; diff --git a/lambda-runtime/src/requests.rs b/lambda-runtime/src/requests.rs index 97fc7e4d..26257d20 100644 --- a/lambda-runtime/src/requests.rs +++ b/lambda-runtime/src/requests.rs @@ -104,7 +104,19 @@ fn test_event_completion_request() { // /runtime/invocation/{AwsRequestId}/error pub(crate) struct EventErrorRequest<'a> { pub(crate) request_id: &'a str, - pub(crate) diagnostic: Diagnostic, + pub(crate) diagnostic: Diagnostic<'a>, +} + +impl<'a> EventErrorRequest<'a> { + pub(crate) fn new(request_id: &'a str, error_type: &'a str, error_message: &'a str) -> EventErrorRequest<'a> { + EventErrorRequest { + request_id, + diagnostic: Diagnostic { + error_type, + error_message, + }, + } + } } impl<'a> IntoRequest for EventErrorRequest<'a> { @@ -128,8 +140,8 @@ fn test_event_error_request() { let req = EventErrorRequest { request_id: "id", diagnostic: Diagnostic { - error_type: "InvalidEventDataError".to_string(), - error_message: "Error parsing event data".to_string(), + error_type: "InvalidEventDataError", + error_message: "Error parsing event data", }, }; let req = req.into_req().unwrap(); diff --git a/lambda-runtime/src/types.rs b/lambda-runtime/src/types.rs index b9e36702..ddf5d00b 100644 --- a/lambda-runtime/src/types.rs +++ b/lambda-runtime/src/types.rs @@ -5,24 +5,9 @@ use std::{collections::HashMap, convert::TryFrom}; #[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] -pub(crate) struct Diagnostic { - pub(crate) error_type: String, - pub(crate) error_message: String, -} - -#[test] -fn round_trip_lambda_error() -> Result<(), Error> { - use serde_json::{json, Value}; - let expected = json!({ - "errorType": "InvalidEventDataError", - "errorMessage": "Error parsing event data.", - }); - - let actual: Diagnostic = serde_json::from_value(expected.clone())?; - let actual: Value = serde_json::to_value(actual)?; - assert_eq!(expected, actual); - - Ok(()) +pub(crate) struct Diagnostic<'a> { + pub(crate) error_type: &'a str, + pub(crate) error_message: &'a str, } /// The request ID, which identifies the request that triggered the function invocation. This header @@ -191,6 +176,22 @@ impl LambdaEvent { mod test { use super::*; + #[test] + fn round_trip_lambda_error() { + use serde_json::{json, Value}; + let expected = json!({ + "errorType": "InvalidEventDataError", + "errorMessage": "Error parsing event data.", + }); + + let actual = Diagnostic { + error_type: "InvalidEventDataError", + error_message: "Error parsing event data.", + }; + let actual: Value = serde_json::to_value(actual).expect("failed to serialize diagnostic"); + assert_eq!(expected, actual); + } + #[test] fn context_with_expected_values_and_types_resolves() { let mut headers = HeaderMap::new();