Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
name: Run CI
on:
push:
branches: [main]
branches:
- "main"
- "feat/**"
paths-ignore:
- "**.md" # Do not need to run CI for markdown changes.
pull_request:
branches: [main]
branches:
- "main"
- "feat/**"
paths-ignore:
- "**.md"

Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/release-please.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ name: Run Release Please
on:
push:
branches:
- main
- "main"
- "feat/**"

jobs:
release-package:
Expand Down
5 changes: 3 additions & 2 deletions contract-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ eventsource-client = { path = "../eventsource-client" }
serde_json = { version = "1.0.39"}
actix = { version = "0.13.1"}
actix-web = { version = "4"}
reqwest = { version = "0.11.6", default-features = false, features = ["json", "rustls-tls"] }
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls", "stream"] }
env_logger = { version = "0.10.0" }
hyper = { version = "0.14.19", features = ["client", "http1", "tcp"] }
log = "0.4.6"
http = "1.0"
bytes = "1.5"

[[bin]]
name = "sse-test-api"
84 changes: 79 additions & 5 deletions contract-tests/src/bin/sse-test-api/stream_entity.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,84 @@
use actix_web::rt::task::JoinHandle;
use futures::TryStreamExt;
use futures::{StreamExt, TryStreamExt};
use log::error;
use std::{
sync::{Arc, Mutex},
time::Duration,
};

use eventsource_client as es;
use eventsource_client::{ByteStream, HttpTransport, ResponseFuture, TransportError};

use crate::{Config, EventType};

// Simple reqwest-based transport implementation
#[derive(Clone)]
struct ReqwestTransport {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is temporary until we implement the hyper based version. But it keeps the tests passing without adding new dependencies.

client: reqwest::Client,
}

impl ReqwestTransport {
fn new(timeout: Option<Duration>) -> Result<Self, reqwest::Error> {
let mut builder = reqwest::Client::builder();

if let Some(timeout) = timeout {
builder = builder.timeout(timeout);
}

let client = builder.build()?;
Ok(Self { client })
}
}

impl HttpTransport for ReqwestTransport {
fn request(&self, request: http::Request<Option<String>>) -> ResponseFuture {
let (parts, body_opt) = request.into_parts();

let mut req_builder = self
.client
.request(parts.method.clone(), parts.uri.to_string());

for (name, value) in parts.headers.iter() {
req_builder = req_builder.header(name, value);
}

if let Some(body) = body_opt {
req_builder = req_builder.body(body);
}

let req = match req_builder.build() {
Ok(r) => r,
Err(e) => return Box::pin(async move { Err(TransportError::new(e)) }),
};

let client = self.client.clone();

Box::pin(async move {
let resp = client.execute(req).await.map_err(TransportError::new)?;

let status = resp.status();
let headers = resp.headers().clone();

let byte_stream: ByteStream = Box::pin(
resp.bytes_stream()
.map(|result| result.map_err(TransportError::new)),
);

let mut response_builder = http::Response::builder().status(status);

for (name, value) in headers.iter() {
response_builder = response_builder.header(name, value);
}

let response = response_builder
.body(byte_stream)
.map_err(TransportError::new)?;

Ok(response)
})
}
}

