Skip to content

Commit d95d7e1

Browse files
Aditya SharmaAditya Sharma
Aditya Sharma
authored and
Aditya Sharma
committed
Handle PeerStorage Message and its Persistence
This commit introduces the handling and persistence of PeerStorage messages on a per-peer basis. The peer storage is stored within the PeerState to simplify management, ensuring we do not need to remove it when there are no active channels with the peer. Key changes include: - Add PeerStorage to PeerState for persistent storage. - Implement internal_peer_storage to manage PeerStorage and its updates. - Add resend logic in peer_connected() to resend PeerStorage before sending the channel reestablish message upon reconnection. - Update PeerState's write() and read() methods to support PeerStorage persistence.
1 parent 726bbac commit d95d7e1

File tree

2 files changed

+58
-2
lines changed

2 files changed

+58
-2
lines changed

lightning/src/ln/channelmanager.rs

+56
Original file line numberDiff line numberDiff line change
@@ -1379,6 +1379,8 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
13791379
/// [`ChannelMessageHandler::peer_connected`] and no corresponding
13801380
/// [`ChannelMessageHandler::peer_disconnected`].
13811381
pub is_connected: bool,
1382+
/// Holds the peer storage data for the channel partner on a per-peer basis.
1383+
peer_storage: Vec<u8>,
13821384
}
13831385

13841386
impl <SP: Deref> PeerState<SP> where SP::Target: SignerProvider {
@@ -8169,6 +8171,44 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
81698171
}
81708172
}
81718173

8174+
fn internal_your_peer_storage(&self, _counterparty_node_id: &PublicKey, _msg: &msgs::YourPeerStorageMessage) {}
8175+
8176+
fn internal_peer_storage(&self, counterparty_node_id: &PublicKey, msg: &msgs::PeerStorageMessage) {
8177+
let per_peer_state = self.per_peer_state.read().unwrap();
8178+
let peer_state_mutex = match per_peer_state.get(counterparty_node_id) {
8179+
Some(peer_state_mutex) => peer_state_mutex,
8180+
None => return,
8181+
};
8182+
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
8183+
let peer_state = &mut *peer_state_lock;
8184+
let logger = WithContext::from(&self.logger, Some(*counterparty_node_id), None, None);
8185+
8186+
// Check if we have any channels with the peer (Currently we only provide the servie to peers we have a channel with).
8187+
if !peer_state.channel_by_id.values().any(|phase| phase.is_funded()) {
8188+
log_debug!(logger, "We do not have any channel with {}", log_pubkey!(counterparty_node_id));
8189+
return;
8190+
}
8191+
8192+
#[cfg(not(test))]
8193+
if msg.data.len() > 1024 {
8194+
log_debug!(logger, "We do not allow more than 1 KiB of data for each peer in peer storage. Sending warning to peer {}", log_pubkey!(counterparty_node_id));
8195+
peer_state.pending_msg_events.push(events::MessageSendEvent::HandleError {
8196+
node_id: counterparty_node_id.clone(),
8197+
action: msgs::ErrorAction::SendWarningMessage {
8198+
msg: msgs::WarningMessage {
8199+
channel_id: ChannelId([0; 32]),
8200+
data: "Supports only data up to 1 KiB in peer storage.".to_owned()
8201+
},
8202+
log_level: Level::Trace,
8203+
}
8204+
});
8205+
return;
8206+
}
8207+
8208+
log_trace!(logger, "Received Peer Storage from {}", log_pubkey!(counterparty_node_id));
8209+
peer_state.peer_storage = msg.data.clone();
8210+
}
8211+
81728212
fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> {
81738213
let best_block = *self.best_block.read().unwrap();
81748214
let per_peer_state = self.per_peer_state.read().unwrap();
@@ -11659,6 +11699,7 @@ where
1165911699
actions_blocking_raa_monitor_updates: BTreeMap::new(),
1166011700
closed_channel_monitor_update_ids: BTreeMap::new(),
1166111701
is_connected: true,
11702+
peer_storage: Vec::new(),
1166211703
}));
1166311704
},
1166411705
hash_map::Entry::Occupied(e) => {
@@ -11688,6 +11729,15 @@ where
1168811729
let peer_state = &mut *peer_state_lock;
1168911730
let pending_msg_events = &mut peer_state.pending_msg_events;
1169011731

11732+
if !peer_state.peer_storage.is_empty() {
11733+
pending_msg_events.push(events::MessageSendEvent::SendYourPeerStorageMessage {
11734+
node_id: counterparty_node_id.clone(),
11735+
msg: msgs::YourPeerStorageMessage {
11736+
data: peer_state.peer_storage.clone()
11737+
},
11738+
});
11739+
}
11740+
1169111741
for (_, chan) in peer_state.channel_by_id.iter_mut() {
1169211742
let logger = WithChannelContext::from(&self.logger, &chan.context(), None);
1169311743
match chan.peer_connected_get_handshake(self.chain_hash, &&logger) {
@@ -12854,6 +12904,9 @@ where
1285412904
if !peer_state.ok_to_remove(false) {
1285512905
peer_pubkey.write(writer)?;
1285612906
peer_state.latest_features.write(writer)?;
12907+
12908+
peer_state.peer_storage.write(writer)?;
12909+
1285712910
if !peer_state.monitor_update_blocked_actions.is_empty() {
1285812911
monitor_update_blocked_actions_per_peer
1285912912
.get_or_insert_with(Vec::new)
@@ -13204,6 +13257,7 @@ where
1320413257
monitor_update_blocked_actions: BTreeMap::new(),
1320513258
actions_blocking_raa_monitor_updates: BTreeMap::new(),
1320613259
closed_channel_monitor_update_ids: BTreeMap::new(),
13260+
peer_storage: Vec::new(),
1320713261
is_connected: false,
1320813262
}
1320913263
};
@@ -13429,8 +13483,10 @@ where
1342913483
for _ in 0..peer_count {
1343013484
let peer_pubkey: PublicKey = Readable::read(reader)?;
1343113485
let latest_features = Readable::read(reader)?;
13486+
let peer_storage = Readable::read(reader)?;
1343213487
if let Some(peer_state) = per_peer_state.get_mut(&peer_pubkey) {
1343313488
peer_state.get_mut().unwrap().latest_features = latest_features;
13489+
peer_state.get_mut().unwrap().peer_storage = peer_storage;
1343413490
}
1343513491
}
1343613492

lightning/src/ln/peer_handler.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -2164,11 +2164,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
21642164
match event {
21652165
MessageSendEvent::SendPeerStorageMessage { ref node_id, ref msg } => {
21662166
log_debug!(self.logger, "Handling SendPeerStorageMessage event in peer_handler for {}", log_pubkey!(node_id));
2167-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2167+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21682168
},
21692169
MessageSendEvent::SendYourPeerStorageMessage { ref node_id, ref msg } => {
21702170
log_debug!(self.logger, "Handling SendYourPeerStorageMessage event in peer_handler for {}", log_pubkey!(node_id));
2171-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2171+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21722172
},
21732173
MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => {
21742174
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",

0 commit comments

Comments
 (0)