Skip to content

Commit 6c93102

Browse files
committed
Implement GossipSource to allow RGS gossip updates
Here we implement a `GossipSource` object that allows us to configure P2P or RGS gossip sources without the need to propagate and leak LDK type parameters upwards. To this end, `GossipSource` wraps the corresponding variants and implements a `RoutingMessageHandler` that is delegating or ignoring the incoming messages.
1 parent f83a5c6 commit 6c93102

File tree

4 files changed

+364
-40
lines changed

4 files changed

+364
-40
lines changed

src/error.rs

+3
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ pub enum Error {
3737
WalletSigningFailed,
3838
/// A transaction sync operation failed.
3939
TxSyncFailed,
40+
/// A gossip updating operation failed.
41+
GossipUpdateFailed,
4042
}
4143

4244
impl fmt::Display for Error {
@@ -63,6 +65,7 @@ impl fmt::Display for Error {
6365
Self::WalletOperationFailed => write!(f, "Failed to conduct wallet operation."),
6466
Self::WalletSigningFailed => write!(f, "Failed to sign given transaction."),
6567
Self::TxSyncFailed => write!(f, "Failed to sync transactions."),
68+
Self::GossipUpdateFailed => write!(f, "Failed to update gossip data."),
6669
}
6770
}
6871
}

src/gossip.rs

