Skip to content

Commit bef0fa8

Browse files
authored
Merge branch 'master' into schema
2 parents 4e64e6c + 7e87247 commit bef0fa8

File tree

8 files changed

+274
-7
lines changed

8 files changed

+274
-7
lines changed

docs/book/content/SUMMARY.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
- [Objects and generics](advanced/objects_and_generics.md)
3333
- [Multiple operations per request](advanced/multiple_ops_per_request.md)
3434
- [Dataloaders](advanced/dataloaders.md)
35+
- [Subscriptions](advanced/subscriptions.md)
3536

3637
# - [Context switching]
3738

docs/book/content/advanced/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ The chapters below cover some more advanced scenarios.
77
- [Objects and generics](objects_and_generics.md)
88
- [Multiple operations per request](multiple_ops_per_request.md)
99
- [Dataloaders](dataloaders.md)
10+
- [Subscriptions](subscriptions.md)
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
# Subscriptions
2+
### How to achieve realtime data with GraphQL subscriptions
3+
4+
GraphQL subscriptions are a way to push data from the server to clients requesting real-time messages
5+
from the server. Subscriptions are similar to queries in that they specify a set of fields to be delivered to the client,
6+
but instead of immediately returning a single answer, a result is sent every time a particular event happens on the
7+
server.
8+
9+
In order to execute subscriptions you need a coordinator (that spawns connections)
10+
and a GraphQL object that can be resolved into a stream--elements of which will then
11+
be returned to the end user. The [juniper_subscriptions][juniper_subscriptions] crate
12+
provides a default connection implementation. Currently subscriptions are only supported on the `master` branch. Add the following to your `Cargo.toml`:
13+
```toml
14+
[dependencies]
15+
juniper = { git = "https://github.com/graphql-rust/juniper", branch = "master" }
16+
juniper_subscriptions = { git = "https://github.com/graphql-rust/juniper", branch = "master" }
17+
```
18+
19+
### Schema Definition
20+
21+
The Subscription is just a GraphQL object, similar to the Query root and Mutations object that you defined for the
22+
operations in your [Schema][Schema], the difference is that all the operations defined there should be async and the return of it
23+
should be a [Stream][Stream].
24+
25+
This example shows a subscription operation that returns two events, the strings `Hello` and `World!`
26+
sequentially:
27+
28+
```rust
29+
# use juniper::http::GraphQLRequest;
30+
# use juniper::{DefaultScalarValue, FieldError, SubscriptionCoordinator};
31+
# use juniper_subscriptions::Coordinator;
32+
# use futures::{Stream, StreamExt};
33+
# use std::pin::Pin;
34+
# #[derive(Clone)]
35+
# pub struct Database;
36+
# impl juniper::Context for Database {}
37+
# impl Database {
38+
# fn new() -> Self {
39+
# Self {}
40+
# }
41+
# }
42+
# pub struct Query;
43+
# #[juniper::graphql_object(Context = Database)]
44+
# impl Query {
45+
# fn hello_world() -> &str {
46+
# "Hello World!"
47+
# }
48+
# }
49+
pub struct Subscription;
50+
51+
type StringStream = Pin<Box<dyn Stream<Item = Result<String, FieldError>> + Send>>;
52+
53+
#[juniper::graphql_subscription(Context = Database)]
54+
impl Subscription {
55+
async fn hello_world() -> StringStream {
56+
let stream = tokio::stream::iter(vec![
57+
Ok(String::from("Hello")),
58+
Ok(String::from("World!"))
59+
]);
60+
Box::pin(stream)
61+
}
62+
}
63+
# fn main () {}
64+
```
65+
66+
67+
68+
### Coordinator
69+
70+
Subscriptions require a bit more resources than regular queries, since they can provide a great vector
71+
for DOS attacks and can bring down a server easily if not handled right. [SubscriptionCoordinator][SubscriptionCoordinator] trait provides the coordination logic.
72+
It contains the schema and can keep track of opened connections, handle subscription
73+
start and maintains a global subscription id. Once connection is established, subscription
74+
coordinator spawns a [SubscriptionConnection][SubscriptionConnection], which handles a
75+
single connection, provides resolver logic for a client stream and can provide re-connection
76+
and shutdown logic.
77+
78+
79+
The [Coordinator][Coordinator] struct is a simple implementation of the trait [SubscriptionCoordinator][SubscriptionCoordinator]
80+
that is responsible for handling the execution of subscription operation into your schema. The execution of the `subscribe`
81+
operation returns a [Future][Future] with a Item value of a Result<[Connection][Connection], [GraphQLError][GraphQLError]>,
82+
where the connection is the Stream of values returned by the operation and the GraphQLError is the error that occurred in the
83+
resolution of this connection, which means that the subscription failed.
84+
85+
```rust
86+
# use juniper::http::GraphQLRequest;
87+
# use juniper::{DefaultScalarValue, EmptyMutation, FieldError, RootNode, SubscriptionCoordinator};
88+
# use juniper_subscriptions::Coordinator;
89+
# use futures::{Stream, StreamExt};
90+
# use std::pin::Pin;
91+
# use tokio::runtime::Runtime;
92+
# use tokio::task;
93+
#
94+
# #[derive(Clone)]
95+
# pub struct Database;
96+
#
97+
# impl juniper::Context for Database {}
98+
#
99+
# impl Database {
100+
# fn new() -> Self {
101+
# Self {}
102+
# }
103+
# }
104+
#
105+
# pub struct Query;
106+
#
107+
# #[juniper::graphql_object(Context = Database)]
108+
# impl Query {
109+
# fn hello_world() -> &str {
110+
# "Hello World!"
111+
# }
112+
# }
113+
#
114+
# pub struct Subscription;
115+
#
116+
# type StringStream = Pin<Box<dyn Stream<Item = Result<String, FieldError>> + Send>>;
117+
#
118+
# #[juniper::graphql_subscription(Context = Database)]
119+
# impl Subscription {
120+
# async fn hello_world() -> StringStream {
121+
# let stream =
122+
# tokio::stream::iter(vec![Ok(String::from("Hello")), Ok(String::from("World!"))]);
123+
# Box::pin(stream)
124+
# }
125+
# }
126+
type Schema = RootNode<'static, Query, EmptyMutation<Database>, Subscription>;
127+
128+
fn schema() -> Schema {
129+
Schema::new(Query {}, EmptyMutation::new(), Subscription {})
130+
}
131+
132+
async fn run_subscription() {
133+
let schema = schema();
134+
let coordinator = Coordinator::new(schema);
135+
let req: GraphQLRequest<DefaultScalarValue> = serde_json::from_str(
136+
r#"
137+
{
138+
"query": "subscription { helloWorld }"
139+
}
140+
"#,
141+
)
142+
.unwrap();
143+
let ctx = Database::new();
144+
let mut conn = coordinator.subscribe(&req, &ctx).await.unwrap();
145+
while let Some(result) = conn.next().await {
146+
println!("{}", serde_json::to_string(&result).unwrap());
147+
}
148+
}
149+
150+
# fn main() { }
151+
```
152+
153+
### Web Integration and Examples
154+
155+
Currently there is an example of subscriptions with [warp][warp], but it still in an alpha state.
156+
GraphQL over [WS][WS] is not fully supported yet and is non-standard.
157+
158+
- [Warp Subscription Example](https://github.com/graphql-rust/juniper/tree/master/examples/warp_subscriptions)
159+
- [Small Example](https://github.com/graphql-rust/juniper/tree/master/examples/basic_subscriptions)
160+
161+
162+
163+
164+
[juniper_subscriptions]: https://github.com/graphql-rust/juniper/tree/master/juniper_subscriptions
165+
[Stream]: https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html
166+
<!-- TODO: Fix these links when the documentation for the `juniper_subscriptions` are defined in the docs. --->
167+
[Coordinator]: https://docs.rs/juniper_subscriptions/0.15.0/struct.Coordinator.html
168+
[SubscriptionCoordinator]: https://docs.rs/juniper_subscriptions/0.15.0/trait.SubscriptionCoordinator.html
169+
[Connection]: https://docs.rs/juniper_subscriptions/0.15.0/struct.Connection.html
170+
[SubscriptionConnection]: https://docs.rs/juniper_subscriptions/0.15.0/trait.SubscriptionConnection.html
171+
<!--- --->
172+
[Future]: https://docs.rs/futures/0.3.4/futures/future/trait.Future.html
173+
[warp]: https://github.com/graphql-rust/juniper/tree/master/juniper_warp
174+
[WS]: https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md
175+
[GraphQLError]: https://docs.rs/juniper/0.14.2/juniper/enum.GraphQLError.html
176+
[Schema]: ../schema/schemas_and_mutations.md

docs/book/content/schema/schemas_and_mutations.md

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
# Schemas
22

3-
A schema consists of two types: a query object and a mutation object (Juniper
4-
does not support subscriptions yet). These two define the root query fields
5-
and mutations of the schema, respectively.
3+
A schema consists of three types: a query object, a mutation object, and a subscription object.
4+
These three define the root query fields, mutations and subscriptions of the schema, respectively.
5+
6+
The usage of subscriptions is a little different from the mutation and query objects, so there is a specific [section][section] that discusses them.
67

78
Both query and mutation objects are regular GraphQL objects, defined like any
8-
other object in Juniper. The mutation object, however, is optional since schemas
9-
can be read-only.
9+
other object in Juniper. The mutation and subscription object, however, is optional since schemas
10+
can be read-only and without subscriptions as well. If mutations/subscriptions functionality is not needed, consider using [EmptyMutation][EmptyMutation]/[EmptySubscription][EmptySubscription].
1011

1112
In Juniper, the `RootNode` type represents a schema. You usually don't have to
1213
create this object yourself: see the framework integrations for [Iron](../servers/iron.md)
@@ -58,3 +59,8 @@ impl Mutations {
5859

5960
# fn main() { }
6061
```
62+
63+
[section]: ../advanced/subscriptions.md
64+
[EmptyMutation]: https://docs.rs/juniper/0.14.2/juniper/struct.EmptyMutation.html
65+
<!--TODO: Fix This URL when the EmptySubscription become available in the Documentation -->
66+
[EmptySubscription]: https://docs.rs/juniper/0.14.2/juniper/struct.EmptySubscription.html

docs/book/tests/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@ build = "build.rs"
88
[dependencies]
99
juniper = { path = "../../../juniper" }
1010
juniper_iron = { path = "../../../juniper_iron" }
11-
futures = "0.3.1"
12-
11+
juniper_subscriptions = { path = "../../../juniper_subscriptions" }
12+
futures = "0.3"
13+
tokio = { version = "0.2", features = ["rt-core", "blocking", "stream", "rt-util"] }
1314
iron = "0.5.0"
1415
mount = "0.4.0"
1516

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
target
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
[package]
2+
name = "basic_subscriptions"
3+
version = "0.1.0"
4+
edition = "2018"
5+
authors = ["Jordao Rosario <[email protected]>"]
6+
7+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
8+
9+
[dependencies]
10+
futures = "0.3"
11+
serde = { version = "1.0", features = ["derive"] }
12+
serde_json = "1.0"
13+
tokio = { version = "0.2", features = ["rt-core", "macros", "stream"] }
14+
15+
juniper = { git = "https://github.com/graphql-rust/juniper" }
16+
juniper_subscriptions = { git = "https://github.com/graphql-rust/juniper" }
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
#![deny(warnings)]
2+
3+
use futures::{Stream, StreamExt};
4+
use juniper::http::GraphQLRequest;
5+
use juniper::{DefaultScalarValue, EmptyMutation, FieldError, RootNode, SubscriptionCoordinator};
6+
use juniper_subscriptions::Coordinator;
7+
use std::pin::Pin;
8+
9+
#[derive(Clone)]
10+
pub struct Database;
11+
12+
impl juniper::Context for Database {}
13+
14+
impl Database {
15+
fn new() -> Self {
16+
Self {}
17+
}
18+
}
19+
20+
pub struct Query;
21+
22+
#[juniper::graphql_object(Context = Database)]
23+
impl Query {
24+
fn hello_world() -> &str {
25+
"Hello World!"
26+
}
27+
}
28+
29+
pub struct Subscription;
30+
31+
type StringStream = Pin<Box<dyn Stream<Item = Result<String, FieldError>> + Send>>;
32+
33+
#[juniper::graphql_subscription(Context = Database)]
34+
impl Subscription {
35+
async fn hello_world() -> StringStream {
36+
let stream =
37+
tokio::stream::iter(vec![Ok(String::from("Hello")), Ok(String::from("World!"))]);
38+
Box::pin(stream)
39+
}
40+
}
41+
42+
type Schema = RootNode<'static, Query, EmptyMutation<Database>, Subscription>;
43+
44+
fn schema() -> Schema {
45+
Schema::new(Query {}, EmptyMutation::new(), Subscription {})
46+
}
47+
48+
#[tokio::main]
49+
async fn main() {
50+
let schema = schema();
51+
let coordinator = Coordinator::new(schema);
52+
let req: GraphQLRequest<DefaultScalarValue> = serde_json::from_str(
53+
r#"
54+
{
55+
"query": "subscription { helloWorld }"
56+
}
57+
"#,
58+
)
59+
.unwrap();
60+
let ctx = Database::new();
61+
let mut conn = coordinator.subscribe(&req, &ctx).await.unwrap();
62+
while let Some(result) = conn.next().await {
63+
println!("{}", serde_json::to_string(&result).unwrap());
64+
}
65+
}

0 commit comments

Comments
 (0)