Skip to content

Commit b06dfda

Browse files
authored
Merge pull request #520 from TheBlueMatt/2020-02-events-in-monitors
Move events into ChannelMonitor from ManyChannelMonitor
2 parents 8829d1b + 9ff6f29 commit b06dfda

File tree

4 files changed

+222
-41
lines changed

4 files changed

+222
-41
lines changed

lightning/src/chain/keysinterface.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,57 @@ pub enum SpendableOutputDescriptor {
8888
}
8989
}
9090

91+
impl Writeable for SpendableOutputDescriptor {
92+
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
93+
match self {
94+
&SpendableOutputDescriptor::StaticOutput { ref outpoint, ref output } => {
95+
0u8.write(writer)?;
96+
outpoint.write(writer)?;
97+
output.write(writer)?;
98+
},
99+
&SpendableOutputDescriptor::DynamicOutputP2WSH { ref outpoint, ref key, ref witness_script, ref to_self_delay, ref output } => {
100+
1u8.write(writer)?;
101+
outpoint.write(writer)?;
102+
key.write(writer)?;
103+
witness_script.write(writer)?;
104+
to_self_delay.write(writer)?;
105+
output.write(writer)?;
106+
},
107+
&SpendableOutputDescriptor::DynamicOutputP2WPKH { ref outpoint, ref key, ref output } => {
108+
2u8.write(writer)?;
109+
outpoint.write(writer)?;
110+
key.write(writer)?;
111+
output.write(writer)?;
112+
},
113+
}
114+
Ok(())
115+
}
116+
}
117+
118+
impl<R: ::std::io::Read> Readable<R> for SpendableOutputDescriptor {
119+
fn read(reader: &mut R) -> Result<Self, DecodeError> {
120+
match Readable::read(reader)? {
121+
0u8 => Ok(SpendableOutputDescriptor::StaticOutput {
122+
outpoint: Readable::read(reader)?,
123+
output: Readable::read(reader)?,
124+
}),
125+
1u8 => Ok(SpendableOutputDescriptor::DynamicOutputP2WSH {
126+
outpoint: Readable::read(reader)?,
127+
key: Readable::read(reader)?,
128+
witness_script: Readable::read(reader)?,
129+
to_self_delay: Readable::read(reader)?,
130+
output: Readable::read(reader)?,
131+
}),
132+
2u8 => Ok(SpendableOutputDescriptor::DynamicOutputP2WPKH {
133+
outpoint: Readable::read(reader)?,
134+
key: Readable::read(reader)?,
135+
output: Readable::read(reader)?,
136+
}),
137+
_ => Err(DecodeError::InvalidValue),
138+
}
139+
}
140+
}
141+
91142
/// A trait to describe an object which can get user secrets and key material.
92143
pub trait KeysInterface: Send + Sync {
93144
/// A type which implements ChannelKeys which will be returned by get_channel_keys.

lightning/src/ln/channelmonitor.rs

Lines changed: 50 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use chain::chaininterface::{ChainListener, ChainWatchInterface, BroadcasterInter
3737
use chain::transaction::OutPoint;
3838
use chain::keysinterface::{SpendableOutputDescriptor, ChannelKeys};
3939
use util::logger::Logger;
40-
use util::ser::{ReadableArgs, Readable, Writer, Writeable, U48};
40+
use util::ser::{ReadableArgs, Readable, MaybeReadable, Writer, Writeable, U48};
4141
use util::{byte_utils, events};
4242

4343
use std::collections::{HashMap, hash_map, HashSet};
@@ -222,7 +222,6 @@ pub struct SimpleManyChannelMonitor<Key, ChanSigner: ChannelKeys, T: Deref, F: D
222222
monitors: Mutex<HashMap<Key, ChannelMonitor<ChanSigner>>>,
223223
chain_monitor: Arc<ChainWatchInterface>,
224224
broadcaster: T,
225-
pending_events: Mutex<Vec<events::Event>>,
226225
logger: Arc<Logger>,
227226
fee_estimator: F
228227
}
@@ -234,16 +233,10 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref +
234233
{
235234
fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[u32]) {
236235
let block_hash = header.bitcoin_hash();
237-
let mut new_events: Vec<events::Event> = Vec::with_capacity(0);
238236
{
239237
let mut monitors = self.monitors.lock().unwrap();
240238
for monitor in monitors.values_mut() {
241-
let (txn_outputs, spendable_outputs) = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator);
242-
if spendable_outputs.len() > 0 {
243-
new_events.push(events::Event::SpendableOutputs {
244-
outputs: spendable_outputs,
245-
});
246-
}
239+
let txn_outputs = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator);
247240

248241
for (ref txid, ref outputs) in txn_outputs {
249242
for (idx, output) in outputs.iter().enumerate() {
@@ -252,8 +245,6 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref +
252245
}
253246
}
254247
}
255-
let mut pending_events = self.pending_events.lock().unwrap();
256-
pending_events.append(&mut new_events);
257248
}
258249

