Skip to content

Commit ad153f6

Browse files
Aditya SharmaAditya Sharma
Aditya Sharma
authored and
Aditya Sharma
committed
Add Message Handlers for Peer Storage
Introduce message types and handlers to enable the exchange of peer storage data between nodes. PeerStorageMessage: Used to send encrypted backups to peers. PeerStorageRetrievalMessage: Used to return a peer's stored data upon reconnection. - Define two new message types: PeerStorageMessage and PeerStorageRetrievalMessage. - Implement handlers for these messages in ChannelMessageHandler. - Add SendPeerStorageMessage and SendPeerStorageRetrievalMessage to MessageSendEvent.
1 parent 5acf925 commit ad153f6

File tree

8 files changed

+156
-0
lines changed

8 files changed

+156
-0
lines changed

lightning-net-tokio/src/lib.rs

+5
Original file line numberDiff line numberDiff line change
@@ -760,6 +760,11 @@ mod tests {
760760
fn handle_tx_init_rbf(&self, _their_node_id: PublicKey, _msg: &TxInitRbf) {}
761761
fn handle_tx_ack_rbf(&self, _their_node_id: PublicKey, _msg: &TxAckRbf) {}
762762
fn handle_tx_abort(&self, _their_node_id: PublicKey, _msg: &TxAbort) {}
763+
fn handle_peer_storage(&self, _their_node_id: PublicKey, _msg: &PeerStorageMessage) {}
764+
fn handle_peer_storage_retrieval(
765+
&self, _their_node_id: PublicKey, _msg: &PeerStorageRetrievalMessage,
766+
) {
767+
}
763768
fn peer_disconnected(&self, their_node_id: PublicKey) {
764769
if their_node_id == self.expected_pubkey {
765770
self.disconnected_flag.store(true, Ordering::SeqCst);

lightning/src/events/mod.rs

+17
Original file line numberDiff line numberDiff line change
@@ -2622,6 +2622,23 @@ pub enum MessageSendEvent {
26222622
/// The gossip_timestamp_filter which should be sent.
26232623
msg: msgs::GossipTimestampFilter,
26242624
},
2625+
/// Sends a channel partner Peer Storage of our backup which they should store.
2626+
/// This should be sent on each new connection to the channel partner or whenever we want
2627+
/// them to update the backup that they store.
2628+
SendPeerStorageMessage {
2629+
/// The node_id of this message recipient
2630+
node_id: PublicKey,
2631+
/// The PeerStorageMessage which should be sent.
2632+
msg: msgs::PeerStorageMessage,
2633+
},
2634+
/// Sends a channel partner their own peer storage which we store and update when they send
2635+
/// a [`msgs::PeerStorageMessage`].
2636+
SendPeerStorageRetrievalMessage {
2637+
/// The node_id of this message recipient
2638+
node_id: PublicKey,
2639+
/// The PeerStorageRetrievalMessage which should be sent.
2640+
msg: msgs::PeerStorageRetrievalMessage,
2641+
}
26252642
}
26262643

26272644
/// A trait indicating an object may generate message send events

lightning/src/ln/channelmanager.rs

+18
Original file line numberDiff line numberDiff line change
@@ -8170,6 +8170,10 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
81708170
}
81718171
}
81728172

8173+
fn internal_peer_storage_retrieval(&self, _counterparty_node_id: &PublicKey, _msg: &msgs::PeerStorageRetrievalMessage) {}
8174+
8175+
fn internal_peer_storage(&self, _counterparty_node_id: &PublicKey, _msg: &msgs::PeerStorageMessage) {}
8176+
81738177
fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> {
81748178
let best_block = *self.best_block.read().unwrap();
81758179
let per_peer_state = self.per_peer_state.read().unwrap();
@@ -11353,6 +11357,16 @@ where
1135311357
let _ = handle_error!(self, self.internal_funding_signed(&counterparty_node_id, msg), counterparty_node_id);
1135411358
}
1135511359

