Skip to content

Commit f3ae8a2

Browse files
authored
Support multiple hyper threads. (#103)
1 parent 92693e6 commit f3ae8a2

File tree

6 files changed

+199
-80
lines changed

6 files changed

+199
-80
lines changed

http/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ documentation = "https://paritytech.github.io/jsonrpc/jsonrpc_http_server/index.
1111

1212
[dependencies]
1313
log = "0.3"
14+
net2 = "0.2"
1415
jsonrpc-core = { version = "7.0", path = "../core" }
1516
jsonrpc-server-utils = { version = "7.0", path = "../server-utils" }
1617
hyper = { git = "https://github.com/hyperium/hyper" }

http/examples/server.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ fn main() {
1111
});
1212

1313
let server = ServerBuilder::new(io)
14+
.threads(3)
1415
.cors(DomainsValidation::AllowOnly(vec![AccessControlAllowOrigin::Any]))
1516
.start_http(&"127.0.0.1:3030".parse().unwrap())
1617
.expect("Unable to start RPC server");

http/src/handler.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,9 @@ use unicase::UniCase;
1010
use jsonrpc::{Metadata, Middleware, NoopMiddleware};
1111
use jsonrpc::futures::{Future, Poll, Async, BoxFuture, Stream};
1212
use response::Response;
13-
use server_utils::{cors, hosts};
13+
use server_utils::cors;
1414

15-
use {utils, RequestMiddleware, RequestMiddlewareAction};
16-
17-
18-
type AllowedHosts = Option<Vec<hosts::Host>>;
19-
type CorsDomains = Option<Vec<cors::AccessControlAllowOrigin>>;
15+
use {utils, RequestMiddleware, RequestMiddlewareAction, CorsDomains, AllowedHosts};
2016

2117
/// jsonrpc http request handler.
2218
pub struct ServerHandler<M: Metadata = (), S: Middleware<M> = NoopMiddleware> {
@@ -119,6 +115,7 @@ impl<M> RpcPollState<M> {
119115
}
120116
}
121117
}
118+
122119
enum RpcHandlerState<M> {
123120
ReadingHeaders {
124121
request: server::Request,

http/src/lib.rs

Lines changed: 173 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
extern crate unicase;
2424
extern crate jsonrpc_core as jsonrpc;
2525
extern crate jsonrpc_server_utils as server_utils;
26+
extern crate net2;
2627

2728
pub extern crate hyper;
2829

@@ -39,6 +40,7 @@ use std::net::SocketAddr;
3940
use hyper::server;
4041
use jsonrpc::MetaIoHandler;
4142
use jsonrpc::futures::{self, Future, IntoFuture, BoxFuture, Stream};
43+
use jsonrpc::futures::sync::oneshot;
4244
use server_utils::reactor::{Remote, UninitializedRemote};
4345

4446
pub use server_utils::hosts::{Host, DomainsValidation};
@@ -198,15 +200,18 @@ impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> Clone for Rpc<M, S> {
198200
}
199201
}
200202

203+
type AllowedHosts = Option<Vec<Host>>;
204+
type CorsDomains = Option<Vec<AccessControlAllowOrigin>>;
201205

202206
/// Convenient JSON-RPC HTTP Server builder.
203207
pub struct ServerBuilder<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::NoopMiddleware> {
204208
handler: Arc<MetaIoHandler<M, S>>,
205209
remote: UninitializedRemote,
206210
meta_extractor: Arc<MetaExtractor<M>>,
207211
request_middleware: Arc<RequestMiddleware>,
208-
cors_domains: Option<Vec<AccessControlAllowOrigin>>,
209-
allowed_hosts: Option<Vec<Host>>,
212+
cors_domains: CorsDomains,
213+
allowed_hosts: AllowedHosts,
214+
threads: usize,
210215
}
211216

212217
const SENDER_PROOF: &'static str = "Server initialization awaits local address.";
@@ -230,15 +235,33 @@ impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {
230235
request_middleware: Arc::new(NoopRequestMiddleware::default()),
231236
cors_domains: None,
232237
allowed_hosts: None,
238+
threads: 1,
233239
}
234240
}
235241

236242
/// Utilize existing event loop remote to poll RPC results.
243+
/// Applies only to 1 of the threads. Other threads will spawn their own Event Loops.
237244
pub fn event_loop_remote(mut self, remote: tokio_core::reactor::Remote) -> Self {
238245
self.remote = UninitializedRemote::Shared(remote);
239246
self
240247
}
241248

