Skip to content

Commit d4bbd42

Browse files
committed
Add support for batches of other types
While the spec only includes commitment_signed messages in batches, there may be other types of batches in the future. Generalize the message batching code to allow for other types in the future.
1 parent c8672e2 commit d4bbd42

File tree

2 files changed

+71
-15
lines changed

2 files changed

+71
-15
lines changed

lightning/src/ln/msgs.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -696,6 +696,8 @@ pub struct StartBatch {
696696
pub channel_id: ChannelId,
697697
/// The number of messages to follow.
698698
pub batch_size: u16,
699+
/// The type of all messages expected in the batch.
700+
pub message_type: Option<u16>,
699701
}
700702

701703
/// An [`update_add_htlc`] message to be sent to or received from a peer.
@@ -3099,7 +3101,9 @@ impl_writeable_msg!(PeerStorageRetrieval, { data }, {});
30993101
impl_writeable_msg!(StartBatch, {
31003102
channel_id,
31013103
batch_size
3102-
}, {});
3104+
}, {
3105+
(1, message_type, option)
3106+
});
31033107

31043108
// Note that this is written as a part of ChannelManager objects, and thus cannot change its
31053109
// serialization format in a way which assumes we know the total serialized length/message end

lightning/src/ln/peer_handler.rs

Lines changed: 66 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,24 @@ enum InitSyncTracker{
539539
NodesSyncing(NodeId),
540540
}
541541

542+
/// A batch of messages initiated when receiving a `start_batch` message.
543+
struct MessageBatch {
544+
/// The channel associated with all the messages in the batch.
545+
channel_id: ChannelId,
546+
547+
/// The number of messages expected to be in the batch.
548+
batch_size: usize,
549+
550+
/// The batch of messages, which should all be of the same type.
551+
messages: MessageBatchImpl,
552+
}
553+
554+
/// The representation of the message batch, which may different for each message type.
555+
enum MessageBatchImpl {
556+
/// A batch of `commitment_signed` messages, where each has a unique `funding_txid`.
557+
CommitmentSigned(BTreeMap<Txid, msgs::CommitmentSigned>),
558+
}
559+
542560
/// The ratio between buffer sizes at which we stop sending initial sync messages vs when we stop
543561
/// forwarding gossip messages to peers altogether.
544562
const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO: usize = 2;
@@ -620,7 +638,7 @@ struct Peer {
620638

621639
inbound_connection: bool,
622640

623-
commitment_signed_batch: Option<(ChannelId, usize, BTreeMap<Txid, msgs::CommitmentSigned>)>,
641+
message_batch: Option<MessageBatch>,
624642
}
625643

626644
impl Peer {
@@ -1159,7 +1177,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
11591177
received_channel_announce_since_backlogged: false,
11601178
inbound_connection: false,
11611179

1162-
commitment_signed_batch: None,
1180+
message_batch: None,
11631181
}));
11641182
Ok(res)
11651183
}
@@ -1217,7 +1235,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
12171235
received_channel_announce_since_backlogged: false,
12181236
inbound_connection: true,
12191237

