Skip to content

Commit 249aa77

Browse files
committed
Send channel_reestablish out-of-band to ensure ordered deliver
1 parent e2de49d commit 249aa77

File tree

5 files changed

+63
-22
lines changed

5 files changed

+63
-22
lines changed

src/ln/channelmanager.rs

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2676,9 +2676,10 @@ impl ChannelMessageHandler for ChannelManager {
26762676
}
26772677
}
26782678

2679-
fn peer_connected(&self, their_node_id: &PublicKey) -> Vec<msgs::ChannelReestablish> {
2680-
let mut res = Vec::new();
2681-
let mut channel_state = self.channel_state.lock().unwrap();
2679+
fn peer_connected(&self, their_node_id: &PublicKey) {
2680+
let mut channel_state_lock = self.channel_state.lock().unwrap();
2681+
let channel_state = channel_state_lock.borrow_parts();
2682+
let pending_msg_events = channel_state.pending_msg_events;
26822683
channel_state.by_id.retain(|_, chan| {
26832684
if chan.get_their_node_id() == *their_node_id {
26842685
if !chan.have_received_message() {
@@ -2688,13 +2689,15 @@ impl ChannelMessageHandler for ChannelManager {
26882689
// drop it.
26892690
false
26902691
} else {
2691-
res.push(chan.get_channel_reestablish());
2692+
pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
2693+
node_id: chan.get_their_node_id(),
2694+
msg: chan.get_channel_reestablish(),
2695+
});
26922696
true
26932697
}
26942698
} else { true }
26952699
});
26962700
//TODO: Also re-broadcast announcement_signatures
2697-
res
26982701
}
26992702

