Skip to content

Commit c37e0f8

Browse files
committed
Move pending_offers_message to flows.rs
1 parent 8ee9a71 commit c37e0f8

File tree

3 files changed

+73
-90
lines changed

3 files changed

+73
-90
lines changed

lightning/src/ln/channelmanager.rs

+8-65
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ use bitcoin::{secp256k1, Sequence, Weight};
3535
use crate::events::FundingInfo;
3636
use crate::blinded_path::message::{AsyncPaymentsContext, MessageForwardNode};
3737
use crate::blinded_path::NodeIdLookUp;
38-
use crate::blinded_path::message::BlindedMessagePath;
3938
use crate::blinded_path::payment::{BlindedPaymentPath, PaymentConstraints, PaymentContext, UnauthenticatedReceiveTlvs};
4039
use crate::chain;
4140
use crate::chain::{Confirm, ChannelMonitorUpdateStatus, Watch, BestBlock};
@@ -67,15 +66,10 @@ use crate::ln::outbound_payment;
6766
use crate::ln::outbound_payment::{OutboundPayments, PendingOutboundPayment, RetryableInvoiceRequest, SendAlongPathArgs, StaleExpiration};
6867
use crate::offers::invoice::Bolt12Invoice;
6968
use crate::offers::invoice::UnsignedBolt12Invoice;
70-
use crate::offers::invoice_request::InvoiceRequest;
7169
use crate::offers::nonce::Nonce;
72-
use crate::offers::parse::Bolt12SemanticError;
7370
use crate::offers::signer;
74-
#[cfg(async_payments)]
75-
use crate::offers::static_invoice::StaticInvoice;
7671
use crate::onion_message::async_payments::{AsyncPaymentsMessage, HeldHtlcAvailable, ReleaseHeldHtlc, AsyncPaymentsMessageHandler};
77-
use crate::onion_message::messenger::{DefaultMessageRouter, Destination, MessageRouter, MessageSendInstructions, Responder, ResponseInstruction};
78-
use crate::onion_message::offers::OffersMessage;
72+
use crate::onion_message::messenger::{MessageRouter, MessageSendInstructions, Responder, ResponseInstruction};
7973
use crate::sign::{EntropySource, NodeSigner, Recipient, SignerProvider};
8074
use crate::sign::ecdsa::EcdsaChannelSigner;
8175
use crate::util::config::{UserConfig, ChannelConfig, ChannelConfigUpdate};
@@ -90,8 +84,15 @@ use crate::util::errors::APIError;
9084
#[cfg(feature = "dnssec")]
9185
use crate::onion_message::dns_resolution::{DNSResolverMessage, OMNameResolver};
9286

87+
#[cfg(async_payments)]
88+
use {
89+
crate::offers::static_invoice::StaticInvoice,
90+
crate::onion_message::messenger::Destination,
91+
};
92+
9393
#[cfg(not(c_bindings))]
9494
use {
95+
crate::onion_message::messenger::DefaultMessageRouter,
9596
crate::routing::router::DefaultRouter,
9697
crate::routing::gossip::NetworkGraph,
9798
crate::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters},
@@ -2145,8 +2146,6 @@ where
21452146
//
21462147
// Lock order tree:
21472148
//
2148-
// `pending_offers_messages`
2149-
//
21502149
// `pending_async_payments_messages`
21512150
//
21522151
// `total_consistency_lock`
@@ -2397,10 +2396,6 @@ where
23972396
event_persist_notifier: Notifier,
23982397
needs_persist_flag: AtomicBool,
23992398

2400-
#[cfg(not(any(test, feature = "_test_utils")))]
2401-
pending_offers_messages: Mutex<Vec<(OffersMessage, MessageSendInstructions)>>,
2402-
#[cfg(any(test, feature = "_test_utils"))]
2403-
pub(crate) pending_offers_messages: Mutex<Vec<(OffersMessage, MessageSendInstructions)>>,
24042399
pending_async_payments_messages: Mutex<Vec<(AsyncPaymentsMessage, MessageSendInstructions)>>,
24052400

