Skip to content

Commit 904c0d7

Browse files
Aditya Sharmaadi2011
Aditya Sharma
authored andcommitted
Handle PeerStorage Message and its Persistence
This commit introduces the handling and persistence of PeerStorage messages on a per-peer basis. The peer storage is stored within the PeerState to simplify management, ensuring we do not need to remove it when there are no active channels with the peer. Key changes include: - Add PeerStorage to PeerState for persistent storage. - Implement internal_peer_storage to manage PeerStorage and its updates. - Add resend logic in peer_connected() to resend PeerStorage before sending the channel reestablish message upon reconnection. - Update PeerState's write() and read() methods to support PeerStorage persistence.
1 parent ddc8e80 commit 904c0d7

File tree

1 file changed

+85
-2
lines changed

1 file changed

+85
-2
lines changed

lightning/src/ln/channelmanager.rs

+85-2
Original file line numberDiff line numberDiff line change
@@ -1380,6 +1380,8 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
13801380
/// [`ChannelMessageHandler::peer_connected`] and no corresponding
13811381
/// [`ChannelMessageHandler::peer_disconnected`].
13821382
pub is_connected: bool,
1383+
/// Holds the peer storage data for the channel partner on a per-peer basis.
1384+
peer_storage: Vec<u8>,
13831385
}
13841386

13851387
impl <SP: Deref> PeerState<SP> where SP::Target: SignerProvider {
@@ -8170,9 +8172,65 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
81708172
}
81718173
}
81728174

8173-
fn internal_peer_storage_retrieval(&self, _counterparty_node_id: &PublicKey, _msg: &msgs::PeerStorageRetrieval) {}
8175+
fn internal_peer_storage_retrieval(&self, counterparty_node_id: &PublicKey, _msg: &msgs::PeerStorageRetrieval) {
8176+
// TODO: Decrypt and check if have any stale or missing ChannelMonitor.
8177+
let per_peer_state = self.per_peer_state.read().unwrap();
8178+
let peer_state_mutex = match per_peer_state.get(counterparty_node_id) {
8179+
Some(peer_state_mutex) => peer_state_mutex,
8180+
None => return,
8181+
};
8182+
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
8183+
let peer_state = &mut *peer_state_lock;
8184+
let logger = WithContext::from(&self.logger, Some(*counterparty_node_id), None, None);
8185+
8186+
log_debug!(logger, "Received unexpected peer_storage_retrieval from {}. This is unusual since we do not yet distribute peer storage. Sending a warning.", log_pubkey!(counterparty_node_id));
8187+
peer_state.pending_msg_events.push(events::MessageSendEvent::HandleError {
8188+
node_id: counterparty_node_id.clone(),
8189+
action: msgs::ErrorAction::SendWarningMessage {
8190+
msg: msgs::WarningMessage {
8191+
channel_id: ChannelId([0; 32]),
8192+
data: "Invalid peer_storage_retrieval message received.".to_owned()
8193+
},
8194+
log_level: Level::Trace,
8195+
}
8196+
});
8197+
}
81748198

8175-
fn internal_peer_storage(&self, _counterparty_node_id: &PublicKey, _msg: &msgs::PeerStorage) {}
8199+
fn internal_peer_storage(&self, counterparty_node_id: &PublicKey, msg: &msgs::PeerStorage) {
8200+
let per_peer_state = self.per_peer_state.read().unwrap();
8201+
let peer_state_mutex = match per_peer_state.get(counterparty_node_id) {
8202+
Some(peer_state_mutex) => peer_state_mutex,
8203+
None => return,
8204+
};
8205+
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
8206+
let peer_state = &mut *peer_state_lock;
8207+
let logger = WithContext::from(&self.logger, Some(*counterparty_node_id), None, None);
8208+
8209+
// Check if we have any channels with the peer (Currently we only provide the servie to peers we have a channel with).
8210+
if !peer_state.channel_by_id.values().any(|phase| phase.is_funded()) {
8211+
log_debug!(logger, "Ignoring peer storage request from {} as we don't have any funded channels with them.", log_pubkey!(counterparty_node_id));
8212+
return;
8213+
}
8214+
8215+
#[cfg(not(test))]
8216+
if msg.data.len() > 1024 {
8217+
log_debug!(logger, "Sending warning to peer and ignoring peer storage request from {} as its over 1KiB", log_pubkey!(counterparty_node_id));
8218+
peer_state.pending_msg_events.push(events::MessageSendEvent::HandleError {
8219+
node_id: counterparty_node_id.clone(),
8220+
action: msgs::ErrorAction::SendWarningMessage {
8221+
msg: msgs::WarningMessage {
8222+
channel_id: ChannelId([0; 32]),
8223+
data: "Supports only data up to 1 KiB in peer storage.".to_owned()
8224+
},
8225+
log_level: Level::Trace,
8226+
}
8227+
});
8228+
return;
8229+
}
8230+
8231+
log_trace!(logger, "Received Peer Storage from {}", log_pubkey!(counterparty_node_id));
8232+
peer_state.peer_storage = msg.data.clone();
8233+
}
81768234

