Skip to content

Commit ec18493

Browse files
Aditya Sharmaadi2011
Aditya Sharma
authored andcommitted
Handle PeerStorage Message and its Persistence
This commit introduces the handling and persistence of PeerStorage messages on a per-peer basis. The peer storage is stored within the PeerState to simplify management, ensuring we do not need to remove it when there are no active channels with the peer. Key changes include: - Add PeerStorage to PeerState for persistent storage. - Implement internal_peer_storage to manage PeerStorage and its updates. - Add resend logic in peer_connected() to resend PeerStorage before sending the channel reestablish message upon reconnection. - Update PeerState's write() and read() methods to support PeerStorage persistence.
1 parent 81e89d8 commit ec18493

File tree

1 file changed

+75
-3
lines changed

1 file changed

+75
-3
lines changed

lightning/src/ln/channelmanager.rs

+75-3
Original file line numberDiff line numberDiff line change
@@ -1404,6 +1404,8 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
14041404
/// [`ChannelMessageHandler::peer_connected`] and no corresponding
14051405
/// [`ChannelMessageHandler::peer_disconnected`].
14061406
pub is_connected: bool,
1407+
/// Holds the peer storage data for the channel partner on a per-peer basis.
1408+
peer_storage: Vec<u8>,
14071409
}
14081410

14091411
impl <SP: Deref> PeerState<SP> where SP::Target: SignerProvider {
@@ -2872,6 +2874,13 @@ const MAX_UNFUNDED_CHANS_PER_PEER: usize = 4;
28722874
/// this many peers we reject new (inbound) channels from peers with which we don't have a channel.
28732875
const MAX_UNFUNDED_CHANNEL_PEERS: usize = 50;
28742876

2877+
/// The maximum allowed size for peer storage, in bytes.
2878+
///
2879+
/// This constant defines the upper limit for the size of data
2880+
/// that can be stored for a peer. It is set to 1024 bytes (1 kilobyte)
2881+
/// to prevent excessive resource consumption.
2882+
const MAX_PEER_STORAGE_SIZE: usize = 1024;
2883+
28752884
/// The maximum number of peers which we do not have a (funded) channel with. Once we reach this
28762885
/// many peers we reject new (inbound) connections.
28772886
const MAX_NO_CHANNEL_PEERS: usize = 250;
@@ -8245,11 +8254,49 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
82458254
}
82468255
}
82478256

8248-
fn internal_peer_storage_retrieval(&self, _counterparty_node_id: PublicKey, _msg: msgs::PeerStorageRetrieval) -> Result<(), MsgHandleErrInternal> {
8249-
Ok(())
8257+
fn internal_peer_storage_retrieval(&self, counterparty_node_id: PublicKey, _msg: msgs::PeerStorageRetrieval) -> Result<(), MsgHandleErrInternal> {
8258+
// TODO: Decrypt and check if have any stale or missing ChannelMonitor.
8259+
let logger = WithContext::from(&self.logger, Some(counterparty_node_id), None, None);
8260+
8261+
log_debug!(logger, "Received unexpected peer_storage_retrieval from {}. This is unusual since we do not yet distribute peer storage. Sending a warning.", log_pubkey!(counterparty_node_id));
8262+
8263+
Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Warn(
8264+
format!("Invalid peer_storage_retrieval message received.")
8265+
), ChannelId([0; 32])))
82508266
}
82518267

8252-
fn internal_peer_storage(&self, _counterparty_node_id: PublicKey, _msg: msgs::PeerStorage) -> Result<(), MsgHandleErrInternal> {
8268+
fn internal_peer_storage(&self, counterparty_node_id: PublicKey, msg: msgs::PeerStorage) -> Result<(), MsgHandleErrInternal> {
8269+
let per_peer_state = self.per_peer_state.read().unwrap();
8270+
let peer_state_mutex = per_peer_state.get(&counterparty_node_id)
8271+
.ok_or_else(|| {
8272+
debug_assert!(false);
8273+
MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), ChannelId([0; 32]))
8274+
})?;
8275+
8276+
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
8277+
let peer_state = &mut *peer_state_lock;
8278+
let logger = WithContext::from(&self.logger, Some(counterparty_node_id), None, None);
8279+
8280+
// Check if we have any channels with the peer (Currently we only provide the service to peers we have a channel with).
8281+
if !peer_state.channel_by_id.values().any(|phase| phase.is_funded()) {
8282+
log_debug!(logger, "Ignoring peer storage request from {} as we don't have any funded channels with them.", log_pubkey!(counterparty_node_id));
8283+
return Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Warn(
8284+
format!("Ignoring peer_storage message, as peer storage is currently supported only for peers with an active funded channel.")
8285+
), ChannelId([0; 32])));
8286+
}
8287+
8288+
#[cfg(not(test))]
8289+
if msg.data.len() > MAX_PEER_STORAGE_SIZE {
8290+
log_debug!(logger, "Sending warning to peer and ignoring peer storage request from {} as its over 1KiB", log_pubkey!(counterparty_node_id));
8291+
8292+
return Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Warn(
8293+
format!("Supports only data up to {} bytes in peer storage.", MAX_PEER_STORAGE_SIZE)
8294+
), ChannelId([0; 32])));
8295+
}
8296+
8297+
log_trace!(logger, "Received peer_storage from {}", log_pubkey!(counterparty_node_id));
8298+
peer_state.peer_storage = msg.data;
8299+
82538300
Ok(())
82548301
}
82558302

