Skip to content

Commit b391b39

Browse files
committed
Add support for batches of other types
While the spec only includes commitment_signed messages in batches, there may be other types of batches in the future. Generalize the message batching code to allow for other types in the future.
1 parent 7467486 commit b391b39

File tree

2 files changed

+71
-15
lines changed

2 files changed

+71
-15
lines changed

lightning/src/ln/msgs.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -696,6 +696,8 @@ pub struct StartBatch {
696696
pub channel_id: ChannelId,
697697
/// The number of messages to follow.
698698
pub batch_size: u16,
699+
/// The type of all messages expected in the batch.
700+
pub message_type: Option<u16>,
699701
}
700702

701703
/// An [`update_add_htlc`] message to be sent to or received from a peer.
@@ -3084,7 +3086,9 @@ impl_writeable_msg!(PeerStorageRetrieval, { data }, {});
30843086
impl_writeable_msg!(StartBatch, {
30853087
channel_id,
30863088
batch_size
3087-
}, {});
3089+
}, {
3090+
(1, message_type, option)
3091+
});
30883092

30893093
// Note that this is written as a part of ChannelManager objects, and thus cannot change its
30903094
// serialization format in a way which assumes we know the total serialized length/message end

lightning/src/ln/peer_handler.rs

Lines changed: 66 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,24 @@ enum InitSyncTracker{
537537
NodesSyncing(NodeId),
538538
}
539539

540+
/// A batch of messages initiated when receiving a `start_batch` message.
541+
struct MessageBatch {
542+
/// The channel associated with all the messages in the batch.
543+
channel_id: ChannelId,
544+
545+
/// The number of messages expected to be in the batch.
546+
batch_size: usize,
547+
548+
/// The batch of messages, which should all be of the same type.
549+
messages: MessageBatchImpl,
550+
}
551+
552+
/// The representation of the message batch, which may different for each message type.
553+
enum MessageBatchImpl {
554+
/// A batch of `commitment_signed` messages, where each has a unique `funding_txid`.
555+
CommitmentSigned(BTreeMap<Txid, msgs::CommitmentSigned>),
556+
}
557+
540558
/// The ratio between buffer sizes at which we stop sending initial sync messages vs when we stop
541559
/// forwarding gossip messages to peers altogether.
542560
const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO: usize = 2;
@@ -618,7 +636,7 @@ struct Peer {
618636

619637
inbound_connection: bool,
620638

621-
commitment_signed_batch: Option<(ChannelId, usize, BTreeMap<Txid, msgs::CommitmentSigned>)>,
639+
message_batch: Option<MessageBatch>,
622640
}
623641

624642
impl Peer {
@@ -1157,7 +1175,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
11571175
received_channel_announce_since_backlogged: false,
11581176
inbound_connection: false,
11591177

1160-
commitment_signed_batch: None,
1178+
message_batch: None,
11611179
}));
11621180
Ok(res)
11631181
}
@@ -1215,7 +1233,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
12151233
received_channel_announce_since_backlogged: false,
12161234
inbound_connection: true,
12171235

