Skip to content

Commit b1abb5d

Browse files
committed
Add support for batches of other types
While the spec only includes commitment_signed messages in batches, there may be other types of batches in the future. Generalize the message batching code to allow for other types in the future.
1 parent e4af811 commit b1abb5d

File tree

1 file changed

+63
-14
lines changed

1 file changed

+63
-14
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 63 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,28 @@ enum InitSyncTracker{
537537
NodesSyncing(NodeId),
538538
}
539539

540+
/// A batch of messages initiated when receiving a `start_batch` message.
541+
struct MessageBatch {
542+
/// The channel associated with all the messages in the batch.
543+
channel_id: ChannelId,
544+
545+
/// The number of messages expected to be in the batch.
546+
batch_size: usize,
547+
548+
/// The batch of messages, which should all be of the same type.
549+
messages: MessageBatchImpl,
550+
}
551+
552+
/// The representation of the message batch, which may different for each message type.
553+
enum MessageBatchImpl {
554+
/// Used before the first message in the batch is received, since the type of messages in the
555+
/// batch is not yet known.
556+
Unknown,
557+
558+
/// A batch of `commitment_signed` messages, where each has a unique `funding_txid`.
559+
CommitmentSigned(BTreeMap<Txid, msgs::CommitmentSigned>),
560+
}
561+
540562
/// The ratio between buffer sizes at which we stop sending initial sync messages vs when we stop
541563
/// forwarding gossip messages to peers altogether.
542564
const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO: usize = 2;
@@ -618,7 +640,7 @@ struct Peer {
618640

619641
inbound_connection: bool,
620642

621-
commitment_signed_batch: Option<(ChannelId, usize, BTreeMap<Txid, msgs::CommitmentSigned>)>,
643+
message_batch: Option<MessageBatch>,
622644
}
623645

624646
impl Peer {
@@ -1157,7 +1179,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
11571179
received_channel_announce_since_backlogged: false,
11581180
inbound_connection: false,
11591181

1160-
commitment_signed_batch: None,
1182+
message_batch: None,
11611183
}));
11621184
Ok(res)
11631185
}
@@ -1215,7 +1237,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
12151237
received_channel_announce_since_backlogged: false,
12161238
inbound_connection: true,
12171239

