Skip to content

Commit c3c3658

Browse files
committed
Move pending_offers_message to flows.rs
1 parent 8e124d8 commit c3c3658

File tree

3 files changed

+73
-86
lines changed

3 files changed

+73
-86
lines changed

lightning/src/ln/channelmanager.rs

+8-61
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,10 @@ use crate::ln::outbound_payment;
6767
use crate::ln::outbound_payment::{OutboundPayments, PendingOutboundPayment, RetryableInvoiceRequest, SendAlongPathArgs, StaleExpiration};
6868
use crate::offers::invoice::Bolt12Invoice;
6969
use crate::offers::invoice::UnsignedBolt12Invoice;
70-
use crate::offers::invoice_request::InvoiceRequest;
7170
use crate::offers::nonce::Nonce;
72-
use crate::offers::parse::Bolt12SemanticError;
7371
use crate::offers::signer;
74-
#[cfg(async_payments)]
75-
use crate::offers::static_invoice::StaticInvoice;
7672
use crate::onion_message::async_payments::{AsyncPaymentsMessage, HeldHtlcAvailable, ReleaseHeldHtlc, AsyncPaymentsMessageHandler};
77-
use crate::onion_message::messenger::{DefaultMessageRouter, Destination, MessageRouter, MessageSendInstructions, Responder, ResponseInstruction};
78-
use crate::onion_message::offers::OffersMessage;
73+
use crate::onion_message::messenger::{MessageRouter, MessageSendInstructions, Responder, ResponseInstruction};
7974
use crate::sign::{EntropySource, NodeSigner, Recipient, SignerProvider};
8075
use crate::sign::ecdsa::EcdsaChannelSigner;
8176
use crate::util::config::{UserConfig, ChannelConfig, ChannelConfigUpdate};
@@ -90,8 +85,15 @@ use crate::util::errors::APIError;
9085
#[cfg(feature = "dnssec")]
9186
use crate::onion_message::dns_resolution::{DNSResolverMessage, OMNameResolver};
9287

88+
#[cfg(async_payments)]
89+
use {
90+
crate::offers::static_invoice::StaticInvoice,
91+
crate::onion_message::messenger::Destination,
92+
};
93+
9394
#[cfg(not(c_bindings))]
9495
use {
96+
crate::onion_message::messenger::DefaultMessageRouter,
9597
crate::routing::router::DefaultRouter,
9698
crate::routing::gossip::NetworkGraph,
9799
crate::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters},
@@ -2126,8 +2128,6 @@ where
21262128
//
21272129
// Lock order tree:
21282130
//
2129-
// `pending_offers_messages`
2130-
//
21312131
// `pending_async_payments_messages`
21322132
//
21332133
// `total_consistency_lock`
@@ -2378,10 +2378,6 @@ where
23782378
event_persist_notifier: Notifier,
23792379
needs_persist_flag: AtomicBool,
23802380

2381-
#[cfg(not(any(test, feature = "_test_utils")))]
2382-
pending_offers_messages: Mutex<Vec<(OffersMessage, MessageSendInstructions)>>,
2383-
#[cfg(any(test, feature = "_test_utils"))]
2384-
pub(crate) pending_offers_messages: Mutex<Vec<(OffersMessage, MessageSendInstructions)>>,
23852381
pending_async_payments_messages: Mutex<Vec<(AsyncPaymentsMessage, MessageSendInstructions)>>,
23862382

23872383
/// Tracks the message events that are to be broadcasted when we are connected to some peer.
@@ -3302,7 +3298,6 @@ where
33023298
needs_persist_flag: AtomicBool::new(false),
33033299
funding_batch_states: Mutex::new(BTreeMap::new()),
33043300

3305-
pending_offers_messages: Mutex::new(Vec::new()),
33063301
pending_async_payments_messages: Mutex::new(Vec::new()),
33073302
pending_broadcast_messages: Mutex::new(Vec::new()),
33083303

@@ -9502,10 +9497,6 @@ where
95029497
MR::Target: MessageRouter,
95039498
L::Target: Logger,
95049499
{
9505-
fn get_pending_offers_messages(&self) -> MutexGuard<'_, Vec<(OffersMessage, MessageSendInstructions)>> {
9506-
self.pending_offers_messages.lock().expect("Mutex is locked by other thread.")
9507-
}
9508-
95099500
#[cfg(feature = "dnssec")]
95109501
fn get_pending_dns_onion_messages(&self) -> MutexGuard<'_, Vec<(DNSResolverMessage, MessageSendInstructions)>> {
95119502
self.pending_dns_onion_messages.lock().expect("Mutex is locked by other thread.")
@@ -9624,42 +9615,6 @@ where
96249615
.and_then(|paths| (!paths.is_empty()).then(|| paths).ok_or(()))
96259616
}
96269617