81778235
fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> {
81788236
let best_block = *self.best_block.read().unwrap();
@@ -11664,6 +11722,7 @@ where
1166411722
actions_blocking_raa_monitor_updates: BTreeMap::new(),
1166511723
closed_channel_monitor_update_ids: BTreeMap::new(),
1166611724
is_connected: true,
11725+
peer_storage: Vec::new(),
1166711726
}));
1166811727
},
1166911728
hash_map::Entry::Occupied(e) => {
@@ -11693,6 +11752,15 @@ where
1169311752
let peer_state = &mut *peer_state_lock;
1169411753
let pending_msg_events = &mut peer_state.pending_msg_events;
1169511754

11755+
if !peer_state.peer_storage.is_empty() {
11756+
pending_msg_events.push(events::MessageSendEvent::SendPeerStorageRetrieval {
11757+
node_id: counterparty_node_id.clone(),
11758+
msg: msgs::PeerStorageRetrieval {
11759+
data: peer_state.peer_storage.clone()
11760+
},
11761+
});
11762+
}
11763+
1169611764
for (_, chan) in peer_state.channel_by_id.iter_mut() {
1169711765
let logger = WithChannelContext::from(&self.logger, &chan.context(), None);
1169811766
match chan.peer_connected_get_handshake(self.chain_hash, &&logger) {
@@ -12850,6 +12918,8 @@ where
1285012918
peer_states.push(peer_state_mutex.unsafe_well_ordered_double_lock_self());
1285112919
}
1285212920

12921+
let mut peer_storage_dir: Vec<(&PublicKey, &Vec<u8>)> = Vec::new();
12922+
1285312923
(serializable_peer_count).write(writer)?;
1285412924
for ((peer_pubkey, _), peer_state) in per_peer_state.iter().zip(peer_states.iter()) {
1285512925
// Peers which we have no channels to should be dropped once disconnected. As we
@@ -12859,6 +12929,8 @@ where
1285912929
if !peer_state.ok_to_remove(false) {
1286012930
peer_pubkey.write(writer)?;
1286112931
peer_state.latest_features.write(writer)?;
12932+
peer_storage_dir.push((peer_pubkey, &peer_state.peer_storage));
12933+
1286212934
if !peer_state.monitor_update_blocked_actions.is_empty() {
1286312935
monitor_update_blocked_actions_per_peer
1286412936
.get_or_insert_with(Vec::new)
@@ -12980,6 +13052,7 @@ where
1298013052
(14, decode_update_add_htlcs_opt, option),
1298113053
(15, self.inbound_payment_id_secret, required),
1298213054
(17, in_flight_monitor_updates, required),
13055+
(19, peer_storage_dir, optional_vec),
1298313056
});
1298413057

1298513058
Ok(())
@@ -13212,6 +13285,7 @@ where
1321213285
monitor_update_blocked_actions: BTreeMap::new(),
1321313286
actions_blocking_raa_monitor_updates: BTreeMap::new(),
1321413287
closed_channel_monitor_update_ids: BTreeMap::new(),
13288+
peer_storage: Vec::new(),
1321513289
is_connected: false,
1321613290
}
1321713291
};
@@ -13507,6 +13581,7 @@ where
1350713581
let mut in_flight_monitor_updates: Option<HashMap<(PublicKey, ChannelId), Vec<ChannelMonitorUpdate>>> = None;
1350813582
let mut decode_update_add_htlcs: Option<HashMap<u64, Vec<msgs::UpdateAddHTLC>>> = None;
1350913583
let mut inbound_payment_id_secret = None;
13584+
let mut peer_storage_dir: Option<Vec<(PublicKey, Vec<u8>)>> = None;
1351013585
read_tlv_fields!(reader, {
1351113586
(1, pending_outbound_payments_no_retry, option),
1351213587
(2, pending_intercepted_htlcs, option),
@@ -13523,8 +13598,10 @@ where
1352313598
(14, decode_update_add_htlcs, option),
1352413599
(15, inbound_payment_id_secret, option),
1352513600
(17, in_flight_monitor_updates, required),
13601+
(19, peer_storage_dir, optional_vec),
1352613602
});
1352713603
let mut decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map());
13604+
let peer_storage_dir: Vec<(PublicKey, Vec<u8>)> = peer_storage_dir.unwrap_or_else(Vec::new);
1352813605
if fake_scid_rand_bytes.is_none() {
1352913606
fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes());
1353013607
}
@@ -13556,6 +13633,12 @@ where
1355613633
}
1355713634
let pending_outbounds = OutboundPayments::new(pending_outbound_payments.unwrap());
1355813635

13636+
for (peer_pubkey, peer_storage) in peer_storage_dir {
13637+
if let Some(peer_state) = per_peer_state.get_mut(&peer_pubkey) {
13638+
peer_state.get_mut().unwrap().peer_storage = peer_storage;
13639+
}
13640+
}
13641+
1355913642
// Handle transitioning from the legacy TLV to the new one on upgrades.
1356013643
if let Some(legacy_in_flight_upds) = legacy_in_flight_monitor_updates {
1356113644
// We should never serialize an empty map.

0 commit comments

Comments
 (0)