diff --git a/lambda-extension/examples/custom_events.rs b/lambda-extension/examples/custom_events.rs index d2756c23..f796ca31 100644 --- a/lambda-extension/examples/custom_events.rs +++ b/lambda-extension/examples/custom_events.rs @@ -1,4 +1,4 @@ -use lambda_extension::{service_fn, Error, LambdaEvent, NextEvent, Runtime}; +use lambda_extension::{service_fn, Error, Extension, LambdaEvent, NextEvent}; async fn my_extension(event: LambdaEvent) -> Result<(), Error> { match event.next { @@ -27,9 +27,9 @@ async fn main() -> Result<(), Error> { .without_time() .init(); - let func = service_fn(my_extension); - - let runtime = Runtime::builder().with_events(&["SHUTDOWN"]).register().await?; - - runtime.run(func).await + Extension::new() + .with_events(&["SHUTDOWN"]) + .with_events_processor(service_fn(my_extension)) + .run() + .await } diff --git a/lambda-extension/src/error.rs b/lambda-extension/src/error.rs new file mode 100644 index 00000000..2c3e23b3 --- /dev/null +++ b/lambda-extension/src/error.rs @@ -0,0 +1,23 @@ +/// Error type that extensions may result in +pub type Error = lambda_runtime_api_client::Error; + +/// Simple error that encapsulates human readable descriptions +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ExtensionError { + err: String, +} + +impl ExtensionError { + pub(crate) fn boxed>(str: T) -> Box { + Box::new(ExtensionError { err: str.into() }) + } +} + +impl std::fmt::Display for ExtensionError { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.err.fmt(f) + } +} + +impl std::error::Error for ExtensionError {} diff --git a/lambda-extension/src/events.rs b/lambda-extension/src/events.rs new file mode 100644 index 00000000..87fd62a4 --- /dev/null +++ b/lambda-extension/src/events.rs @@ -0,0 +1,71 @@ +use serde::Deserialize; + +/// Request tracing information +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Tracing { + /// The type of tracing exposed to the extension + pub r#type: String, + /// The span value + pub value: String, +} +/// Event received when there is a new Lambda invocation. +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct InvokeEvent { + /// The time that the function times out + pub deadline_ms: u64, + /// The ID assigned to the Lambda request + pub request_id: String, + /// The function's Amazon Resource Name + pub invoked_function_arn: String, + /// The request tracing information + pub tracing: Tracing, +} + +/// Event received when a Lambda function shuts down. +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ShutdownEvent { + /// The reason why the function terminates + /// It can be SPINDOWN, TIMEOUT, or FAILURE + pub shutdown_reason: String, + /// The time that the function times out + pub deadline_ms: u64, +} + +/// Event that the extension receives in +/// either the INVOKE or SHUTDOWN phase +#[derive(Debug, Deserialize)] +#[serde(rename_all = "UPPERCASE", tag = "eventType")] +pub enum NextEvent { + /// Payload when the event happens in the INVOKE phase + Invoke(InvokeEvent), + /// Payload when the event happens in the SHUTDOWN phase + Shutdown(ShutdownEvent), +} + +impl NextEvent { + /// Return whether the event is a [`NextEvent::Invoke`] event or not + pub fn is_invoke(&self) -> bool { + matches!(self, NextEvent::Invoke(_)) + } +} + +/// Wrapper with information about the next +/// event that the Lambda Runtime is going to process +pub struct LambdaEvent { + /// ID assigned to this extension by the Lambda Runtime + pub extension_id: String, + /// Next incoming event + pub next: NextEvent, +} + +impl LambdaEvent { + pub(crate) fn new(ex_id: &str, next: NextEvent) -> LambdaEvent { + LambdaEvent { + extension_id: ex_id.into(), + next, + } + } +} diff --git a/lambda-extension/src/extension.rs b/lambda-extension/src/extension.rs new file mode 100644 index 00000000..40aa340a --- /dev/null +++ b/lambda-extension/src/extension.rs @@ -0,0 +1,234 @@ +use crate::{logs::*, requests, Error, ExtensionError, LambdaEvent, NextEvent}; +use lambda_runtime_api_client::Client; +use std::{fmt, future::ready, future::Future, path::PathBuf, pin::Pin}; +use tokio_stream::StreamExt; +use tower::Service; +use tracing::trace; + +/// An Extension that runs event and log processors +pub struct Extension<'a, E, L> { + extension_name: Option<&'a str>, + events: Option<&'a [&'a str]>, + events_processor: E, + log_types: Option<&'a [&'a str]>, + logs_processor: Option, + log_buffering: Option, +} + +impl<'a> Extension<'a, Identity, Identity> { + /// Create a new base [`Extension`] with a no-op events processor + pub fn new() -> Self { + Extension { + extension_name: None, + events: None, + events_processor: Identity::new(), + log_types: None, + log_buffering: None, + logs_processor: None, + } + } +} + +impl<'a> Default for Extension<'a, Identity, Identity> { + fn default() -> Self { + Self::new() + } +} + +impl<'a, E, L> Extension<'a, E, L> +where + E: Service, + E::Future: Future>, + E::Error: Into> + fmt::Display, + + L: Service, + L::Future: Future>, + L::Error: Into> + fmt::Display, +{ + /// Create a new [`Extension`] with a given extension name + pub fn with_extension_name(self, extension_name: &'a str) -> Self { + Extension { + extension_name: Some(extension_name), + ..self + } + } + + /// Create a new [`Extension`] with a list of given events. + /// The only accepted events are `INVOKE` and `SHUTDOWN`. + pub fn with_events(self, events: &'a [&'a str]) -> Self { + Extension { + events: Some(events), + ..self + } + } + + /// Create a new [`Extension`] with a service that receives Lambda events. + pub fn with_events_processor(self, ep: N) -> Extension<'a, N, L> + where + N: Service, + N::Future: Future>, + N::Error: Into> + fmt::Display, + { + Extension { + events_processor: ep, + extension_name: self.extension_name, + events: self.events, + log_types: self.log_types, + log_buffering: self.log_buffering, + logs_processor: self.logs_processor, + } + } + + /// Create a new [`Extension`] with a service that receives Lambda logs. + pub fn with_logs_processor(self, lp: N) -> Extension<'a, E, N> + where + N: Service, + N::Future: Future>, + N::Error: Into> + fmt::Display, + { + Extension { + logs_processor: Some(lp), + events_processor: self.events_processor, + extension_name: self.extension_name, + events: self.events, + log_types: self.log_types, + log_buffering: self.log_buffering, + } + } + + /// Create a new [`Extension`] with a list of logs types to subscribe. + /// The only accepted log types are `function`, `platform`, and `extension`. + pub fn with_log_types(self, log_types: &'a [&'a str]) -> Self { + Extension { + log_types: Some(log_types), + ..self + } + } + + /// Create a new [`Extension`] with specific configuration to buffer logs. + pub fn with_log_buffering(self, lb: LogBuffering) -> Self { + Extension { + log_buffering: Some(lb), + ..self + } + } + + /// Execute the given extension + pub async fn run(self) -> Result<(), Error> { + let client = &Client::builder().build()?; + + let extension_id = register(client, self.extension_name, self.events).await?; + let extension_id = extension_id.to_str()?; + let mut ep = self.events_processor; + + if let Some(mut _lp) = self.logs_processor { + // fixme(david): + // - Spawn task to run processor + + // - Call Logs API to start receiving events + let req = requests::subscribe_logs_request(extension_id, self.log_types, self.log_buffering)?; + let res = client.call(req).await?; + if res.status() != http::StatusCode::OK { + return Err(ExtensionError::boxed("unable to initialize the logs api")); + } + } + + let incoming = async_stream::stream! { + loop { + trace!("Waiting for next event (incoming loop)"); + let req = requests::next_event_request(extension_id)?; + let res = client.call(req).await; + yield res; + } + }; + + tokio::pin!(incoming); + while let Some(event) = incoming.next().await { + trace!("New event arrived (run loop)"); + let event = event?; + let (_parts, body) = event.into_parts(); + + let body = hyper::body::to_bytes(body).await?; + trace!("{}", std::str::from_utf8(&body)?); // this may be very verbose + let event: NextEvent = serde_json::from_slice(&body)?; + let is_invoke = event.is_invoke(); + + let event = LambdaEvent::new(extension_id, event); + + let res = ep.call(event).await; + if let Err(error) = res { + let req = if is_invoke { + requests::init_error(extension_id, &error.to_string(), None)? + } else { + requests::exit_error(extension_id, &error.to_string(), None)? + }; + + client.call(req).await?; + return Err(error.into()); + } + } + Ok(()) + } +} + +/// A no-op generic processor +pub struct Identity { + _pd: std::marker::PhantomData, +} + +impl Identity { + fn new() -> Identity { + Identity { + _pd: std::marker::PhantomData, + } + } +} + +impl Service for Identity { + type Error = Error; + type Future = Pin>>>; + type Response = (); + + fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll> { + core::task::Poll::Ready(Ok(())) + } + + fn call(&mut self, _event: T) -> Self::Future { + Box::pin(ready(Ok(()))) + } +} + +/// Initialize and register the extension in the Extensions API +async fn register<'a>( + client: &'a Client, + extension_name: Option<&'a str>, + events: Option<&'a [&'a str]>, +) -> Result { + let name = match extension_name { + Some(name) => name.into(), + None => { + let args: Vec = std::env::args().collect(); + PathBuf::from(args[0].clone()) + .file_name() + .expect("unexpected executable name") + .to_str() + .expect("unexpect executable name") + .to_string() + } + }; + + let events = events.unwrap_or(&["INVOKE", "SHUTDOWN"]); + + let req = requests::register_request(&name, events)?; + let res = client.call(req).await?; + if res.status() != http::StatusCode::OK { + return Err(ExtensionError::boxed("unable to register the extension")); + } + + let header = res + .headers() + .get(requests::EXTENSION_ID_HEADER) + .ok_or_else(|| ExtensionError::boxed("missing extension id header")) + .map_err(|e| ExtensionError::boxed(e.to_string()))?; + Ok(header.clone()) +} diff --git a/lambda-extension/src/lib.rs b/lambda-extension/src/lib.rs index 130aae50..18224a08 100644 --- a/lambda-extension/src/lib.rs +++ b/lambda-extension/src/lib.rs @@ -6,239 +6,28 @@ //! //! Create a type that conforms to the [`Extension`] trait. This type can then be passed //! to the the `lambda_extension::run` function, which launches and runs the Lambda runtime extension. -use hyper::client::{connect::Connection, HttpConnector}; -use lambda_runtime_api_client::Client; -use serde::Deserialize; -use std::{fmt, future::Future, path::PathBuf}; -use tokio::io::{AsyncRead, AsyncWrite}; -use tokio_stream::StreamExt; +use std::{fmt, future::Future}; pub use tower::{self, service_fn, Service}; -use tracing::trace; + +mod error; +pub use error::*; +mod extension; +pub use extension::*; +mod events; +pub use events::*; +mod logs; +pub use logs::*; /// Include several request builders to interact with the Extension API. pub mod requests; -/// Error type that extensions may result in -pub type Error = lambda_runtime_api_client::Error; - -/// Simple error that encapsulates human readable descriptions -#[derive(Clone, Debug, PartialEq, Eq)] -pub struct ExtensionError { - err: String, -} - -impl ExtensionError { - fn boxed>(str: T) -> Box { - Box::new(ExtensionError { err: str.into() }) - } -} - -impl std::fmt::Display for ExtensionError { - #[inline] - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.err.fmt(f) - } -} - -impl std::error::Error for ExtensionError {} - -/// Request tracing information -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Tracing { - /// The type of tracing exposed to the extension - pub r#type: String, - /// The span value - pub value: String, -} - -/// Event received when there is a new Lambda invocation. -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct InvokeEvent { - /// The time that the function times out - pub deadline_ms: u64, - /// The ID assigned to the Lambda request - pub request_id: String, - /// The function's Amazon Resource Name - pub invoked_function_arn: String, - /// The request tracing information - pub tracing: Tracing, -} - -/// Event received when a Lambda function shuts down. -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ShutdownEvent { - /// The reason why the function terminates - /// It can be SPINDOWN, TIMEOUT, or FAILURE - pub shutdown_reason: String, - /// The time that the function times out - pub deadline_ms: u64, -} - -/// Event that the extension receives in -/// either the INVOKE or SHUTDOWN phase -#[derive(Debug, Deserialize)] -#[serde(rename_all = "UPPERCASE", tag = "eventType")] -pub enum NextEvent { - /// Payload when the event happens in the INVOKE phase - Invoke(InvokeEvent), - /// Payload when the event happens in the SHUTDOWN phase - Shutdown(ShutdownEvent), -} - -impl NextEvent { - fn is_invoke(&self) -> bool { - matches!(self, NextEvent::Invoke(_)) - } -} - -/// Wrapper with information about the next -/// event that the Lambda Runtime is going to process -pub struct LambdaEvent { - /// ID assigned to this extension by the Lambda Runtime - pub extension_id: String, - /// Next incoming event - pub next: NextEvent, -} - -/// The Runtime handles all the incoming extension requests -pub struct Runtime = HttpConnector> { - extension_id: String, - client: Client, -} - -impl Runtime { - /// Create a [`RuntimeBuilder`] to initialize the [`Runtime`] - pub fn builder<'a>() -> RuntimeBuilder<'a> { - RuntimeBuilder::default() - } -} - -impl Runtime -where - C: Service + Clone + Send + Sync + Unpin + 'static, - C::Future: Unpin + Send, - C::Error: Into>, - C::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, -{ - /// Execute the given extension. - /// Register the extension with the Extensions API and wait for incoming events. - pub async fn run(&self, mut extension: E) -> Result<(), Error> - where - E: Service, - E::Future: Future>, - E::Error: Into> + fmt::Display, - { - let client = &self.client; - - let incoming = async_stream::stream! { - loop { - trace!("Waiting for next event (incoming loop)"); - let req = requests::next_event_request(&self.extension_id)?; - let res = client.call(req).await; - yield res; - } - }; - - tokio::pin!(incoming); - while let Some(event) = incoming.next().await { - trace!("New event arrived (run loop)"); - let event = event?; - let (_parts, body) = event.into_parts(); - - let body = hyper::body::to_bytes(body).await?; - trace!("{}", std::str::from_utf8(&body)?); // this may be very verbose - let event: NextEvent = serde_json::from_slice(&body)?; - let is_invoke = event.is_invoke(); - - let event = LambdaEvent { - extension_id: self.extension_id.clone(), - next: event, - }; - - let res = extension.call(event).await; - if let Err(error) = res { - let req = if is_invoke { - requests::init_error(&self.extension_id, &error.to_string(), None)? - } else { - requests::exit_error(&self.extension_id, &error.to_string(), None)? - }; - - self.client.call(req).await?; - return Err(error.into()); - } - } - - Ok(()) - } -} - -/// Builder to construct a new extension [`Runtime`] -#[derive(Default)] -pub struct RuntimeBuilder<'a> { - extension_name: Option<&'a str>, - events: Option<&'a [&'a str]>, -} - -impl<'a> RuntimeBuilder<'a> { - /// Create a new [`RuntimeBuilder`] with a given extension name - pub fn with_extension_name(self, extension_name: &'a str) -> Self { - RuntimeBuilder { - extension_name: Some(extension_name), - ..self - } - } - - /// Create a new [`RuntimeBuilder`] with a list of given events. - /// The only accepted events are `INVOKE` and `SHUTDOWN`. - pub fn with_events(self, events: &'a [&'a str]) -> Self { - RuntimeBuilder { - events: Some(events), - ..self - } - } - - /// Initialize and register the extension in the Extensions API - pub async fn register(&self) -> Result { - let name = match self.extension_name { - Some(name) => name.into(), - None => { - let args: Vec = std::env::args().collect(); - PathBuf::from(args[0].clone()) - .file_name() - .expect("unexpected executable name") - .to_str() - .expect("unexpect executable name") - .to_string() - } - }; - - let events = self.events.unwrap_or(&["INVOKE", "SHUTDOWN"]); - - let client = Client::builder().build()?; - - let req = requests::register_request(&name, events)?; - let res = client.call(req).await?; - if res.status() != http::StatusCode::OK { - return Err(ExtensionError::boxed("unable to register the extension")); - } - - let extension_id = res.headers().get(requests::EXTENSION_ID_HEADER).unwrap().to_str()?; - Ok(Runtime { - extension_id: extension_id.into(), - client, - }) - } -} - -/// Execute the given extension -pub async fn run(extension: E) -> Result<(), Error> +/// Execute the given events processor +pub async fn run(events_processor: E) -> Result<(), Error> where E: Service, E::Future: Future>, E::Error: Into> + fmt::Display, { - Runtime::builder().register().await?.run(extension).await + let ext = Extension::new().with_events_processor(events_processor); + ext.run().await } diff --git a/lambda-extension/src/logs.rs b/lambda-extension/src/logs.rs new file mode 100644 index 00000000..7355a7dd --- /dev/null +++ b/lambda-extension/src/logs.rs @@ -0,0 +1,40 @@ +use serde::{Deserialize, Serialize}; + +/// Payload received from the Lambda Logs API +/// See: https://docs.aws.amazon.com/lambda/latest/dg/runtimes-logs-api.html#runtimes-logs-api-msg +#[derive(Debug, Deserialize)] +pub struct LambdaLog { + /// Time when the log was generated + pub time: String, + /// Log type, either function, extension, or platform types + pub r#type: String, + // Fixme(david): the record can be a struct with more information, implement custom deserializer + /// Log data + pub record: String, +} + +/// Log buffering configuration. +/// Allows Lambda to buffer logs before deliverying them to a subscriber. +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct LogBuffering { + /// The maximum time (in milliseconds) to buffer a batch. + /// Default: 1,000. Minimum: 25. Maximum: 30,000 + pub timeout_ms: usize, + /// The maximum size (in bytes) of the logs to buffer in memory. + /// Default: 262,144. Minimum: 262,144. Maximum: 1,048,576 + pub max_bytes: usize, + /// The maximum number of events to buffer in memory. + /// Default: 10,000. Minimum: 1,000. Maximum: 10,000 + pub max_items: usize, +} + +impl Default for LogBuffering { + fn default() -> Self { + LogBuffering { + timeout_ms: 1_000, + max_bytes: 262_144, + max_items: 10_000, + } + } +} diff --git a/lambda-extension/src/requests.rs b/lambda-extension/src/requests.rs index 2fdbf2a6..4ca3157c 100644 --- a/lambda-extension/src/requests.rs +++ b/lambda-extension/src/requests.rs @@ -1,4 +1,4 @@ -use crate::Error; +use crate::{Error, LogBuffering}; use http::{Method, Request}; use hyper::Body; use lambda_runtime_api_client::build_request; @@ -29,6 +29,32 @@ pub(crate) fn register_request(extension_name: &str, events: &[&str]) -> Result< Ok(req) } +pub(crate) fn subscribe_logs_request( + extension_id: &str, + types: Option<&[&str]>, + buffering: Option, +) -> Result, Error> { + let types = types.unwrap_or(&["platform", "function"]); + + let data = serde_json::json!({ + "schemaVersion": "2021-03-18", + "types": types, + "buffering": buffering.unwrap_or_default(), + "destination": { + "protocol": "HTTP", + "URI": "http://sandbox.localdomain:8080" + } + }); + + let req = build_request() + .method(Method::PUT) + .uri("/2020-08-15/logs") + .header(EXTENSION_ID_HEADER, extension_id) + .body(Body::from(serde_json::to_string(&data)?))?; + + Ok(req) +} + /// Payload to send error information to the Extensions API. #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")]