+244
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
use crate::logger::{log_error, log_info, FilesystemLogger, Logger};
2+
use crate::types::{GossipSync, NetworkGraph, P2PGossipSync, RapidGossipSync};
3+
use crate::Error;
4+
5+
use lightning::events::MessageSendEventsProvider;
6+
use lightning::ln::features::{InitFeatures, NodeFeatures};
7+
use lightning::ln::msgs as ldk_msgs;
8+
use lightning::ln::msgs::RoutingMessageHandler;
9+
use lightning::routing::gossip::NodeId;
10+
use lightning::routing::utxo::UtxoLookup;
11+
12+
use bitcoin::secp256k1::PublicKey;
13+
14+
use std::sync::atomic::{AtomicU32, Ordering};
15+
use std::sync::Arc;
16+
17+
pub(crate) enum GossipSource {
18+
P2PNetwork {
19+
gossip_sync: Arc<P2PGossipSync>,
20+
},
21+
RapidGossipSync {
22+
gossip_sync: Arc<RapidGossipSync>,
23+
server_url: String,
24+
latest_sync_timestamp: AtomicU32,
25+
logger: Arc<FilesystemLogger>,
26+
},
27+
}
28+
29+
impl GossipSource {
30+
pub fn new_p2p(network_graph: Arc<NetworkGraph>, logger: Arc<FilesystemLogger>) -> Self {
31+
let gossip_sync = Arc::new(P2PGossipSync::new(
32+
network_graph,
33+
None::<Arc<dyn UtxoLookup + Send + Sync>>,
34+
logger,
35+
));
36+
Self::P2PNetwork { gossip_sync }
37+
}
38+
39+
pub fn new_rgs(
40+
server_url: String, network_graph: Arc<NetworkGraph>, logger: Arc<FilesystemLogger>,
41+
) -> Self {
42+
let gossip_sync = Arc::new(RapidGossipSync::new(network_graph, Arc::clone(&logger)));
43+
let latest_sync_timestamp = AtomicU32::new(0);
44+
Self::RapidGossipSync { gossip_sync, server_url, latest_sync_timestamp, logger }
45+
}
46+
47+
pub fn is_p2p(&self) -> bool {
48+
if let Self::P2PNetwork { .. } = self {
49+
true
50+
} else {
51+
false
52+
}
53+
}
54+
55+
pub fn is_rgs(&self) -> bool {
56+
!self.is_p2p()
57+
}
58+
59+
pub fn as_gossip_sync(&self) -> GossipSync {
60+
match self {
61+
Self::RapidGossipSync { gossip_sync, .. } => {
62+
GossipSync::Rapid(Arc::clone(&gossip_sync))
63+
}
64+
Self::P2PNetwork { gossip_sync, .. } => GossipSync::P2P(Arc::clone(&gossip_sync)),
65+
}
66+
}
67+
68+
pub async fn update_rgs_snapshot(&self) -> Result<(), Error> {
69+
match self {
70+
Self::P2PNetwork { gossip_sync: _ } => Ok(()),
71+
Self::RapidGossipSync { gossip_sync, server_url, latest_sync_timestamp, logger } => {
72+
let query_timestamp = latest_sync_timestamp.load(Ordering::Acquire);
73+
let query_url = format!("{}/{}", server_url, query_timestamp);
74+
let response =
75+
reqwest::get(query_url).await.map_err(|_| Error::GossipUpdateFailed)?;
76+
77+
match response.error_for_status() {
78+
Ok(res) => {
79+
let update_data =
80+
res.bytes().await.map_err(|_| Error::GossipUpdateFailed)?;
81+
82+
let new_latest_sync_timestamp = gossip_sync
83+
.update_network_graph(&update_data)
84+
.map_err(|_| Error::GossipUpdateFailed)?;
85+
latest_sync_timestamp.store(new_latest_sync_timestamp, Ordering::Release);
86+
log_info!(logger, "Successfully retrieved latest gossip update.");
87+
Ok(())
88+
}
89+
Err(e) => {
90+
log_error!(logger, "Failed to retrieve RGS gossip update: {}", e);
91+
Err(Error::GossipUpdateFailed)
92+
}
93+
}
94+
}
95+
}
96+
}
97+
}
98+
99+
impl MessageSendEventsProvider for GossipSource {
100+
fn get_and_clear_pending_msg_events(&self) -> Vec<lightning::events::MessageSendEvent> {
101+
match self {
102+
GossipSource::P2PNetwork { gossip_sync, .. } => {
103+
gossip_sync.get_and_clear_pending_msg_events()
104+
}
105+
GossipSource::RapidGossipSync { .. } => Vec::new(),
106+
}
107+
}
108+
}
109+
110+
impl RoutingMessageHandler for GossipSource {
111+
fn handle_node_announcement(
112+
&self, msg: &ldk_msgs::NodeAnnouncement,
113+
) -> Result<bool, ldk_msgs::LightningError> {
114+
match self {
115+
GossipSource::P2PNetwork { gossip_sync, .. } => {
116+
gossip_sync.handle_node_announcement(msg)
117+
}
118+
GossipSource::RapidGossipSync { .. } => Ok(false),
119+
}
120+
}
121+
122+
fn handle_channel_announcement(
123+
&self, msg: &ldk_msgs::ChannelAnnouncement,
124+
) -> Result<bool, ldk_msgs::LightningError> {
125+
match self {
126+
GossipSource::P2PNetwork { gossip_sync, .. } => {
127+
gossip_sync.handle_channel_announcement(msg)
128+
}
129+
GossipSource::RapidGossipSync { .. } => Ok(false),
130+
}
131+
}
132+
133+
fn handle_channel_update(
134+
&self, msg: &ldk_msgs::ChannelUpdate,
135+
) -> Result<bool, ldk_msgs::LightningError> {
136+
match self {
137+
GossipSource::P2PNetwork { gossip_sync, .. } => gossip_sync.handle_channel_update(msg),
138+
GossipSource::RapidGossipSync { .. } => Ok(false),
139+
}
140+
}
141+
142+
fn get_next_channel_announcement(
143+
&self, starting_point: u64,
144+
) -> Option<(
145+
ldk_msgs::ChannelAnnouncement,
146+
Option<ldk_msgs::ChannelUpdate>,
147+
Option<ldk_msgs::ChannelUpdate>,
148+
)> {
149+
match self {
150+
GossipSource::P2PNetwork { gossip_sync, .. } => {
151+
gossip_sync.get_next_channel_announcement(starting_point)
152+
}
153+
GossipSource::RapidGossipSync { .. } => None,
154+
}
155+
}
156+
157+
fn get_next_node_announcement(
158+
&self, starting_point: Option<&NodeId>,
159+
) -> Option<ldk_msgs::NodeAnnouncement> {
160+
match self {
161+
GossipSource::P2PNetwork { gossip_sync, .. } => {
162+
gossip_sync.get_next_node_announcement(starting_point)
163+
}
164+
GossipSource::RapidGossipSync { .. } => None,
165+
}
166+
}
167+
168+
fn peer_connected(
169+
&self, their_node_id: &PublicKey, init: &ldk_msgs::Init, inbound: bool,
170+
) -> Result<(), ()> {
171+
match self {
172+
GossipSource::P2PNetwork { gossip_sync, .. } => {
173+
gossip_sync.peer_connected(their_node_id, init, inbound)
174+
}
175+
GossipSource::RapidGossipSync { .. } => Ok(()),
176+
}
177+
}
178+
179+
fn handle_reply_channel_range(
180+
&self, their_node_id: &PublicKey, msg: ldk_msgs::ReplyChannelRange,
181+
) -> Result<(), ldk_msgs::LightningError> {
182+
match self {
183+
GossipSource::P2PNetwork { gossip_sync, .. } => {
184+
gossip_sync.handle_reply_channel_range(their_node_id, msg)
185+
}
186+
GossipSource::RapidGossipSync { .. } => Ok(()),
187+
}
188+
}
189+
190+
fn handle_reply_short_channel_ids_end(
191+
&self, their_node_id: &PublicKey, msg: ldk_msgs::ReplyShortChannelIdsEnd,
192+
) -> Result<(), ldk_msgs::LightningError> {
193+
match self {
194+
GossipSource::P2PNetwork { gossip_sync, .. } => {
195+
gossip_sync.handle_reply_short_channel_ids_end(their_node_id, msg)
196+
}
197+
GossipSource::RapidGossipSync { .. } => Ok(()),
198+
}
199+
}
200+
201+
fn handle_query_channel_range(
202+
&self, their_node_id: &PublicKey, msg: ldk_msgs::QueryChannelRange,
203+
) -> Result<(), ldk_msgs::LightningError> {
204+
match self {
205+
GossipSource::P2PNetwork { gossip_sync, .. } => {
206+
gossip_sync.handle_query_channel_range(their_node_id, msg)
207+
}
208+
GossipSource::RapidGossipSync { .. } => Ok(()),
209+
}
210+
}
211+
212+
fn handle_query_short_channel_ids(
213+
&self, their_node_id: &PublicKey, msg: ldk_msgs::QueryShortChannelIds,
214+
) -> Result<(), ldk_msgs::LightningError> {
215+
match self {
216+
GossipSource::P2PNetwork { gossip_sync, .. } => {
217+
gossip_sync.handle_query_short_channel_ids(their_node_id, msg)
218+
}
219+
GossipSource::RapidGossipSync { .. } => Ok(()),
220+
}
221+
}
222+
223+
fn provided_node_features(&self) -> NodeFeatures {
224+
match self {
225+
GossipSource::P2PNetwork { gossip_sync, .. } => gossip_sync.provided_node_features(),
226+
GossipSource::RapidGossipSync { .. } => NodeFeatures::empty(),
227+
}
228+
}
229+
fn provided_init_features(&self, their_node_id: &PublicKey) -> InitFeatures {
230+
match self {
231+
GossipSource::P2PNetwork { gossip_sync, .. } => {
232+
gossip_sync.provided_init_features(their_node_id)
233+
}
234+
GossipSource::RapidGossipSync { .. } => InitFeatures::empty(),
235+
}
236+
}
237+
238+
fn processing_queue_high(&self) -> bool {
239+
match self {
240+
GossipSource::P2PNetwork { gossip_sync, .. } => gossip_sync.processing_queue_high(),
241+
GossipSource::RapidGossipSync { .. } => false,
242+
}
243+
}
244+
}

0 commit comments

Comments
 (0)