24062401
/// Tracks the message events that are to be broadcasted when we are connected to some peer.
@@ -3320,7 +3315,6 @@ where
33203315
needs_persist_flag: AtomicBool::new(false),
33213316
funding_batch_states: Mutex::new(BTreeMap::new()),
33223317

3323-
pending_offers_messages: Mutex::new(Vec::new()),
33243318
pending_async_payments_messages: Mutex::new(Vec::new()),
33253319
pending_broadcast_messages: Mutex::new(Vec::new()),
33263320

@@ -9599,10 +9593,6 @@ where
95999593
MR::Target: MessageRouter,
96009594
L::Target: Logger,
96019595
{
9602-
fn get_pending_offers_messages(&self) -> MutexGuard<'_, Vec<(OffersMessage, MessageSendInstructions)>> {
9603-
self.pending_offers_messages.lock().expect("Mutex is locked by other thread.")
9604-
}
9605-
96069596
#[cfg(feature = "dnssec")]
96079597
fn get_pending_dns_onion_messages(&self) -> MutexGuard<'_, Vec<(DNSResolverMessage, MessageSendInstructions)>> {
96089598
self.pending_dns_onion_messages.lock().expect("Mutex is locked by other thread.")
@@ -9659,10 +9649,6 @@ where
96599649
self.pending_outbound_payments.release_invoice_requests_awaiting_invoice()
96609650
}
96619651

9662-
fn enqueue_invoice_request(&self, invoice_request: InvoiceRequest, reply_paths: Vec<BlindedMessagePath>) -> Result<(), Bolt12SemanticError> {
9663-
self.enqueue_invoice_request(invoice_request, reply_paths)
9664-
}
9665-
96669652
fn get_current_blocktime(&self) -> Duration {
96679653
Duration::from_secs(self.highest_seen_timestamp.load(Ordering::Acquire) as u64)
96689654
}
@@ -9709,13 +9695,6 @@ where
97099695
}
97109696
}
97119697

9712-
/// Defines the maximum number of [`OffersMessage`] including different reply paths to be sent
9713-
/// along different paths.
9714-
/// Sending multiple requests increases the chances of successful delivery in case some
9715-
/// paths are unavailable. However, only one invoice for a given [`PaymentId`] will be paid,
9716-
/// even if multiple invoices are received.
9717-
pub const OFFERS_MESSAGE_REQUEST_LIMIT: usize = 10;
9718-
97199698
impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, MR: Deref, L: Deref> ChannelManager<M, T, ES, NS, SP, F, R, MR, L>
97209699
where
97219700
M::Target: chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
@@ -9728,41 +9707,6 @@ where
97289707
MR::Target: MessageRouter,
97299708
L::Target: Logger,
97309709
{
9731-
fn enqueue_invoice_request(
9732-
&self,
9733-
invoice_request: InvoiceRequest,
9734-
reply_paths: Vec<BlindedMessagePath>,
9735-
) -> Result<(), Bolt12SemanticError> {
9736-
let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
9737-
if !invoice_request.paths().is_empty() {
9738-
reply_paths
9739-
.iter()
9740-
.flat_map(|reply_path| invoice_request.paths().iter().map(move |path| (path, reply_path)))
9741-
.take(OFFERS_MESSAGE_REQUEST_LIMIT)
9742-
.for_each(|(path, reply_path)| {
9743-
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
9744-
destination: Destination::BlindedPath(path.clone()),
9745-
reply_path: reply_path.clone(),
9746-
};
9747-
let message = OffersMessage::InvoiceRequest(invoice_request.clone());
9748-
pending_offers_messages.push((message, instructions));
9749-
});
9750-
} else if let Some(node_id) = invoice_request.issuer_signing_pubkey() {
9751-
for reply_path in reply_paths {
9752-
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
9753-
destination: Destination::Node(node_id),
9754-
reply_path,
9755-
};
9756-
let message = OffersMessage::InvoiceRequest(invoice_request.clone());
9757-
pending_offers_messages.push((message, instructions));
9758-
}
9759-
} else {
9760-
debug_assert!(false);
9761-
return Err(Bolt12SemanticError::MissingIssuerSigningPubkey);
9762-
}
9763-
Ok(())
9764-
}
9765-
97669710
/// Gets a payment secret and payment hash for use in an invoice given to a third party wishing
97679711
/// to pay us.
97689712
///
@@ -13206,7 +13150,6 @@ where
1320613150

