Skip to content

Commit c737920

Browse files
authored
Fix kinesis example (#493)
* Remove kinesis example The repository doesn't recognize this directory properly. Signed-off-by: David Calavera <[email protected]> * Add example back Signed-off-by: David Calavera <[email protected]>
1 parent 2dedba8 commit c737920

File tree

4 files changed

+97
-1
lines changed

4 files changed

+97
-1
lines changed
Lines changed: 0 additions & 1 deletion
This file was deleted.
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
[package]
2+
name = "lambda-logs-firehose-extension"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
# Use cargo-edit(https://github.com/killercup/cargo-edit#installation)
7+
# to manage dependencies.
8+
# Running `cargo add DEPENDENCY_NAME` will
9+
# add the latest version of a dependency to the list,
10+
# and it will keep the alphabetic ordering for you.
11+
12+
[dependencies]
13+
lambda-extension = { path = "../../lambda-extension" }
14+
tokio = { version = "1.17.0", features = ["full"] }
15+
aws-config = "0.13.0"
16+
aws-sdk-firehose = "0.13.0"
17+
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# AWS Lambda Logs extension example
2+
3+
## Build & Deploy
4+
5+
1. Install [cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation)
6+
2. Build the extension with `cargo lambda build --release --extension`
7+
3. Deploy the extension as a layer with `cargo lambda deploy --extension`
8+
9+
The last command will give you an ARN for the extension layer that you can use in your functions.
10+
11+
12+
## Build for ARM 64
13+
14+
Build the extension with `cargo lambda build --release --extension --arm64`
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use aws_sdk_firehose::{model::Record, types::Blob, Client};
2+
use lambda_extension::{Error, Extension, LambdaLog, LambdaLogRecord, Service, SharedService};
3+
use std::{future::Future, pin::Pin, task::Poll};
4+
5+
#[derive(Clone)]
6+
struct FirehoseLogsProcessor {
7+
client: Client,
8+
}
9+
10+
impl FirehoseLogsProcessor {
11+
pub fn new(client: Client) -> Self {
12+
FirehoseLogsProcessor { client }
13+
}
14+
}
15+
16+
/// Implementation of the actual log processor
17+
///
18+
/// This receives a `Vec<LambdaLog>` whenever there are new log entries available.
19+
impl Service<Vec<LambdaLog>> for FirehoseLogsProcessor {
20+
type Response = ();
21+
type Error = Error;
22+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
23+
24+
fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll<Result<(), Self::Error>> {
25+
Poll::Ready(Ok(()))
26+
}
27+
28+
fn call(&mut self, logs: Vec<LambdaLog>) -> Self::Future {
29+
let mut records = Vec::with_capacity(logs.len());
30+
31+
for log in logs {
32+
match log.record {
33+
LambdaLogRecord::Function(record) => {
34+
records.push(Record::builder().data(Blob::new(record.as_bytes())).build())
35+
}
36+
_ => unreachable!(),
37+
}
38+
}
39+
40+
let fut = self
41+
.client
42+
.put_record_batch()
43+
.set_records(Some(records))
44+
.delivery_stream_name(std::env::var("KINESIS_DELIVERY_STREAM").unwrap())
45+
.send();
46+
47+
Box::pin(async move {
48+
let _ = fut.await?;
49+
Ok(())
50+
})
51+
}
52+
}
53+
54+
#[tokio::main]
55+
async fn main() -> Result<(), Error> {
56+
let config = aws_config::load_from_env().await;
57+
let logs_processor = SharedService::new(FirehoseLogsProcessor::new(Client::new(&config)));
58+
59+
Extension::new()
60+
.with_log_types(&["function"])
61+
.with_logs_processor(logs_processor)
62+
.run()
63+
.await?;
64+
65+
Ok(())
66+
}

0 commit comments

Comments
 (0)