diff --git a/Cargo.toml b/Cargo.toml index 502b841..85804ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ default = ["http_server", "rand", "uuid", "tracing-span-filter"] hyper = ["dep:hyper", "http-body-util", "restate-sdk-shared-core/http"] http_server = ["hyper", "hyper/server", "hyper/http2", "hyper-util", "tokio/net", "tokio/signal", "tokio/macros"] tracing-span-filter = ["dep:tracing-subscriber"] +lambda = [ "dep:http-serde", "dep:lambda_runtime", "dep:aws_lambda_events"] [dependencies] bytes = "1.10" @@ -44,6 +45,9 @@ tokio = { version = "1.44", default-features = false, features = ["sync"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["registry"], optional = true } uuid = { version = "1.16.0", optional = true } +http-serde = { version = "2.1.1", optional = true } +aws_lambda_events = { version = "0.16.1", optional = true } +lambda_runtime = { version = "0.14.2", optional = true } [dev-dependencies] tokio = { version = "1", features = ["full"] } diff --git a/README.md b/README.md index 23d15c2..ba779cf 100644 --- a/README.md +++ b/README.md @@ -10,11 +10,11 @@ ## Community -* 🤗️ [Join our online community](https://discord.gg/skW3AZ6uGd) for help, sharing feedback and talking to the community. -* 📖 [Check out our documentation](https://docs.restate.dev) to get quickly started! -* 📣 [Follow us on Twitter](https://twitter.com/restatedev) for staying up to date. -* 🙋 [Create a GitHub issue](https://github.com/restatedev/sdk-java/issues) for requesting a new feature or reporting a problem. -* 🏠 [Visit our GitHub org](https://github.com/restatedev) for exploring other repositories. +- 🤗️ [Join our online community](https://discord.gg/skW3AZ6uGd) for help, sharing feedback and talking to the community. +- 📖 [Check out our documentation](https://docs.restate.dev) to get quickly started! +- 📣 [Follow us on Twitter](https://twitter.com/restatedev) for staying up to date. +- 🙋 [Create a GitHub issue](https://github.com/restatedev/sdk-java/issues) for requesting a new feature or reporting a problem. +- 🏠 [Visit our GitHub org](https://github.com/restatedev) for exploring other repositories. ## Using the SDK @@ -58,6 +58,75 @@ async fn main() { } ``` +## Running on Lambda + +The Restate Rust SDK supports running services on AWS Lambda using Lambda Function URLs. This allows you to deploy your Restate services as serverless functions. + +### Setup + +First, enable the `lambda` feature in your `Cargo.toml`: + +```toml +[dependencies] +restate-sdk = { version = "0.1", features = ["lambda"] } +tokio = { version = "1", features = ["full"] } +``` + +### Basic Lambda Service + +Here's how to create a simple Lambda service: + +```rust +use restate_sdk::prelude::*; + +#[restate_sdk::service] +trait Greeter { + async fn greet(name: String) -> HandlerResult; +} + +struct GreeterImpl; + +impl Greeter for GreeterImpl { + async fn greet(&self, _: Context<'_>, name: String) -> HandlerResult { + Ok(format!("Greetings {name}")) + } +} + +#[tokio::main] +async fn main() { + // To enable logging/tracing + // check https://docs.aws.amazon.com/lambda/latest/dg/rust-logging.html#rust-logging-tracing + + // Build and run the Lambda endpoint + LambdaEndpoint::run( + Endpoint::builder() + .bind(GreeterImpl.serve()) + .build(), + ) + .await + .unwrap(); +} +``` + +### Deployment + +1. Install `cargo-lambda` + ``` + cargo install cargo-lambda + ``` +2. Build your Lambda function: + + ```bash + cargo lambda build --release --arm64 --output-format zip + ``` + +3. Create a Lambda function with the following configuration: + + - **Runtime**: Amazon Linux 2023 + - **Architecture**: arm64 + +4. Upload your `zip` file to the Lambda function. + ### Logging The SDK uses tokio's [`tracing`](https://docs.rs/tracing/latest/tracing/) crate to generate logs. @@ -121,15 +190,15 @@ The Rust SDK is currently in active development, and might break across releases The compatibility with Restate is described in the following table: -| Restate Server\sdk-rust | 0.0 - 0.2 | 0.3 | 0.4 - 0.5 | 0.6 | -|-------------------------|-----------|-----|-----------|------------------| -| 1.0 | ✅ | ❌ | ❌ | ❌ | -| 1.1 | ✅ | ✅ | ❌ | ❌ | -| 1.2 | ✅ | ✅ | ❌ | ❌ | -| 1.3 | ✅ | ✅ | ✅ | ✅ (1) | -| 1.4 | ✅ | ✅ | ✅ | ✅ | +| Restate Server\sdk-rust | 0.0 - 0.2 | 0.3 | 0.4 - 0.5 | 0.6 | +| ----------------------- | --------- | --- | --------- | ----------------- | +| 1.0 | ✅ | ❌ | ❌ | ❌ | +| 1.1 | ✅ | ✅ | ❌ | ❌ | +| 1.2 | ✅ | ✅ | ❌ | ❌ | +| 1.3 | ✅ | ✅ | ✅ | ✅ (1) | +| 1.4 | ✅ | ✅ | ✅ | ✅ | -(1) **Note** `bind_with_options` works only from Restate 1.4 onward. +(1) **Note** `bind_with_options` works only from Restate 1.4 onward. ## Contributing diff --git a/src/endpoint/mod.rs b/src/endpoint/mod.rs index 58a2c51..31d12b9 100644 --- a/src/endpoint/mod.rs +++ b/src/endpoint/mod.rs @@ -190,7 +190,7 @@ impl Endpoint { pub fn handle + Send> + Send + 'static>( &self, req: http::Request, - ) -> Result, Error> { + ) -> http::Response { self.handle_with_options(req, HandleOptions::default()) } @@ -201,13 +201,13 @@ impl Endpoint { &self, req: http::Request, options: HandleOptions, - ) -> Result, Error> { + ) -> http::Response { let (parts, body) = req.into_parts(); let path = parts.uri.path(); let headers = parts.headers; if let Err(e) = self.0.identity_verifier.verify_identity(&headers, path) { - return Err(ErrorInner::IdentityVerification(e).into()); + return error_response(ErrorInner::IdentityVerification(e)); } let parts: Vec<&str> = path.split('/').collect(); @@ -221,9 +221,9 @@ impl Endpoint { // Parse service name/handler name let (svc_name, handler_name) = match parts.get(parts.len() - 3..) { - None => return Ok(error_response(ErrorInner::BadPath(path.to_owned()))), + None => return error_response(ErrorInner::BadPath(path.to_owned())), Some(last_elements) if last_elements[0] != "invoke" => { - return Ok(error_response(ErrorInner::BadPath(path.to_owned()))) + return error_response(ErrorInner::BadPath(path.to_owned())) } Some(last_elements) => (last_elements[1].to_owned(), last_elements[2].to_owned()), }; @@ -231,7 +231,7 @@ impl Endpoint { // Prepare vm let vm = match CoreVM::new(headers, Default::default()) { Ok(vm) => vm, - Err(e) => return Ok(error_response(e)), + Err(e) => return error_response(e), }; let ResponseHead { status_code, @@ -241,9 +241,7 @@ impl Endpoint { // Resolve service if !self.0.svcs.contains_key(&svc_name) { - return Ok(error_response(ErrorInner::UnknownService( - svc_name.to_owned(), - ))); + return error_response(ErrorInner::UnknownService(svc_name.to_owned())); } // Prepare handle_invocation future @@ -269,7 +267,7 @@ impl Endpoint { invocation_response_builder = invocation_response_builder.header(key.deref(), value.deref()); } - Ok(invocation_response_builder + invocation_response_builder .body( Either::Right(InvocationRunnerBody { fut: Some(handle_invocation_fut), @@ -278,25 +276,25 @@ impl Endpoint { }) .into(), ) - .expect("Headers should be valid")) + .expect("Headers should be valid") } - fn handle_health(&self) -> Result, Error> { - Ok(simple_response(200, vec![], Bytes::default())) + fn handle_health(&self) -> http::Response { + simple_response(200, vec![], Bytes::default()) } fn handle_discovery( &self, headers: http::HeaderMap, protocol_mode: ProtocolMode, - ) -> Result, Error> { + ) -> http::Response { // Extract Accept header from request let accept_header = match headers .extract("accept") .map_err(|e| ErrorInner::Header("accept".to_owned(), Box::new(e))) { Ok(h) => h, - Err(e) => return Ok(error_response(e)), + Err(e) => return error_response(e), }; // Negotiate discovery protocol version @@ -310,17 +308,15 @@ impl Endpoint { version = 2; content_type = DISCOVERY_CONTENT_TYPE_V2; } else { - return Ok(error_response(ErrorInner::BadDiscoveryVersion( - accept.to_owned(), - ))); + return error_response(ErrorInner::BadDiscoveryVersion(accept.to_owned())); } } if let Err(e) = self.validate_discovery_request(version) { - return Ok(error_response(e)); + return error_response(e); } - Ok(simple_response( + simple_response( 200, vec![Header { key: "content-type".into(), @@ -340,7 +336,7 @@ impl Endpoint { }) .expect("Discovery should be serializable"), ), - )) + ) } fn validate_discovery_request(&self, version: usize) -> Result<(), ErrorInner> { diff --git a/src/hyper.rs b/src/hyper.rs index 3733615..f89bce8 100644 --- a/src/hyper.rs +++ b/src/hyper.rs @@ -6,6 +6,7 @@ use crate::endpoint::{Endpoint, HandleOptions, ProtocolMode}; use http::{Request, Response}; use hyper::body::Incoming; use hyper::service::Service; +use std::convert::Infallible; use std::future::{ready, Ready}; /// Wraps [`Endpoint`] to implement hyper [`Service`]. @@ -20,15 +21,15 @@ impl HyperEndpoint { impl Service> for HyperEndpoint { type Response = Response; - type Error = endpoint::Error; + type Error = Infallible; type Future = Ready>; fn call(&self, req: Request) -> Self::Future { - ready(self.0.handle_with_options( + ready(Ok(self.0.handle_with_options( req, HandleOptions { protocol_mode: ProtocolMode::BidiStream, }, - )) + ))) } } diff --git a/src/lambda.rs b/src/lambda.rs new file mode 100644 index 0000000..51cc1ea --- /dev/null +++ b/src/lambda.rs @@ -0,0 +1,232 @@ +//! ## Running on Lambda +//! +//! The Restate Rust SDK supports running services on AWS Lambda using Lambda Function URLs. This allows you to deploy your Restate services as serverless functions. +//! +//! ### Setup +//! +//! First, enable the `lambda` feature in your `Cargo.toml`: +//! +//! ```toml +//! [dependencies] +//! restate-sdk = { version = "0.1", features = ["lambda"] } +//! tokio = { version = "1", features = ["full"] } +//! ``` +//! +//! ### Basic Lambda Service +//! +//! Here's how to create a simple Lambda service: +//! +//! ```rust,no_run +//! #![cfg(feature = "lambda")] +//! +//! use restate_sdk::prelude::*; +//! +//! #[restate_sdk::service] +//! trait Greeter { +//! async fn greet(name: String) -> HandlerResult; +//! } +//! +//! struct GreeterImpl; +//! +//! impl Greeter for GreeterImpl { +//! async fn greet(&self, _: Context<'_>, name: String) -> HandlerResult { +//! Ok(format!("Greetings {name}")) +//! } +//! } +//! +//! #[tokio::main] +//! async fn main() { +//! // To enable logging/tracing +//! // check https://docs.aws.amazon.com/lambda/latest/dg/rust-logging.html#rust-logging-tracing +//! +//! // Build and run the Lambda endpoint +//! LambdaEndpoint::run( +//! Endpoint::builder() +//! .bind(GreeterImpl.serve()) +//! .build(), +//! ) +//! .await +//! .unwrap(); +//! } +//! ``` +//! +//! ### Deployment +//! +//! 1. Install `cargo-lambda` +//! ```bash +//! cargo install cargo-lambda +//! ``` +//! 2. Build your Lambda function: +//! +//! ```bash +//! cargo lambda build --release --arm64 --output-format zip +//! ``` +//! +//! 3. Create a Lambda function with the following configuration: +//! +//! - **Runtime**: Amazon Linux 2023 +//! - **Architecture**: arm64 +//! +//! 4. Upload your `zip` file to the Lambda function. + +use std::future::Future; + +use aws_lambda_events::encodings::Base64Data; +use bytes::Bytes; +use http::{HeaderMap, Method, Request, Uri}; +use http_body_util::{BodyExt, Full}; +use lambda_runtime::service_fn; +use lambda_runtime::tower::ServiceExt; +use lambda_runtime::LambdaEvent; +use serde::{Deserialize, Serialize}; + +use crate::endpoint::{Endpoint, HandleOptions, ProtocolMode}; + +/// Represents an incoming request from AWS Lambda when using Lambda Function URLs. +/// +/// This struct is used to deserialize the JSON payload from Lambda. +#[derive(Clone, Debug, Default, Deserialize, PartialEq)] +#[serde(rename_all = "camelCase")] +struct LambdaRequest { + /// The HTTP method of the request. + // #[serde(with = "http_method")] + #[serde(with = "http_serde::method")] + pub http_method: Method, + /// The path of the request. + #[serde(default)] + #[serde(with = "http_serde::uri")] + pub path: Uri, + /// The headers of the request. + #[serde(with = "http_serde::header_map", default)] + pub headers: HeaderMap, + /// Whether the request body is Base64 encoded. + pub is_base64_encoded: bool, + /// The request body, if any. + pub body: Option, +} + +/// Represents a response to be sent back to AWS Lambda. +/// +/// This struct is serialized to JSON to form the response payload for Lambda. +#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +struct LambdaResponse { + /// The HTTP status code. + pub status_code: u16, + /// An optional status description. + #[serde(default)] + pub status_description: Option, + /// The response headers. + #[serde(with = "http_serde::header_map", default)] + pub headers: HeaderMap, + /// The optional response body, Base64 encoded. + #[serde(skip_serializing_if = "Option::is_none")] + pub body: Option, + /// Whether the response body is Base64 encoded. This should generally be `true` + /// when a body is present. + #[serde(default)] + pub is_base64_encoded: bool, +} + +impl LambdaResponse { + fn builder() -> LambdaResponseBuilder { + LambdaResponseBuilder { + status_code: 200, + status_description: None, + headers: HeaderMap::default(), + body: None, + } + } +} + +struct LambdaResponseBuilder { + status_code: u16, + status_description: Option, + headers: HeaderMap, + body: Option, +} + +impl LambdaResponseBuilder { + pub fn status_code(mut self, status_code: u16) -> Self { + self.status_code = status_code; + self.status_description = http::StatusCode::from_u16(status_code) + .map(|s| s.to_string()) + .ok(); + self + } + + pub fn body(mut self, body: Bytes) -> Self { + self.body = Some(Base64Data(body.into())); + self + } + + pub fn build(self) -> LambdaResponse { + LambdaResponse { + status_code: self.status_code, + status_description: self.status_description, + headers: self.headers, + body: self.body, + is_base64_encoded: true, + } + } +} + +/// Wraps an [`Endpoint`] to implement the `lambda_runtime::Service` trait for AWS Lambda. +/// +/// This adapter allows a Restate endpoint to be deployed as an AWS Lambda function. +/// It handles the conversion between Lambda's request/response format and the +/// internal representation used by the SDK. +#[derive(Clone)] +pub struct LambdaEndpoint; + +impl LambdaEndpoint { + /// Runs the Lambda service. + /// + /// This function starts the `lambda_runtime` and begins processing incoming + /// Lambda events, passing them to the wrapped [`Endpoint`]. + pub fn run(endpoint: Endpoint) -> impl Future> { + let svc = service_fn(handle); + let svc = svc.map_request(move |req| { + let endpoint = endpoint.clone(); + LambdaEventWithEndpoint { + inner: req, + endpoint, + } + }); + + lambda_runtime::run(svc) + } +} + +struct LambdaEventWithEndpoint { + inner: LambdaEvent, + endpoint: Endpoint, +} + +async fn handle(req: LambdaEventWithEndpoint) -> Result { + let (request, _) = req.inner.into_parts(); + + let mut http_request = Request::builder() + .method(request.http_method) + .uri(request.path) + .body(request.body.map(|b| Full::from(b.0)).unwrap_or_default()) + .expect("to build"); + + http_request.headers_mut().extend(request.headers); + + let response = req.endpoint.handle_with_options( + http_request, + HandleOptions { + protocol_mode: ProtocolMode::RequestResponse, + }, + ); + + let (parts, body) = response.into_parts(); + + let body = body.collect().await?.to_bytes(); + + let mut builder = LambdaResponse::builder().status_code(parts.status.as_u16()); + builder.headers.extend(parts.headers); + + Ok(builder.body(body).build()) +} diff --git a/src/lib.rs b/src/lib.rs index 195017a..bd1f018 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -226,6 +226,8 @@ pub mod filter; pub mod http_server; #[cfg(feature = "hyper")] pub mod hyper; +#[cfg(feature = "lambda")] +pub mod lambda; pub mod serde; /// Entry-point macro to define a Restate [Service](https://docs.restate.dev/concepts/services#services-1). @@ -509,6 +511,9 @@ pub mod prelude { #[cfg(feature = "http_server")] pub use crate::http_server::HttpServer; + #[cfg(feature = "lambda")] + pub use crate::lambda::LambdaEndpoint; + pub use crate::context::{ CallFuture, Context, ContextAwakeables, ContextClient, ContextPromises, ContextReadState, ContextSideEffects, ContextTimers, ContextWriteState, HeaderMap, InvocationHandle,