249+
/// Sets number of threads of the server to run.
250+
/// Panics when set to `0`.
251+
#[cfg(not(unix))]
252+
pub fn threads(mut self, _threads: usize) -> Self {
253+
warn!("Multi-threaded server is not available on Windows. Falling back to single thread.");
254+
self
255+
}
256+
257+
/// Sets number of threads of the server to run.
258+
/// Panics when set to `0`.
259+
#[cfg(unix)]
260+
pub fn threads(mut self, threads: usize) -> Self {
261+
self.threads = threads;
262+
self
263+
}
264+
242265
/// Configures a list of allowed CORS origins.
243266
pub fn cors(mut self, cors_domains: DomainsValidation<AccessControlAllowOrigin>) -> Self {
244267
self.cors_domains = cors_domains.into();
@@ -274,88 +297,160 @@ impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {
274297
let cors_domains = self.cors_domains;
275298
let request_middleware = self.request_middleware;
276299
let allowed_hosts = self.allowed_hosts;
277-
278-
let eloop = self.remote.initialize()?;
279300
let jsonrpc_handler = Rpc {
280301
handler: self.handler,
281302
extractor: self.meta_extractor,
282303
};
304+
let reuse_port = self.threads > 1;
283305

284306
let (local_addr_tx, local_addr_rx) = mpsc::channel();
285-
let (close, shutdown_signal) = futures::sync::oneshot::channel();
286-
let addr = addr.to_owned();
287-
// TODO [ToDr] Consider spawning many threads like in minihttp?
288-
eloop.remote().spawn(move |handle| {
289-
let handle1 = handle.clone();
290-
let bind = move || {
291-
let listener = tokio_core::net::TcpListener::bind(&addr, &handle1)?;
292-
// Add current host to allowed headers.
293-
// NOTE: we need to use `l.local_addr()` instead of `addr`
294-
// it might be different!
295-
let local_addr = listener.local_addr()?;
296-
297-
Ok((listener, local_addr))
298-
};
299-
300-
let bind_result = match bind() {
301-
Ok((listener, local_addr)) => {
302-
// Send local address
303-
local_addr_tx.send(Ok(local_addr)).expect(SENDER_PROOF);
304-
305-
futures::future::ok((listener, local_addr))
306-
},
307-
Err(err) => {
308-
// Send error
309-
local_addr_tx.send(Err(err)).expect(SENDER_PROOF);
310-
311-
futures::future::err(())
312-
}
313-
};
314-
315-
let handle = handle.clone();
316-
bind_result.and_then(move |(listener, local_addr)| {
317-
let allowed_hosts = server_utils::hosts::update(allowed_hosts, &local_addr);
318-
319-
let http = server::Http::new();
320-
listener.incoming()
321-
.for_each(move |(socket, addr)| {
322-
http.bind_connection(&handle, socket, addr, ServerHandler::new(
323-
jsonrpc_handler.clone(),
324-
cors_domains.clone(),
325-
allowed_hosts.clone(),
326-
request_middleware.clone(),
327-
));
328-
Ok(())
329-
})
330-
.map_err(|e| {
331-
warn!("Incoming streams error, closing sever: {:?}", e);
332-
})
333-
.select(shutdown_signal.map_err(|e| {
334-
warn!("Shutdown signaller dropped, closing server: {:?}", e);
335-
}))
336-
.map(|_| ())
337-
.map_err(|_| ())
338-
})
339-
});
307+
let (close, shutdown_signal) = oneshot::channel();
308+
let eloop = self.remote.init_with_name("http.worker0")?;
309+
serve(
310+
(shutdown_signal, local_addr_tx),
311+
eloop.remote(),
312+
addr.to_owned(),
313+
cors_domains.clone(),
314+
request_middleware.clone(),
315+
allowed_hosts.clone(),
316+
jsonrpc_handler.clone(),
317+
reuse_port,
318+
);
319+
let handles = (0..self.threads - 1).map(|i| {
320+
let (local_addr_tx, local_addr_rx) = mpsc::channel();
321+
let (close, shutdown_signal) = oneshot::channel();
322+
let eloop = UninitializedRemote::Unspawned.init_with_name(format!("http.worker{}", i + 1))?;
323+
serve(
324+
(shutdown_signal, local_addr_tx),
325+
eloop.remote(),
326+
addr.to_owned(),
327+
cors_domains.clone(),
328+
request_middleware.clone(),
329+
allowed_hosts.clone(),
330+
jsonrpc_handler.clone(),
331+
reuse_port,
332+
);
333+
Ok((eloop, close, local_addr_rx))
334+
}).collect::<io::Result<Vec<_>>>()?;
340335

341336
// Wait for server initialization
342-
let local_addr: io::Result<SocketAddr> = local_addr_rx.recv().map_err(|_| {
343-
Error::Io(io::Error::new(io::ErrorKind::Interrupted, ""))
344-
})?;
337+
let local_addr = recv_address(local_addr_rx);
338+
// Wait for other threads as well.
339+
let mut handles = handles.into_iter().map(|(eloop, close, local_addr_rx)| {
340+
let _ = recv_address(local_addr_rx)?;
341+
Ok((eloop, close))
342+
}).collect::<io::Result<(Vec<_>)>>()?;
343+
handles.push((eloop, close));
344+
let (remotes, close) = handles.into_iter().unzip();
345345

346346
Ok(Server {
347347
address: local_addr?,
348-
remote: Some(eloop),
348+
remote: Some(remotes),
349349
close: Some(close),
350350
})
351351
}
352352
}
353353

