Skip to content

Commit 0753799

Browse files
committed
Additional improvements to integrate pubsub with Parity (#110)
* Cloneable types * Adding Origin to RequestContext * Sink implementations. * Add comment regarding Task wakeup.
1 parent 84ef222 commit 0753799

File tree

10 files changed

+114
-21
lines changed

10 files changed

+114
-21
lines changed

core/src/types/params.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use serde_json::value::from_value;
1010
use super::{Value, Error};
1111

1212
/// Request parameters
13-
#[derive(Debug, PartialEq)]
13+
#[derive(Debug, PartialEq, Clone)]
1414
pub enum Params {
1515
/// Array of values
1616
Array(Vec<Value>),

core/src/types/response.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use serde_json::value::from_value;
55
use super::{Id, Value, Error, ErrorCode, Version};
66

77
/// Successful response
8-
#[derive(Debug, PartialEq, Serialize, Deserialize)]
8+
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
99
pub struct Success {
1010
/// Protocol version
1111
#[serde(skip_serializing_if = "Option::is_none")]
@@ -17,7 +17,7 @@ pub struct Success {
1717
}
1818

1919
/// Unsuccessful response
20-
#[derive(Debug, PartialEq, Serialize, Deserialize)]
20+
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
2121
pub struct Failure {
2222
/// Protocol Version
2323
#[serde(skip_serializing_if = "Option::is_none")]
@@ -29,7 +29,7 @@ pub struct Failure {
2929
}
3030

3131
/// Represents output - failure or success
32-
#[derive(Debug, PartialEq)]
32+
#[derive(Debug, PartialEq, Clone)]
3333
pub enum Output {
3434
/// Success
3535
Success(Success),

macros/examples/pubsub-macros.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ fn main() {
9999
{
100100
let subscribers = active_subscriptions.read().unwrap();
101101
for sink in subscribers.values() {
102-
let _ = sink.send("Hello World!".into()).wait();
102+
let _ = sink.notify("Hello World!".into()).wait();
103103
}
104104
}
105105
thread::sleep(::std::time::Duration::from_secs(1));

macros/src/pubsub.rs

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use jsonrpc_pubsub as pubsub;
55
use serde;
66
use util::to_value;
77

8+
use self::core::futures::{self, Sink as FuturesSink};
9+
810
pub use self::pubsub::SubscriptionId;
911

1012
pub struct Subscriber<T> {
@@ -29,6 +31,7 @@ impl<T> Subscriber<T> {
2931
Ok(Sink {
3032
id: id,
3133
sink: sink,
34+
buffered: None,
3235
_data: PhantomData,
3336
})
3437
}
@@ -37,16 +40,67 @@ impl<T> Subscriber<T> {
3740
pub struct Sink<T> {
3841
sink: pubsub::Sink,
3942
id: SubscriptionId,
43+
buffered: Option<core::Params>,
4044
_data: PhantomData<T>,
4145
}
4246

4347
impl<T: serde::Serialize> Sink<T> {
44-
pub fn send(&self, val: T) -> pubsub::SinkResult {
48+
pub fn notify(&self, val: T) -> pubsub::SinkResult {
49+
self.sink.notify(self.val_to_params(val))
50+
}
51+
52+
fn val_to_params(&self, val: T) -> core::Params {
4553
let id = self.id.clone().into();
4654
let val = to_value(val);
47-
self.sink.send(core::Params::Map(vec![
55+
56+
core::Params::Map(vec![
4857
("subscription".to_owned(), id),
4958
("result".to_owned(), val),
50-
].into_iter().collect()))
59+
].into_iter().collect())
60+
}
61+
62+
fn poll(&mut self) -> futures::Poll<(), pubsub::TransportError> {
63+
if let Some(item) = self.buffered.take() {
64+
let result = self.sink.start_send(item)?;
65+
if let futures::AsyncSink::NotReady(item) = result {
66+
self.buffered = Some(item);
67+
}
68+
}
69+
70+
if self.buffered.is_some() {
71+
Ok(futures::Async::NotReady)
72+
} else {
73+
Ok(futures::Async::Ready(()))
74+
}
75+
}
76+
}
77+
78+
impl<T: serde::Serialize> futures::sink::Sink for Sink<T> {
79+
type SinkItem = T;
80+
type SinkError = pubsub::TransportError;
81+
82+
fn start_send(&mut self, item: Self::SinkItem) -> futures::StartSend<Self::SinkItem, Self::SinkError> {
83+
// Make sure to always try to process the buffered entry.
84+
// Since we're just a proxy to real `Sink` we don't need
85+
// to schedule a `Task` wakeup. It will be done downstream.
86+
if self.poll()?.is_not_ready() {
87+
return Ok(futures::AsyncSink::NotReady(item));
88+
}
89+
90+
let val = self.val_to_params(item);
91+
self.buffered = Some(val);
92+
self.poll()?;
93+
94+
Ok(futures::AsyncSink::Ready)
95+
}
96+
97+
fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> {
98+
self.poll()?;
99+
self.sink.poll_complete()
100+
}
101+
102+
fn close(&mut self) -> futures::Poll<(), Self::SinkError> {
103+
self.poll()?;
104+
self.sink.close()
51105
}
52106
}

pubsub/examples/pubsub.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ fn main() {
5555
thread::spawn(move || {
5656
loop {
5757
thread::sleep(time::Duration::from_millis(100));
58-
match sink.send(Params::Array(vec![Value::Number(10.into())])).wait() {
58+
match sink.notify(Params::Array(vec![Value::Number(10.into())])).wait() {
5959
Ok(_) => {},
6060
Err(_) => {
6161
println!("Subscription has ended, finishing.");

pubsub/src/lib.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,4 @@ mod types;
1515

1616
pub use self::handler::{PubSubHandler, SubscribeRpcMethod, UnsubscribeRpcMethod};
1717
pub use self::subscription::{Session, Sink, Subscriber, new_subscription};
18-
pub use self::types::{PubSubMetadata, SubscriptionId};
19-
20-
/// Subscription send result.
21-
pub type SinkResult = core::futures::sink::Send<types::TransportSender>;
18+
pub use self::types::{PubSubMetadata, SubscriptionId, TransportError, SinkResult};

pubsub/src/subscription.rs

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ use std::sync::Arc;
66
use parking_lot::Mutex;
77

88
use core;
9-
use core::futures::{self, sink, future, Sink as FuturesSink, Future, BoxFuture};
9+
use core::futures::{self, future, Sink as FuturesSink, Future, BoxFuture};
1010
use core::futures::sync::oneshot;
1111

1212
use handler::{SubscribeRpcMethod, UnsubscribeRpcMethod};
13-
use types::{PubSubMetadata, SubscriptionId, TransportSender};
13+
use types::{PubSubMetadata, SubscriptionId, TransportSender, TransportError, SinkResult};
1414

1515
/// RPC client session
1616
/// Keeps track of active subscriptions and unsubscribes from them upon dropping.
@@ -80,21 +80,50 @@ impl Drop for Session {
8080
}
8181

8282
/// A handle to send notifications directly to subscribed client.
83+
#[derive(Debug, Clone)]
8384
pub struct Sink {
8485
notification: String,
8586
transport: TransportSender
8687
}
8788

8889
impl Sink {
8990
/// Sends a notification to a client.
90-
pub fn send(&self, val: core::Params) -> sink::Send<TransportSender> {
91+
pub fn notify(&self, val: core::Params) -> SinkResult {
92+
let val = self.params_to_string(val);
93+
self.transport.clone().send(val.0)
94+
}
95+
96+
fn params_to_string(&self, val: core::Params) -> (String, core::Params) {
9197
let notification = core::Notification {
9298
jsonrpc: Some(core::Version::V2),
9399
method: self.notification.clone(),
94100
params: Some(val),
95101
};
102+
(
103+
core::to_string(&notification).expect("Notification serialization never fails."),
104+
notification.params.expect("Always Some"),
105+
)
106+
}
107+
}
108+
109+
impl FuturesSink for Sink {
110+
type SinkItem = core::Params;
111+
type SinkError = TransportError;
112+
113+
fn start_send(&mut self, item: Self::SinkItem) -> futures::StartSend<Self::SinkItem, Self::SinkError> {
114+
let (val, params) = self.params_to_string(item);
115+
self.transport.start_send(val).map(|result| match result {
116+
futures::AsyncSink::Ready => futures::AsyncSink::Ready,
117+
futures::AsyncSink::NotReady(_) => futures::AsyncSink::NotReady(params),
118+
})
119+
}
120+
121+
fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> {
122+
self.transport.poll_complete()
123+
}
96124

97-
self.transport.clone().send(core::to_string(&notification).expect("Notification serialization never fails."))
125+
fn close(&mut self) -> futures::Poll<(), Self::SinkError> {
126+
self.transport.close()
98127
}
99128
}
100129

@@ -324,7 +353,7 @@ mod tests {
324353
};
325354

326355
// when
327-
sink.send(core::Params::Array(vec![core::Value::Number(10.into())])).wait().unwrap();
356+
sink.notify(core::Params::Array(vec![core::Value::Number(10.into())])).wait().unwrap();
328357

329358
// then
330359
assert_eq!(

pubsub/src/types.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ use subscription::Session;
66

77
/// Raw transport sink for specific client.
88
pub type TransportSender = mpsc::Sender<String>;
9+
/// Raw transport error.
10+
pub type TransportError = mpsc::SendError<String>;
11+
/// Subscription send result.
12+
pub type SinkResult = core::futures::sink::Send<TransportSender>;
913

1014
/// Metadata extension for pub-sub method handling.
1115
pub trait PubSubMetadata: core::Metadata {

ws/src/metadata.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@ use core;
22
use ws;
33

44
use session;
5+
use Origin;
56

67
/// Request context
78
pub struct RequestContext {
89
/// Session id
910
pub session_id: session::SessionId,
11+
/// Request Origin
12+
pub origin: Option<Origin>,
1013
/// Direct channel to send messages to a client.
1114
pub out: ws::Sender,
1215
}

ws/src/session.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,11 @@ impl<M: core::Metadata, S: core::Middleware<M>> Drop for Session<M, S> {
9595
}
9696

9797
impl<M: core::Metadata, S: core::Middleware<M>> Session<M, S> {
98-
fn verify_origin(&self, req: &ws::Request) -> Option<ws::Response> {
99-
let origin = req.header("origin").map(|x| &x[..]);
98+
fn read_origin<'a>(&self, req: &'a ws::Request) -> Option<&'a [u8]> {
99+
req.header("origin").map(|x| &x[..])
100+
}
101+
102+
fn verify_origin(&self, origin: Option<&[u8]>) -> Option<ws::Response> {
100103
if !header_is_allowed(&self.allowed_origins, origin) {
101104
warn!(target: "signer", "Blocked connection to Signer API from untrusted origin: {:?}", origin);
102105
Some(forbidden(
@@ -131,9 +134,10 @@ impl<M: core::Metadata, S: core::Middleware<M>> ws::Handler for Session<M, S> {
131134
MiddlewareAction::Proceed
132135
};
133136

137+
let origin = self.read_origin(req);
134138
if action.should_verify_origin() {
135139
// Verify request origin.
136-
if let Some(response) = self.verify_origin(req) {
140+
if let Some(response) = self.verify_origin(origin) {
137141
return Ok(response);
138142
}
139143
}
@@ -145,6 +149,7 @@ impl<M: core::Metadata, S: core::Middleware<M>> ws::Handler for Session<M, S> {
145149
}
146150
}
147151

152+
self.context.origin = origin.and_then(|origin| ::std::str::from_utf8(origin).ok()).map(Into::into);
148153
self.metadata = self.meta_extractor.extract(&self.context);
149154

150155
match action {
@@ -217,6 +222,7 @@ impl<M: core::Metadata, S: core::Middleware<M>> ws::Factory for Factory<M, S> {
217222
Session {
218223
context: metadata::RequestContext {
219224
session_id: self.session_id,
225+
origin: None,
220226
out: sender,
221227
},
222228
handler: self.handler.clone(),

0 commit comments

Comments
 (0)