27002703
fn handle_error(&self, their_node_id: &PublicKey, msg: &msgs::ErrorMessage) {
@@ -5197,6 +5200,23 @@ mod tests {
51975200
assert_eq!(channel_state.short_to_id.len(), 0);
51985201
}
51995202

5203+
macro_rules! get_chan_reestablish_msgs {
5204+
($src_node: expr, $dst_node: expr) => {
5205+
{
5206+
let mut res = Vec::with_capacity(1);
5207+
for msg in $src_node.node.get_and_clear_pending_msg_events() {
5208+
if let MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } = msg {
5209+
assert_eq!(*node_id, $dst_node.node.get_our_node_id());
5210+
res.push(msg.clone());
5211+
} else {
5212+
panic!("Unexpected event")
5213+
}
5214+
}
5215+
res
5216+
}
5217+
}
5218+
}
5219+
52005220
macro_rules! handle_chan_reestablish_msgs {
52015221
($src_node: expr, $dst_node: expr) => {
52025222
{
@@ -5255,8 +5275,10 @@ mod tests {
52555275
/// pending_htlc_adds includes both the holding cell and in-flight update_add_htlcs, whereas
52565276
/// for claims/fails they are separated out.
52575277
fn reconnect_nodes(node_a: &Node, node_b: &Node, pre_all_htlcs: bool, pending_htlc_adds: (i64, i64), pending_htlc_claims: (usize, usize), pending_cell_htlc_claims: (usize, usize), pending_cell_htlc_fails: (usize, usize), pending_raa: (bool, bool)) {
5258-
let reestablish_1 = node_a.node.peer_connected(&node_b.node.get_our_node_id());
5259-
let reestablish_2 = node_b.node.peer_connected(&node_a.node.get_our_node_id());
5278+
node_a.node.peer_connected(&node_b.node.get_our_node_id());
5279+
let reestablish_1 = get_chan_reestablish_msgs!(node_a, node_b);
5280+
node_b.node.peer_connected(&node_a.node.get_our_node_id());
5281+
let reestablish_2 = get_chan_reestablish_msgs!(node_b, node_a);
52605282

52615283
let mut resp_1 = Vec::new();
52625284
for msg in reestablish_1 {
@@ -5754,9 +5776,11 @@ mod tests {
57545776
nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
57555777
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
57565778

5757-
let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
5779+
nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
5780+
let reestablish_1 = get_chan_reestablish_msgs!(nodes[0], nodes[1]);
57585781
assert_eq!(reestablish_1.len(), 1);
5759-
let reestablish_2 = nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
5782+
nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
5783+
let reestablish_2 = get_chan_reestablish_msgs!(nodes[1], nodes[0]);
57605784
assert_eq!(reestablish_2.len(), 1);
57615785

57625786
nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap();
@@ -6042,9 +6066,11 @@ mod tests {
60426066
nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
60436067
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
60446068

6045-
let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
6069+
nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
6070+
let reestablish_1 = get_chan_reestablish_msgs!(nodes[0], nodes[1]);
60466071
assert_eq!(reestablish_1.len(), 1);
6047-
let reestablish_2 = nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
6072+
nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
6073+
let reestablish_2 = get_chan_reestablish_msgs!(nodes[1], nodes[0]);
60486074
assert_eq!(reestablish_2.len(), 1);
60496075

60506076
nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap();
@@ -6062,9 +6088,11 @@ mod tests {
60626088
assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
60636089
assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
60646090

6065-
let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
6091+
nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
6092+
let reestablish_1 = get_chan_reestablish_msgs!(nodes[0], nodes[1]);
60666093
assert_eq!(reestablish_1.len(), 1);
6067-
let reestablish_2 = nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
6094+
nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
6095+
let reestablish_2 = get_chan_reestablish_msgs!(nodes[1], nodes[0]);
60686096
assert_eq!(reestablish_2.len(), 1);
60696097

60706098
nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap();

src/ln/msgs.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -308,14 +308,14 @@ pub struct UpdateFee {
308308
pub(crate) feerate_per_kw: u32,
309309
}
310310

311-
#[derive(PartialEq)]
311+
#[derive(PartialEq, Clone)]
312312
pub(crate) struct DataLossProtect {
313313
pub(crate) your_last_per_commitment_secret: [u8; 32],
314314
pub(crate) my_current_per_commitment_point: PublicKey,
315315
}
316316

317317
/// A channel_reestablish message to be sent or received from a peer
318-
#[derive(PartialEq)]
318+
#[derive(PartialEq, Clone)]
319319
pub struct ChannelReestablish {
320320
pub(crate) channel_id: [u8; 32],
321321
pub(crate) next_local_commitment_number: u64,
@@ -563,7 +563,7 @@ pub trait ChannelMessageHandler : events::MessageSendEventsProvider + Send + Syn
563563
fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool);
564564

565565
/// Handle a peer reconnecting, possibly generating channel_reestablish message(s).
566-
fn peer_connected(&self, their_node_id: &PublicKey) -> Vec<ChannelReestablish>;
566+
fn peer_connected(&self, their_node_id: &PublicKey);
567567
/// Handle an incoming channel_reestablish message from the given peer.
568568
fn handle_channel_reestablish(&self, their_node_id: &PublicKey, msg: &ChannelReestablish) -> Result<(), HandleError>;
569569

src/ln/peer_handler.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -520,9 +520,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
520520
}, 16);
521521
}
522522

523-
for msg in self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap()) {
524-
encode_and_send_msg!(msg, 136);
525-
}
523+
self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap());
526524
},
527525
17 => {
528526
let msg = try_potential_decodeerror!(msgs::ErrorMessage::read(&mut reader));
@@ -834,6 +832,16 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
834832
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 38)));
835833
Self::do_attempt_write_data(&mut descriptor, peer);
836834
},
835+
MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
836+
log_trace!(self, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
837+
log_pubkey!(node_id),
838+
log_bytes!(msg.channel_id));
839+
let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
840+
//TODO: Do whatever we're gonna do for handling dropped messages
841+
});
842+
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 136)));
843+
Self::do_attempt_write_data(&mut descriptor, peer);
844+
},
837845
MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
838846
log_trace!(self, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
839847
if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() {

src/util/events.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,13 @@ pub enum MessageSendEvent {
175175
/// The message which should be sent.
176176
msg: msgs::Shutdown,
177177
},
178+
/// Used to indicate that a channel_reestablish message should be sent to the peer with the given node_id.
179+
SendChannelReestablish {
180+
/// The node_id of the node which should receive this message
181+
node_id: PublicKey,
182+
/// The message which should be sent.
183+
msg: msgs::ChannelReestablish,
184+
},
178185
/// Used to indicate that a channel_announcement and channel_update should be broadcast to all
179186
/// peers (except the peer with node_id either msg.contents.node_id_1 or msg.contents.node_id_2).
180187
BroadcastChannelAnnouncement {

src/util/test_utils.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,7 @@ impl msgs::ChannelMessageHandler for TestChannelMessageHandler {
135135
Err(HandleError { err: "", action: None })
136136
}
137137
fn peer_disconnected(&self, _their_node_id: &PublicKey, _no_connection_possible: bool) {}
138-
fn peer_connected(&self, _their_node_id: &PublicKey) -> Vec<msgs::ChannelReestablish> {
139-
Vec::new()
140-
}
138+
fn peer_connected(&self, _their_node_id: &PublicKey) {}
141139
fn handle_error(&self, _their_node_id: &PublicKey, _msg: &msgs::ErrorMessage) {}
142140
}
143141

0 commit comments

Comments
 (0)