11360+
fn handle_peer_storage(&self, counterparty_node_id: PublicKey, msg: &msgs::PeerStorageMessage) {
11361+
let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || NotifyOption::SkipPersistNoEvents);
11362+
self.internal_peer_storage(&counterparty_node_id, msg);
11363+
}
11364+
11365+
fn handle_peer_storage_retrieval(&self, counterparty_node_id: PublicKey, msg: &msgs::PeerStorageRetrievalMessage) {
11366+
let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || NotifyOption::SkipPersistNoEvents);
11367+
self.internal_peer_storage_retrieval(&counterparty_node_id, msg);
11368+
}
11369+
1135611370
fn handle_channel_ready(&self, counterparty_node_id: PublicKey, msg: &msgs::ChannelReady) {
1135711371
// Note that we never need to persist the updated ChannelManager for an inbound
1135811372
// channel_ready message - while the channel's state will change, any channel_ready message
@@ -11594,6 +11608,10 @@ where
1159411608
&events::MessageSendEvent::SendShortIdsQuery { .. } => false,
1159511609
&events::MessageSendEvent::SendReplyChannelRange { .. } => false,
1159611610
&events::MessageSendEvent::SendGossipTimestampFilter { .. } => false,
11611+
11612+
// Peer Storage
11613+
&events::MessageSendEvent::SendPeerStorageMessage { .. } => false,
11614+
&events::MessageSendEvent::SendPeerStorageRetrievalMessage { .. } => false,
1159711615
}
1159811616
});
1159911617
debug_assert!(peer_state.is_connected, "A disconnected peer cannot disconnect");

lightning/src/ln/functional_test_utils.rs