pub(crate) struct Inner {
callback_counter: Mutex<i32>,
callback_url: String,
Expand Down Expand Up @@ -102,9 +171,12 @@ impl Inner {
reconnect_options = reconnect_options.delay(Duration::from_millis(delay_ms));
}

if let Some(read_timeout_ms) = config.read_timeout_ms {
client_builder = client_builder.read_timeout(Duration::from_millis(read_timeout_ms));
}
// Create transport with timeout configuration
let timeout = config.read_timeout_ms.map(Duration::from_millis);
let transport = match ReqwestTransport::new(timeout) {
Ok(t) => t,
Err(e) => return Err(format!("Failed to create transport {:?}", e)),
};

if let Some(last_event_id) = &config.last_event_id {
client_builder = client_builder.last_event_id(last_event_id.clone());
Expand All @@ -128,7 +200,9 @@ impl Inner {
}

Ok(Box::new(
client_builder.reconnect(reconnect_options.build()).build(),
client_builder
.reconnect(reconnect_options.build())
.build_with_transport(transport),
))
}
}
Expand Down
13 changes: 4 additions & 9 deletions eventsource-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ keywords = ["launchdarkly", "feature-flags", "feature-toggles", "eventsource", "
exclude = ["CHANGELOG.md"]

[dependencies]
bytes = "1.5"
futures = "0.3.21"
hyper = { version = "0.14.19", features = ["client", "http1", "tcp"] }
hyper-rustls = { version = "0.24.1", optional = true }
http = "1.0"
log = "0.4.6"
pin-project = "1.0.10"
tokio = { version = "1.17.0", features = ["time"] }
hyper-timeout = "0.4.1"
rand = "0.8.5"
base64 = "0.22.1"

Expand All @@ -31,10 +30,6 @@ test-case = "3.2.1"
proptest = "1.0.0"


[features]
default = ["rustls"]
rustls = ["hyper-rustls", "hyper-rustls/http2"]

[[example]]
name = "tail"
required-features = ["rustls"]
[features]
default = []
122 changes: 103 additions & 19 deletions eventsource-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,125 @@

Client for the [Server-Sent Events] protocol (aka [EventSource]).

This library focuses on the SSE protocol implementation. You provide the HTTP transport layer (hyper, reqwest, etc.), giving you full control over HTTP configuration like timeouts, TLS, and connection pooling.

[Server-Sent Events]: https://html.spec.whatwg.org/multipage/server-sent-events.html
[EventSource]: https://developer.mozilla.org/en-US/docs/Web/API/EventSource

## Requirements

Requires tokio.
* Tokio async runtime
* An HTTP client library (hyper, reqwest, or custom)

## Quick Start

### 1. Add dependencies

## Usage
```toml
[dependencies]
eventsource-client = "0.17"
reqwest = { version = "0.12", features = ["stream"] } # or hyper v1
futures = "0.3"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
```

### 2. Implement HttpTransport

Example that just prints the type of each event received:
Use one of our example implementations:

```rust
use eventsource_client as es;
// See examples/reqwest_transport.rs for complete implementation
use eventsource_client::{HttpTransport, ResponseFuture};

let mut client = es::ClientBuilder::for_url("https://example.com/stream")?
.header("Authorization", "Basic username:password")?
.build();
struct ReqwestTransport {
client: reqwest::Client,
}

client
.stream()
.map_ok(|event| println!("got event: {:?}", event))
.map_err(|err| eprintln!("error streaming events: {:?}", err));
impl HttpTransport for ReqwestTransport {
fn request(&self, request: http::Request<()>) -> ResponseFuture {
// Convert request and call HTTP client
// See examples/ for full implementation
}
}
```

(Some boilerplate omitted for clarity; see [examples directory] for complete,
working code.)
### 3. Use the client

```rust
use eventsource_client::{ClientBuilder, SSE};
use futures::TryStreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create HTTP transport
let transport = ReqwestTransport::new()?;

// Build SSE client
let client = ClientBuilder::for_url("https://example.com/stream")?
.header("Authorization", "Bearer token")?
.build_with_transport(transport);

// Stream events
let mut stream = client.stream();

while let Some(event) = stream.try_next().await? {
match event {
SSE::Event(evt) => println!("Event: {}", evt.event_type),
SSE::Comment(c) => println!("Comment: {}", c),
SSE::Connected(_) => println!("Connected!"),
}
}

Ok(())
}
```

[examples directory]: https://github.com/launchdarkly/rust-eventsource-client/tree/main/eventsource-client/examples
## Features

* tokio-based streaming client.
* Supports setting custom headers on the HTTP request (e.g. for endpoints
requiring authorization).
* Retry for failed connections.
* Reconnection if connection is interrupted, with exponential backoff.
* **Pluggable HTTP transport** - Use any HTTP client (hyper, reqwest, or custom)
* **Tokio-based streaming** - Efficient async/await support
* **Custom headers** - Full control over HTTP requests
* **Automatic reconnection** - Configurable exponential backoff
* **Retry logic** - Handle transient failures gracefully
* **Redirect following** - Automatic handling of HTTP redirects
* **Last-Event-ID** - Resume streams from last received event

## Migration from v0.16
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I change this later from migration guide to just examples.


If you're upgrading from v0.16 (which used hyper 0.14 internally), see [MIGRATION.md](MIGRATION.md) for a detailed migration guide.

Key changes:
- You must now provide an HTTP transport implementation
- Removed `build()`, `build_http()`, and other hyper-specific methods
- Use `build_with_transport(transport)` instead
- Timeout configuration moved to your HTTP transport

## Why Pluggable Transport?

1. **Use latest HTTP clients** - Not locked to a specific HTTP library version
2. **Full control** - Configure timeouts, TLS, proxies, etc. exactly as needed
3. **Smaller library** - Focused on SSE protocol, not HTTP implementation
4. **Flexibility** - Swap HTTP clients without changing SSE code

## Architecture

```
┌─────────────────────────────────────┐
│ Your Application │
└─────────────┬───────────────────────┘
┌─────────────────────────────────────┐
│ eventsource-client │
│ (SSE Protocol Implementation) │
└─────────────┬───────────────────────┘
│ HttpTransport trait
┌─────────────────────────────────────┐
│ Your HTTP Client │
│ (hyper, reqwest, custom, etc.) │
└─────────────────────────────────────┘
```

## Stability

Expand Down
54 changes: 0 additions & 54 deletions eventsource-client/examples/tail.rs

This file was deleted.

Loading