1218-
commitment_signed_batch: None,
1236+
message_batch: None,
12191237
}));
12201238
Ok(())
12211239
}
@@ -1771,7 +1789,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
17711789
// During splicing, commitment_signed messages need to be collected into a single batch
17721790
// before they are handled.
17731791
if let wire::Message::StartBatch(msg) = message {
1774-
if peer_lock.commitment_signed_batch.is_some() {
1792+
if peer_lock.message_batch.is_some() {
17751793
let error = format!("Peer {} sent start_batch for channel {} before previous batch completed", log_pubkey!(their_node_id), &msg.channel_id);
17761794
log_debug!(logger, "{}", error);
17771795
return Err(LightningError {
@@ -1816,15 +1834,41 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
18161834
}.into());
18171835
}
18181836

1819-
peer_lock.commitment_signed_batch = Some((msg.channel_id, batch_size, BTreeMap::new()));
1837+
let messages = match msg.message_type {
1838+
Some(message_type) if message_type == msgs::CommitmentSigned::TYPE => {
1839+
MessageBatchImpl::CommitmentSigned(BTreeMap::new())
1840+
},
1841+
_ => {
1842+
let error = format!("Peer {} sent start_batch for channel {} without a known message type", log_pubkey!(their_node_id), &msg.channel_id);
1843+
log_debug!(logger, "{}", error);
1844+
return Err(LightningError {
1845+
err: error.clone(),
1846+
action: msgs::ErrorAction::DisconnectPeerWithWarning {
1847+
msg: msgs::WarningMessage {
1848+
channel_id: msg.channel_id,
1849+
data: error,
1850+
},
1851+
},
1852+
}.into());
1853+
},
1854+
};
1855+
1856+
let message_batch = MessageBatch {
1857+
channel_id: msg.channel_id,
1858+
batch_size,
1859+
messages,
1860+
};
1861+
peer_lock.message_batch = Some(message_batch);
18201862

18211863
return Ok(None);
18221864
}
18231865

18241866
if let wire::Message::CommitmentSigned(msg) = message {
1825-
if let Some((channel_id, batch_size, buffer)) = &mut peer_lock.commitment_signed_batch {
1826-
if msg.channel_id != *channel_id {
1827-
let error = format!("Peer {} sent batched commitment_signed for the wrong channel (expected: {}, actual: {})", log_pubkey!(their_node_id), channel_id, &msg.channel_id);
1867+
if let Some(message_batch) = &mut peer_lock.message_batch {
1868+
let MessageBatchImpl::CommitmentSigned(ref mut buffer) = &mut message_batch.messages;
1869+
1870+
if msg.channel_id != message_batch.channel_id {
1871+
let error = format!("Peer {} sent batched commitment_signed for the wrong channel (expected: {}, actual: {})", log_pubkey!(their_node_id), message_batch.channel_id, &msg.channel_id);
18281872
log_debug!(logger, "{}", error);
18291873
return Err(LightningError {
18301874
err: error.clone(),
@@ -1840,30 +1884,37 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
18401884
let funding_txid = match msg.funding_txid {
18411885
Some(funding_txid) => funding_txid,
18421886
None => {
1843-
log_debug!(logger, "Peer {} sent batched commitment_signed without a funding_txid for channel {}", log_pubkey!(their_node_id), channel_id);
1887+
log_debug!(logger, "Peer {} sent batched commitment_signed without a funding_txid for channel {}", log_pubkey!(their_node_id), message_batch.channel_id);
18441888
return Err(PeerHandleError { }.into());
18451889
},
18461890
};
18471891

18481892
match buffer.entry(funding_txid) {
18491893
btree_map::Entry::Vacant(entry) => { entry.insert(msg); },
18501894
btree_map::Entry::Occupied(_) => {
1851-
log_debug!(logger, "Peer {} sent batched commitment_signed with duplicate funding_txid {} for channel {}", log_pubkey!(their_node_id), funding_txid, channel_id);
1895+
log_debug!(logger, "Peer {} sent batched commitment_signed with duplicate funding_txid {} for channel {}", log_pubkey!(their_node_id), funding_txid, message_batch.channel_id);
18521896
return Err(PeerHandleError { }.into());
18531897
}
18541898
}
18551899

1856-
if buffer.len() == *batch_size {
1857-
let (channel_id, _, batch) = peer_lock.commitment_signed_batch.take().expect("batch should have been inserted");
1900+
if buffer.len() == message_batch.batch_size {
1901+
let MessageBatch { channel_id, batch_size: _, messages } = peer_lock.message_batch.take().expect("batch should have been inserted");
1902+
let MessageBatchImpl::CommitmentSigned(batch) = messages;
1903+
18581904
return Ok(Some(LogicalMessage::CommitmentSignedBatch(channel_id, batch)));
18591905
} else {
18601906
return Ok(None);
18611907
}
18621908
} else {
18631909
return Ok(Some(LogicalMessage::FromWire(wire::Message::CommitmentSigned(msg))));
18641910
}
1865-
} else if peer_lock.commitment_signed_batch.is_some() {
1866-
log_debug!(logger, "Peer {} sent non-commitment_signed message when expecting batched commitment_signed", log_pubkey!(their_node_id));
1911+
} else if let Some(message_batch) = &peer_lock.message_batch {
1912+
match message_batch.messages {
1913+
MessageBatchImpl::CommitmentSigned(_) => {
1914+
log_debug!(logger, "Peer {} sent an unexpected message for a commitment_signed batch", log_pubkey!(their_node_id));
1915+
},
1916+
}
1917+
18671918
return Err(PeerHandleError { }.into());
18681919
}
18691920

@@ -2463,6 +2514,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
24632514
let msg = msgs::StartBatch {
24642515
channel_id: *channel_id,
24652516
batch_size: commitment_signed.len() as u16,
2517+
message_type: Some(msgs::CommitmentSigned::TYPE),
24662518
};
24672519
self.enqueue_message(&mut *peer, &msg);
24682520
}

0 commit comments

Comments
 (0)