From 97ed59d2a574fda1856f6a6436414a259f3e2cd4 Mon Sep 17 00:00:00 2001 From: Daniel Cormier Date: Thu, 6 Oct 2022 10:13:55 -0400 Subject: [PATCH 1/3] Check that `lambda_http`s inner service is ready before calling it This allows for service [backpressure](https://docs.rs/tower/0.4.13/tower/trait.Service.html#backpressure). Additionally: > Services are permitted to panic if `call` is invoked without obtaining `Poll::Ready(Ok(()))` from `poll_ready`. [Source](https://docs.rs/tower/0.4.13/tower/trait.Service.html#be-careful-when-cloning-inner-services) --- lambda-http/src/lib.rs | 4 +- .../src/bin/http-trait.rs | 45 ++++++++++++++++++- 2 files changed, 45 insertions(+), 4 deletions(-) diff --git a/lambda-http/src/lib.rs b/lambda-http/src/lib.rs index 3581002f..d8d1e942 100644 --- a/lambda-http/src/lib.rs +++ b/lambda-http/src/lib.rs @@ -155,8 +155,8 @@ where type Error = E; type Future = TransformResponse<'a, R, Self::Error>; - fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll> { - core::task::Poll::Ready(Ok(())) + fn poll_ready(&mut self, cx: &mut core::task::Context<'_>) -> core::task::Poll> { + self.service.poll_ready(cx) } fn call(&mut self, req: LambdaEvent) -> Self::Future { diff --git a/lambda-integration-tests/src/bin/http-trait.rs b/lambda-integration-tests/src/bin/http-trait.rs index 67cc9fc5..fffe0db0 100644 --- a/lambda-integration-tests/src/bin/http-trait.rs +++ b/lambda-integration-tests/src/bin/http-trait.rs @@ -1,13 +1,36 @@ -use lambda_http::{Body, Error, Request, RequestExt, Response, Service}; use std::{ future::{ready, Future}, pin::Pin, + sync::atomic::{AtomicBool, Ordering}, }; + +use lambda_http::{Body, Error, Request, RequestExt, Response, Service}; use tracing::info; -#[derive(Default)] struct MyHandler { invoke_count: usize, + ready: AtomicBool, +} + +impl Default for MyHandler { + fn default() -> Self { + Self { + invoke_count: usize::default(), + // New instances are not ready to be called until polled. + ready: false.into(), + } + } +} + +impl Clone for MyHandler { + fn clone(&self) -> Self { + Self { + invoke_count: self.invoke_count, + // Cloned instances may not be immediately ready to be called. + // https://docs.rs/tower/0.4.13/tower/trait.Service.html#be-careful-when-cloning-inner-services + ready: false.into(), + } + } } impl Service for MyHandler { @@ -16,6 +39,12 @@ impl Service for MyHandler { type Response = Response; fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll> { + if self.ready.swap(true, Ordering::SeqCst) { + info!("[http-trait] Service was already ready"); + } else { + info!("[http-trait] Service is now ready"); + }; + core::task::Poll::Ready(Ok(())) } @@ -23,6 +52,18 @@ impl Service for MyHandler { self.invoke_count += 1; info!("[http-trait] Received event {}: {:?}", self.invoke_count, request); info!("[http-trait] Lambda context: {:?}", request.lambda_context()); + + // After being called once, the service is no longer ready until polled again. + if self.ready.swap(false, Ordering::SeqCst) { + info!("[http-trait] The service is ready"); + } else { + // https://docs.rs/tower/latest/tower/trait.Service.html#backpressure + // https://docs.rs/tower/latest/tower/trait.Service.html#be-careful-when-cloning-inner-services + // > Services are permitted to panic if `call` is invoked without obtaining + // > `Poll::Ready(Ok(()))` from `poll_ready`. + panic!("[http-trait] The service is not ready; `.poll_ready()` must be called first"); + } + Box::pin(ready(Ok(Response::builder() .status(200) .body(Body::from("Hello, World!")) From 8db76be63de35316e991280b6872352ab5b99e2c Mon Sep 17 00:00:00 2001 From: Daniel Cormier Date: Thu, 6 Oct 2022 12:25:46 -0400 Subject: [PATCH 2/3] Check that `lambda_extension`s inner service is ready before calling it This allows for service [backpressure](https://docs.rs/tower/0.4.13/tower/trait.Service.html#backpressure). Additionally: > Services are permitted to panic if `call` is invoked without obtaining `Poll::Ready(Ok(()))` from `poll_ready`. [Source](https://docs.rs/tower/0.4.13/tower/trait.Service.html#be-careful-when-cloning-inner-services) --- lambda-extension/src/extension.rs | 25 +++++++++-- .../src/bin/extension-trait.rs | 44 ++++++++++++++++++- 2 files changed, 63 insertions(+), 6 deletions(-) diff --git a/lambda-extension/src/extension.rs b/lambda-extension/src/extension.rs index 81462c24..ec83ce71 100644 --- a/lambda-extension/src/extension.rs +++ b/lambda-extension/src/extension.rs @@ -1,14 +1,16 @@ -use crate::{logs::*, requests, Error, ExtensionError, LambdaEvent, NextEvent}; -use hyper::{server::conn::AddrStream, Server}; -use lambda_runtime_api_client::Client; use std::{ convert::Infallible, fmt, future::ready, future::Future, net::SocketAddr, path::PathBuf, pin::Pin, sync::Arc, }; + +use hyper::{server::conn::AddrStream, Server}; +use lambda_runtime_api_client::Client; use tokio::sync::Mutex; use tokio_stream::StreamExt; -use tower::{service_fn, MakeService, Service}; +use tower::{service_fn, MakeService, Service, ServiceExt}; use tracing::{error, trace}; +use crate::{logs::*, requests, Error, ExtensionError, LambdaEvent, NextEvent}; + const DEFAULT_LOG_PORT_NUMBER: u16 = 9002; /// An Extension that runs event and log processors @@ -199,6 +201,21 @@ where let event = LambdaEvent::new(extension_id, event); + let ep = match ep.ready().await { + Ok(ep) => ep, + Err(error) => { + println!("Inner service is not ready: {:?}", error); + let req = if is_invoke { + requests::init_error(extension_id, &error.to_string(), None)? + } else { + requests::exit_error(extension_id, &error.to_string(), None)? + }; + + client.call(req).await?; + return Err(error.into()); + } + }; + let res = ep.call(event).await; if let Err(error) = res { println!("{:?}", error); diff --git a/lambda-integration-tests/src/bin/extension-trait.rs b/lambda-integration-tests/src/bin/extension-trait.rs index 1dc73c75..ecf46c81 100644 --- a/lambda-integration-tests/src/bin/extension-trait.rs +++ b/lambda-integration-tests/src/bin/extension-trait.rs @@ -1,13 +1,36 @@ -use lambda_extension::{Error, LambdaEvent, NextEvent, Service}; use std::{ future::{ready, Future}, pin::Pin, + sync::atomic::{AtomicBool, Ordering}, }; + +use lambda_extension::{Error, LambdaEvent, NextEvent, Service}; use tracing::info; -#[derive(Default)] struct MyExtension { invoke_count: usize, + ready: AtomicBool, +} + +impl Default for MyExtension { + fn default() -> Self { + Self { + invoke_count: usize::default(), + // New instances are not ready to be called until polled. + ready: false.into(), + } + } +} + +impl Clone for MyExtension { + fn clone(&self) -> Self { + Self { + invoke_count: self.invoke_count, + // Cloned instances may not be immediately ready to be called. + // https://docs.rs/tower/0.4.13/tower/trait.Service.html#be-careful-when-cloning-inner-services + ready: false.into(), + } + } } impl Service for MyExtension { @@ -16,6 +39,12 @@ impl Service for MyExtension { type Response = (); fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll> { + if self.ready.swap(true, Ordering::SeqCst) { + info!("[extension] Service was already ready"); + } else { + info!("[extension] Service is now ready"); + }; + core::task::Poll::Ready(Ok(())) } @@ -30,6 +59,17 @@ impl Service for MyExtension { } } + // After being called once, the service is no longer ready until polled again. + if self.ready.swap(false, Ordering::SeqCst) { + info!("[extension] The service is ready"); + } else { + // https://docs.rs/tower/latest/tower/trait.Service.html#backpressure + // https://docs.rs/tower/latest/tower/trait.Service.html#be-careful-when-cloning-inner-services + // > Services are permitted to panic if `call` is invoked without obtaining + // > `Poll::Ready(Ok(()))` from `poll_ready`. + panic!("[extension] The service is not ready; `.poll_ready()` must be called first"); + } + Box::pin(ready(Ok(()))) } } From 5d3f3f9effb642b6e049b06a15cbafb244bf0d78 Mon Sep 17 00:00:00 2001 From: Daniel Cormier Date: Thu, 6 Oct 2022 14:55:49 -0400 Subject: [PATCH 3/3] Check that `lambda_runtime`s inner service is ready before calling it This allows for service [backpressure](https://docs.rs/tower/0.4.13/tower/trait.Service.html#backpressure). Additionally: > Services are permitted to panic if `call` is invoked without obtaining `Poll::Ready(Ok(()))` from `poll_ready`. [Source](https://docs.rs/tower/0.4.13/tower/trait.Service.html#be-careful-when-cloning-inner-services) --- .../src/bin/runtime-trait.rs | 49 +++++++++++++++++-- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/lambda-integration-tests/src/bin/runtime-trait.rs b/lambda-integration-tests/src/bin/runtime-trait.rs index d86bafdc..b925e138 100644 --- a/lambda-integration-tests/src/bin/runtime-trait.rs +++ b/lambda-integration-tests/src/bin/runtime-trait.rs @@ -1,9 +1,11 @@ -use lambda_runtime::{Error, LambdaEvent, Service}; -use serde::{Deserialize, Serialize}; use std::{ future::{ready, Future}, pin::Pin, + sync::atomic::{AtomicBool, Ordering}, }; + +use lambda_runtime::{Error, LambdaEvent, Service}; +use serde::{Deserialize, Serialize}; use tracing::info; #[derive(Deserialize, Debug)] @@ -16,9 +18,30 @@ struct Response { message: String, } -#[derive(Default)] struct MyHandler { invoke_count: usize, + ready: AtomicBool, +} + +impl Default for MyHandler { + fn default() -> Self { + Self { + invoke_count: usize::default(), + // New instances are not ready to be called until polled. + ready: false.into(), + } + } +} + +impl Clone for MyHandler { + fn clone(&self) -> Self { + Self { + invoke_count: self.invoke_count, + // Cloned instances may not be immediately ready to be called. + // https://docs.rs/tower/0.4.13/tower/trait.Service.html#be-careful-when-cloning-inner-services + ready: false.into(), + } + } } impl Service> for MyHandler { @@ -27,12 +50,30 @@ impl Service> for MyHandler { type Response = Response; fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll> { + if self.ready.swap(true, Ordering::SeqCst) { + info!("[runtime-trait] Service was already ready"); + } else { + info!("[runtime-trait] Service is now ready"); + }; + core::task::Poll::Ready(Ok(())) } fn call(&mut self, request: LambdaEvent) -> Self::Future { self.invoke_count += 1; - info!("[handler] Received event {}: {:?}", self.invoke_count, request); + info!("[runtime-trait] Received event {}: {:?}", self.invoke_count, request); + + // After being called once, the service is no longer ready until polled again. + if self.ready.swap(false, Ordering::SeqCst) { + info!("[runtime-trait] The service is ready"); + } else { + // https://docs.rs/tower/latest/tower/trait.Service.html#backpressure + // https://docs.rs/tower/latest/tower/trait.Service.html#be-careful-when-cloning-inner-services + // > Services are permitted to panic if `call` is invoked without obtaining + // > `Poll::Ready(Ok(()))` from `poll_ready`. + panic!("[runtime-trait] The service is not ready; `.poll_ready()` must be called first"); + } + Box::pin(ready(Ok(Response { message: request.payload.command.to_uppercase(), })))