1220-
commitment_signed_batch: None,
1238+
message_batch: None,
12211239
}));
12221240
Ok(())
12231241
}
@@ -1773,7 +1791,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
17731791
// During splicing, commitment_signed messages need to be collected into a single batch
17741792
// before they are handled.
17751793
if let wire::Message::StartBatch(msg) = message {
1776-
if peer_lock.commitment_signed_batch.is_some() {
1794+
if peer_lock.message_batch.is_some() {
17771795
let error = format!("Peer {} sent start_batch for channel {} before previous batch completed", log_pubkey!(their_node_id), &msg.channel_id);
17781796
log_debug!(logger, "{}", error);
17791797
return Err(LightningError {
@@ -1818,15 +1836,41 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
18181836
}.into());
18191837
}
18201838

1821-
peer_lock.commitment_signed_batch = Some((msg.channel_id, batch_size, BTreeMap::new()));
1839+
let messages = match msg.message_type {
1840+
Some(message_type) if message_type == msgs::CommitmentSigned::TYPE => {
1841+
MessageBatchImpl::CommitmentSigned(BTreeMap::new())
1842+
},
1843+
_ => {
1844+
let error = format!("Peer {} sent start_batch for channel {} without a known message type", log_pubkey!(their_node_id), &msg.channel_id);
1845+
log_debug!(logger, "{}", error);
1846+
return Err(LightningError {
1847+
err: error.clone(),
1848+
action: msgs::ErrorAction::DisconnectPeerWithWarning {
1849+
msg: msgs::WarningMessage {
1850+
channel_id: msg.channel_id,
1851+
data: error,
1852+
},
1853+
},
1854+
}.into());
1855+
},
1856+
};
1857+
1858+
let message_batch = MessageBatch {
1859+
channel_id: msg.channel_id,
1860+
batch_size,
1861+
messages,
1862+
};
1863+
peer_lock.message_batch = Some(message_batch);
18221864

18231865
return Ok(None);
18241866
}
18251867

18261868
if let wire::Message::CommitmentSigned(msg) = message {
1827-
if let Some((channel_id, batch_size, buffer)) = &mut peer_lock.commitment_signed_batch {
1828-
if msg.channel_id != *channel_id {
1829-
let error = format!("Peer {} sent batched commitment_signed for the wrong channel (expected: {}, actual: {})", log_pubkey!(their_node_id), channel_id, &msg.channel_id);
1869+
if let Some(message_batch) = &mut peer_lock.message_batch {
1870+
let MessageBatchImpl::CommitmentSigned(ref mut buffer) = &mut message_batch.messages;
1871+
1872+
if msg.channel_id != message_batch.channel_id {
1873+
let error = format!("Peer {} sent batched commitment_signed for the wrong channel (expected: {}, actual: {})", log_pubkey!(their_node_id), message_batch.channel_id, &msg.channel_id);
18301874
log_debug!(logger, "{}", error);
18311875
return Err(LightningError {
18321876
err: error.clone(),
@@ -1842,30 +1886,37 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
18421886
let funding_txid = match msg.funding_txid {
18431887
Some(funding_txid) => funding_txid,
18441888
None => {
1845-
log_debug!(logger, "Peer {} sent batched commitment_signed without a funding_txid for channel {}", log_pubkey!(their_node_id), channel_id);
1889+
log_debug!(logger, "Peer {} sent batched commitment_signed without a funding_txid for channel {}", log_pubkey!(their_node_id), message_batch.channel_id);
18461890
return Err(PeerHandleError { }.into());
18471891
},
18481892
};
18491893

18501894
match buffer.entry(funding_txid) {
18511895
btree_map::Entry::Vacant(entry) => { entry.insert(msg); },
18521896
btree_map::Entry::Occupied(_) => {
1853-
log_debug!(logger, "Peer {} sent batched commitment_signed with duplicate funding_txid {} for channel {}", log_pubkey!(their_node_id), funding_txid, channel_id);
1897+
log_debug!(logger, "Peer {} sent batched commitment_signed with duplicate funding_txid {} for channel {}", log_pubkey!(their_node_id), funding_txid, message_batch.channel_id);
18541898
return Err(PeerHandleError { }.into());
18551899
}
18561900
}
18571901

1858-
if buffer.len() == *batch_size {
1859-
let (channel_id, _, batch) = peer_lock.commitment_signed_batch.take().expect("batch should have been inserted");
1902+
if buffer.len() == message_batch.batch_size {
1903+
let MessageBatch { channel_id, batch_size: _, messages } = peer_lock.message_batch.take().expect("batch should have been inserted");
1904+
let MessageBatchImpl::CommitmentSigned(batch) = messages;
1905+
18601906
return Ok(Some(LogicalMessage::CommitmentSignedBatch(channel_id, batch)));
18611907
} else {
18621908
return Ok(None);
18631909
}
18641910
} else {
18651911
return Ok(Some(LogicalMessage::FromWire(wire::Message::CommitmentSigned(msg))));
18661912
}
1867-
} else if peer_lock.commitment_signed_batch.is_some() {
1868-
log_debug!(logger, "Peer {} sent non-commitment_signed message when expecting batched commitment_signed", log_pubkey!(their_node_id));
1913+
} else if let Some(message_batch) = &peer_lock.message_batch {
1914+
match message_batch.messages {
1915+
MessageBatchImpl::CommitmentSigned(_) => {
1916+
log_debug!(logger, "Peer {} sent an unexpected message for a commitment_signed batch", log_pubkey!(their_node_id));
1917+
},
1918+
}
1919+
18691920
return Err(PeerHandleError { }.into());
18701921
}
18711922

@@ -2465,6 +2516,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
24652516
let msg = msgs::StartBatch {
24662517
channel_id: *channel_id,
24672518
batch_size: commitment_signed.len() as u16,
2519+
message_type: Some(msgs::CommitmentSigned::TYPE),
24682520
};
24692521
self.enqueue_message(&mut *peer, &msg);
24702522
}

0 commit comments

Comments
 (0)