Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions p2p/src/io/deadline.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::io;
use std::time::Duration;
use futures::{Future, Select, BoxFuture, Poll, Async};
use futures::{Future, Select, Poll, Async};
use tokio_core::reactor::{Handle, Timeout};

type DeadlineBox<F> = BoxFuture<DeadlineStatus<<F as Future>::Item>, <F as Future>::Error>;
type DeadlineBox<F> = Box<Future<Item = DeadlineStatus<<F as Future>::Item>, Error = <F as Future>::Error> + Send>;

pub fn deadline<F, T>(duration: Duration, handle: &Handle, future: F) -> Result<Deadline<F>, io::Error>
where F: Future<Item = T, Error = io::Error> + Send + 'static, T: 'static {
let timeout = try!(Timeout::new(duration, handle)).map(|_| DeadlineStatus::Timeout).boxed();
let future = future.map(DeadlineStatus::Meet).boxed();
let timeout: DeadlineBox<F> = Box::new(try!(Timeout::new(duration, handle)).map(|_| DeadlineStatus::Timeout));
let future: DeadlineBox<F> = Box::new(future.map(DeadlineStatus::Meet));
let deadline = Deadline {
future: timeout.select(future),
};
Expand All @@ -20,11 +20,11 @@ pub enum DeadlineStatus<T> {
Timeout,
}

pub struct Deadline<F> where F: Future {
pub struct Deadline<F> where F: Future + Send {
future: Select<DeadlineBox<F>, DeadlineBox<F>>,
}

impl<F, T> Future for Deadline<F> where F: Future<Item = T, Error = io::Error> {
impl<F, T> Future for Deadline<F> where F: Future<Item = T, Error = io::Error> + Send {
type Item = DeadlineStatus<T>;
type Error = io::Error;

Expand Down
60 changes: 28 additions & 32 deletions p2p/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{io, net, error, time};
use std::sync::Arc;
use std::net::SocketAddr;
use parking_lot::RwLock;
use futures::{Future, finished, failed, BoxFuture};
use futures::{Future, finished, failed};
use futures::stream::Stream;
use futures_cpupool::CpuPool;
use tokio_io::IoFuture;
Expand All @@ -20,7 +20,7 @@ use {Config, PeerId};
use protocol::{LocalSyncNodeRef, InboundSyncConnectionRef, OutboundSyncConnectionRef};
use io::DeadlineStatus;

pub type BoxedEmptyFuture = BoxFuture<(), ()>;
pub type BoxedEmptyFuture = Box<Future<Item=(), Error=()> + Send>;

/// Network context.
pub struct Context {
Expand Down Expand Up @@ -113,7 +113,7 @@ impl Context {
pub fn autoconnect(context: Arc<Context>, handle: &Handle) {
let c = context.clone();
// every 10 seconds connect to new peers (if needed)
let interval: BoxedEmptyFuture = Interval::new_at(time::Instant::now(), time::Duration::new(10, 0), handle).expect("Failed to create interval")
let interval: BoxedEmptyFuture = Box::new(Interval::new_at(time::Instant::now(), time::Duration::new(10, 0), handle).expect("Failed to create interval")
.and_then(move |_| {
// print traces
let ic = context.connection_counter.inbound_connections();
Expand Down Expand Up @@ -147,16 +147,15 @@ impl Context {
Ok(())
})
.for_each(|_| Ok(()))
.then(|_| finished(()))
.boxed();
.then(|_| finished(())));
c.spawn(interval);
}

/// Connect to socket using given context and handle.
fn connect_future<T>(context: Arc<Context>, socket: net::SocketAddr, handle: &Handle, config: &NetConfig) -> BoxedEmptyFuture where T: SessionFactory {
trace!("Trying to connect to: {}", socket);
let connection = connect(&socket, handle, config);
connection.then(move |result| {
Box::new(connection.then(move |result| {
match result {
Ok(DeadlineStatus::Meet(Ok(connection))) => {
// successfull hanshake
Expand All @@ -174,27 +173,26 @@ impl Context {
// TODO: close socket
context.node_table.write().note_failure(&socket);
context.connection_counter.note_close_outbound_connection();
finished(Ok(())).boxed()
Box::new(finished(Ok(())))
},
Ok(DeadlineStatus::Timeout) => {
// connection time out
trace!("Handshake with {} timed out", socket);
// TODO: close socket
context.node_table.write().note_failure(&socket);
context.connection_counter.note_close_outbound_connection();
finished(Ok(())).boxed()
Box::new(finished(Ok(())))
},
Err(_) => {
// network error
trace!("Unable to connect to {}", socket);
context.node_table.write().note_failure(&socket);
context.connection_counter.note_close_outbound_connection();
finished(Ok(())).boxed()
Box::new(finished(Ok(())))
}
}
})
.then(|_| finished(()))
.boxed()
.then(|_| finished(())))
}

/// Connect to socket using given context.
Expand All @@ -211,7 +209,7 @@ impl Context {
}

pub fn accept_connection_future(context: Arc<Context>, stream: TcpStream, socket: net::SocketAddr, handle: &Handle, config: NetConfig) -> BoxedEmptyFuture {
accept_connection(stream, handle, &config, socket).then(move |result| {
Box::new(accept_connection(stream, handle, &config, socket).then(move |result| {
match result {
Ok(DeadlineStatus::Meet(Ok(connection))) => {
// successfull hanshake
Expand All @@ -229,27 +227,26 @@ impl Context {
// TODO: close socket
context.node_table.write().note_failure(&socket);
context.connection_counter.note_close_inbound_connection();
finished(Ok(())).boxed()
Box::new(finished(Ok(())))
},
Ok(DeadlineStatus::Timeout) => {
// connection time out
trace!("Accepting handshake from {} timed out", socket);
// TODO: close socket
context.node_table.write().note_failure(&socket);
context.connection_counter.note_close_inbound_connection();
finished(Ok(())).boxed()
Box::new(finished(Ok(())))
},
Err(_) => {
// network error
trace!("Accepting handshake from {} failed with network error", socket);
context.node_table.write().note_failure(&socket);
context.connection_counter.note_close_inbound_connection();
finished(Ok(())).boxed()
Box::new(finished(Ok(())))
}
}
})
.then(|_| finished(()))
.boxed()
.then(|_| finished(())))
}

pub fn accept_connection(context: Arc<Context>, stream: TcpStream, socket: net::SocketAddr, config: NetConfig) {
Expand All @@ -263,7 +260,7 @@ impl Context {
pub fn listen(context: Arc<Context>, handle: &Handle, config: NetConfig) -> Result<BoxedEmptyFuture, io::Error> {
trace!("Starting tcp server");
let server = try!(TcpListener::bind(&config.local_address, handle));
let server = server.incoming()
let server = Box::new(server.incoming()
.and_then(move |(stream, socket)| {
// because we acquire atomic value twice,
// it may happen that accept slightly more connections than we need
Expand All @@ -277,14 +274,13 @@ impl Context {
Ok(())
})
.for_each(|_| Ok(()))
.then(|_| finished(()))
.boxed();
.then(|_| finished(())));
Ok(server)
}

/// Called on incomming mesage.
pub fn on_message(context: Arc<Context>, channel: Arc<Channel>) -> IoFuture<MessageResult<()>> {
channel.read_message().then(move |result| {
Box::new(channel.read_message().then(move |result| {
match result {
Ok(Ok((command, payload))) => {
// successful read
Expand All @@ -295,28 +291,28 @@ impl Context {
context.node_table.write().note_used(&channel.peer_info().address);
let on_message = Context::on_message(context.clone(), channel);
context.spawn(on_message);
finished(Ok(())).boxed()
Box::new(finished(Ok(())))
},
Err(err) => {
// protocol error
context.close_channel_with_error(channel.peer_info().id, &err);
finished(Err(err)).boxed()
Box::new(finished(Err(err)))
}
}
},
Ok(Err(err)) => {
// protocol error
context.close_channel_with_error(channel.peer_info().id, &err);
finished(Err(err)).boxed()
Box::new(finished(Err(err)))
},
Err(err) => {
// network error
// TODO: remote node was just turned off. should we mark it as not reliable?
context.close_channel_with_error(channel.peer_info().id, &err);
failed(err).boxed()
Box::new(failed(err))
}
}
}).boxed()
}))
}

/// Send message to a channel with given peer id.
Expand All @@ -331,7 +327,7 @@ impl Context {
None => {
// peer no longer exists.
// TODO: should we return error here?
finished(()).boxed()
Box::new(finished(()))
}
}
}
Expand All @@ -342,28 +338,28 @@ impl Context {
None => {
// peer no longer exists.
// TODO: should we return error here?
finished(()).boxed()
Box::new(finished(()))
}
}
}

/// Send message using given channel.
pub fn send<T>(_context: Arc<Context>, channel: Arc<Channel>, message: T) -> IoFuture<()> where T: AsRef<[u8]> + Send + 'static {
//trace!("Sending {} message to {}", T::command(), channel.peer_info().address);
channel.write_message(message).then(move |result| {
Box::new(channel.write_message(message).then(move |result| {
match result {
Ok(_) => {
// successful send
//trace!("Sent {} message to {}", T::command(), channel.peer_info().address);
finished(()).boxed()
Box::new(finished(()))
},
Err(err) => {
// network error
// closing connection is handled in on_message`
failed(err).boxed()
Box::new(failed(err))
},
}
}).boxed()
}))
}

/// Close channel with given peer info.
Expand Down
6 changes: 3 additions & 3 deletions sync/src/local_node.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;
use parking_lot::{Mutex, Condvar};
use time;
use futures::{Future, lazy, finished};
use futures::{lazy, finished};
use chain::{Transaction, IndexedTransaction, IndexedBlock};
use message::types;
use miner::BlockAssembler;
Expand Down Expand Up @@ -163,8 +163,8 @@ impl<T, U, V> LocalNode<T, U, V> where T: TaskExecutor, U: Server, V: Client {
let lazy_server_task = lazy(move || {
server.upgrade().map(|s| s.execute(server_task));
finished::<(), ()>(())
}).boxed();
self.client.after_peer_nearly_blocks_verified(peer_index, lazy_server_task);
});
self.client.after_peer_nearly_blocks_verified(peer_index, Box::new(lazy_server_task));
}

/// When peer is requesting for memory pool contents
Expand Down
4 changes: 2 additions & 2 deletions sync/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::sync::Arc;
use futures::BoxFuture;
use futures::Future;
use parking_lot::{Mutex, RwLock};
use db;
use local_node::LocalNode;
Expand All @@ -21,7 +21,7 @@ pub type RequestId = u32;
pub type PeerIndex = usize;

// No-error, no-result future
pub type EmptyBoxFuture = BoxFuture<(), ()>;
pub type EmptyBoxFuture = Box<Future<Item=(), Error=()> + Send>;

/// Reference to storage
pub type StorageRef = db::SharedStore;
Expand Down