Skip to content

Commit a7c4371

Browse files
Aditya Sharmaadi2011
Aditya Sharma
authored andcommitted
Handle PeerStorage Message and its Persistence
This commit introduces the handling and persistence of PeerStorage messages on a per-peer basis. The peer storage is stored within the PeerState to simplify management, ensuring we do not need to remove it when there are no active channels with the peer. Key changes include: - Add PeerStorage to PeerState for persistent storage. - Implement internal_peer_storage to manage PeerStorage and its updates. - Add resend logic in peer_connected() to resend PeerStorage before sending the channel reestablish message upon reconnection. - Update PeerState's write() and read() methods to support PeerStorage persistence.
1 parent 7bc6943 commit a7c4371

File tree

1 file changed

+75
-3
lines changed

1 file changed

+75
-3
lines changed

lightning/src/ln/channelmanager.rs

+75-3
Original file line numberDiff line numberDiff line change
@@ -1380,6 +1380,8 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
13801380
/// [`ChannelMessageHandler::peer_connected`] and no corresponding
13811381
/// [`ChannelMessageHandler::peer_disconnected`].
13821382
pub is_connected: bool,
1383+
/// Holds the peer storage data for the channel partner on a per-peer basis.
1384+
peer_storage: Vec<u8>,
13831385
}
13841386

13851387
impl <SP: Deref> PeerState<SP> where SP::Target: SignerProvider {
@@ -2848,6 +2850,13 @@ const MAX_UNFUNDED_CHANS_PER_PEER: usize = 4;
28482850
/// this many peers we reject new (inbound) channels from peers with which we don't have a channel.
28492851
const MAX_UNFUNDED_CHANNEL_PEERS: usize = 50;
28502852

2853+
/// The maximum allowed size for peer storage, in bytes.
2854+
///
2855+
/// This constant defines the upper limit for the size of data
2856+
/// that can be stored for a peer. It is set to 1024 bytes (1 kilobyte)
2857+
/// to prevent excessive resource consumption.
2858+
const MAX_PEER_STORAGE_SIZE: usize = 1024;
2859+
28512860
/// The maximum number of peers which we do not have a (funded) channel with. Once we reach this
28522861
/// many peers we reject new (inbound) connections.
28532862
const MAX_NO_CHANNEL_PEERS: usize = 250;
@@ -8221,11 +8230,49 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
82218230
}
82228231
}
82238232

8224-
fn internal_peer_storage_retrieval(&self, _counterparty_node_id: PublicKey, _msg: msgs::PeerStorageRetrieval) -> Result<(), MsgHandleErrInternal> {
8225-
Ok(())
8233+
fn internal_peer_storage_retrieval(&self, counterparty_node_id: PublicKey, _msg: msgs::PeerStorageRetrieval) -> Result<(), MsgHandleErrInternal> {
8234+
// TODO: Decrypt and check if have any stale or missing ChannelMonitor.
8235+
let logger = WithContext::from(&self.logger, Some(counterparty_node_id), None, None);
8236+
8237+
log_debug!(logger, "Received unexpected peer_storage_retrieval from {}. This is unusual since we do not yet distribute peer storage. Sending a warning.", log_pubkey!(counterparty_node_id));
8238+
8239+
Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Warn(
8240+
format!("Invalid peer_storage_retrieval message received.")
8241+
), ChannelId([0; 32])))
82268242
}
82278243

8228-
fn internal_peer_storage(&self, _counterparty_node_id: PublicKey, _msg: msgs::PeerStorage) -> Result<(), MsgHandleErrInternal> {
8244+
fn internal_peer_storage(&self, counterparty_node_id: PublicKey, msg: msgs::PeerStorage) -> Result<(), MsgHandleErrInternal> {
8245+
let per_peer_state = self.per_peer_state.read().unwrap();
8246+
let peer_state_mutex = per_peer_state.get(&counterparty_node_id)
8247+
.ok_or_else(|| {
8248+
debug_assert!(false);
8249+
MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), ChannelId([0; 32]))
8250+
})?;
8251+
8252+
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
8253+
let peer_state = &mut *peer_state_lock;
8254+
let logger = WithContext::from(&self.logger, Some(counterparty_node_id), None, None);
8255+
8256+
// Check if we have any channels with the peer (Currently we only provide the service to peers we have a channel with).
8257+
if !peer_state.channel_by_id.values().any(|phase| phase.is_funded()) {
8258+
log_debug!(logger, "Ignoring peer storage request from {} as we don't have any funded channels with them.", log_pubkey!(counterparty_node_id));
8259+
return Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Warn(
8260+
format!("Ignoring peer_storage message, as peer storage is currently supported only for peers with an active funded channel.")
8261+
), ChannelId([0; 32])));
8262+
}
8263+
8264+
#[cfg(not(test))]
8265+
if msg.data.len() > MAX_PEER_STORAGE_SIZE {
8266+
log_debug!(logger, "Sending warning to peer and ignoring peer storage request from {} as its over 1KiB", log_pubkey!(counterparty_node_id));
8267+
8268+
return Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Warn(
8269+
format!("Supports only data up to {} bytes in peer storage.", MAX_PEER_STORAGE_SIZE)
8270+
), ChannelId([0; 32])));
8271+
}
8272+
8273+
log_trace!(logger, "Received peer_storage from {}", log_pubkey!(counterparty_node_id));
8274+
peer_state.peer_storage = msg.data;
8275+
82298276
Ok(())
82308277
}
82318278

