Skip to content

[WIP] Logs api connection #407

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions lambda-extension/examples/custom_events.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use lambda_extension::{service_fn, Error, LambdaEvent, NextEvent, Runtime};
use lambda_extension::{service_fn, Error, Extension, LambdaEvent, NextEvent};

async fn my_extension(event: LambdaEvent) -> Result<(), Error> {
match event.next {
Expand Down Expand Up @@ -27,9 +27,9 @@ async fn main() -> Result<(), Error> {
.without_time()
.init();

let func = service_fn(my_extension);

let runtime = Runtime::builder().with_events(&["SHUTDOWN"]).register().await?;

runtime.run(func).await
Extension::new()
.with_events(&["SHUTDOWN"])
.with_events_processor(service_fn(my_extension))
.run()
.await
}
23 changes: 23 additions & 0 deletions lambda-extension/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/// Error type that extensions may result in
pub type Error = lambda_runtime_api_client::Error;

/// Simple error that encapsulates human readable descriptions
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ExtensionError {
err: String,
}

impl ExtensionError {
pub(crate) fn boxed<T: Into<String>>(str: T) -> Box<ExtensionError> {
Box::new(ExtensionError { err: str.into() })
}
}

impl std::fmt::Display for ExtensionError {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.err.fmt(f)
}
}

impl std::error::Error for ExtensionError {}
71 changes: 71 additions & 0 deletions lambda-extension/src/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use serde::Deserialize;

/// Request tracing information
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Tracing {
/// The type of tracing exposed to the extension
pub r#type: String,
/// The span value
pub value: String,
}
/// Event received when there is a new Lambda invocation.
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct InvokeEvent {
/// The time that the function times out
pub deadline_ms: u64,
/// The ID assigned to the Lambda request
pub request_id: String,
/// The function's Amazon Resource Name
pub invoked_function_arn: String,
/// The request tracing information
pub tracing: Tracing,
}

/// Event received when a Lambda function shuts down.
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ShutdownEvent {
/// The reason why the function terminates
/// It can be SPINDOWN, TIMEOUT, or FAILURE
pub shutdown_reason: String,
/// The time that the function times out
pub deadline_ms: u64,
}

/// Event that the extension receives in
/// either the INVOKE or SHUTDOWN phase
#[derive(Debug, Deserialize)]
#[serde(rename_all = "UPPERCASE", tag = "eventType")]
pub enum NextEvent {
/// Payload when the event happens in the INVOKE phase
Invoke(InvokeEvent),
/// Payload when the event happens in the SHUTDOWN phase
Shutdown(ShutdownEvent),
}

impl NextEvent {
/// Return whether the event is a [`NextEvent::Invoke`] event or not
pub fn is_invoke(&self) -> bool {
matches!(self, NextEvent::Invoke(_))
}
}

/// Wrapper with information about the next
/// event that the Lambda Runtime is going to process
pub struct LambdaEvent {
/// ID assigned to this extension by the Lambda Runtime
pub extension_id: String,
/// Next incoming event
pub next: NextEvent,
}

impl LambdaEvent {
pub(crate) fn new(ex_id: &str, next: NextEvent) -> LambdaEvent {
LambdaEvent {
extension_id: ex_id.into(),
next,
}
}
}
234 changes: 234 additions & 0 deletions lambda-extension/src/extension.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
use crate::{logs::*, requests, Error, ExtensionError, LambdaEvent, NextEvent};
use lambda_runtime_api_client::Client;
use std::{fmt, future::ready, future::Future, path::PathBuf, pin::Pin};
use tokio_stream::StreamExt;
use tower::Service;
use tracing::trace;

/// An Extension that runs event and log processors
pub struct Extension<'a, E, L> {
extension_name: Option<&'a str>,
events: Option<&'a [&'a str]>,
events_processor: E,
log_types: Option<&'a [&'a str]>,
logs_processor: Option<L>,
log_buffering: Option<LogBuffering>,
}

impl<'a> Extension<'a, Identity<LambdaEvent>, Identity<LambdaLog>> {
/// Create a new base [`Extension`] with a no-op events processor
pub fn new() -> Self {
Extension {
extension_name: None,
events: None,
events_processor: Identity::new(),
log_types: None,
log_buffering: None,
logs_processor: None,
}
}
}

impl<'a> Default for Extension<'a, Identity<LambdaEvent>, Identity<LambdaLog>> {
fn default() -> Self {
Self::new()
}
}

impl<'a, E, L> Extension<'a, E, L>
where
E: Service<LambdaEvent>,
E::Future: Future<Output = Result<(), E::Error>>,
E::Error: Into<Box<dyn std::error::Error + Send + Sync>> + fmt::Display,

L: Service<LambdaLog>,
L::Future: Future<Output = Result<(), L::Error>>,
L::Error: Into<Box<dyn std::error::Error + Send + Sync>> + fmt::Display,
{
/// Create a new [`Extension`] with a given extension name
pub fn with_extension_name(self, extension_name: &'a str) -> Self {
Extension {
extension_name: Some(extension_name),
..self
}
}

/// Create a new [`Extension`] with a list of given events.
/// The only accepted events are `INVOKE` and `SHUTDOWN`.
pub fn with_events(self, events: &'a [&'a str]) -> Self {
Extension {
events: Some(events),
..self
}
}