354+
fn recv_address(local_addr_rx: mpsc::Receiver<io::Result<SocketAddr>>) -> io::Result<SocketAddr> {
355+
local_addr_rx.recv().map_err(|_| {
356+
io::Error::new(io::ErrorKind::Interrupted, "")
357+
})?
358+
}
359+
360+
fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
361+
signals: (oneshot::Receiver<()>, mpsc::Sender<io::Result<SocketAddr>>),
362+
remote: tokio_core::reactor::Remote,
363+
addr: SocketAddr,
364+
cors_domains: CorsDomains,
365+
request_middleware: Arc<RequestMiddleware>,
366+
allowed_hosts: AllowedHosts,
367+
jsonrpc_handler: Rpc<M, S>,
368+
reuse_port: bool,
369+
) {
370+
let (shutdown_signal, local_addr_tx) = signals;
371+
remote.spawn(move |handle| {
372+
let handle1 = handle.clone();
373+
let bind = move || {
374+
let listener = match addr {
375+
SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?,
376+
SocketAddr::V6(_) => net2::TcpBuilder::new_v6()?,
377+
};
378+
configure_port(reuse_port, &listener)?;
379+
listener.reuse_address(true)?;
380+
listener.bind(&addr)?;
381+
let listener = listener.listen(1024)?;
382+
let listener = tokio_core::net::TcpListener::from_listener(listener, &addr, &handle1)?;
383+
// Add current host to allowed headers.
384+
// NOTE: we need to use `l.local_addr()` instead of `addr`
385+
// it might be different!
386+
let local_addr = listener.local_addr()?;
387+
388+
Ok((listener, local_addr))
389+
};
390+
391+
let bind_result = match bind() {
392+
Ok((listener, local_addr)) => {
393+
// Send local address
394+
local_addr_tx.send(Ok(local_addr)).expect(SENDER_PROOF);
395+
396+
futures::future::ok((listener, local_addr))
397+
},
398+
Err(err) => {
399+
// Send error
400+
local_addr_tx.send(Err(err)).expect(SENDER_PROOF);
401+
402+
futures::future::err(())
403+
}
404+
};
405+
406+
let handle = handle.clone();
407+
bind_result.and_then(move |(listener, local_addr)| {
408+
let allowed_hosts = server_utils::hosts::update(allowed_hosts, &local_addr);
409+
410+
let http = server::Http::new();
411+
listener.incoming()
412+
.for_each(move |(socket, addr)| {
413+
http.bind_connection(&handle, socket, addr, ServerHandler::new(
414+
jsonrpc_handler.clone(),
415+
cors_domains.clone(),
416+
allowed_hosts.clone(),
417+
request_middleware.clone(),
418+
));
419+
Ok(())
420+
})
421+
.map_err(|e| {
422+
warn!("Incoming streams error, closing sever: {:?}", e);
423+
})
424+
.select(shutdown_signal.map_err(|e| {
425+
warn!("Shutdown signaller dropped, closing server: {:?}", e);
426+
}))
427+
.map(|_| ())
428+
.map_err(|_| ())
429+
})
430+
});
431+
}
432+
433+
#[cfg(unix)]
434+
fn configure_port(reuse: bool, tcp: &net2::TcpBuilder) -> io::Result<()> {
435+
use net2::unix::*;
436+
437+
if reuse {
438+
try!(tcp.reuse_port(true));
439+
}
440+
441+
Ok(())
442+
}
443+
444+
#[cfg(not(unix))]
445+
fn configure_port(_reuse: bool, _tcp: &net2::TcpBuilder) -> io::Result<()> {
446+
Ok(())
447+
}
448+
354449
/// jsonrpc http server instance
355450
pub struct Server {
356451
address: SocketAddr,
357-
remote: Option<Remote>,
358-
close: Option<futures::sync::oneshot::Sender<()>>,
452+
remote: Option<Vec<Remote>>,
453+
close: Option<Vec<oneshot::Sender<()>>>,
359454
}
360455

361456
const PROOF: &'static str = "Server is always Some until self is consumed.";
@@ -367,19 +462,28 @@ impl Server {
367462

368463
/// Closes the server.
369464
pub fn close(mut self) {
370-
let _ = self.close.take().expect(PROOF).send(());
371-
self.remote.take().expect(PROOF).close();
465+
for close in self.close.take().expect(PROOF) {
466+
let _ = close.send(());
467+
}
468+
469+
for remote in self.remote.take().expect(PROOF) {
470+
remote.close();
471+
}
372472
}
373473

374474
/// Will block, waiting for the server to finish.
375475
pub fn wait(mut self) {
376-
self.remote.take().expect(PROOF).wait();
476+
for remote in self.remote.take().expect(PROOF) {
477+
remote.wait();
478+
}
377479
}
378480
}
379481

380482
impl Drop for Server {
381483
fn drop(&mut self) {
382-
self.remote.take().map(|remote| remote.close());
484+
self.remote.take().map(|remotes| {
485+
for remote in remotes { remote.close(); }
486+
});
383487
}
384488
}
385489

minihttp/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {
133133
}
134134
}
135135

136-
/// Sets number of threads of the server to run.
136+
/// Sets number of threads of the server to run. (not available for windows)
137137
/// Panics when set to `0`.
138138
pub fn threads(mut self, threads: usize) -> Self {
139139
assert!(threads > 0);

0 commit comments

Comments
 (0)