259250
fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) {
@@ -276,7 +267,6 @@ impl<Key : Send + cmp::Eq + hash::Hash + 'static, ChanSigner: ChannelKeys, T: De
276267
monitors: Mutex::new(HashMap::new()),
277268
chain_monitor,
278269
broadcaster,
279-
pending_events: Mutex::new(Vec::new()),
280270
logger,
281271
fee_estimator: feeest,
282272
};
@@ -362,10 +352,11 @@ impl<Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref, F: De
362352
F::Target: FeeEstimator
363353
{
364354
fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
365-
let mut pending_events = self.pending_events.lock().unwrap();
366-
let mut ret = Vec::new();
367-
mem::swap(&mut ret, &mut *pending_events);
368-
ret
355+
let mut pending_events = Vec::new();
356+
for chan in self.monitors.lock().unwrap().values_mut() {
357+
pending_events.append(&mut chan.get_and_clear_pending_events());
358+
}
359+
pending_events
369360
}
370361
}
371362

@@ -792,6 +783,11 @@ impl<R: ::std::io::Read> Readable<R> for ChannelMonitorUpdateStep {
792783
///
793784
/// You MUST ensure that no ChannelMonitors for a given channel anywhere contain out-of-date
794785
/// information and are actively monitoring the chain.
786+
///
787+
/// Pending Events or updated HTLCs which have not yet been read out by
788+
/// get_and_clear_pending_htlcs_updated or get_and_clear_pending_events are serialized to disk and
789+
/// reloaded at deserialize-time. Thus, you must ensure that, when handling events, all events
790+
/// gotten are fully handled before re-serializing the new state.
795791
pub struct ChannelMonitor<ChanSigner: ChannelKeys> {
796792
latest_update_id: u64,
797793
commitment_transaction_number_obscure_factor: u64,
@@ -835,6 +831,7 @@ pub struct ChannelMonitor<ChanSigner: ChannelKeys> {
835831
payment_preimages: HashMap<PaymentHash, PaymentPreimage>,
836832

837833
pending_htlcs_updated: Vec<HTLCUpdate>,
834+
pending_events: Vec<events::Event>,
838835

839836
destination_script: Script,
840837
// Thanks to data loss protection, we may be able to claim our non-htlc funds
@@ -948,6 +945,7 @@ impl<ChanSigner: ChannelKeys> PartialEq for ChannelMonitor<ChanSigner> {
948945
self.current_local_signed_commitment_tx != other.current_local_signed_commitment_tx ||
949946
self.payment_preimages != other.payment_preimages ||
950947
self.pending_htlcs_updated != other.pending_htlcs_updated ||
948+
self.pending_events.len() != other.pending_events.len() || // We trust events to round-trip properly
951949
self.destination_script != other.destination_script ||
952950
self.to_remote_rescue != other.to_remote_rescue ||
953951
self.pending_claim_requests != other.pending_claim_requests ||
@@ -1135,6 +1133,11 @@ impl<ChanSigner: ChannelKeys + Writeable> ChannelMonitor<ChanSigner> {
11351133
data.write(writer)?;
11361134
}
11371135

1136+
writer.write_all(&byte_utils::be64_to_array(self.pending_events.len() as u64))?;
1137+
for event in self.pending_events.iter() {
1138+
event.write(writer)?;
1139+
}
1140+
11381141
self.last_block_hash.write(writer)?;
11391142
self.destination_script.write(writer)?;
11401143
if let Some((ref to_remote_script, ref local_key)) = self.to_remote_rescue {
@@ -1267,6 +1270,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
12671270

12681271
payment_preimages: HashMap::new(),
12691272
pending_htlcs_updated: Vec::new(),
1273+
pending_events: Vec::new(),
12701274

12711275
destination_script: destination_script.clone(),
12721276
to_remote_rescue: None,
@@ -1560,6 +1564,18 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
15601564
ret
15611565
}
15621566

1567+
/// Gets the list of pending events which were generated by previous actions, clearing the list
1568+
/// in the process.
1569+
///
1570+
/// This is called by ManyChannelMonitor::get_and_clear_pending_events() and is equivalent to
1571+
/// EventsProvider::get_and_clear_pending_events() except that it requires &mut self as we do
1572+
/// no internal locking in ChannelMonitors.
1573+
pub fn get_and_clear_pending_events(&mut self) -> Vec<events::Event> {
1574+
let mut ret = Vec::new();
1575+
mem::swap(&mut ret, &mut self.pending_events);
1576+
ret
1577+
}
1578+
15631579
/// Can only fail if idx is < get_min_seen_secret
15641580
pub(super) fn get_secret(&self, idx: u64) -> Option<[u8; 32]> {
15651581
self.commitment_secrets.get_secret(idx)
@@ -2534,7 +2550,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
25342550
/// Eventually this should be pub and, roughly, implement ChainListener, however this requires
25352551
/// &mut self, as well as returns new spendable outputs and outpoints to watch for spending of
25362552
/// on-chain.
2537-
fn block_connected<B: Deref, F: Deref>(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: B, fee_estimator: F)-> (Vec<(Sha256dHash, Vec<TxOut>)>, Vec<SpendableOutputDescriptor>)
2553+
fn block_connected<B: Deref, F: Deref>(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: B, fee_estimator: F)-> Vec<(Sha256dHash, Vec<TxOut>)>
25382554
where B::Target: BroadcasterInterface,
25392555
F::Target: FeeEstimator
25402556
{
@@ -2767,7 +2783,14 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
27672783
for &(ref txid, ref output_scripts) in watch_outputs.iter() {
27682784
self.outputs_to_watch.insert(txid.clone(), output_scripts.iter().map(|o| o.script_pubkey.clone()).collect());
27692785
}
2770-
(watch_outputs, spendable_outputs)
2786+
2787+
if spendable_outputs.len() > 0 {
2788+
self.pending_events.push(events::Event::SpendableOutputs {
2789+
outputs: spendable_outputs,
2790+
});
2791+
}
2792+
2793+
watch_outputs
27712794
}
27722795

27732796
fn block_disconnected<B: Deref, F: Deref>(&mut self, height: u32, block_hash: &Sha256dHash, broadcaster: B, fee_estimator: F)
@@ -3369,6 +3392,14 @@ impl<R: ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>> ReadableArgs<R,
33693392
pending_htlcs_updated.push(Readable::read(reader)?);
33703393
}
33713394

3395+
let pending_events_len: u64 = Readable::read(reader)?;
3396+
let mut pending_events = Vec::with_capacity(cmp::min(pending_events_len as usize, MAX_ALLOC_SIZE / mem::size_of::<events::Event>()));
3397+
for _ in 0..pending_events_len {
3398+
if let Some(event) = MaybeReadable::read(reader)? {
3399+
pending_events.push(event);
3400+
}
3401+
}
3402+
33723403
let last_block_hash: Sha256dHash = Readable::read(reader)?;
33733404
let destination_script = Readable::read(reader)?;
33743405
let to_remote_rescue = match <u8 as Readable<R>>::read(reader)? {
@@ -3471,6 +3502,7 @@ impl<R: ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>> ReadableArgs<R,
34713502

34723503
payment_preimages,
34733504
pending_htlcs_updated,
3505+
pending_events,
34743506

34753507
destination_script,
34763508
to_remote_rescue,

lightning/src/util/events.rs

Lines changed: 90 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,12 @@
44
//! Because we don't have a built-in runtime, it's up to the client to call events at a time in the
55
//! future, as well as generate and broadcast funding transactions handle payment preimages and a
66
//! few other things.
7-
//!
8-
//! Note that many events are handled for you by PeerHandler, so in the common design of having a
9-
//! PeerManager which marshalls messages to ChannelManager and Router you only need to call
10-
//! process_events on the PeerHandler and then get_and_clear_pending_events and handle the events
11-
//! that bubble up to the surface. If, however, you do not have a PeerHandler managing a
12-
//! ChannelManager you need to handle all of the events which may be generated.
13-
//TODO: We need better separation of event types ^
147
158
use ln::msgs;
169
use ln::channelmanager::{PaymentPreimage, PaymentHash};
1710
use chain::transaction::OutPoint;
1811
use chain::keysinterface::SpendableOutputDescriptor;
12+
use util::ser::{Writeable, Writer, MaybeReadable, Readable};
1913

2014
use bitcoin::blockdata::script::Script;
2115

@@ -24,6 +18,10 @@ use secp256k1::key::PublicKey;
2418
use std::time::Duration;
2519

2620
/// An Event which you should probably take some action in response to.
21+
///
22+
/// Note that while Writeable and Readable are implemented for Event, you probably shouldn't use
23+
/// them directly as they don't round-trip exactly (for example FundingGenerationReady is never
24+
/// written as it makes no sense to respond to it after reconnecting to peers).
2725
pub enum Event {
2826
/// Used to indicate that the client should generate a funding transaction with the given
2927
/// parameters and then call ChannelManager::funding_transaction_generated.
@@ -108,6 +106,91 @@ pub enum Event {
108106
},
109107
}
110108

109+
impl Writeable for Event {
110+
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
111+
match self {
112+
&Event::FundingGenerationReady { .. } => {
113+
0u8.write(writer)?;
114+
// We never write out FundingGenerationReady events as, upon disconnection, peers
115+
// drop any channels which have not yet exchanged funding_signed.
116+
},
117+
&Event::FundingBroadcastSafe { ref funding_txo, ref user_channel_id } => {
118+
1u8.write(writer)?;
119+
funding_txo.write(writer)?;
120+
user_channel_id.write(writer)?;
121+
},
122+
&Event::PaymentReceived { ref payment_hash, ref amt } => {
123+
2u8.write(writer)?;
124+
payment_hash.write(writer)?;
125+
amt.write(writer)?;
126+
},
127+
&Event::PaymentSent { ref payment_preimage } => {
128+
3u8.write(writer)?;
129+
payment_preimage.write(writer)?;
130+
},
131+
&Event::PaymentFailed { ref payment_hash, ref rejected_by_dest,
132+
#[cfg(test)]
133+
ref error_code,
134+
} => {
135+
4u8.write(writer)?;
136+
payment_hash.write(writer)?;
137+
rejected_by_dest.write(writer)?;
138+
#[cfg(test)]
139+
error_code.write(writer)?;
140+
},
141+
&Event::PendingHTLCsForwardable { time_forwardable: _ } => {
142+
5u8.write(writer)?;
143+
// We don't write the time_fordwardable out at all, as we presume when the user
144+
// deserializes us at least that much time has elapsed.
145+
},
146+
&Event::SpendableOutputs { ref outputs } => {
147+
6u8.write(writer)?;
148+
(outputs.len() as u64).write(writer)?;
149+
for output in outputs.iter() {
150+
output.write(writer)?;
151+
}
152+
},
153+
}
154+
Ok(())
155+
}
156+
}
157+
impl<R: ::std::io::Read> MaybeReadable<R> for Event {
158+
fn read(reader: &mut R) -> Result<Option<Self>, msgs::DecodeError> {
159+
match Readable::read(reader)? {
160+
0u8 => Ok(None),
161+
1u8 => Ok(Some(Event::FundingBroadcastSafe {
162+
funding_txo: Readable::read(reader)?,
163+
user_channel_id: Readable::read(reader)?,
164+
})),
165+
2u8 => Ok(Some(Event::PaymentReceived {
166+
payment_hash: Readable::read(reader)?,
167+
amt: Readable::read(reader)?,
168+
})),
169+
3u8 => Ok(Some(Event::PaymentSent {
170+
payment_preimage: Readable::read(reader)?,
171+
})),
172+
4u8 => Ok(Some(Event::PaymentFailed {
173+
payment_hash: Readable::read(reader)?,
174+
rejected_by_dest: Readable::read(reader)?,
175+
#[cfg(test)]
176+
error_code: Readable::read(reader)?,
177+
})),
178+
5u8 => Ok(Some(Event::PendingHTLCsForwardable {
179+
time_forwardable: Duration::from_secs(0)
180+
})),
181+
6u8 => {
182+
let outputs_len: u64 = Readable::read(reader)?;
183+
let mut outputs = Vec::new();
184+
for _ in 0..outputs_len {
185+
outputs.push(Readable::read(reader)?);
186+
}
187+
Ok(Some(Event::SpendableOutputs { outputs }))
188+
},
189+
_ => Err(msgs::DecodeError::InvalidValue)
190+
}
191+
}
192+
}
193+
111194
/// An event generated by ChannelManager which indicates a message should be sent to a peer (or
112195
/// broadcast to most peers).
113196
/// These events are handled by PeerManager::process_events if you are using a PeerManager.

0 commit comments

Comments
 (0)