9627-
fn enqueue_invoice_request(
9628-
&self,
9629-
invoice_request: InvoiceRequest,
9630-
reply_paths: Vec<BlindedMessagePath>,
9631-
) -> Result<(), Bolt12SemanticError> {
9632-
let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
9633-
if !invoice_request.paths().is_empty() {
9634-
reply_paths
9635-
.iter()
9636-
.flat_map(|reply_path| invoice_request.paths().iter().map(move |path| (path, reply_path)))
9637-
.take(OFFERS_MESSAGE_REQUEST_LIMIT)
9638-
.for_each(|(path, reply_path)| {
9639-
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
9640-
destination: Destination::BlindedPath(path.clone()),
9641-
reply_path: reply_path.clone(),
9642-
};
9643-
let message = OffersMessage::InvoiceRequest(invoice_request.clone());
9644-
pending_offers_messages.push((message, instructions));
9645-
});
9646-
} else if let Some(node_id) = invoice_request.issuer_signing_pubkey() {
9647-
for reply_path in reply_paths {
9648-
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
9649-
destination: Destination::Node(node_id),
9650-
reply_path,
9651-
};
9652-
let message = OffersMessage::InvoiceRequest(invoice_request.clone());
9653-
pending_offers_messages.push((message, instructions));
9654-
}
9655-
} else {
9656-
debug_assert!(false);
9657-
return Err(Bolt12SemanticError::MissingIssuerSigningPubkey);
9658-
}
9659-
9660-
Ok(())
9661-
}
9662-
96639618
fn get_current_blocktime(&self) -> Duration {
96649619
Duration::from_secs(self.highest_seen_timestamp.load(Ordering::Acquire) as u64)
96659620
}
@@ -9760,13 +9715,6 @@ where
97609715
}
97619716
}
97629717

9763-
/// Defines the maximum number of [`OffersMessage`] including different reply paths to be sent
9764-
/// along different paths.
9765-
/// Sending multiple requests increases the chances of successful delivery in case some
9766-
/// paths are unavailable. However, only one invoice for a given [`PaymentId`] will be paid,
9767-
/// even if multiple invoices are received.
9768-
pub const OFFERS_MESSAGE_REQUEST_LIMIT: usize = 10;
9769-
97709718
impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, MR: Deref, L: Deref> ChannelManager<M, T, ES, NS, SP, F, R, MR, L>
97719719
where
97729720
M::Target: chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
@@ -13134,7 +13082,6 @@ where
1313413082

1313513083
funding_batch_states: Mutex::new(BTreeMap::new()),
1313613084

13137-
pending_offers_messages: Mutex::new(Vec::new()),
1313813085
pending_async_payments_messages: Mutex::new(Vec::new()),
1313913086

1314013087
pending_broadcast_messages: Mutex::new(Vec::new()),

