Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ default = ["http_server", "rand", "uuid", "tracing-span-filter"]
hyper = ["dep:hyper", "http-body-util", "restate-sdk-shared-core/http"]
http_server = ["hyper", "hyper/server", "hyper/http2", "hyper-util", "tokio/net", "tokio/signal", "tokio/macros"]
tracing-span-filter = ["dep:tracing-subscriber"]
lambda = [ "dep:http-serde", "dep:lambda_runtime", "dep:aws_lambda_events"]

[dependencies]
bytes = "1.10"
Expand All @@ -44,6 +45,9 @@ tokio = { version = "1.44", default-features = false, features = ["sync"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["registry"], optional = true }
uuid = { version = "1.16.0", optional = true }
http-serde = { version = "2.1.1", optional = true }
aws_lambda_events = { version = "0.16.1", optional = true }
lambda_runtime = { version = "0.14.2", optional = true }

[dev-dependencies]
tokio = { version = "1", features = ["full"] }
Expand Down
95 changes: 82 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@

## Community

* 🤗️ [Join our online community](https://discord.gg/skW3AZ6uGd) for help, sharing feedback and talking to the community.
* 📖 [Check out our documentation](https://docs.restate.dev) to get quickly started!
* 📣 [Follow us on Twitter](https://twitter.com/restatedev) for staying up to date.
* 🙋 [Create a GitHub issue](https://github.com/restatedev/sdk-java/issues) for requesting a new feature or reporting a problem.
* 🏠 [Visit our GitHub org](https://github.com/restatedev) for exploring other repositories.
- 🤗️ [Join our online community](https://discord.gg/skW3AZ6uGd) for help, sharing feedback and talking to the community.
- 📖 [Check out our documentation](https://docs.restate.dev) to get quickly started!
- 📣 [Follow us on Twitter](https://twitter.com/restatedev) for staying up to date.
- 🙋 [Create a GitHub issue](https://github.com/restatedev/sdk-java/issues) for requesting a new feature or reporting a problem.
- 🏠 [Visit our GitHub org](https://github.com/restatedev) for exploring other repositories.

## Using the SDK

Expand Down Expand Up @@ -58,6 +58,75 @@ async fn main() {
}
```

## Running on Lambda

The Restate Rust SDK supports running services on AWS Lambda using Lambda Function URLs. This allows you to deploy your Restate services as serverless functions.

### Setup

First, enable the `lambda` feature in your `Cargo.toml`:

```toml
[dependencies]
restate-sdk = { version = "0.1", features = ["lambda"] }
tokio = { version = "1", features = ["full"] }
```

### Basic Lambda Service

Here's how to create a simple Lambda service:

```rust
use restate_sdk::prelude::*;

#[restate_sdk::service]
trait Greeter {
async fn greet(name: String) -> HandlerResult<String>;
}

struct GreeterImpl;

impl Greeter for GreeterImpl {
async fn greet(&self, _: Context<'_>, name: String) -> HandlerResult<String> {
Ok(format!("Greetings {name}"))
}
}

#[tokio::main]
async fn main() {
// To enable logging/tracing
// check https://docs.aws.amazon.com/lambda/latest/dg/rust-logging.html#rust-logging-tracing

// Build and run the Lambda endpoint
LambdaEndpoint::run(
Endpoint::builder()
.bind(GreeterImpl.serve())
.build(),
)
.await
.unwrap();
}
```

### Deployment

1. Install `cargo-lambda`
```
cargo install cargo-lambda
```
2. Build your Lambda function:

```bash
cargo lambda build --release --arm64 --output-format zip
```

3. Create a Lambda function with the following configuration:

- **Runtime**: Amazon Linux 2023
- **Architecture**: arm64

4. Upload your `zip` file to the Lambda function.

### Logging

The SDK uses tokio's [`tracing`](https://docs.rs/tracing/latest/tracing/) crate to generate logs.
Expand Down Expand Up @@ -121,15 +190,15 @@ The Rust SDK is currently in active development, and might break across releases

The compatibility with Restate is described in the following table:

| Restate Server\sdk-rust | 0.0 - 0.2 | 0.3 | 0.4 - 0.5 | 0.6 |
|-------------------------|-----------|-----|-----------|------------------|
| 1.0 | ✅ | ❌ | ❌ | ❌ |
| 1.1 | ✅ | ✅ | ❌ | ❌ |
| 1.2 | ✅ | ✅ | ❌ | ❌ |
| 1.3 | ✅ | ✅ | ✅ | ✅ <sup>(1)</sup> |
| 1.4 | ✅ | ✅ | ✅ | ✅ |
| Restate Server\sdk-rust | 0.0 - 0.2 | 0.3 | 0.4 - 0.5 | 0.6 |
| ----------------------- | --------- | --- | --------- | ----------------- |
| 1.0 | ✅ | ❌ | ❌ | ❌ |
| 1.1 | ✅ | ✅ | ❌ | ❌ |
| 1.2 | ✅ | ✅ | ❌ | ❌ |
| 1.3 | ✅ | ✅ | ✅ | ✅ <sup>(1)</sup> |
| 1.4 | ✅ | ✅ | ✅ | ✅ |

<sup>(1)</sup> **Note** `bind_with_options` works only from Restate 1.4 onward.
<sup>(1)</sup> **Note** `bind_with_options` works only from Restate 1.4 onward.

## Contributing

Expand Down
38 changes: 17 additions & 21 deletions src/endpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl Endpoint {
pub fn handle<B: Body<Data = Bytes, Error: Into<BoxError> + Send> + Send + 'static>(
&self,
req: http::Request<B>,
) -> Result<http::Response<ResponseBody>, Error> {
) -> http::Response<ResponseBody> {
self.handle_with_options(req, HandleOptions::default())
}

Expand All @@ -201,13 +201,13 @@ impl Endpoint {
&self,
req: http::Request<B>,
options: HandleOptions,
) -> Result<http::Response<ResponseBody>, Error> {
) -> http::Response<ResponseBody> {
let (parts, body) = req.into_parts();
let path = parts.uri.path();
let headers = parts.headers;

if let Err(e) = self.0.identity_verifier.verify_identity(&headers, path) {
return Err(ErrorInner::IdentityVerification(e).into());
return error_response(ErrorInner::IdentityVerification(e));
}

let parts: Vec<&str> = path.split('/').collect();
Expand All @@ -221,17 +221,17 @@ impl Endpoint {

// Parse service name/handler name
let (svc_name, handler_name) = match parts.get(parts.len() - 3..) {
None => return Ok(error_response(ErrorInner::BadPath(path.to_owned()))),
None => return error_response(ErrorInner::BadPath(path.to_owned())),
Some(last_elements) if last_elements[0] != "invoke" => {
return Ok(error_response(ErrorInner::BadPath(path.to_owned())))
return error_response(ErrorInner::BadPath(path.to_owned()))
}
Some(last_elements) => (last_elements[1].to_owned(), last_elements[2].to_owned()),
};

// Prepare vm
let vm = match CoreVM::new(headers, Default::default()) {
Ok(vm) => vm,
Err(e) => return Ok(error_response(e)),
Err(e) => return error_response(e),
};
let ResponseHead {
status_code,
Expand All @@ -241,9 +241,7 @@ impl Endpoint {

// Resolve service
if !self.0.svcs.contains_key(&svc_name) {
return Ok(error_response(ErrorInner::UnknownService(
svc_name.to_owned(),
)));
return error_response(ErrorInner::UnknownService(svc_name.to_owned()));
}

// Prepare handle_invocation future
Expand All @@ -269,7 +267,7 @@ impl Endpoint {
invocation_response_builder =
invocation_response_builder.header(key.deref(), value.deref());
}
Ok(invocation_response_builder
invocation_response_builder
.body(
Either::Right(InvocationRunnerBody {
fut: Some(handle_invocation_fut),
Expand All @@ -278,25 +276,25 @@ impl Endpoint {
})
.into(),
)
.expect("Headers should be valid"))
.expect("Headers should be valid")
}

fn handle_health(&self) -> Result<http::Response<ResponseBody>, Error> {
Ok(simple_response(200, vec![], Bytes::default()))
fn handle_health(&self) -> http::Response<ResponseBody> {
simple_response(200, vec![], Bytes::default())
}

fn handle_discovery(
&self,
headers: http::HeaderMap,
protocol_mode: ProtocolMode,
) -> Result<http::Response<ResponseBody>, Error> {
) -> http::Response<ResponseBody> {
// Extract Accept header from request
let accept_header = match headers
.extract("accept")
.map_err(|e| ErrorInner::Header("accept".to_owned(), Box::new(e)))
{
Ok(h) => h,
Err(e) => return Ok(error_response(e)),
Err(e) => return error_response(e),
};

// Negotiate discovery protocol version
Expand All @@ -310,17 +308,15 @@ impl Endpoint {
version = 2;
content_type = DISCOVERY_CONTENT_TYPE_V2;
} else {
return Ok(error_response(ErrorInner::BadDiscoveryVersion(
accept.to_owned(),
)));
return error_response(ErrorInner::BadDiscoveryVersion(accept.to_owned()));
}
}

if let Err(e) = self.validate_discovery_request(version) {
return Ok(error_response(e));
return error_response(e);
}

Ok(simple_response(
simple_response(
200,
vec![Header {
key: "content-type".into(),
Expand All @@ -340,7 +336,7 @@ impl Endpoint {
})
.expect("Discovery should be serializable"),
),
))
)
}

fn validate_discovery_request(&self, version: usize) -> Result<(), ErrorInner> {
Expand Down
7 changes: 4 additions & 3 deletions src/hyper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::endpoint::{Endpoint, HandleOptions, ProtocolMode};
use http::{Request, Response};
use hyper::body::Incoming;
use hyper::service::Service;
use std::convert::Infallible;
use std::future::{ready, Ready};

/// Wraps [`Endpoint`] to implement hyper [`Service`].
Expand All @@ -20,15 +21,15 @@ impl HyperEndpoint {

impl Service<Request<Incoming>> for HyperEndpoint {
type Response = Response<endpoint::ResponseBody>;
type Error = endpoint::Error;
type Error = Infallible;
type Future = Ready<Result<Self::Response, Self::Error>>;

fn call(&self, req: Request<Incoming>) -> Self::Future {
ready(self.0.handle_with_options(
ready(Ok(self.0.handle_with_options(
req,
HandleOptions {
protocol_mode: ProtocolMode::BidiStream,
},
))
)))
}
}
Loading