+6
Original file line numberDiff line numberDiff line change
@@ -880,6 +880,12 @@ macro_rules! get_htlc_update_msgs {
880880
/// such messages are intended to all peers.
881881
pub fn remove_first_msg_event_to_node(msg_node_id: &PublicKey, msg_events: &mut Vec<MessageSendEvent>) -> MessageSendEvent {
882882
let ev_index = msg_events.iter().position(|e| { match e {
883+
MessageSendEvent::SendPeerStorageMessage { node_id, .. } => {
884+
node_id == msg_node_id
885+
},
886+
MessageSendEvent::SendPeerStorageRetrievalMessage { node_id, .. } => {
887+
node_id == msg_node_id
888+
},
883889
MessageSendEvent::SendAcceptChannel { node_id, .. } => {
884890
node_id == msg_node_id
885891
},

lightning/src/ln/msgs.rs

+64
Original file line numberDiff line numberDiff line change
@@ -725,6 +725,36 @@ pub struct UpdateFulfillHTLC {
725725
pub payment_preimage: PaymentPreimage,
726726
}
727727

728+
/// A [`PeerStorage`] message that can be sent to or received from a peer.
729+
///
730+
/// This message is used to distribute backup data to peers.
731+
/// If data is lost or corrupted, users can retrieve it through [`PeerStorageRetrievalMessage`]
732+
/// to recover critical information, such as channel states, for fund recovery.
733+
///
734+
/// [`PeerStorageMessage`] is used to send our own encrypted backup data to a peer.
735+
///
736+
/// [`PeerStorage`]: https://github.com/lightning/bolts/pull/1110
737+
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
738+
pub struct PeerStorageMessage {
739+
/// Our encrypted backup data included in the msg.
740+
pub data: Vec<u8>,
741+
}
742+
743+
/// A [`PeerStorageRetrievalMessage`] message that can be sent to or received from a peer.
744+
///
745+
/// This message is sent to peers for whom we store backup data.
746+
/// If we receive this message, it indicates that the peer had stored our backup data.
747+
/// This data can be used for fund recovery in case of data loss.
748+
///
749+
/// [`PeerStorageRetrievalMessage`] is used to send the most recent backup of the peer.
750+
///
751+
/// [`PeerStorageRetrievalMessage`]: https://github.com/lightning/bolts/pull/1110
752+
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
753+
pub struct PeerStorageRetrievalMessage {
754+
/// Most recent peer's data included in the msg.
755+
pub data: Vec<u8>,
756+
}
757+
728758
/// An [`update_fail_htlc`] message to be sent to or received from a peer.
729759
///
730760
/// [`update_fail_htlc`]: https://github.com/lightning/bolts/blob/master/02-peer-protocol.md#removing-an-htlc-update_fulfill_htlc-update_fail_htlc-and-update_fail_malformed_htlc
@@ -1507,6 +1537,12 @@ pub trait ChannelMessageHandler : MessageSendEventsProvider {
15071537
/// Handle an incoming `channel_ready` message from the given peer.
15081538
fn handle_channel_ready(&self, their_node_id: PublicKey, msg: &ChannelReady);
15091539

1540+
// Peer Storage
1541+
/// Handle an incoming `peer_storage` message from the given peer.
1542+
fn handle_peer_storage(&self, their_node_id: PublicKey, msg: &PeerStorageMessage);
1543+
/// Handle an incoming `peer_storage_retrieval` message from the given peer.
1544+
fn handle_peer_storage_retrieval(&self, their_node_id: PublicKey, msg: &PeerStorageRetrievalMessage);
1545+
15101546
// Channel close:
15111547
/// Handle an incoming `shutdown` message from the given peer.
15121548
fn handle_shutdown(&self, their_node_id: PublicKey, msg: &Shutdown);
@@ -2628,6 +2664,14 @@ impl_writeable_msg!(UpdateFulfillHTLC, {
26282664
payment_preimage
26292665
}, {});
26302666

2667+
impl_writeable_msg!(PeerStorageMessage, {
2668+
data
2669+
}, {});
2670+
2671+
impl_writeable_msg!(PeerStorageRetrievalMessage, {
2672+
data
2673+
}, {});
2674+
26312675
// Note that this is written as a part of ChannelManager objects, and thus cannot change its
26322676
// serialization format in a way which assumes we know the total serialized length/message end
26332677
// position.
@@ -4527,6 +4571,26 @@ mod tests {
45274571
assert_eq!(encoded_value, target_value);
45284572
}
45294573

4574+
#[test]
4575+
fn encoding_peer_storage() {
4576+
let peer_storage = msgs::PeerStorageMessage {
4577+
data: <Vec<u8>>::from_hex("01020304050607080910").unwrap()
4578+
};
4579+
let encoded_value = peer_storage.encode();
4580+
let target_value = <Vec<u8>>::from_hex("000a01020304050607080910").unwrap();
4581+
assert_eq!(encoded_value, target_value);
4582+
}
4583+
4584+
#[test]
4585+
fn encoding_peer_storage_retrieval() {
4586+
let peer_storage_retrieval = msgs::PeerStorageRetrievalMessage {
4587+
data: <Vec<u8>>::from_hex("01020304050607080910").unwrap()
4588+
};
4589+
let encoded_value = peer_storage_retrieval.encode();
4590+
let target_value = <Vec<u8>>::from_hex("000a01020304050607080910").unwrap();
4591+
assert_eq!(encoded_value, target_value);
4592+
}
4593+
45304594
#[test]
45314595
fn encoding_pong() {
45324596
let pong = msgs::Pong {

lightning/src/ln/peer_handler.rs

+16
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,8 @@ impl ChannelMessageHandler for ErroringMessageHandler {
324324
}
325325
// msgs::ChannelUpdate does not contain the channel_id field, so we just drop them.
326326
fn handle_channel_update(&self, _their_node_id: PublicKey, _msg: &msgs::ChannelUpdate) {}
327+
fn handle_peer_storage(&self, _their_node_id: PublicKey, _msg: &msgs::PeerStorageMessage) {}
328+
fn handle_peer_storage_retrieval(&self, _their_node_id: PublicKey, _msg: &msgs::PeerStorageRetrievalMessage) {}
327329
fn peer_disconnected(&self, _their_node_id: PublicKey) {}
328330
fn peer_connected(&self, _their_node_id: PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) }
329331
fn handle_error(&self, _their_node_id: PublicKey, _msg: &msgs::ErrorMessage) {}
@@ -1841,6 +1843,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
18411843
wire::Message::ChannelReady(msg) => {
18421844
self.message_handler.chan_handler.handle_channel_ready(their_node_id, &msg);
18431845
},
1846+
wire::Message::PeerStorageMessage(msg) => {
1847+
self.message_handler.chan_handler.handle_peer_storage(their_node_id, &msg);
1848+
},
1849+
wire::Message::PeerStorageRetrievalMessage(msg) => {
1850+
self.message_handler.chan_handler.handle_peer_storage_retrieval(their_node_id, &msg);
1851+
},
18441852

18451853
// Quiescence messages:
18461854
wire::Message::Stfu(msg) => {
@@ -2154,6 +2162,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
21542162
// robustly gossip broadcast events even if a peer's message buffer is full.
21552163
let mut handle_event = |event, from_chan_handler| {
21562164
match event {
2165+
MessageSendEvent::SendPeerStorageMessage { ref node_id, ref msg } => {
2166+
log_debug!(self.logger, "Handling SendPeerStorageMessage event in peer_handler for {}", log_pubkey!(node_id));
2167+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2168+
},
2169+
MessageSendEvent::SendPeerStorageRetrievalMessage { ref node_id, ref msg } => {
2170+
log_debug!(self.logger, "Handling SendPeerStorageRetrievalMessage event in peer_handler for {}", log_pubkey!(node_id));
2171+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2172+
},
21572173
MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => {
21582174
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
21592175
log_pubkey!(node_id),

lightning/src/ln/wire.rs

+20
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ pub(crate) enum Message<T> where T: core::fmt::Debug + Type + TestEq {
5353
Warning(msgs::WarningMessage),
5454
Ping(msgs::Ping),
5555
Pong(msgs::Pong),
56+
PeerStorageMessage(msgs::PeerStorageMessage),
57+
PeerStorageRetrievalMessage(msgs::PeerStorageRetrievalMessage),
5658
OpenChannel(msgs::OpenChannel),
5759
OpenChannelV2(msgs::OpenChannelV2),
5860
AcceptChannel(msgs::AcceptChannel),
@@ -111,6 +113,8 @@ impl<T> Writeable for Message<T> where T: core::fmt::Debug + Type + TestEq {
111113
&Message::Warning(ref msg) => msg.write(writer),
112114
&Message::Ping(ref msg) => msg.write(writer),
113115
&Message::Pong(ref msg) => msg.write(writer),
116+
&Message::PeerStorageMessage(ref msg) => msg.write(writer),
117+
&Message::PeerStorageRetrievalMessage(ref msg) => msg.write(writer),
114118
&Message::OpenChannel(ref msg) => msg.write(writer),
115119
&Message::OpenChannelV2(ref msg) => msg.write(writer),
116120
&Message::AcceptChannel(ref msg) => msg.write(writer),
@@ -169,6 +173,8 @@ impl<T> Type for Message<T> where T: core::fmt::Debug + Type + TestEq {
169173
&Message::Warning(ref msg) => msg.type_id(),
170174
&Message::Ping(ref msg) => msg.type_id(),
171175
&Message::Pong(ref msg) => msg.type_id(),
176+
&Message::PeerStorageMessage(ref msg) => msg.type_id(),
177+
&Message::PeerStorageRetrievalMessage(ref msg) => msg.type_id(),
172178
&Message::OpenChannel(ref msg) => msg.type_id(),
173179
&Message::OpenChannelV2(ref msg) => msg.type_id(),
174180
&Message::AcceptChannel(ref msg) => msg.type_id(),
@@ -261,6 +267,12 @@ fn do_read<R: io::Read, T, H: core::ops::Deref>(buffer: &mut R, message_type: u1
261267
msgs::Pong::TYPE => {
262268
Ok(Message::Pong(Readable::read(buffer)?))
263269
},
270+
msgs::PeerStorageMessage::TYPE => {
271+
Ok(Message::PeerStorageMessage(Readable::read(buffer)?))
272+
},
273+
msgs::PeerStorageRetrievalMessage::TYPE => {
274+
Ok(Message::PeerStorageRetrievalMessage(Readable::read(buffer)?))
275+
},
264276
msgs::OpenChannel::TYPE => {
265277
Ok(Message::OpenChannel(Readable::read(buffer)?))
266278
},
@@ -625,6 +637,14 @@ impl Encode for msgs::GossipTimestampFilter {
625637
const TYPE: u16 = 265;
626638
}
627639

640+
impl Encode for msgs::PeerStorageMessage {
641+
const TYPE: u16 = 7;
642+
}
643+
644+
impl Encode for msgs::PeerStorageRetrievalMessage {
645+
const TYPE: u16 = 9;
646+
}
647+
628648
#[cfg(test)]
629649
mod tests {
630650
use super::*;

lightning/src/util/test_utils.rs

+10
Original file line numberDiff line numberDiff line change
@@ -1085,6 +1085,16 @@ impl msgs::ChannelMessageHandler for TestChannelMessageHandler {
10851085
self.received_msg(wire::Message::TxAbort(msg.clone()));
10861086
}
10871087

1088+
fn handle_peer_storage(&self, _their_node_id: PublicKey, msg: &msgs::PeerStorageMessage) {
1089+
self.received_msg(wire::Message::PeerStorageMessage(msg.clone()));
1090+
}
1091+
1092+
fn handle_peer_storage_retrieval(
1093+
&self, _their_node_id: PublicKey, msg: &msgs::PeerStorageRetrievalMessage,
1094+
) {
1095+
self.received_msg(wire::Message::PeerStorageRetrievalMessage(msg.clone()));
1096+
}
1097+
10881098
fn message_received(&self) {}
10891099
}
10901100

0 commit comments

Comments
 (0)