diff --git a/examples/extension-logs-kinesis-firehose b/examples/extension-logs-kinesis-firehose deleted file mode 160000 index e2596af7..00000000 --- a/examples/extension-logs-kinesis-firehose +++ /dev/null @@ -1 +0,0 @@ -Subproject commit e2596af77d7abdf8d1c2b6e6f78cac0b6ac3e64f diff --git a/examples/extension-logs-kinesis-firehose/Cargo.toml b/examples/extension-logs-kinesis-firehose/Cargo.toml new file mode 100644 index 00000000..942d0a93 --- /dev/null +++ b/examples/extension-logs-kinesis-firehose/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "lambda-logs-firehose-extension" +version = "0.1.0" +edition = "2021" + +# Use cargo-edit(https://github.com/killercup/cargo-edit#installation) +# to manage dependencies. +# Running `cargo add DEPENDENCY_NAME` will +# add the latest version of a dependency to the list, +# and it will keep the alphabetic ordering for you. + +[dependencies] +lambda-extension = { path = "../../lambda-extension" } +tokio = { version = "1.17.0", features = ["full"] } +aws-config = "0.13.0" +aws-sdk-firehose = "0.13.0" + diff --git a/examples/extension-logs-kinesis-firehose/README.md b/examples/extension-logs-kinesis-firehose/README.md new file mode 100644 index 00000000..6f9636c9 --- /dev/null +++ b/examples/extension-logs-kinesis-firehose/README.md @@ -0,0 +1,14 @@ +# AWS Lambda Logs extension example + +## Build & Deploy + +1. Install [cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation) +2. Build the extension with `cargo lambda build --release --extension` +3. Deploy the extension as a layer with `cargo lambda deploy --extension` + +The last command will give you an ARN for the extension layer that you can use in your functions. + + +## Build for ARM 64 + +Build the extension with `cargo lambda build --release --extension --arm64` diff --git a/examples/extension-logs-kinesis-firehose/src/main.rs b/examples/extension-logs-kinesis-firehose/src/main.rs new file mode 100644 index 00000000..d771cf4b --- /dev/null +++ b/examples/extension-logs-kinesis-firehose/src/main.rs @@ -0,0 +1,66 @@ +use aws_sdk_firehose::{model::Record, types::Blob, Client}; +use lambda_extension::{Error, Extension, LambdaLog, LambdaLogRecord, Service, SharedService}; +use std::{future::Future, pin::Pin, task::Poll}; + +#[derive(Clone)] +struct FirehoseLogsProcessor { + client: Client, +} + +impl FirehoseLogsProcessor { + pub fn new(client: Client) -> Self { + FirehoseLogsProcessor { client } + } +} + +/// Implementation of the actual log processor +/// +/// This receives a `Vec` whenever there are new log entries available. +impl Service> for FirehoseLogsProcessor { + type Response = (); + type Error = Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, logs: Vec) -> Self::Future { + let mut records = Vec::with_capacity(logs.len()); + + for log in logs { + match log.record { + LambdaLogRecord::Function(record) => { + records.push(Record::builder().data(Blob::new(record.as_bytes())).build()) + } + _ => unreachable!(), + } + } + + let fut = self + .client + .put_record_batch() + .set_records(Some(records)) + .delivery_stream_name(std::env::var("KINESIS_DELIVERY_STREAM").unwrap()) + .send(); + + Box::pin(async move { + let _ = fut.await?; + Ok(()) + }) + } +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + let config = aws_config::load_from_env().await; + let logs_processor = SharedService::new(FirehoseLogsProcessor::new(Client::new(&config))); + + Extension::new() + .with_log_types(&["function"]) + .with_logs_processor(logs_processor) + .run() + .await?; + + Ok(()) +}