diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 15fa105d0c0..8a026a12edd 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -10,12 +10,12 @@ use secp256k1::key::{SecretKey,PublicKey}; use ln::features::InitFeatures; use ln::msgs; -use ln::msgs::ChannelMessageHandler; +use ln::msgs::{ChannelMessageHandler, LightningError}; use ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager}; -use util::ser::VecWriter; +use util::ser::{VecWriter, Writeable}; use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep}; use ln::wire; -use ln::wire::Encode; +use ln::wire::{Encode, Message}; use util::byte_utils; use util::events::{MessageSendEvent, MessageSendEventsProvider}; use util::logger::Logger; @@ -203,6 +203,12 @@ pub struct PeerManager where CM::Target logger: Arc, } +enum MessageProcessingDecision { + Continue, + Disconnect(PeerHandleError), + Panic +} + macro_rules! encode_msg { ($msg: expr) => {{ let mut buffer = VecWriter(Vec::new()); @@ -607,187 +613,243 @@ impl PeerManager where log_trace!(self, "Received message of type {} from {}", message.type_id(), log_pubkey!(peer.their_node_id.unwrap())); - // Need an Init as first message - if let wire::Message::Init(_) = message { - } else if peer.their_features.is_none() { - log_trace!(self, "Peer {} sent non-Init first message", log_pubkey!(peer.their_node_id.unwrap())); - return Err(PeerHandleError{ no_connection_possible: false }); + let output_buffer_size = peer.pending_outbound_buffer.len(); + let message_action = self.handle_message(peer, message); + let new_output_buffer_size = peer.pending_outbound_buffer.len(); + if new_output_buffer_size > output_buffer_size { + // we have enqueued some messages, let's make the peers aware of it + peers.peers_needing_send.insert(peer_descriptor.clone()); } - match message { - // Setup and Control messages: - wire::Message::Init(msg) => { - if msg.features.requires_unknown_bits() { - log_info!(self, "Peer global features required unknown version bits"); - return Err(PeerHandleError{ no_connection_possible: true }); - } - if msg.features.requires_unknown_bits() { - log_info!(self, "Peer local features required unknown version bits"); - return Err(PeerHandleError{ no_connection_possible: true }); - } - if peer.their_features.is_some() { - return Err(PeerHandleError{ no_connection_possible: false }); - } + if let MessageProcessingDecision::Disconnect(peer_error) = message_action { + return Err(peer_error); + } + + } + } + } + } + } + + self.do_attempt_write_data(peer_descriptor, peer); + peer.pending_outbound_buffer.len() > 10 // pause_read + } + }; - log_info!(self, "Received peer Init message: data_loss_protect: {}, initial_routing_sync: {}, upfront_shutdown_script: {}, unkown local flags: {}, unknown global flags: {}", + pause_read + }; + + Ok(pause_read) + } + + fn enqueue_message(&self, peer: &mut Peer, message: &M){ + let mut encoded_message: Vec = vec![]; + wire::write(message, &mut encoded_message); + + log_trace!(self, "Encoding and sending message of type {} to {}", message.type_id().to_string(), log_pubkey!(peer.their_node_id.unwrap())); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..])); + } + + fn process_lightning_error(&self, peer: &mut Peer, error: LightningError) -> MessageProcessingDecision{ + match error.action { + msgs::ErrorAction::DisconnectPeer { msg: _ } => { + //TODO: Try to push msg + log_trace!(self, "Got Err handling message, disconnecting peer because {}", error.err); + return MessageProcessingDecision::Disconnect(PeerHandleError{ no_connection_possible: false }); + }, + msgs::ErrorAction::IgnoreError => { + log_trace!(self, "Got Err handling message, ignoring because {}", error.err); + return MessageProcessingDecision::Continue; + }, + msgs::ErrorAction::SendErrorMessage { msg } => { + log_trace!(self, "Got Err handling message, sending Error message because {}", error.err); + self.enqueue_message(peer, &msg); + return MessageProcessingDecision::Continue; + }, + } + } + + fn handle_message(&self, peer: &mut Peer, message: Message) -> MessageProcessingDecision { + + // Need an Init as first message + if let wire::Message::Init(_) = message {} else if peer.their_features.is_none() { + log_trace!(self, "Peer {} sent non-Init first message", log_pubkey!(peer.their_node_id.unwrap())); + return MessageProcessingDecision::Disconnect(PeerHandleError { no_connection_possible: false }); + } + + match message { + // Setup and Control messages: + wire::Message::Init(msg) => { + if msg.features.requires_unknown_bits() { + log_info!(self, "Peer global features required unknown version bits"); + return MessageProcessingDecision::Disconnect(PeerHandleError { no_connection_possible: true }); + } + if msg.features.requires_unknown_bits() { + log_info!(self, "Peer local features required unknown version bits"); + return MessageProcessingDecision::Disconnect(PeerHandleError { no_connection_possible: true }); + } + if peer.their_features.is_some() { + return MessageProcessingDecision::Disconnect(PeerHandleError { no_connection_possible: false }); + } + + log_info!(self, "Received peer Init message: data_loss_protect: {}, initial_routing_sync: {}, upfront_shutdown_script: {}, unkown local flags: {}, unknown global flags: {}", if msg.features.supports_data_loss_protect() { "supported" } else { "not supported"}, if msg.features.initial_routing_sync() { "requested" } else { "not requested" }, if msg.features.supports_upfront_shutdown_script() { "supported" } else { "not supported"}, if msg.features.supports_unknown_bits() { "present" } else { "none" }, if msg.features.supports_unknown_bits() { "present" } else { "none" }); - if msg.features.initial_routing_sync() { - peer.sync_status = InitSyncTracker::ChannelsSyncing(0); - peers.peers_needing_send.insert(peer_descriptor.clone()); - } - - if !peer.outbound { - let mut features = InitFeatures::supported(); - if self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) { - features.set_initial_routing_sync(); - } - - let resp = msgs::Init { features }; - encode_and_send_msg!(resp); - } + if msg.features.initial_routing_sync() { + peer.sync_status = InitSyncTracker::ChannelsSyncing(0); + } - self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap(), &msg); - peer.their_features = Some(msg.features); - }, - wire::Message::Error(msg) => { - let mut data_is_printable = true; - for b in msg.data.bytes() { - if b < 32 || b > 126 { - data_is_printable = false; - break; - } - } + if !peer.outbound { + let mut features = InitFeatures::supported(); + if self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) { + features.set_initial_routing_sync(); + } - if data_is_printable { - log_debug!(self, "Got Err message from {}: {}", log_pubkey!(peer.their_node_id.unwrap()), msg.data); - } else { - log_debug!(self, "Got Err message from {} with non-ASCII error message", log_pubkey!(peer.their_node_id.unwrap())); - } - self.message_handler.chan_handler.handle_error(&peer.their_node_id.unwrap(), &msg); - if msg.channel_id == [0; 32] { - return Err(PeerHandleError{ no_connection_possible: true }); - } - }, + let resp = msgs::Init { features }; + self.enqueue_message(peer, &resp); + } - wire::Message::Ping(msg) => { - if msg.ponglen < 65532 { - let resp = msgs::Pong { byteslen: msg.ponglen }; - encode_and_send_msg!(resp); - } - }, - wire::Message::Pong(_msg) => { - peer.awaiting_pong = false; - }, - - // Channel messages: - wire::Message::OpenChannel(msg) => { - self.message_handler.chan_handler.handle_open_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg); - }, - wire::Message::AcceptChannel(msg) => { - self.message_handler.chan_handler.handle_accept_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg); - }, - - wire::Message::FundingCreated(msg) => { - self.message_handler.chan_handler.handle_funding_created(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::FundingSigned(msg) => { - self.message_handler.chan_handler.handle_funding_signed(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::FundingLocked(msg) => { - self.message_handler.chan_handler.handle_funding_locked(&peer.their_node_id.unwrap(), &msg); - }, - - wire::Message::Shutdown(msg) => { - self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::ClosingSigned(msg) => { - self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg); - }, - - // Commitment messages: - wire::Message::UpdateAddHTLC(msg) => { - self.message_handler.chan_handler.handle_update_add_htlc(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::UpdateFulfillHTLC(msg) => { - self.message_handler.chan_handler.handle_update_fulfill_htlc(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::UpdateFailHTLC(msg) => { - self.message_handler.chan_handler.handle_update_fail_htlc(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::UpdateFailMalformedHTLC(msg) => { - self.message_handler.chan_handler.handle_update_fail_malformed_htlc(&peer.their_node_id.unwrap(), &msg); - }, - - wire::Message::CommitmentSigned(msg) => { - self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::RevokeAndACK(msg) => { - self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::UpdateFee(msg) => { - self.message_handler.chan_handler.handle_update_fee(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::ChannelReestablish(msg) => { - self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg); - }, - - // Routing messages: - wire::Message::AnnouncementSignatures(msg) => { - self.message_handler.chan_handler.handle_announcement_signatures(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::ChannelAnnouncement(msg) => { - let should_forward = try_potential_handleerror!(self.message_handler.route_handler.handle_channel_announcement(&msg)); - - if should_forward { - // TODO: forward msg along to all our other peers! - } - }, - wire::Message::NodeAnnouncement(msg) => { - let should_forward = try_potential_handleerror!(self.message_handler.route_handler.handle_node_announcement(&msg)); + self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap(), &msg); + peer.their_features = Some(msg.features); + }, + wire::Message::Error(msg) => { + let mut data_is_printable = true; + for b in msg.data.bytes() { + if b < 32 || b > 126 { + data_is_printable = false; + break; + } + } - if should_forward { - // TODO: forward msg along to all our other peers! - } - }, - wire::Message::ChannelUpdate(msg) => { - let should_forward = try_potential_handleerror!(self.message_handler.route_handler.handle_channel_update(&msg)); + if data_is_printable { + log_debug!(self, "Got Err message from {}: {}", log_pubkey!(peer.their_node_id.unwrap()), msg.data); + } else { + log_debug!(self, "Got Err message from {} with non-ASCII error message", log_pubkey!(peer.their_node_id.unwrap())); + } + self.message_handler.chan_handler.handle_error(&peer.their_node_id.unwrap(), &msg); + if msg.channel_id == [0; 32] { + return MessageProcessingDecision::Disconnect(PeerHandleError { no_connection_possible: true }); + } + }, - if should_forward { - // TODO: forward msg along to all our other peers! - } - }, - - // Unknown messages: - wire::Message::Unknown(msg_type) if msg_type.is_even() => { - log_debug!(self, "Received unknown even message of type {}, disconnecting peer!", msg_type); - // Fail the channel if message is an even, unknown type as per BOLT #1. - return Err(PeerHandleError{ no_connection_possible: true }); - }, - wire::Message::Unknown(msg_type) => { - log_trace!(self, "Received unknown odd message of type {}, ignoring", msg_type); - }, - } - } - } - } - } + wire::Message::Ping(msg) => { + if msg.ponglen < 65532 { + let resp = msgs::Pong { byteslen: msg.ponglen }; + self.enqueue_message(peer, &resp); + } + }, + wire::Message::Pong(_msg) => { + peer.awaiting_pong = false; + }, + + // Channel messages: + wire::Message::OpenChannel(msg) => { + self.message_handler.chan_handler.handle_open_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg); + }, + wire::Message::AcceptChannel(msg) => { + self.message_handler.chan_handler.handle_accept_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg); + }, + + wire::Message::FundingCreated(msg) => { + self.message_handler.chan_handler.handle_funding_created(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::FundingSigned(msg) => { + self.message_handler.chan_handler.handle_funding_signed(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::FundingLocked(msg) => { + self.message_handler.chan_handler.handle_funding_locked(&peer.their_node_id.unwrap(), &msg); + }, + + wire::Message::Shutdown(msg) => { + self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::ClosingSigned(msg) => { + self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg); + }, + + // Commitment messages: + wire::Message::UpdateAddHTLC(msg) => { + self.message_handler.chan_handler.handle_update_add_htlc(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::UpdateFulfillHTLC(msg) => { + self.message_handler.chan_handler.handle_update_fulfill_htlc(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::UpdateFailHTLC(msg) => { + self.message_handler.chan_handler.handle_update_fail_htlc(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::UpdateFailMalformedHTLC(msg) => { + self.message_handler.chan_handler.handle_update_fail_malformed_htlc(&peer.their_node_id.unwrap(), &msg); + }, + + wire::Message::CommitmentSigned(msg) => { + self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::RevokeAndACK(msg) => { + self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::UpdateFee(msg) => { + self.message_handler.chan_handler.handle_update_fee(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::ChannelReestablish(msg) => { + self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg); + }, + + // Routing messages: + wire::Message::AnnouncementSignatures(msg) => { + self.message_handler.chan_handler.handle_announcement_signatures(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::ChannelAnnouncement(msg) => { + let should_forward = match self.message_handler.route_handler.handle_channel_announcement(&msg) { + Ok(v) => v, + Err(e) => { + return self.process_lightning_error(peer, e); } + }; - self.do_attempt_write_data(peer_descriptor, peer); + if should_forward { + // TODO: forward msg along to all our other peers! + } + }, + wire::Message::NodeAnnouncement(msg) => { + let should_forward = match self.message_handler.route_handler.handle_node_announcement(&msg) { + Ok(v) => v, + Err(e) => { + return self.process_lightning_error(peer, e); + } + }; - peer.pending_outbound_buffer.len() > 10 // pause_read + if should_forward { + // TODO: forward msg along to all our other peers! } - }; + }, + wire::Message::ChannelUpdate(msg) => { + let should_forward = match self.message_handler.route_handler.handle_channel_update(&msg) { + Ok(v) => v, + Err(e) => { + return self.process_lightning_error(peer, e); + } + }; - pause_read + if should_forward { + // TODO: forward msg along to all our other peers! + } + }, + + // Unknown messages: + wire::Message::Unknown(msg_type) if msg_type.is_even() => { + log_debug!(self, "Received unknown even message of type {}, disconnecting peer!", msg_type); + // Fail the channel if message is an even, unknown type as per BOLT #1. + return MessageProcessingDecision::Disconnect(PeerHandleError { no_connection_possible: true }); + }, + wire::Message::Unknown(msg_type) => { + log_trace!(self, "Received unknown odd message of type {}, ignoring", msg_type); + }, }; - - Ok(pause_read) + MessageProcessingDecision::Continue } /// Checks for any events generated by our handlers and processes them. Includes sending most