Skip to content

Commit bc9367b

Browse files
nmoutschencalavera
andauthored
feat(lambda-extension): Logs API processor (#416)
* Organize crate into modules. Signed-off-by: David Calavera <[email protected]> * Initial logs processor implementation - Reorganize builder to support more options - Use no-op processors to play nice with generic types Signed-off-by: David Calavera <[email protected]> * Fix value returned by the register function. Signed-off-by: David Calavera <[email protected]> * Add logs subcription call Send request to the Lambda API to subscribe the extension to the logs API. Use the 2021-03-18 schema version to receive all new log types. Signed-off-by: David Calavera <[email protected]> * Clean logs file. Signed-off-by: David Calavera <[email protected]> * feat(lambda-extension): log processor server (DOES NOT WORK) * feat(lambda-extension): use MakeService for log processor (DOES NOT WORK) * feat(lambda-extension): restrict trait bounds for Identity/MakeIdentity * feat(lambda-extension): deserialize Vec<LambdaLog> * test: add integration tests for Logs API handler * chore: cargo fmt * feat(lambda-extension): add tracing and customizable port number for Logs API * feat(lambda-extension): implement LambdaLogRecord enum * fix(lambda-extension): fix bounds for with_logs_processor() * feat(lambda-extension): use chrono::DateTime for LambdaLog time * feat(lambda-extension): add combined example * docs(lambda-extension): add example for logs processor * feat(lambda-integration-tests): update log processor * fix(lambda-integration-tests): add tracing config to logs-trait * chore: cargo fmt Co-authored-by: David Calavera <[email protected]>
1 parent 5f8eb34 commit bc9367b

16 files changed

+1018
-243
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ INTEG_STACK_NAME ?= rust-lambda-integration-tests
22
INTEG_FUNCTIONS_BUILD := runtime-fn runtime-trait http-fn http-trait
33
INTEG_FUNCTIONS_INVOKE := RuntimeFn RuntimeFnAl2 RuntimeTrait RuntimeTraitAl2 Python PythonAl2
44
INTEG_API_INVOKE := RestApiUrl HttpApiUrl
5-
INTEG_EXTENSIONS := extension-fn extension-trait
5+
INTEG_EXTENSIONS := extension-fn extension-trait logs-trait
66
# Using musl to run extensions on both AL1 and AL2
77
INTEG_ARCH := x86_64-unknown-linux-musl
88

lambda-extension/Cargo.toml

+7-6
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,18 @@ keywords = ["AWS", "Lambda", "API"]
1111
readme = "README.md"
1212

1313
[dependencies]
14-
tokio = { version = "1.0", features = ["macros", "io-util", "sync", "rt-multi-thread"] }
14+
async-stream = "0.3"
15+
bytes = "1.0"
16+
chrono = { version = "0.4", features = ["serde"] }
17+
http = "0.2"
1518
hyper = { version = "0.14", features = ["http1", "client", "server", "stream", "runtime"] }
19+
lambda_runtime_api_client = { version = "0.4", path = "../lambda-runtime-api-client" }
1620
serde = { version = "1", features = ["derive"] }
1721
serde_json = "^1"
18-
bytes = "1.0"
19-
http = "0.2"
20-
async-stream = "0.3"
2122
tracing = { version = "0.1", features = ["log"] }
22-
tower = { version = "0.4", features = ["util"] }
23+
tokio = { version = "1.0", features = ["macros", "io-util", "sync", "rt-multi-thread"] }
2324
tokio-stream = "0.1.2"
24-
lambda_runtime_api_client = { version = "0.4", path = "../lambda-runtime-api-client" }
25+
tower = { version = "0.4", features = ["make", "util"] }
2526

2627
[dev-dependencies]
2728
simple-error = "0.2"

lambda-extension/README.md

+38-2
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22

33
[![Docs](https://docs.rs/lambda_extension/badge.svg)](https://docs.rs/lambda_extension)
44

5-
**`lambda-extension`** is a library that makes it easy to write [AWS Lambda Runtime Extensions](https://docs.aws.amazon.com/lambda/latest/dg/using-extensions.html) in Rust.
5+
**`lambda-extension`** is a library that makes it easy to write [AWS Lambda Runtime Extensions](https://docs.aws.amazon.com/lambda/latest/dg/using-extensions.html) in Rust. It also helps with using [Lambda Logs API](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-logs-api.html).
66

7-
## Example extension
7+
## Example extensions
8+
9+
### Simple extension
810

911
The code below creates a simple extension that's registered to every `INVOKE` and `SHUTDOWN` events, and logs them in CloudWatch.
1012

@@ -37,6 +39,40 @@ async fn main() -> Result<(), Error> {
3739
3840
```
3941

42+
### Log processor extension
43+
44+
```rust,no_run
45+
use lambda_extension::{service_fn, Error, Extension, LambdaLog, LambdaLogRecord, SharedService};
46+
use tracing::info;
47+
48+
async fn handler(logs: Vec<LambdaLog>) -> Result<(), Error> {
49+
for log in logs {
50+
match log.record {
51+
LambdaLogRecord::Function(_record) => {
52+
// do something with the function log record
53+
},
54+
LambdaLogRecord::Extension(_record) => {
55+
// do something with the extension log record
56+
},
57+
},
58+
_ => (),
59+
}
60+
}
61+
62+
Ok(())
63+
}
64+
65+
#[tokio::main]
66+
async fn main() -> Result<(), Error> {
67+
let logs_processor = SharedService::new(service_fn(handler));
68+
69+
Extension::new().with_logs_processor(logs_processor).run().await?;
70+
71+
Ok(())
72+
}
73+
74+
```
75+
4076
## Deployment
4177

4278
Lambda extensions can be added to your functions either using [Lambda layers](https://docs.aws.amazon.com/lambda/latest/dg/using-extensions.html#using-extensions-config), or adding them to [containers images](https://docs.aws.amazon.com/lambda/latest/dg/using-extensions.html#invocation-extensions-images).

lambda-extension/examples/combined.rs

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
use lambda_extension::{
2+
service_fn, Error, Extension, LambdaEvent, LambdaLog, LambdaLogRecord, NextEvent, SharedService,
3+
};
4+
use tracing::info;
5+
6+
async fn my_extension(event: LambdaEvent) -> Result<(), Error> {
7+
match event.next {
8+
NextEvent::Shutdown(_e) => {
9+
// do something with the shutdown event
10+
}
11+
NextEvent::Invoke(_e) => {
12+
// do something with the invoke event
13+
}
14+
}
15+
Ok(())
16+
}
17+
18+
async fn my_log_processor(logs: Vec<LambdaLog>) -> Result<(), Error> {
19+
for log in logs {
20+
match log.record {
21+
LambdaLogRecord::Function(record) => info!("[logs] [function] {}", record),
22+
LambdaLogRecord::Extension(record) => info!("[logs] [extension] {}", record),
23+
_ => (),
24+
}
25+
}
26+
27+
Ok(())
28+
}
29+
30+
#[tokio::main]
31+
async fn main() -> Result<(), Error> {
32+
// The runtime logging can be enabled here by initializing `tracing` with `tracing-subscriber`
33+
// While `tracing` is used internally, `log` can be used as well if preferred.
34+
tracing_subscriber::fmt()
35+
.with_max_level(tracing::Level::INFO)
36+
// this needs to be set to false, otherwise ANSI color codes will
37+
// show up in a confusing manner in CloudWatch logs.
38+
.with_ansi(false)
39+
// disabling time is handy because CloudWatch will add the ingestion time.
40+
.without_time()
41+
.init();
42+
43+
let func = service_fn(my_extension);
44+
let logs_processor = SharedService::new(service_fn(my_log_processor));
45+
46+
Extension::new()
47+
.with_events_processor(func)
48+
.with_logs_processor(logs_processor)
49+
.run()
50+
.await
51+
}

lambda-extension/examples/custom_events.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use lambda_extension::{service_fn, Error, LambdaEvent, NextEvent, Runtime};
1+
use lambda_extension::{service_fn, Error, Extension, LambdaEvent, NextEvent};
22

33
async fn my_extension(event: LambdaEvent) -> Result<(), Error> {
44
match event.next {
@@ -27,9 +27,9 @@ async fn main() -> Result<(), Error> {
2727
.without_time()
2828
.init();
2929

30-
let func = service_fn(my_extension);
31-
32-
let runtime = Runtime::builder().with_events(&["SHUTDOWN"]).register().await?;
33-
34-
runtime.run(func).await
30+
Extension::new()
31+
.with_events(&["SHUTDOWN"])
32+
.with_events_processor(service_fn(my_extension))
33+
.run()
34+
.await
3535
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
use lambda_extension::{Error, Extension, LambdaLog, LambdaLogRecord, Service, SharedService};
2+
use std::{
3+
future::{ready, Future},
4+
pin::Pin,
5+
sync::{
6+
atomic::{AtomicUsize, Ordering::SeqCst},
7+
Arc,
8+
},
9+
task::Poll,
10+
};
11+
use tracing::info;
12+
13+
/// Custom log processor that increments a counter for each log record.
14+
///
15+
/// This is a simple example of a custom log processor that can be used to
16+
/// count the number of log records that are processed.
17+
///
18+
/// This needs to derive Clone (and store the counter in an Arc) as the runtime
19+
/// could need multiple `Service`s to process the logs.
20+
#[derive(Clone, Default)]
21+
struct MyLogsProcessor {
22+
counter: Arc<AtomicUsize>,
23+
}
24+
25+
impl MyLogsProcessor {
26+
pub fn new() -> Self {
27+
Self::default()
28+
}
29+
}
30+
31+
/// Implementation of the actual log processor
32+
///
33+
/// This receives a `Vec<LambdaLog>` whenever there are new log entries available.
34+
impl Service<Vec<LambdaLog>> for MyLogsProcessor {
35+
type Response = ();
36+
type Error = Error;
37+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
38+
39+
fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll<Result<(), Self::Error>> {
40+
Poll::Ready(Ok(()))
41+
}
42+
43+
fn call(&mut self, logs: Vec<LambdaLog>) -> Self::Future {
44+
let counter = self.counter.fetch_add(1, SeqCst);
45+
for log in logs {
46+
match log.record {
47+
LambdaLogRecord::Function(record) => info!("[logs] [function] {}: {}", counter, record),
48+
LambdaLogRecord::Extension(record) => info!("[logs] [extension] {}: {}", counter, record),
49+
_ => (),
50+
}
51+
}
52+
53+
Box::pin(ready(Ok(())))
54+
}
55+
}
56+
57+
#[tokio::main]
58+
async fn main() -> Result<(), Error> {
59+
let logs_processor = SharedService::new(MyLogsProcessor::new());
60+
61+
Extension::new().with_logs_processor(logs_processor).run().await?;
62+
63+
Ok(())
64+
}

lambda-extension/examples/logs.rs

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use lambda_extension::{service_fn, Error, Extension, LambdaLog, LambdaLogRecord, SharedService};
2+
use tracing::info;
3+
4+
async fn handler(logs: Vec<LambdaLog>) -> Result<(), Error> {
5+
for log in logs {
6+
match log.record {
7+
LambdaLogRecord::Function(record) => info!("[logs] [function] {}", record),
8+
LambdaLogRecord::Extension(record) => info!("[logs] [extension] {}", record),
9+
_ => (),
10+
}
11+
}
12+
13+
Ok(())
14+
}
15+
16+
#[tokio::main]
17+
async fn main() -> Result<(), Error> {
18+
let logs_processor = SharedService::new(service_fn(handler));
19+
20+
Extension::new().with_logs_processor(logs_processor).run().await?;
21+
22+
Ok(())
23+
}

lambda-extension/src/error.rs

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/// Error type that extensions may result in
2+
pub type Error = lambda_runtime_api_client::Error;
3+
4+
/// Simple error that encapsulates human readable descriptions
5+
#[derive(Clone, Debug, PartialEq, Eq)]
6+
pub struct ExtensionError {
7+
err: String,
8+
}
9+
10+
impl ExtensionError {
11+
pub(crate) fn boxed<T: Into<String>>(str: T) -> Box<ExtensionError> {
12+
Box::new(ExtensionError { err: str.into() })
13+
}
14+
}
15+
16+
impl std::fmt::Display for ExtensionError {
17+
#[inline]
18+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
19+
self.err.fmt(f)
20+
}
21+
}
22+
23+
impl std::error::Error for ExtensionError {}

lambda-extension/src/events.rs

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
use serde::Deserialize;
2+
3+
/// Request tracing information
4+
#[derive(Debug, Deserialize)]
5+
#[serde(rename_all = "camelCase")]
6+
pub struct Tracing {
7+
/// The type of tracing exposed to the extension
8+
pub r#type: String,
9+
/// The span value
10+
pub value: String,
11+
}
12+
/// Event received when there is a new Lambda invocation.
13+
#[derive(Debug, Deserialize)]
14+
#[serde(rename_all = "camelCase")]
15+
pub struct InvokeEvent {
16+
/// The time that the function times out
17+
pub deadline_ms: u64,
18+
/// The ID assigned to the Lambda request
19+
pub request_id: String,
20+
/// The function's Amazon Resource Name
21+
pub invoked_function_arn: String,
22+
/// The request tracing information
23+
pub tracing: Tracing,
24+
}
25+
26+
/// Event received when a Lambda function shuts down.
27+
#[derive(Debug, Deserialize)]
28+
#[serde(rename_all = "camelCase")]
29+
pub struct ShutdownEvent {
30+
/// The reason why the function terminates
31+
/// It can be SPINDOWN, TIMEOUT, or FAILURE
32+
pub shutdown_reason: String,
33+
/// The time that the function times out
34+
pub deadline_ms: u64,
35+
}
36+
37+
/// Event that the extension receives in
38+
/// either the INVOKE or SHUTDOWN phase
39+
#[derive(Debug, Deserialize)]
40+
#[serde(rename_all = "UPPERCASE", tag = "eventType")]
41+
pub enum NextEvent {
42+
/// Payload when the event happens in the INVOKE phase
43+
Invoke(InvokeEvent),
44+
/// Payload when the event happens in the SHUTDOWN phase
45+
Shutdown(ShutdownEvent),
46+
}
47+
48+
impl NextEvent {
49+
/// Return whether the event is a [`NextEvent::Invoke`] event or not
50+
pub fn is_invoke(&self) -> bool {
51+
matches!(self, NextEvent::Invoke(_))
52+
}
53+
}
54+
55+
/// Wrapper with information about the next
56+
/// event that the Lambda Runtime is going to process
57+
pub struct LambdaEvent {
58+
/// ID assigned to this extension by the Lambda Runtime
59+
pub extension_id: String,
60+
/// Next incoming event
61+
pub next: NextEvent,
62+
}
63+
64+
impl LambdaEvent {
65+
pub(crate) fn new(ex_id: &str, next: NextEvent) -> LambdaEvent {
66+
LambdaEvent {
67+
extension_id: ex_id.into(),
68+
next,
69+
}
70+
}
71+
}

0 commit comments

Comments
 (0)