1320713151
funding_batch_states: Mutex::new(BTreeMap::new()),
1320813152

13209-
pending_offers_messages: Mutex::new(Vec::new()),
1321013153
pending_async_payments_messages: Mutex::new(Vec::new()),
1321113154

1321213155
pending_broadcast_messages: Mutex::new(Vec::new()),

lightning/src/ln/offers_tests.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -1325,7 +1325,7 @@ fn fails_authentication_when_handling_invoice_request() {
13251325
expect_recent_payment!(david, RecentPaymentDetails::AwaitingInvoice, payment_id);
13261326

13271327
connect_peers(david, alice);
1328-
match &mut david.node.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
1328+
match &mut david.offers_handler.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
13291329
MessageSendInstructions::WithSpecifiedReplyPath { destination, .. } =>
13301330
*destination = Destination::Node(alice_id),
13311331
_ => panic!(),
@@ -1350,7 +1350,7 @@ fn fails_authentication_when_handling_invoice_request() {
13501350
.unwrap();
13511351
expect_recent_payment!(david, RecentPaymentDetails::AwaitingInvoice, payment_id);
13521352

1353-
match &mut david.node.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
1353+
match &mut david.offers_handler.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
13541354
MessageSendInstructions::WithSpecifiedReplyPath { destination, .. } =>
13551355
*destination = Destination::BlindedPath(invalid_path),
13561356
_ => panic!(),
@@ -1430,7 +1430,7 @@ fn fails_authentication_when_handling_invoice_for_offer() {
14301430

14311431
// Don't send the invoice request, but grab its reply path to use with a different request.
14321432
let invalid_reply_path = {
1433-
let mut pending_offers_messages = david.node.pending_offers_messages.lock().unwrap();
1433+
let mut pending_offers_messages = david.offers_handler.pending_offers_messages.lock().unwrap();
14341434
let pending_invoice_request = pending_offers_messages.pop().unwrap();
14351435
pending_offers_messages.clear();
14361436
match pending_invoice_request.1 {
@@ -1447,7 +1447,7 @@ fn fails_authentication_when_handling_invoice_for_offer() {
14471447
// Swap out the reply path to force authentication to fail when handling the invoice since it
14481448
// will be sent over the wrong blinded path.
14491449
{
1450-
let mut pending_offers_messages = david.node.pending_offers_messages.lock().unwrap();
1450+
let mut pending_offers_messages = david.offers_handler.pending_offers_messages.lock().unwrap();
14511451
let mut pending_invoice_request = pending_offers_messages.first_mut().unwrap();
14521452
match &mut pending_invoice_request.1 {
14531453
MessageSendInstructions::WithSpecifiedReplyPath { reply_path, .. } =>
@@ -1534,7 +1534,7 @@ fn fails_authentication_when_handling_invoice_for_refund() {
15341534
let expected_invoice = alice.offers_handler.request_refund_payment(&refund).unwrap();
15351535

15361536
connect_peers(david, alice);
1537-
match &mut alice.node.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
1537+
match &mut alice.offers_handler.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
15381538
MessageSendInstructions::WithSpecifiedReplyPath { destination, .. } =>
15391539
*destination = Destination::Node(david_id),
15401540
_ => panic!(),
@@ -1565,7 +1565,7 @@ fn fails_authentication_when_handling_invoice_for_refund() {
15651565

15661566
let expected_invoice = alice.offers_handler.request_refund_payment(&refund).unwrap();
15671567

1568-
match &mut alice.node.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
1568+
match &mut alice.offers_handler.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
15691569
MessageSendInstructions::WithSpecifiedReplyPath { destination, .. } =>
15701570
*destination = Destination::BlindedPath(invalid_path),
15711571
_ => panic!(),
@@ -2156,7 +2156,7 @@ fn fails_paying_invoice_with_unknown_required_features() {
21562156
destination: Destination::BlindedPath(reply_path),
21572157
};
21582158
let message = OffersMessage::Invoice(invoice);
2159-
alice.node.pending_offers_messages.lock().unwrap().push((message, instructions));
2159+
alice.offers_handler.pending_offers_messages.lock().unwrap().push((message, instructions));
21602160

21612161
let onion_message = alice.onion_messenger.next_onion_message_for_peer(charlie_id).unwrap();
21622162
charlie.onion_messenger.handle_onion_message(alice_id, &onion_message);

lightning/src/offers/flow.rs

+58-18
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,7 @@ use crate::blinded_path::payment::{
2626
BlindedPaymentPath, Bolt12OfferContext, Bolt12RefundContext, PaymentContext,
2727
};
2828
use crate::events::PaymentFailureReason;
29-
use crate::ln::channelmanager::{
30-
Bolt12PaymentError, PaymentId, Verification, OFFERS_MESSAGE_REQUEST_LIMIT,
31-
};
29+
use crate::ln::channelmanager::{Bolt12PaymentError, PaymentId, Verification};
3230
use crate::ln::inbound_payment;
3331
use crate::ln::outbound_payment::{Retry, RetryableInvoiceRequest, StaleExpiration};
3432
use crate::offers::invoice::{
@@ -77,11 +75,6 @@ use {
7775
///
7876
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
7977
pub trait OffersMessageCommons {
80-
/// Get pending offers messages
81-
fn get_pending_offers_messages(
82-
&self,
83-
) -> MutexGuard<'_, Vec<(OffersMessage, MessageSendInstructions)>>;
84-
8578
#[cfg(feature = "dnssec")]
8679
/// Get pending DNS onion messages
8780
fn get_pending_dns_onion_messages(
@@ -167,11 +160,6 @@ pub trait OffersMessageCommons {
167160
&self,
168161
) -> Vec<(PaymentId, RetryableInvoiceRequest)>;
169162

170-
/// Enqueue invoice request
171-
fn enqueue_invoice_request(
172-
&self, invoice_request: InvoiceRequest, reply_paths: Vec<BlindedMessagePath>,
173-
) -> Result<(), Bolt12SemanticError>;
174-
175163
/// Get the current time determined by highest seen timestamp
176164
fn get_current_blocktime(&self) -> Duration;
177165

@@ -572,6 +560,11 @@ where
572560

573561
message_router: MR,
574562

563+
#[cfg(not(any(test, feature = "_test_utils")))]
564+
pending_offers_messages: Mutex<Vec<(OffersMessage, MessageSendInstructions)>>,
565+
#[cfg(any(test, feature = "_test_utils"))]
566+
pub(crate) pending_offers_messages: Mutex<Vec<(OffersMessage, MessageSendInstructions)>>,
567+
575568
#[cfg(feature = "_test_utils")]
576569
/// In testing, it is useful be able to forge a name -> offer mapping so that we can pay an
577570
/// offer generated in the test.
@@ -604,9 +597,13 @@ where
604597
inbound_payment_key: expanded_inbound_key,
605598
our_network_pubkey,
606599
secp_ctx,
600+
entropy_source,
601+
607602
commons,
603+
608604
message_router,
609-
entropy_source,
605+
606+
pending_offers_messages: Mutex::new(Vec::new()),
610607
#[cfg(feature = "_test_utils")]
611608
testing_dnssec_proof_offer_resolution_override: Mutex::new(new_hash_map()),
612609
logger,
@@ -656,6 +653,13 @@ where
656653
/// [`Refund`]: crate::offers::refund
657654
pub const MAX_SHORT_LIVED_RELATIVE_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24);
658655

656+
/// Defines the maximum number of [`OffersMessage`] including different reply paths to be sent
657+
/// along different paths.
658+
/// Sending multiple requests increases the chances of successful delivery in case some
659+
/// paths are unavailable. However, only one invoice for a given [`PaymentId`] will be paid,
660+
/// even if multiple invoices are received.
661+
pub const OFFERS_MESSAGE_REQUEST_LIMIT: usize = 10;
662+
659663
impl<ES: Deref, OMC: Deref, MR: Deref, L: Deref> OffersMessageFlow<ES, OMC, MR, L>
660664
where
661665
ES::Target: EntropySource,
@@ -734,6 +738,42 @@ where
734738
)
735739
.and_then(|paths| (!paths.is_empty()).then(|| paths).ok_or(()))
736740
}
741+
742+
fn enqueue_invoice_request(
743+
&self, invoice_request: InvoiceRequest, reply_paths: Vec<BlindedMessagePath>,
744+
) -> Result<(), Bolt12SemanticError> {
745+
let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
746+
if !invoice_request.paths().is_empty() {
747+
reply_paths
748+
.iter()
749+
.flat_map(|reply_path| {
750+
invoice_request.paths().iter().map(move |path| (path, reply_path))
751+
})
752+
.take(OFFERS_MESSAGE_REQUEST_LIMIT)
753+
.for_each(|(path, reply_path)| {
754+
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
755+
destination: Destination::BlindedPath(path.clone()),
756+
reply_path: reply_path.clone(),
757+
};
758+
let message = OffersMessage::InvoiceRequest(invoice_request.clone());
759+
pending_offers_messages.push((message, instructions));
760+
});
761+
} else if let Some(node_id) = invoice_request.issuer_signing_pubkey() {
762+
for reply_path in reply_paths {
763+
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
764+
destination: Destination::Node(node_id),
765+
reply_path,
766+
};
767+
let message = OffersMessage::InvoiceRequest(invoice_request.clone());
768+
pending_offers_messages.push((message, instructions));
769+
}
770+
} else {
771+
debug_assert!(false);
772+
return Err(Bolt12SemanticError::MissingIssuerSigningPubkey);
773+
}
774+
775+
Ok(())
776+
}
737777
}
738778

739779
impl<ES: Deref, OMC: Deref, MR: Deref, L: Deref> OffersMessageFlow<ES, OMC, MR, L>
@@ -788,7 +828,7 @@ where
788828

789829
create_pending_payment(&invoice_request, nonce)?;
790830

791-
self.commons.enqueue_invoice_request(invoice_request, reply_paths)
831+
self.enqueue_invoice_request(invoice_request, reply_paths)
792832
}
793833
}
794834

@@ -1055,7 +1095,7 @@ where
10551095
});
10561096
match self.create_blinded_paths(context) {
10571097
Ok(reply_paths) => {
1058-
match self.commons.enqueue_invoice_request(invoice_request, reply_paths) {
1098+
match self.enqueue_invoice_request(invoice_request, reply_paths) {
10591099
Ok(_) => {},
10601100
Err(_) => {
10611101
log_warn!(
@@ -1079,7 +1119,7 @@ where
10791119
}
10801120

10811121
fn release_pending_messages(&self) -> Vec<(OffersMessage, MessageSendInstructions)> {
1082-
core::mem::take(&mut self.commons.get_pending_offers_messages())
1122+
core::mem::take(&mut self.pending_offers_messages.lock().unwrap())
10831123
}
10841124
}
10851125

@@ -1409,7 +1449,7 @@ where
14091449
.create_blinded_paths(context)
14101450
.map_err(|_| Bolt12SemanticError::MissingPaths)?;
14111451

1412-
let mut pending_offers_messages = self.commons.get_pending_offers_messages();
1452+
let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
14131453
if refund.paths().is_empty() {
14141454
for reply_path in reply_paths {
14151455
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {

0 commit comments

Comments
 (0)