Skip to content

Commit 27586b7

Browse files
authored
Backporting recent fixes for Parity (#111)
* Improving the external API. (#108) * Improving the external API. * Fix output format for subscriptions. * Additional improvements to integrate pubsub with Parity (#110) * Cloneable types * Adding Origin to RequestContext * Sink implementations. * Add comment regarding Task wakeup.
1 parent ad06827 commit 27586b7

File tree

14 files changed

+146
-23
lines changed

14 files changed

+146
-23
lines changed

core/src/io.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,8 @@ impl<T: Metadata, S: Middleware<T>> MetaIoHandler<T, S> {
192192
}).boxed()
193193
}
194194

195-
fn handle_call(&self, call: Call, meta: T) -> BoxFuture<Option<Output>, ()> {
195+
/// Handle single call asynchronously.
196+
pub fn handle_call(&self, call: Call, meta: T) -> BoxFuture<Option<Output>, ()> {
196197
match call {
197198
Call::MethodCall(method) => {
198199
let params = method.params.unwrap_or(Params::None);

core/src/types/params.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use serde_json::value::from_value;
1010
use super::{Value, Error};
1111

1212
/// Request parameters
13-
#[derive(Debug, PartialEq)]
13+
#[derive(Debug, PartialEq, Clone)]
1414
pub enum Params {
1515
/// Array of values
1616
Array(Vec<Value>),

core/src/types/request.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,18 @@ pub enum Call {
4747

4848
}
4949

50+
impl From<MethodCall> for Call {
51+
fn from(mc: MethodCall) -> Self {
52+
Call::MethodCall(mc)
53+
}
54+
}
55+
56+
impl From<Notification> for Call {
57+
fn from(n: Notification) -> Self {
58+
Call::Notification(n)
59+
}
60+
}
61+
5062
impl Serialize for Call {
5163
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
5264
where S: Serializer {

core/src/types/response.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use serde_json::value::from_value;
55
use super::{Id, Value, Error, ErrorCode, Version};
66

77
/// Successful response
8-
#[derive(Debug, PartialEq, Serialize, Deserialize)]
8+
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
99
pub struct Success {
1010
/// Protocol version
1111
#[serde(skip_serializing_if = "Option::is_none")]
@@ -17,7 +17,7 @@ pub struct Success {
1717
}
1818

1919
/// Unsuccessful response
20-
#[derive(Debug, PartialEq, Serialize, Deserialize)]
20+
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
2121
pub struct Failure {
2222
/// Protocol Version
2323
#[serde(skip_serializing_if = "Option::is_none")]
@@ -29,7 +29,7 @@ pub struct Failure {
2929
}
3030

3131
/// Represents output - failure or success
32-
#[derive(Debug, PartialEq)]
32+
#[derive(Debug, PartialEq, Clone)]
3333
pub enum Output {
3434
/// Success
3535
Success(Success),

macros/examples/pubsub-macros.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ fn main() {
9999
{
100100
let subscribers = active_subscriptions.read().unwrap();
101101
for sink in subscribers.values() {
102-
let _ = sink.send("Hello World!".into()).wait();
102+
let _ = sink.notify("Hello World!".into()).wait();
103103
}
104104
}
105105
thread::sleep(::std::time::Duration::from_secs(1));

macros/src/auto_args.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ macro_rules! build_rpc_trait {
233233
)
234234
}),
235235
($unsubscribe, move |base, id| {
236+
use $crate::jsonrpc_core::futures::Future;
236237
Self::$unsub_method(base, id).map($crate::to_value).boxed()
237238
}),
238239
);

macros/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
extern crate jsonrpc_core;
1+
pub extern crate jsonrpc_core;
22
extern crate jsonrpc_pubsub;
33
extern crate serde;
44

macros/src/pubsub.rs

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use jsonrpc_pubsub as pubsub;
55
use serde;
66
use util::to_value;
77

8+
use self::core::futures::{self, Sink as FuturesSink};
9+
810
pub use self::pubsub::SubscriptionId;
911

1012
pub struct Subscriber<T> {
@@ -25,22 +27,80 @@ impl<T> Subscriber<T> {
2527
}
2628

2729
pub fn assign_id(self, id: SubscriptionId) -> Result<Sink<T>, ()> {
28-
let sink = self.subscriber.assign_id(id)?;
30+
let sink = self.subscriber.assign_id(id.clone())?;
2931
Ok(Sink {
32+
id: id,
3033
sink: sink,
34+
buffered: None,
3135
_data: PhantomData,
3236
})
3337
}
3438
}
3539

3640
pub struct Sink<T> {
3741
sink: pubsub::Sink,
42+
id: SubscriptionId,
43+
buffered: Option<core::Params>,
3844
_data: PhantomData<T>,
3945
}
4046

4147
impl<T: serde::Serialize> Sink<T> {
42-
pub fn send(&self, val: T) -> pubsub::SinkResult {
48+
pub fn notify(&self, val: T) -> pubsub::SinkResult {
49+
self.sink.notify(self.val_to_params(val))
50+
}
51+
52+
fn val_to_params(&self, val: T) -> core::Params {
53+
let id = self.id.clone().into();
4354
let val = to_value(val);
44-
self.sink.send(core::Params::Array(vec![val]))
55+
56+
core::Params::Map(vec![
57+
("subscription".to_owned(), id),
58+
("result".to_owned(), val),
59+
].into_iter().collect())
60+
}
61+
62+
fn poll(&mut self) -> futures::Poll<(), pubsub::TransportError> {
63+
if let Some(item) = self.buffered.take() {
64+
let result = self.sink.start_send(item)?;
65+
if let futures::AsyncSink::NotReady(item) = result {
66+
self.buffered = Some(item);
67+
}
68+
}
69+
70+
if self.buffered.is_some() {
71+
Ok(futures::Async::NotReady)
72+
} else {
73+
Ok(futures::Async::Ready(()))
74+
}
75+
}
76+
}
77+
78+
impl<T: serde::Serialize> futures::sink::Sink for Sink<T> {
79+
type SinkItem = T;
80+
type SinkError = pubsub::TransportError;
81+
82+
fn start_send(&mut self, item: Self::SinkItem) -> futures::StartSend<Self::SinkItem, Self::SinkError> {
83+
// Make sure to always try to process the buffered entry.
84+
// Since we're just a proxy to real `Sink` we don't need
85+
// to schedule a `Task` wakeup. It will be done downstream.
86+
if self.poll()?.is_not_ready() {
87+
return Ok(futures::AsyncSink::NotReady(item));
88+
}
89+
90+
let val = self.val_to_params(item);
91+
self.buffered = Some(val);
92+
self.poll()?;
93+
94+
Ok(futures::AsyncSink::Ready)
95+
}
96+
97+
fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> {
98+
self.poll()?;
99+
self.sink.poll_complete()
100+
}
101+
102+
fn close(&mut self) -> futures::Poll<(), Self::SinkError> {
103+
self.poll()?;
104+
self.sink.close()
45105
}
46106
}

pubsub/examples/pubsub.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ fn main() {
5555
thread::spawn(move || {
5656
loop {
5757
thread::sleep(time::Duration::from_millis(100));
58-
match sink.send(Params::Array(vec![Value::Number(10.into())])).wait() {
58+
match sink.notify(Params::Array(vec![Value::Number(10.into())])).wait() {
5959
Ok(_) => {},
6060
Err(_) => {
6161
println!("Subscription has ended, finishing.");

pubsub/src/lib.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,4 @@ mod types;
1515

1616
pub use self::handler::{PubSubHandler, SubscribeRpcMethod, UnsubscribeRpcMethod};
1717
pub use self::subscription::{Session, Sink, Subscriber, new_subscription};
18-
pub use self::types::{PubSubMetadata, SubscriptionId};
19-
20-
/// Subscription send result.
21-
pub type SinkResult = core::futures::sink::Send<types::TransportSender>;
18+
pub use self::types::{PubSubMetadata, SubscriptionId, TransportError, SinkResult};

pubsub/src/subscription.rs

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
//! Subscription primitives.
22
3+
use std::fmt;
34
use std::collections::HashMap;
45
use std::sync::Arc;
56
use parking_lot::Mutex;
67

78
use core;
8-
use core::futures::{self, sink, future, Sink as FuturesSink, Future, BoxFuture};
9+
use core::futures::{self, future, Sink as FuturesSink, Future, BoxFuture};
910
use core::futures::sync::oneshot;
1011

1112
use handler::{SubscribeRpcMethod, UnsubscribeRpcMethod};
12-
use types::{PubSubMetadata, SubscriptionId, TransportSender};
13+
use types::{PubSubMetadata, SubscriptionId, TransportSender, TransportError, SinkResult};
1314

1415
/// RPC client session
1516
/// Keeps track of active subscriptions and unsubscribes from them upon dropping.
@@ -19,6 +20,15 @@ pub struct Session {
1920
on_drop: Mutex<Vec<Box<Fn() + Send>>>,
2021
}
2122

23+
impl fmt::Debug for Session {
24+
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
25+
fmt.debug_struct("pubsub::Session")
26+
.field("active_subscriptions", &self.active_subscriptions.lock().len())
27+
.field("transport", &self.transport)
28+
.finish()
29+
}
30+
}
31+
2232
impl Session {
2333
/// Creates new session given transport raw send capabilities.
2434
/// Session should be created as part of metadata, `sender` should be returned by transport.
@@ -70,21 +80,50 @@ impl Drop for Session {
7080
}
7181

7282
/// A handle to send notifications directly to subscribed client.
83+
#[derive(Debug, Clone)]
7384
pub struct Sink {
7485
notification: String,
7586
transport: TransportSender
7687
}
7788

7889
impl Sink {
7990
/// Sends a notification to a client.
80-
pub fn send(&self, val: core::Params) -> sink::Send<TransportSender> {
91+
pub fn notify(&self, val: core::Params) -> SinkResult {
92+
let val = self.params_to_string(val);
93+
self.transport.clone().send(val.0)
94+
}
95+
96+
fn params_to_string(&self, val: core::Params) -> (String, core::Params) {
8197
let notification = core::Notification {
8298
jsonrpc: Some(core::Version::V2),
8399
method: self.notification.clone(),
84100
params: Some(val),
85101
};
102+
(
103+
core::to_string(&notification).expect("Notification serialization never fails."),
104+
notification.params.expect("Always Some"),
105+
)
106+
}
107+
}
108+
109+
impl FuturesSink for Sink {
110+
type SinkItem = core::Params;
111+
type SinkError = TransportError;
112+
113+
fn start_send(&mut self, item: Self::SinkItem) -> futures::StartSend<Self::SinkItem, Self::SinkError> {
114+
let (val, params) = self.params_to_string(item);
115+
self.transport.start_send(val).map(|result| match result {
116+
futures::AsyncSink::Ready => futures::AsyncSink::Ready,
117+
futures::AsyncSink::NotReady(_) => futures::AsyncSink::NotReady(params),
118+
})
119+
}
120+
121+
fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> {
122+
self.transport.poll_complete()
123+
}
86124

87-
self.transport.clone().send(core::to_string(&notification).expect("Notification serialization never fails."))
125+
fn close(&mut self) -> futures::Poll<(), Self::SinkError> {
126+
self.transport.close()
88127
}
89128
}
90129

@@ -314,7 +353,7 @@ mod tests {
314353
};
315354

316355
// when
317-
sink.send(core::Params::Array(vec![core::Value::Number(10.into())])).wait().unwrap();
356+
sink.notify(core::Params::Array(vec![core::Value::Number(10.into())])).wait().unwrap();
318357

319358
// then
320359
assert_eq!(

pubsub/src/types.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ use subscription::Session;
66

77
/// Raw transport sink for specific client.
88
pub type TransportSender = mpsc::Sender<String>;
9+
/// Raw transport error.
10+
pub type TransportError = mpsc::SendError<String>;
11+
/// Subscription send result.
12+
pub type SinkResult = core::futures::sink::Send<TransportSender>;
913

1014
/// Metadata extension for pub-sub method handling.
1115
pub trait PubSubMetadata: core::Metadata {

ws/src/metadata.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@ use core;
22
use ws;
33

44
use session;
5+
use Origin;
56

67
/// Request context
78
pub struct RequestContext {
89
/// Session id
910
pub session_id: session::SessionId,
11+
/// Request Origin
12+
pub origin: Option<Origin>,
1013
/// Direct channel to send messages to a client.
1114
pub out: ws::Sender,
1215
}

ws/src/session.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,11 @@ impl<M: core::Metadata, S: core::Middleware<M>> Drop for Session<M, S> {
9595
}
9696

9797
impl<M: core::Metadata, S: core::Middleware<M>> Session<M, S> {
98-
fn verify_origin(&self, req: &ws::Request) -> Option<ws::Response> {
99-
let origin = req.header("origin").map(|x| &x[..]);
98+
fn read_origin<'a>(&self, req: &'a ws::Request) -> Option<&'a [u8]> {
99+
req.header("origin").map(|x| &x[..])
100+
}
101+
102+
fn verify_origin(&self, origin: Option<&[u8]>) -> Option<ws::Response> {
100103
if !header_is_allowed(&self.allowed_origins, origin) {
101104
warn!(target: "signer", "Blocked connection to Signer API from untrusted origin: {:?}", origin);
102105
Some(forbidden(
@@ -131,9 +134,10 @@ impl<M: core::Metadata, S: core::Middleware<M>> ws::Handler for Session<M, S> {
131134
MiddlewareAction::Proceed
132135
};
133136

137+
let origin = self.read_origin(req);
134138
if action.should_verify_origin() {
135139
// Verify request origin.
136-
if let Some(response) = self.verify_origin(req) {
140+
if let Some(response) = self.verify_origin(origin) {
137141
return Ok(response);
138142
}
139143
}
@@ -145,6 +149,7 @@ impl<M: core::Metadata, S: core::Middleware<M>> ws::Handler for Session<M, S> {
145149
}
146150
}
147151

152+
self.context.origin = origin.and_then(|origin| ::std::str::from_utf8(origin).ok()).map(Into::into);
148153
self.metadata = self.meta_extractor.extract(&self.context);
149154

150155
match action {
@@ -217,6 +222,7 @@ impl<M: core::Metadata, S: core::Middleware<M>> ws::Factory for Factory<M, S> {
217222
Session {
218223
context: metadata::RequestContext {
219224
session_id: self.session_id,
225+
origin: None,
220226
out: sender,
221227
},
222228
handler: self.handler.clone(),

0 commit comments

Comments
 (0)