/// Create a new [`Extension`] with a service that receives Lambda events.
pub fn with_events_processor<N>(self, ep: N) -> Extension<'a, N, L>
where
N: Service<LambdaEvent>,
N::Future: Future<Output = Result<(), N::Error>>,
N::Error: Into<Box<dyn std::error::Error + Send + Sync>> + fmt::Display,
{
Extension {
events_processor: ep,
extension_name: self.extension_name,
events: self.events,
log_types: self.log_types,
log_buffering: self.log_buffering,
logs_processor: self.logs_processor,
}
}

/// Create a new [`Extension`] with a service that receives Lambda logs.
pub fn with_logs_processor<N>(self, lp: N) -> Extension<'a, E, N>
where
N: Service<LambdaLog>,
N::Future: Future<Output = Result<(), N::Error>>,
N::Error: Into<Box<dyn std::error::Error + Send + Sync>> + fmt::Display,
{
Extension {
logs_processor: Some(lp),
events_processor: self.events_processor,
extension_name: self.extension_name,
events: self.events,
log_types: self.log_types,
log_buffering: self.log_buffering,
}
}

/// Create a new [`Extension`] with a list of logs types to subscribe.
/// The only accepted log types are `function`, `platform`, and `extension`.
pub fn with_log_types(self, log_types: &'a [&'a str]) -> Self {
Extension {
log_types: Some(log_types),
..self
}
}

/// Create a new [`Extension`] with specific configuration to buffer logs.
pub fn with_log_buffering(self, lb: LogBuffering) -> Self {
Extension {
log_buffering: Some(lb),
..self
}
}

/// Execute the given extension
pub async fn run(self) -> Result<(), Error> {
let client = &Client::builder().build()?;

let extension_id = register(client, self.extension_name, self.events).await?;
let extension_id = extension_id.to_str()?;
let mut ep = self.events_processor;

if let Some(mut _lp) = self.logs_processor {
// fixme(david):
// - Spawn task to run processor

// - Call Logs API to start receiving events
let req = requests::subscribe_logs_request(extension_id, self.log_types, self.log_buffering)?;
let res = client.call(req).await?;
if res.status() != http::StatusCode::OK {
return Err(ExtensionError::boxed("unable to initialize the logs api"));
}
}

let incoming = async_stream::stream! {
loop {
trace!("Waiting for next event (incoming loop)");
let req = requests::next_event_request(extension_id)?;
let res = client.call(req).await;
yield res;
}
};

tokio::pin!(incoming);
while let Some(event) = incoming.next().await {
trace!("New event arrived (run loop)");
let event = event?;
let (_parts, body) = event.into_parts();

let body = hyper::body::to_bytes(body).await?;
trace!("{}", std::str::from_utf8(&body)?); // this may be very verbose
let event: NextEvent = serde_json::from_slice(&body)?;
let is_invoke = event.is_invoke();

let event = LambdaEvent::new(extension_id, event);

let res = ep.call(event).await;
if let Err(error) = res {
let req = if is_invoke {
requests::init_error(extension_id, &error.to_string(), None)?
} else {
requests::exit_error(extension_id, &error.to_string(), None)?
};

client.call(req).await?;
return Err(error.into());
}
}
Ok(())
}
}

/// A no-op generic processor
pub struct Identity<T> {
_pd: std::marker::PhantomData<T>,
}

impl<T> Identity<T> {
fn new() -> Identity<T> {
Identity {
_pd: std::marker::PhantomData,
}
}
}

impl<T> Service<T> for Identity<T> {
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
type Response = ();

fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll<Result<(), Self::Error>> {
core::task::Poll::Ready(Ok(()))
}

fn call(&mut self, _event: T) -> Self::Future {
Box::pin(ready(Ok(())))
}
}

/// Initialize and register the extension in the Extensions API
async fn register<'a>(
client: &'a Client,
extension_name: Option<&'a str>,
events: Option<&'a [&'a str]>,
) -> Result<http::HeaderValue, Error> {
let name = match extension_name {
Some(name) => name.into(),
None => {
let args: Vec<String> = std::env::args().collect();
PathBuf::from(args[0].clone())
.file_name()
.expect("unexpected executable name")
.to_str()
.expect("unexpect executable name")
.to_string()
}
};

let events = events.unwrap_or(&["INVOKE", "SHUTDOWN"]);

let req = requests::register_request(&name, events)?;
let res = client.call(req).await?;
if res.status() != http::StatusCode::OK {
return Err(ExtensionError::boxed("unable to register the extension"));
}

let header = res
.headers()
.get(requests::EXTENSION_ID_HEADER)
.ok_or_else(|| ExtensionError::boxed("missing extension id header"))
.map_err(|e| ExtensionError::boxed(e.to_string()))?;
Ok(header.clone())
}
Loading