From c87d08ca100726b38a86c0b384217205be4e2186 Mon Sep 17 00:00:00 2001 From: "David A. Ramos" Date: Fri, 1 Dec 2023 01:05:50 -0800 Subject: [PATCH 1/4] Support internal Lambda extensions Internal Lambda extensions must be registered during the Lamba lifecycle Init phase, which ends when the runtime calls Next to await the first event. Since the `Extension::run` function both registers and executes the extension, there was previously no way for the runtime to determine that all internal extensions have been registered and that it's safe to proceed to the Invoke lifecycle phase. This change introduces an `Extension::register` method that registers the extension and begins any logs/telemetry handlers. It then returns a new `RegisteredExtension` abstraction that can be used to invoke the extension's run loop concurrently with the runtime's run loop. This change maintains backward compatibility by having the existing `Extension::run` method perform both steps. External Lambda extensions can use either API, and internal extensions should use the new API. Resolves #743. --- lambda-extension/src/extension.rs | 53 +++++++++++++++++++++++++++++-- 1 file changed, 50 insertions(+), 3 deletions(-) diff --git a/lambda-extension/src/extension.rs b/lambda-extension/src/extension.rs index 4747b041..d653e0dc 100644 --- a/lambda-extension/src/extension.rs +++ b/lambda-extension/src/extension.rs @@ -215,14 +215,21 @@ where } } - /// Execute the given extension - pub async fn run(self) -> Result<(), Error> { + /// Register the extension. + /// + /// Performs the + /// [init phase](https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtime-environment.html#runtimes-lifecycle-ib) + /// Lambda lifecycle operations to register the extension. When implementing an internal Lambda + /// extension, it is safe to call `lambda_runtime::run` once the future returned by this + /// function resolves. + pub async fn register(self) -> Result, Error> { let client = &Client::builder().build()?; let extension_id = register(client, self.extension_name, self.events).await?; let extension_id = extension_id.to_str()?; - let mut ep = self.events_processor; + // Logs API subscriptions must be requested during the Lambda init phase (see + // https://docs.aws.amazon.com/lambda/latest/dg/runtimes-logs-api.html#runtimes-logs-api-subscribing). if let Some(mut log_processor) = self.logs_processor { trace!("Log processor found"); @@ -262,6 +269,8 @@ where trace!("Registered extension with Logs API"); } + // Telemetry API subscriptions must be requested during the Lambda init phase (see + // https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api.html#telemetry-api-registration if let Some(mut telemetry_processor) = self.telemetry_processor { trace!("Telemetry processor found"); @@ -301,6 +310,42 @@ where trace!("Registered extension with Telemetry API"); } + Ok(RegisteredExtension { + extension_id: extension_id.to_string(), + events_processor: self.events_processor, + }) + } + + /// Execute the given extension. + pub async fn run(self) -> Result<(), Error> { + self.register().await?.run().await + } +} + +/// An extension registered by calling [`Extension::register`]. +pub struct RegisteredExtension { + extension_id: String, + events_processor: E, +} + +impl RegisteredExtension +where + E: Service, + E::Future: Future>, + E::Error: Into> + fmt::Display + fmt::Debug, +{ + /// Execute the extension's run loop. + /// + /// Performs the + /// [invoke](https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtime-environment.html#runtimes-lifecycle-invoke) + /// and, for external Lambda extensions registered to receive the `SHUTDOWN` event, the + /// [shutdown](https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtime-environment.html#runtimes-lifecycle-shutdown) + /// Lambda lifecycle phases. + pub async fn run(self) -> Result<(), Error> { + let client = &Client::builder().build()?; + let mut ep = self.events_processor; + let extension_id = &self.extension_id; + let incoming = async_stream::stream! { loop { trace!("Waiting for next event (incoming loop)"); @@ -351,6 +396,8 @@ where return Err(err.into()); } } + + // Unreachable. Ok(()) } } From 7a2d28a836c9ccc5fddc8af1f3d2daf789387eab Mon Sep 17 00:00:00 2001 From: "David A. Ramos" Date: Sat, 2 Dec 2023 16:00:04 -0800 Subject: [PATCH 2/4] Add example --- examples/extension-internal-flush/Cargo.toml | 12 ++ examples/extension-internal-flush/README.md | 30 +++++ examples/extension-internal-flush/src/main.rs | 112 ++++++++++++++++++ 3 files changed, 154 insertions(+) create mode 100644 examples/extension-internal-flush/Cargo.toml create mode 100644 examples/extension-internal-flush/README.md create mode 100644 examples/extension-internal-flush/src/main.rs diff --git a/examples/extension-internal-flush/Cargo.toml b/examples/extension-internal-flush/Cargo.toml new file mode 100644 index 00000000..daadd0eb --- /dev/null +++ b/examples/extension-internal-flush/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "extension-internal-flush" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow = "1" +aws_lambda_events = { path = "../../lambda-events" } +lambda-extension = { path = "../../lambda-extension" } +lambda_runtime = { path = "../../lambda-runtime" } +serde = "1.0.136" +tokio = { version = "1", features = ["macros", "sync"] } diff --git a/examples/extension-internal-flush/README.md b/examples/extension-internal-flush/README.md new file mode 100644 index 00000000..553f7a3d --- /dev/null +++ b/examples/extension-internal-flush/README.md @@ -0,0 +1,30 @@ +# AWS Lambda runtime + internal extension example + +This example demonstrates how to build an AWS Lambda function that includes a +[Lambda internal extension](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-extensions-api.html). +Unlike external extensions that run as separate processes, an internal extension runs within the +main runtime process. + +One use case for internal extensions is to flush logs or telemetry data after the Lambda runtime +handler has finished processing an event but before the execution environment is frozen awaiting the +arrival of the next event. Without an explicit flush, telemetry data may never be sent since the +execution environment will remain frozen and eventually be terminated if no additional events arrive. + +Note that for +[synchronous](https://docs.aws.amazon.com/lambda/latest/dg/invocation-sync.html) Lambda invocations +(e.g., via +[Amazon API Gateway](https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-integrations.html)), +the Lambda service returns the response to the caller immediately. Extensions may continue to run +without introducing an observable delay. + +## Build & Deploy + +1. Install [cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation) +2. Build the extension with `cargo lambda build --release` +3. Deploy the function to AWS Lambda with `cargo lambda deploy --iam-role YOUR_ROLE` + +The last command will give you an ARN for the extension layer that you can use in your functions. + +## Build for ARM 64 + +Build the extension with `cargo lambda build --release --arm64` diff --git a/examples/extension-internal-flush/src/main.rs b/examples/extension-internal-flush/src/main.rs new file mode 100644 index 00000000..a64f2a6c --- /dev/null +++ b/examples/extension-internal-flush/src/main.rs @@ -0,0 +1,112 @@ +use anyhow::anyhow; +use aws_lambda_events::sqs::{SqsBatchResponse, SqsEventObj}; +use lambda_extension::{service_fn, Error, Extension, NextEvent}; +use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::Mutex; + +use std::sync::Arc; + +/// Implements an internal Lambda extension to flush logs/telemetry after each request. +struct FlushExtension { + request_done_receiver: Arc>>, +} + +impl FlushExtension { + pub fn new(request_done_receiver: UnboundedReceiver<()>) -> Self { + Self { + request_done_receiver: Arc::new(Mutex::new(request_done_receiver)), + } + } + + pub async fn invoke(&self, event: lambda_extension::LambdaEvent) -> Result<(), Error> { + match event.next { + // NB: Internal extensions only support the INVOKE event. + NextEvent::Shutdown(shutdown) => { + return Err(anyhow!("extension received unexpected SHUTDOWN event: {:?}", shutdown).into()); + } + NextEvent::Invoke(_e) => {} + } + + eprintln!("[extension] waiting for event to be processed"); + + // Wait for runtime to finish processing event. + self.request_done_receiver + .lock() + .await + .recv() + .await + .ok_or_else(|| anyhow!("channel is closed"))?; + + eprintln!("[extension] flushing logs and telemetry"); + + // + + Ok(()) + } +} + +/// Object that you send to SQS and plan to process with the function. +#[derive(Debug, Deserialize, Serialize)] +struct Data { + a: String, + b: i64, +} + +/// Implements the main event handler for processing events from an SQS queue. +struct EventHandler { + request_done_sender: Arc>>, +} + +impl EventHandler { + pub fn new(request_done_sender: UnboundedSender<()>) -> Self { + Self { + request_done_sender: Arc::new(Mutex::new(request_done_sender)), + } + } + + pub async fn invoke( + &self, + event: lambda_runtime::LambdaEvent>, + ) -> Result { + let data = &event.payload.records[0].body; + eprintln!("[runtime] received event {data:?}"); + + // + + // Notify the extension to flush traces. + self.request_done_sender.lock().await.send(()).map_err(Box::new)?; + + Ok(SqsBatchResponse::default()) + } +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + let (request_done_sender, request_done_receiver) = unbounded_channel::<()>(); + + let flush_extension = Arc::new(FlushExtension::new(request_done_receiver)); + let extension = Extension::new() + // Internal extensions only support INVOKE events. + .with_events(&["INVOKE"]) + .with_events_processor(service_fn(|event| { + let flush_extension = flush_extension.clone(); + async move { flush_extension.invoke(event).await } + })) + // Extensions MUST be registered before calling lambda_runtime::run(), which ends the Init + // phase and begins the Invoke phase. + .register() + .await?; + + let handler = Arc::new(EventHandler::new(request_done_sender)); + + tokio::try_join!( + lambda_runtime::run(service_fn(|event| { + let handler = handler.clone(); + async move { handler.invoke(event).await } + })), + extension.run(), + )?; + + Ok(()) +} From c0e6b36400a83d67a31bf454e506b665f6c3dda4 Mon Sep 17 00:00:00 2001 From: "David A. Ramos" Date: Sat, 2 Dec 2023 16:39:27 -0800 Subject: [PATCH 3/4] Set extension name in example --- examples/extension-internal-flush/src/main.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/examples/extension-internal-flush/src/main.rs b/examples/extension-internal-flush/src/main.rs index a64f2a6c..e095ddef 100644 --- a/examples/extension-internal-flush/src/main.rs +++ b/examples/extension-internal-flush/src/main.rs @@ -93,6 +93,8 @@ async fn main() -> Result<(), Error> { let flush_extension = flush_extension.clone(); async move { flush_extension.invoke(event).await } })) + // Internal extension names MUST be unique within a given Lambda function. + .with_extension_name("internal-flush") // Extensions MUST be registered before calling lambda_runtime::run(), which ends the Init // phase and begins the Invoke phase. .register() From df44db4ca6a8dc7cd6e1d0b033d8897cdb1e4abb Mon Sep 17 00:00:00 2001 From: "David A. Ramos" Date: Sat, 2 Dec 2023 16:43:16 -0800 Subject: [PATCH 4/4] Remove unnecessary Arc/Mutex --- examples/extension-internal-flush/src/main.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/examples/extension-internal-flush/src/main.rs b/examples/extension-internal-flush/src/main.rs index e095ddef..3706809d 100644 --- a/examples/extension-internal-flush/src/main.rs +++ b/examples/extension-internal-flush/src/main.rs @@ -9,13 +9,13 @@ use std::sync::Arc; /// Implements an internal Lambda extension to flush logs/telemetry after each request. struct FlushExtension { - request_done_receiver: Arc>>, + request_done_receiver: Mutex>, } impl FlushExtension { pub fn new(request_done_receiver: UnboundedReceiver<()>) -> Self { Self { - request_done_receiver: Arc::new(Mutex::new(request_done_receiver)), + request_done_receiver: Mutex::new(request_done_receiver), } } @@ -55,14 +55,12 @@ struct Data { /// Implements the main event handler for processing events from an SQS queue. struct EventHandler { - request_done_sender: Arc>>, + request_done_sender: UnboundedSender<()>, } impl EventHandler { pub fn new(request_done_sender: UnboundedSender<()>) -> Self { - Self { - request_done_sender: Arc::new(Mutex::new(request_done_sender)), - } + Self { request_done_sender } } pub async fn invoke( @@ -75,7 +73,7 @@ impl EventHandler { // // Notify the extension to flush traces. - self.request_done_sender.lock().await.send(()).map_err(Box::new)?; + self.request_done_sender.send(()).map_err(Box::new)?; Ok(SqsBatchResponse::default()) }