1218-
commitment_signed_batch: None,
1240+
message_batch: None,
12191241
}));
12201242
Ok(())
12211243
}
@@ -1771,7 +1793,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
17711793
// During splicing, commitment_signed messages need to be collected into a single batch
17721794
// before they are handled.
17731795
if let wire::Message::StartBatch(msg) = message {
1774-
if peer_lock.commitment_signed_batch.is_some() {
1796+
if peer_lock.message_batch.is_some() {
17751797
let error = format!("Peer {} sent start_batch for channel {} before previous batch completed", log_pubkey!(their_node_id), &msg.channel_id);
17761798
log_debug!(logger, "{}", error);
17771799
return Err(LightningError {
@@ -1816,15 +1838,29 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
18161838
}.into());
18171839
}
18181840

1819-
peer_lock.commitment_signed_batch = Some((msg.channel_id, batch_size, BTreeMap::new()));
1841+
let message_batch = MessageBatch {
1842+
channel_id: msg.channel_id,
1843+
batch_size: msg.batch_size as usize,
1844+
messages: MessageBatchImpl::Unknown,
1845+
};
1846+
peer_lock.message_batch = Some(message_batch);
18201847

18211848
return Ok(None);
18221849
}
18231850

18241851
if let wire::Message::CommitmentSigned(msg) = message {
1825-
if let Some((channel_id, batch_size, buffer)) = &mut peer_lock.commitment_signed_batch {
1826-
if msg.channel_id != *channel_id {
1827-
let error = format!("Peer {} sent batched commitment_signed for the wrong channel (expected: {}, actual: {})", log_pubkey!(their_node_id), channel_id, &msg.channel_id);
1852+
if let Some(message_batch) = &mut peer_lock.message_batch {
1853+
if let MessageBatchImpl::Unknown = message_batch.messages {
1854+
message_batch.messages = MessageBatchImpl::CommitmentSigned(BTreeMap::new());
1855+
}
1856+
1857+
let buffer = match &mut message_batch.messages {
1858+
MessageBatchImpl::Unknown => unreachable!(),
1859+
MessageBatchImpl::CommitmentSigned(ref mut messages) => messages,
1860+
};
1861+
1862+
if msg.channel_id != message_batch.channel_id {
1863+
let error = format!("Peer {} sent batched commitment_signed for the wrong channel (expected: {}, actual: {})", log_pubkey!(their_node_id), message_batch.channel_id, &msg.channel_id);
18281864
log_debug!(logger, "{}", error);
18291865
return Err(LightningError {
18301866
err: error.clone(),
@@ -1840,30 +1876,43 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
18401876
let funding_txid = match msg.funding_txid {
18411877
Some(funding_txid) => funding_txid,
18421878
None => {
1843-
log_debug!(logger, "Peer {} sent batched commitment_signed without a funding_txid for channel {}", log_pubkey!(their_node_id), channel_id);
1879+
log_debug!(logger, "Peer {} sent batched commitment_signed without a funding_txid for channel {}", log_pubkey!(their_node_id), message_batch.channel_id);
18441880
return Err(PeerHandleError { }.into());
18451881
},
18461882
};
18471883

18481884
match buffer.entry(funding_txid) {
18491885
btree_map::Entry::Vacant(entry) => { entry.insert(msg); },
18501886
btree_map::Entry::Occupied(_) => {
1851-
log_debug!(logger, "Peer {} sent batched commitment_signed with duplicate funding_txid {} for channel {}", log_pubkey!(their_node_id), funding_txid, channel_id);
1887+
log_debug!(logger, "Peer {} sent batched commitment_signed with duplicate funding_txid {} for channel {}", log_pubkey!(their_node_id), funding_txid, message_batch.channel_id);
18521888
return Err(PeerHandleError { }.into());
18531889
}
18541890
}
18551891

1856-
if buffer.len() == *batch_size {
1857-
let (channel_id, _, batch) = peer_lock.commitment_signed_batch.take().expect("batch should have been inserted");
1892+
if buffer.len() == message_batch.batch_size {
1893+
let MessageBatch { channel_id, batch_size: _, messages } = peer_lock.message_batch.take().expect("batch should have been inserted");
1894+
let batch = match messages {
1895+
MessageBatchImpl::Unknown => unreachable!(),
1896+
MessageBatchImpl::CommitmentSigned(messages) => messages,
1897+
};
1898+
18581899
return Ok(Some(LogicalMessage::CommitmentSignedBatch(channel_id, batch)));
18591900
} else {
18601901
return Ok(None);
18611902
}
18621903
} else {
18631904
return Ok(Some(LogicalMessage::FromWire(wire::Message::CommitmentSigned(msg))));
18641905
}
1865-
} else if peer_lock.commitment_signed_batch.is_some() {
1866-
log_debug!(logger, "Peer {} sent non-commitment_signed message when expecting batched commitment_signed", log_pubkey!(their_node_id));
1906+
} else if let Some(message_batch) = &peer_lock.message_batch {
1907+
match message_batch.messages {
1908+
MessageBatchImpl::Unknown => {
1909+
log_debug!(logger, "Peer {} sent an unexpected message for a batch", log_pubkey!(their_node_id));
1910+
},
1911+
MessageBatchImpl::CommitmentSigned(_) => {
1912+
log_debug!(logger, "Peer {} sent an unexpected message for a commitment_signed batch", log_pubkey!(their_node_id));
1913+
},
1914+
}
1915+
18671916
return Err(PeerHandleError { }.into());
18681917
}
18691918

0 commit comments

Comments
 (0)