diff --git a/lambda-extension/src/extension.rs b/lambda-extension/src/extension.rs index 81462c24..ec83ce71 100644 --- a/lambda-extension/src/extension.rs +++ b/lambda-extension/src/extension.rs @@ -1,14 +1,16 @@ -use crate::{logs::*, requests, Error, ExtensionError, LambdaEvent, NextEvent}; -use hyper::{server::conn::AddrStream, Server}; -use lambda_runtime_api_client::Client; use std::{ convert::Infallible, fmt, future::ready, future::Future, net::SocketAddr, path::PathBuf, pin::Pin, sync::Arc, }; + +use hyper::{server::conn::AddrStream, Server}; +use lambda_runtime_api_client::Client; use tokio::sync::Mutex; use tokio_stream::StreamExt; -use tower::{service_fn, MakeService, Service}; +use tower::{service_fn, MakeService, Service, ServiceExt}; use tracing::{error, trace}; +use crate::{logs::*, requests, Error, ExtensionError, LambdaEvent, NextEvent}; + const DEFAULT_LOG_PORT_NUMBER: u16 = 9002; /// An Extension that runs event and log processors @@ -199,6 +201,21 @@ where let event = LambdaEvent::new(extension_id, event); + let ep = match ep.ready().await { + Ok(ep) => ep, + Err(error) => { + println!("Inner service is not ready: {:?}", error); + let req = if is_invoke { + requests::init_error(extension_id, &error.to_string(), None)? + } else { + requests::exit_error(extension_id, &error.to_string(), None)? + }; + + client.call(req).await?; + return Err(error.into()); + } + }; + let res = ep.call(event).await; if let Err(error) = res { println!("{:?}", error); diff --git a/lambda-http/src/lib.rs b/lambda-http/src/lib.rs index 3581002f..d8d1e942 100644 --- a/lambda-http/src/lib.rs +++ b/lambda-http/src/lib.rs @@ -155,8 +155,8 @@ where type Error = E; type Future = TransformResponse<'a, R, Self::Error>; - fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll> { - core::task::Poll::Ready(Ok(())) + fn poll_ready(&mut self, cx: &mut core::task::Context<'_>) -> core::task::Poll> { + self.service.poll_ready(cx) } fn call(&mut self, req: LambdaEvent) -> Self::Future { diff --git a/lambda-integration-tests/src/bin/extension-trait.rs b/lambda-integration-tests/src/bin/extension-trait.rs index 1dc73c75..ecf46c81 100644 --- a/lambda-integration-tests/src/bin/extension-trait.rs +++ b/lambda-integration-tests/src/bin/extension-trait.rs @@ -1,13 +1,36 @@ -use lambda_extension::{Error, LambdaEvent, NextEvent, Service}; use std::{ future::{ready, Future}, pin::Pin, + sync::atomic::{AtomicBool, Ordering}, }; + +use lambda_extension::{Error, LambdaEvent, NextEvent, Service}; use tracing::info; -#[derive(Default)] struct MyExtension { invoke_count: usize, + ready: AtomicBool, +} + +impl Default for MyExtension { + fn default() -> Self { + Self { + invoke_count: usize::default(), + // New instances are not ready to be called until polled. + ready: false.into(), + } + } +} + +impl Clone for MyExtension { + fn clone(&self) -> Self { + Self { + invoke_count: self.invoke_count, + // Cloned instances may not be immediately ready to be called. + // https://docs.rs/tower/0.4.13/tower/trait.Service.html#be-careful-when-cloning-inner-services + ready: false.into(), + } + } } impl Service for MyExtension { @@ -16,6 +39,12 @@ impl Service for MyExtension { type Response = (); fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll> { + if self.ready.swap(true, Ordering::SeqCst) { + info!("[extension] Service was already ready"); + } else { + info!("[extension] Service is now ready"); + }; + core::task::Poll::Ready(Ok(())) } @@ -30,6 +59,17 @@ impl Service for MyExtension { } } + // After being called once, the service is no longer ready until polled again. + if self.ready.swap(false, Ordering::SeqCst) { + info!("[extension] The service is ready"); + } else { + // https://docs.rs/tower/latest/tower/trait.Service.html#backpressure + // https://docs.rs/tower/latest/tower/trait.Service.html#be-careful-when-cloning-inner-services + // > Services are permitted to panic if `call` is invoked without obtaining + // > `Poll::Ready(Ok(()))` from `poll_ready`. + panic!("[extension] The service is not ready; `.poll_ready()` must be called first"); + } + Box::pin(ready(Ok(()))) } } diff --git a/lambda-integration-tests/src/bin/http-trait.rs b/lambda-integration-tests/src/bin/http-trait.rs index 67cc9fc5..fffe0db0 100644 --- a/lambda-integration-tests/src/bin/http-trait.rs +++ b/lambda-integration-tests/src/bin/http-trait.rs @@ -1,13 +1,36 @@ -use lambda_http::{Body, Error, Request, RequestExt, Response, Service}; use std::{ future::{ready, Future}, pin::Pin, + sync::atomic::{AtomicBool, Ordering}, }; + +use lambda_http::{Body, Error, Request, RequestExt, Response, Service}; use tracing::info; -#[derive(Default)] struct MyHandler { invoke_count: usize, + ready: AtomicBool, +} + +impl Default for MyHandler { + fn default() -> Self { + Self { + invoke_count: usize::default(), + // New instances are not ready to be called until polled. + ready: false.into(), + } + } +} + +impl Clone for MyHandler { + fn clone(&self) -> Self { + Self { + invoke_count: self.invoke_count, + // Cloned instances may not be immediately ready to be called. + // https://docs.rs/tower/0.4.13/tower/trait.Service.html#be-careful-when-cloning-inner-services + ready: false.into(), + } + } } impl Service for MyHandler { @@ -16,6 +39,12 @@ impl Service for MyHandler { type Response = Response; fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll> { + if self.ready.swap(true, Ordering::SeqCst) { + info!("[http-trait] Service was already ready"); + } else { + info!("[http-trait] Service is now ready"); + }; + core::task::Poll::Ready(Ok(())) } @@ -23,6 +52,18 @@ impl Service for MyHandler { self.invoke_count += 1; info!("[http-trait] Received event {}: {:?}", self.invoke_count, request); info!("[http-trait] Lambda context: {:?}", request.lambda_context()); + + // After being called once, the service is no longer ready until polled again. + if self.ready.swap(false, Ordering::SeqCst) { + info!("[http-trait] The service is ready"); + } else { + // https://docs.rs/tower/latest/tower/trait.Service.html#backpressure + // https://docs.rs/tower/latest/tower/trait.Service.html#be-careful-when-cloning-inner-services + // > Services are permitted to panic if `call` is invoked without obtaining + // > `Poll::Ready(Ok(()))` from `poll_ready`. + panic!("[http-trait] The service is not ready; `.poll_ready()` must be called first"); + } + Box::pin(ready(Ok(Response::builder() .status(200) .body(Body::from("Hello, World!")) diff --git a/lambda-integration-tests/src/bin/runtime-trait.rs b/lambda-integration-tests/src/bin/runtime-trait.rs index d86bafdc..b925e138 100644 --- a/lambda-integration-tests/src/bin/runtime-trait.rs +++ b/lambda-integration-tests/src/bin/runtime-trait.rs @@ -1,9 +1,11 @@ -use lambda_runtime::{Error, LambdaEvent, Service}; -use serde::{Deserialize, Serialize}; use std::{ future::{ready, Future}, pin::Pin, + sync::atomic::{AtomicBool, Ordering}, }; + +use lambda_runtime::{Error, LambdaEvent, Service}; +use serde::{Deserialize, Serialize}; use tracing::info; #[derive(Deserialize, Debug)] @@ -16,9 +18,30 @@ struct Response { message: String, } -#[derive(Default)] struct MyHandler { invoke_count: usize, + ready: AtomicBool, +} + +impl Default for MyHandler { + fn default() -> Self { + Self { + invoke_count: usize::default(), + // New instances are not ready to be called until polled. + ready: false.into(), + } + } +} + +impl Clone for MyHandler { + fn clone(&self) -> Self { + Self { + invoke_count: self.invoke_count, + // Cloned instances may not be immediately ready to be called. + // https://docs.rs/tower/0.4.13/tower/trait.Service.html#be-careful-when-cloning-inner-services + ready: false.into(), + } + } } impl Service> for MyHandler { @@ -27,12 +50,30 @@ impl Service> for MyHandler { type Response = Response; fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll> { + if self.ready.swap(true, Ordering::SeqCst) { + info!("[runtime-trait] Service was already ready"); + } else { + info!("[runtime-trait] Service is now ready"); + }; + core::task::Poll::Ready(Ok(())) } fn call(&mut self, request: LambdaEvent) -> Self::Future { self.invoke_count += 1; - info!("[handler] Received event {}: {:?}", self.invoke_count, request); + info!("[runtime-trait] Received event {}: {:?}", self.invoke_count, request); + + // After being called once, the service is no longer ready until polled again. + if self.ready.swap(false, Ordering::SeqCst) { + info!("[runtime-trait] The service is ready"); + } else { + // https://docs.rs/tower/latest/tower/trait.Service.html#backpressure + // https://docs.rs/tower/latest/tower/trait.Service.html#be-careful-when-cloning-inner-services + // > Services are permitted to panic if `call` is invoked without obtaining + // > `Poll::Ready(Ok(()))` from `poll_ready`. + panic!("[runtime-trait] The service is not ready; `.poll_ready()` must be called first"); + } + Box::pin(ready(Ok(Response { message: request.payload.command.to_uppercase(), })))