lightning/src/ln/offers_tests.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -1326,7 +1326,7 @@ fn fails_authentication_when_handling_invoice_request() {
13261326
expect_recent_payment!(david, RecentPaymentDetails::AwaitingInvoice, payment_id);
13271327

13281328
connect_peers(david, alice);
1329-
match &mut david.node.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
1329+
match &mut david.offers_handler.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
13301330
MessageSendInstructions::WithSpecifiedReplyPath { destination, .. } =>
13311331
*destination = Destination::Node(alice_id),
13321332
_ => panic!(),
@@ -1351,7 +1351,7 @@ fn fails_authentication_when_handling_invoice_request() {
13511351
.unwrap();
13521352
expect_recent_payment!(david, RecentPaymentDetails::AwaitingInvoice, payment_id);
13531353

1354-
match &mut david.node.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
1354+
match &mut david.offers_handler.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
13551355
MessageSendInstructions::WithSpecifiedReplyPath { destination, .. } =>
13561356
*destination = Destination::BlindedPath(invalid_path),
13571357
_ => panic!(),
@@ -1431,7 +1431,7 @@ fn fails_authentication_when_handling_invoice_for_offer() {
14311431

14321432
// Don't send the invoice request, but grab its reply path to use with a different request.
14331433
let invalid_reply_path = {
1434-
let mut pending_offers_messages = david.node.pending_offers_messages.lock().unwrap();
1434+
let mut pending_offers_messages = david.offers_handler.pending_offers_messages.lock().unwrap();
14351435
let pending_invoice_request = pending_offers_messages.pop().unwrap();
14361436
pending_offers_messages.clear();
14371437
match pending_invoice_request.1 {
@@ -1448,7 +1448,7 @@ fn fails_authentication_when_handling_invoice_for_offer() {
14481448
// Swap out the reply path to force authentication to fail when handling the invoice since it
14491449
// will be sent over the wrong blinded path.
14501450
{
1451-
let mut pending_offers_messages = david.node.pending_offers_messages.lock().unwrap();
1451+
let mut pending_offers_messages = david.offers_handler.pending_offers_messages.lock().unwrap();
14521452
let mut pending_invoice_request = pending_offers_messages.first_mut().unwrap();
14531453
match &mut pending_invoice_request.1 {
14541454
MessageSendInstructions::WithSpecifiedReplyPath { reply_path, .. } =>
@@ -1535,7 +1535,7 @@ fn fails_authentication_when_handling_invoice_for_refund() {
15351535
let expected_invoice = alice.offers_handler.request_refund_payment(&refund).unwrap();
15361536

15371537
connect_peers(david, alice);
1538-
match &mut alice.node.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
1538+
match &mut alice.offers_handler.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
15391539
MessageSendInstructions::WithSpecifiedReplyPath { destination, .. } =>
15401540
*destination = Destination::Node(david_id),
15411541
_ => panic!(),
@@ -1566,7 +1566,7 @@ fn fails_authentication_when_handling_invoice_for_refund() {
15661566

15671567
let expected_invoice = alice.offers_handler.request_refund_payment(&refund).unwrap();
15681568

1569-
match &mut alice.node.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
1569+
match &mut alice.offers_handler.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
15701570
MessageSendInstructions::WithSpecifiedReplyPath { destination, .. } =>
15711571
*destination = Destination::BlindedPath(invalid_path),
15721572
_ => panic!(),
@@ -2157,7 +2157,7 @@ fn fails_paying_invoice_with_unknown_required_features() {
21572157
destination: Destination::BlindedPath(reply_path),
21582158
};
21592159
let message = OffersMessage::Invoice(invoice);
2160-
alice.node.pending_offers_messages.lock().unwrap().push((message, instructions));
2160+
alice.offers_handler.pending_offers_messages.lock().unwrap().push((message, instructions));
21612161

21622162
let onion_message = alice.onion_messenger.next_onion_message_for_peer(charlie_id).unwrap();
21632163
charlie.onion_messenger.handle_onion_message(alice_id, &onion_message);

lightning/src/offers/flow.rs

+58-18
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,7 @@ use crate::blinded_path::payment::{
2626
BlindedPaymentPath, Bolt12OfferContext, Bolt12RefundContext, PaymentContext,
2727
};
2828
use crate::events::PaymentFailureReason;
29-
use crate::ln::channelmanager::{
30-
Bolt12PaymentError, PaymentId, Verification, OFFERS_MESSAGE_REQUEST_LIMIT,
31-
};
29+
use crate::ln::channelmanager::{Bolt12PaymentError, PaymentId, Verification};
3230
use crate::ln::inbound_payment;
3331
use crate::ln::outbound_payment::{Retry, RetryableInvoiceRequest, StaleExpiration};
3432
use crate::offers::invoice::{
@@ -77,11 +75,6 @@ use {
7775
///
7876
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
7977
pub trait OffersMessageCommons {
80-
/// Get pending offers messages
81-
fn get_pending_offers_messages(
82-
&self,
83-
) -> MutexGuard<'_, Vec<(OffersMessage, MessageSendInstructions)>>;
84-
8578
#[cfg(feature = "dnssec")]
8679
/// Get pending DNS onion messages
8780
fn get_pending_dns_onion_messages(
@@ -180,11 +173,6 @@ pub trait OffersMessageCommons {
180173
/// [`MessageRouter::create_blinded_paths`]: crate::onion_message::messenger::MessageRouter::create_blinded_paths
181174
fn create_blinded_paths(&self, context: MessageContext) -> Result<Vec<BlindedMessagePath>, ()>;
182175

183-
/// Enqueue invoice request
184-
fn enqueue_invoice_request(
185-
&self, invoice_request: InvoiceRequest, reply_paths: Vec<BlindedMessagePath>,
186-
) -> Result<(), Bolt12SemanticError>;
187-
188176
/// Get the current time determined by highest seen timestamp
189177
fn get_current_blocktime(&self) -> Duration;
190178

@@ -585,6 +573,11 @@ where
585573

586574
message_router: MR,
587575

576+
#[cfg(not(any(test, feature = "_test_utils")))]
577+
pending_offers_messages: Mutex<Vec<(OffersMessage, MessageSendInstructions)>>,
578+
#[cfg(any(test, feature = "_test_utils"))]
579+
pub(crate) pending_offers_messages: Mutex<Vec<(OffersMessage, MessageSendInstructions)>>,
580+
588581
#[cfg(feature = "_test_utils")]
589582
/// In testing, it is useful be able to forge a name -> offer mapping so that we can pay an
590583
/// offer generated in the test.
@@ -617,9 +610,13 @@ where
617610
inbound_payment_key: expanded_inbound_key,
618611
our_network_pubkey,
619612
secp_ctx,
613+
entropy_source,
614+
620615
commons,
616+
621617
message_router,
622-
entropy_source,
618+
619+
pending_offers_messages: Mutex::new(Vec::new()),
623620
#[cfg(feature = "_test_utils")]
624621
testing_dnssec_proof_offer_resolution_override: Mutex::new(new_hash_map()),
625622
logger,
@@ -652,6 +649,13 @@ where
652649
/// [`Refund`]: crate::offers::refund
653650
pub const MAX_SHORT_LIVED_RELATIVE_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24);
654651

652+
/// Defines the maximum number of [`OffersMessage`] including different reply paths to be sent
653+
/// along different paths.
654+
/// Sending multiple requests increases the chances of successful delivery in case some
655+
/// paths are unavailable. However, only one invoice for a given [`PaymentId`] will be paid,
656+
/// even if multiple invoices are received.
657+
pub const OFFERS_MESSAGE_REQUEST_LIMIT: usize = 10;
658+
655659
impl<ES: Deref, OMC: Deref, MR: Deref, L: Deref> OffersMessageFlow<ES, OMC, MR, L>
656660
where
657661
ES::Target: EntropySource,
@@ -710,6 +714,42 @@ where
710714
)
711715
.and_then(|paths| (!paths.is_empty()).then(|| paths).ok_or(()))
712716
}
717+
718+
fn enqueue_invoice_request(
719+
&self, invoice_request: InvoiceRequest, reply_paths: Vec<BlindedMessagePath>,
720+
) -> Result<(), Bolt12SemanticError> {
721+
let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
722+
if !invoice_request.paths().is_empty() {
723+
reply_paths
724+
.iter()
725+
.flat_map(|reply_path| {
726+
invoice_request.paths().iter().map(move |path| (path, reply_path))
727+
})
728+
.take(OFFERS_MESSAGE_REQUEST_LIMIT)
729+
.for_each(|(path, reply_path)| {
730+
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
731+
destination: Destination::BlindedPath(path.clone()),
732+
reply_path: reply_path.clone(),
733+
};
734+
let message = OffersMessage::InvoiceRequest(invoice_request.clone());
735+
pending_offers_messages.push((message, instructions));
736+
});
737+
} else if let Some(node_id) = invoice_request.issuer_signing_pubkey() {
738+
for reply_path in reply_paths {
739+
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
740+
destination: Destination::Node(node_id),
741+
reply_path,
742+
};
743+
let message = OffersMessage::InvoiceRequest(invoice_request.clone());
744+
pending_offers_messages.push((message, instructions));
745+
}
746+
} else {
747+
debug_assert!(false);
748+
return Err(Bolt12SemanticError::MissingIssuerSigningPubkey);
749+
}
750+
751+
Ok(())
752+
}
713753
}
714754

715755
impl<ES: Deref, OMC: Deref, MR: Deref, L: Deref> OffersMessageFlow<ES, OMC, MR, L>
@@ -766,7 +806,7 @@ where
766806

767807
create_pending_payment(&invoice_request, nonce)?;
768808

769-
self.commons.enqueue_invoice_request(invoice_request, reply_paths)
809+
self.enqueue_invoice_request(invoice_request, reply_paths)
770810
}
771811
}
772812

@@ -1034,7 +1074,7 @@ where
10341074
});
10351075
match self.commons.create_blinded_paths(context) {
10361076
Ok(reply_paths) => {
1037-
match self.commons.enqueue_invoice_request(invoice_request, reply_paths) {
1077+
match self.enqueue_invoice_request(invoice_request, reply_paths) {
10381078
Ok(_) => {},
10391079
Err(_) => {
10401080
log_warn!(
@@ -1058,7 +1098,7 @@ where
10581098
}
10591099

10601100
fn release_pending_messages(&self) -> Vec<(OffersMessage, MessageSendInstructions)> {
1061-
core::mem::take(&mut self.commons.get_pending_offers_messages())
1101+
core::mem::take(&mut self.pending_offers_messages.lock().unwrap())
10621102
}
10631103
}
10641104

@@ -1389,7 +1429,7 @@ where
13891429
.create_blinded_paths(context)
13901430
.map_err(|_| Bolt12SemanticError::MissingPaths)?;
13911431

1392-
let mut pending_offers_messages = self.commons.get_pending_offers_messages();
1432+
let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
13931433
if refund.paths().is_empty() {
13941434
for reply_path in reply_paths {
13951435
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {

0 commit comments

Comments
 (0)