@@ -11758,6 +11805,7 @@ where
1175811805
actions_blocking_raa_monitor_updates: BTreeMap::new(),
1175911806
closed_channel_monitor_update_ids: BTreeMap::new(),
1176011807
is_connected: true,
11808+
peer_storage: Vec::new(),
1176111809
}));
1176211810
},
1176311811
hash_map::Entry::Occupied(e) => {
@@ -11787,6 +11835,15 @@ where
1178711835
let peer_state = &mut *peer_state_lock;
1178811836
let pending_msg_events = &mut peer_state.pending_msg_events;
1178911837

11838+
if !peer_state.peer_storage.is_empty() {
11839+
pending_msg_events.push(events::MessageSendEvent::SendPeerStorageRetrieval {
11840+
node_id: counterparty_node_id.clone(),
11841+
msg: msgs::PeerStorageRetrieval {
11842+
data: peer_state.peer_storage.clone()
11843+
},
11844+
});
11845+
}
11846+
1179011847
for (_, chan) in peer_state.channel_by_id.iter_mut() {
1179111848
let logger = WithChannelContext::from(&self.logger, &chan.context(), None);
1179211849
match chan.peer_connected_get_handshake(self.chain_hash, &&logger) {
@@ -12995,6 +13052,8 @@ where
1299513052
peer_states.push(peer_state_mutex.unsafe_well_ordered_double_lock_self());
1299613053
}
1299713054

13055+
let mut peer_storage_dir: Vec<(&PublicKey, &Vec<u8>)> = Vec::new();
13056+
1299813057
(serializable_peer_count).write(writer)?;
1299913058
for ((peer_pubkey, _), peer_state) in per_peer_state.iter().zip(peer_states.iter()) {
1300013059
// Peers which we have no channels to should be dropped once disconnected. As we
@@ -13004,6 +13063,8 @@ where
1300413063
if !peer_state.ok_to_remove(false) {
1300513064
peer_pubkey.write(writer)?;
1300613065
peer_state.latest_features.write(writer)?;
13066+
peer_storage_dir.push((peer_pubkey, &peer_state.peer_storage));
13067+
1300713068
if !peer_state.monitor_update_blocked_actions.is_empty() {
1300813069
monitor_update_blocked_actions_per_peer
1300913070
.get_or_insert_with(Vec::new)
@@ -13125,6 +13186,7 @@ where
1312513186
(14, decode_update_add_htlcs_opt, option),
1312613187
(15, self.inbound_payment_id_secret, required),
1312713188
(17, in_flight_monitor_updates, required),
13189+
(19, peer_storage_dir, optional_vec),
1312813190
});
1312913191

1313013192
Ok(())
@@ -13357,6 +13419,7 @@ where
1335713419
monitor_update_blocked_actions: BTreeMap::new(),
1335813420
actions_blocking_raa_monitor_updates: BTreeMap::new(),
1335913421
closed_channel_monitor_update_ids: BTreeMap::new(),
13422+
peer_storage: Vec::new(),
1336013423
is_connected: false,
1336113424
}
1336213425
};
@@ -13652,6 +13715,7 @@ where
1365213715
let mut in_flight_monitor_updates: Option<HashMap<(PublicKey, ChannelId), Vec<ChannelMonitorUpdate>>> = None;
1365313716
let mut decode_update_add_htlcs: Option<HashMap<u64, Vec<msgs::UpdateAddHTLC>>> = None;
1365413717
let mut inbound_payment_id_secret = None;
13718+
let mut peer_storage_dir: Option<Vec<(PublicKey, Vec<u8>)>> = None;
1365513719
read_tlv_fields!(reader, {
1365613720
(1, pending_outbound_payments_no_retry, option),
1365713721
(2, pending_intercepted_htlcs, option),
@@ -13668,8 +13732,10 @@ where
1366813732
(14, decode_update_add_htlcs, option),
1366913733
(15, inbound_payment_id_secret, option),
1367013734
(17, in_flight_monitor_updates, required),
13735+
(19, peer_storage_dir, optional_vec),
1367113736
});
1367213737
let mut decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map());
13738+
let peer_storage_dir: Vec<(PublicKey, Vec<u8>)> = peer_storage_dir.unwrap_or_else(Vec::new);
1367313739
if fake_scid_rand_bytes.is_none() {
1367413740
fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes());
1367513741
}
@@ -13701,6 +13767,12 @@ where
1370113767
}
1370213768
let pending_outbounds = OutboundPayments::new(pending_outbound_payments.unwrap());
1370313769

13770+
for (peer_pubkey, peer_storage) in peer_storage_dir {
13771+
if let Some(peer_state) = per_peer_state.get_mut(&peer_pubkey) {
13772+
peer_state.get_mut().unwrap().peer_storage = peer_storage;
13773+
}
13774+
}
13775+
1370413776
// Handle transitioning from the legacy TLV to the new one on upgrades.
1370513777
if let Some(legacy_in_flight_upds) = legacy_in_flight_monitor_updates {
1370613778
// We should never serialize an empty map.

0 commit comments

Comments
 (0)