@@ -11732,6 +11779,7 @@ where
1173211779
actions_blocking_raa_monitor_updates: BTreeMap::new(),
1173311780
closed_channel_monitor_update_ids: BTreeMap::new(),
1173411781
is_connected: true,
11782+
peer_storage: Vec::new(),
1173511783
}));
1173611784
},
1173711785
hash_map::Entry::Occupied(e) => {
@@ -11761,6 +11809,15 @@ where
1176111809
let peer_state = &mut *peer_state_lock;
1176211810
let pending_msg_events = &mut peer_state.pending_msg_events;
1176311811

11812+
if !peer_state.peer_storage.is_empty() {
11813+
pending_msg_events.push(events::MessageSendEvent::SendPeerStorageRetrieval {
11814+
node_id: counterparty_node_id.clone(),
11815+
msg: msgs::PeerStorageRetrieval {
11816+
data: peer_state.peer_storage.clone()
11817+
},
11818+
});
11819+
}
11820+
1176411821
for (_, chan) in peer_state.channel_by_id.iter_mut() {
1176511822
let logger = WithChannelContext::from(&self.logger, &chan.context(), None);
1176611823
match chan.peer_connected_get_handshake(self.chain_hash, &&logger) {
@@ -12932,6 +12989,8 @@ where
1293212989
peer_states.push(peer_state_mutex.unsafe_well_ordered_double_lock_self());
1293312990
}
1293412991

12992+
let mut peer_storage_dir: Vec<(&PublicKey, &Vec<u8>)> = Vec::new();
12993+
1293512994
(serializable_peer_count).write(writer)?;
1293612995
for ((peer_pubkey, _), peer_state) in per_peer_state.iter().zip(peer_states.iter()) {
1293712996
// Peers which we have no channels to should be dropped once disconnected. As we
@@ -12941,6 +13000,8 @@ where
1294113000
if !peer_state.ok_to_remove(false) {
1294213001
peer_pubkey.write(writer)?;
1294313002
peer_state.latest_features.write(writer)?;
13003+
peer_storage_dir.push((peer_pubkey, &peer_state.peer_storage));
13004+
1294413005
if !peer_state.monitor_update_blocked_actions.is_empty() {
1294513006
monitor_update_blocked_actions_per_peer
1294613007
.get_or_insert_with(Vec::new)
@@ -13062,6 +13123,7 @@ where
1306213123
(14, decode_update_add_htlcs_opt, option),
1306313124
(15, self.inbound_payment_id_secret, required),
1306413125
(17, in_flight_monitor_updates, required),
13126+
(19, peer_storage_dir, optional_vec),
1306513127
});
1306613128

1306713129
Ok(())
@@ -13294,6 +13356,7 @@ where
1329413356
monitor_update_blocked_actions: BTreeMap::new(),
1329513357
actions_blocking_raa_monitor_updates: BTreeMap::new(),
1329613358
closed_channel_monitor_update_ids: BTreeMap::new(),
13359+
peer_storage: Vec::new(),
1329713360
is_connected: false,
1329813361
}
1329913362
};
@@ -13589,6 +13652,7 @@ where
1358913652
let mut in_flight_monitor_updates: Option<HashMap<(PublicKey, ChannelId), Vec<ChannelMonitorUpdate>>> = None;
1359013653
let mut decode_update_add_htlcs: Option<HashMap<u64, Vec<msgs::UpdateAddHTLC>>> = None;
1359113654
let mut inbound_payment_id_secret = None;
13655+
let mut peer_storage_dir: Option<Vec<(PublicKey, Vec<u8>)>> = None;
1359213656
read_tlv_fields!(reader, {
1359313657
(1, pending_outbound_payments_no_retry, option),
1359413658
(2, pending_intercepted_htlcs, option),
@@ -13605,8 +13669,10 @@ where
1360513669
(14, decode_update_add_htlcs, option),
1360613670
(15, inbound_payment_id_secret, option),
1360713671
(17, in_flight_monitor_updates, required),
13672+
(19, peer_storage_dir, optional_vec),
1360813673
});
1360913674
let mut decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map());
13675+
let peer_storage_dir: Vec<(PublicKey, Vec<u8>)> = peer_storage_dir.unwrap_or_else(Vec::new);
1361013676
if fake_scid_rand_bytes.is_none() {
1361113677
fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes());
1361213678
}
@@ -13638,6 +13704,12 @@ where
1363813704
}
1363913705
let pending_outbounds = OutboundPayments::new(pending_outbound_payments.unwrap());
1364013706

13707+
for (peer_pubkey, peer_storage) in peer_storage_dir {
13708+
if let Some(peer_state) = per_peer_state.get_mut(&peer_pubkey) {
13709+
peer_state.get_mut().unwrap().peer_storage = peer_storage;
13710+
}
13711+
}
13712+
1364113713
// Handle transitioning from the legacy TLV to the new one on upgrades.
1364213714
if let Some(legacy_in_flight_upds) = legacy_in_flight_monitor_updates {
1364313715
// We should never serialize an empty map.

0 commit comments

Comments
 (0)