Skip to content

Move events into ChannelMonitor from ManyChannelMonitor #520

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions lightning/src/chain/keysinterface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,57 @@ pub enum SpendableOutputDescriptor {
}
}

impl Writeable for SpendableOutputDescriptor {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
match self {
&SpendableOutputDescriptor::StaticOutput { ref outpoint, ref output } => {
0u8.write(writer)?;
outpoint.write(writer)?;
output.write(writer)?;
},
&SpendableOutputDescriptor::DynamicOutputP2WSH { ref outpoint, ref key, ref witness_script, ref to_self_delay, ref output } => {
1u8.write(writer)?;
outpoint.write(writer)?;
key.write(writer)?;
witness_script.write(writer)?;
to_self_delay.write(writer)?;
output.write(writer)?;
},
&SpendableOutputDescriptor::DynamicOutputP2WPKH { ref outpoint, ref key, ref output } => {
2u8.write(writer)?;
outpoint.write(writer)?;
key.write(writer)?;
output.write(writer)?;
},
}
Ok(())
}
}

impl<R: ::std::io::Read> Readable<R> for SpendableOutputDescriptor {
fn read(reader: &mut R) -> Result<Self, DecodeError> {
match Readable::read(reader)? {
0u8 => Ok(SpendableOutputDescriptor::StaticOutput {
outpoint: Readable::read(reader)?,
output: Readable::read(reader)?,
}),
1u8 => Ok(SpendableOutputDescriptor::DynamicOutputP2WSH {
outpoint: Readable::read(reader)?,
key: Readable::read(reader)?,
witness_script: Readable::read(reader)?,
to_self_delay: Readable::read(reader)?,
output: Readable::read(reader)?,
}),
2u8 => Ok(SpendableOutputDescriptor::DynamicOutputP2WPKH {
outpoint: Readable::read(reader)?,
key: Readable::read(reader)?,
output: Readable::read(reader)?,
}),
_ => Err(DecodeError::InvalidValue),
}
}
}

/// A trait to describe an object which can get user secrets and key material.
pub trait KeysInterface: Send + Sync {
/// A type which implements ChannelKeys which will be returned by get_channel_keys.
Expand Down
68 changes: 50 additions & 18 deletions lightning/src/ln/channelmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use chain::chaininterface::{ChainListener, ChainWatchInterface, BroadcasterInter
use chain::transaction::OutPoint;
use chain::keysinterface::{SpendableOutputDescriptor, ChannelKeys};
use util::logger::Logger;
use util::ser::{ReadableArgs, Readable, Writer, Writeable, U48};
use util::ser::{ReadableArgs, Readable, MaybeReadable, Writer, Writeable, U48};
use util::{byte_utils, events};

use std::collections::{HashMap, hash_map, HashSet};
Expand Down Expand Up @@ -222,7 +222,6 @@ pub struct SimpleManyChannelMonitor<Key, ChanSigner: ChannelKeys, T: Deref, F: D
monitors: Mutex<HashMap<Key, ChannelMonitor<ChanSigner>>>,
chain_monitor: Arc<ChainWatchInterface>,
broadcaster: T,
pending_events: Mutex<Vec<events::Event>>,
logger: Arc<Logger>,
fee_estimator: F
}
Expand All @@ -234,16 +233,10 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref +
{
fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[u32]) {
let block_hash = header.bitcoin_hash();
let mut new_events: Vec<events::Event> = Vec::with_capacity(0);
{
let mut monitors = self.monitors.lock().unwrap();
for monitor in monitors.values_mut() {
let (txn_outputs, spendable_outputs) = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator);
if spendable_outputs.len() > 0 {
new_events.push(events::Event::SpendableOutputs {
outputs: spendable_outputs,
});
}
let txn_outputs = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator);

for (ref txid, ref outputs) in txn_outputs {
for (idx, output) in outputs.iter().enumerate() {
Expand All @@ -252,8 +245,6 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref +
}
}
}
let mut pending_events = self.pending_events.lock().unwrap();
pending_events.append(&mut new_events);
}

fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) {
Expand All @@ -276,7 +267,6 @@ impl<Key : Send + cmp::Eq + hash::Hash + 'static, ChanSigner: ChannelKeys, T: De
monitors: Mutex::new(HashMap::new()),
chain_monitor,
broadcaster,
pending_events: Mutex::new(Vec::new()),
logger,
fee_estimator: feeest,
};
Expand Down Expand Up @@ -362,10 +352,11 @@ impl<Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref, F: De
F::Target: FeeEstimator
{
fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
let mut pending_events = self.pending_events.lock().unwrap();
let mut ret = Vec::new();
mem::swap(&mut ret, &mut *pending_events);
ret
let mut pending_events = Vec::new();
for chan in self.monitors.lock().unwrap().values_mut() {
pending_events.append(&mut chan.get_and_clear_pending_events());
}
pending_events
}
}

Expand Down Expand Up @@ -792,6 +783,11 @@ impl<R: ::std::io::Read> Readable<R> for ChannelMonitorUpdateStep {
///
/// You MUST ensure that no ChannelMonitors for a given channel anywhere contain out-of-date
/// information and are actively monitoring the chain.
///
/// Pending Events or updated HTLCs which have not yet been read out by
/// get_and_clear_pending_htlcs_updated or get_and_clear_pending_events are serialized to disk and
/// reloaded at deserialize-time. Thus, you must ensure that, when handling events, all events
/// gotten are fully handled before re-serializing the new state.
pub struct ChannelMonitor<ChanSigner: ChannelKeys> {
latest_update_id: u64,
commitment_transaction_number_obscure_factor: u64,
Expand Down Expand Up @@ -835,6 +831,7 @@ pub struct ChannelMonitor<ChanSigner: ChannelKeys> {
payment_preimages: HashMap<PaymentHash, PaymentPreimage>,

pending_htlcs_updated: Vec<HTLCUpdate>,
pending_events: Vec<events::Event>,

destination_script: Script,
// Thanks to data loss protection, we may be able to claim our non-htlc funds
Expand Down Expand Up @@ -948,6 +945,7 @@ impl<ChanSigner: ChannelKeys> PartialEq for ChannelMonitor<ChanSigner> {
self.current_local_signed_commitment_tx != other.current_local_signed_commitment_tx ||
self.payment_preimages != other.payment_preimages ||
self.pending_htlcs_updated != other.pending_htlcs_updated ||
self.pending_events.len() != other.pending_events.len() || // We trust events to round-trip properly
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be documented in ChannelMonitor struct than its Events are invariant between round-trip (contrary to ChannelManager ones and previous commit)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should just fix it in ChannelManager and serialize them out, instead of adding a comment. I did add a comment to ChannelMonitor.

self.destination_script != other.destination_script ||
self.to_remote_rescue != other.to_remote_rescue ||
self.pending_claim_requests != other.pending_claim_requests ||
Expand Down Expand Up @@ -1135,6 +1133,11 @@ impl<ChanSigner: ChannelKeys + Writeable> ChannelMonitor<ChanSigner> {
data.write(writer)?;
}

writer.write_all(&byte_utils::be64_to_array(self.pending_events.len() as u64))?;
for event in self.pending_events.iter() {
event.write(writer)?;
}

self.last_block_hash.write(writer)?;
self.destination_script.write(writer)?;
if let Some((ref to_remote_script, ref local_key)) = self.to_remote_rescue {
Expand Down Expand Up @@ -1267,6 +1270,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {

payment_preimages: HashMap::new(),
pending_htlcs_updated: Vec::new(),
pending_events: Vec::new(),

destination_script: destination_script.clone(),
to_remote_rescue: None,
Expand Down Expand Up @@ -1560,6 +1564,18 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
ret
}

/// Gets the list of pending events which were generated by previous actions, clearing the list
/// in the process.
///
/// This is called by ManyChannelMonitor::get_and_clear_pending_events() and is equivalent to
/// EventsProvider::get_and_clear_pending_events() except that it requires &mut self as we do
/// no internal locking in ChannelMonitors.
pub fn get_and_clear_pending_events(&mut self) -> Vec<events::Event> {
let mut ret = Vec::new();
mem::swap(&mut ret, &mut self.pending_events);
ret
}

/// Can only fail if idx is < get_min_seen_secret
pub(super) fn get_secret(&self, idx: u64) -> Option<[u8; 32]> {
self.commitment_secrets.get_secret(idx)
Expand Down Expand Up @@ -2534,7 +2550,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
/// Eventually this should be pub and, roughly, implement ChainListener, however this requires
/// &mut self, as well as returns new spendable outputs and outpoints to watch for spending of
/// on-chain.
fn block_connected<B: Deref, F: Deref>(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: B, fee_estimator: F)-> (Vec<(Sha256dHash, Vec<TxOut>)>, Vec<SpendableOutputDescriptor>)
fn block_connected<B: Deref, F: Deref>(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: B, fee_estimator: F)-> Vec<(Sha256dHash, Vec<TxOut>)>
where B::Target: BroadcasterInterface,
F::Target: FeeEstimator
{
Expand Down Expand Up @@ -2767,7 +2783,14 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
for &(ref txid, ref output_scripts) in watch_outputs.iter() {
self.outputs_to_watch.insert(txid.clone(), output_scripts.iter().map(|o| o.script_pubkey.clone()).collect());
}
(watch_outputs, spendable_outputs)

if spendable_outputs.len() > 0 {
self.pending_events.push(events::Event::SpendableOutputs {
outputs: spendable_outputs,
});
}

watch_outputs
}

fn block_disconnected<B: Deref, F: Deref>(&mut self, height: u32, block_hash: &Sha256dHash, broadcaster: B, fee_estimator: F)
Expand Down Expand Up @@ -3369,6 +3392,14 @@ impl<R: ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>> ReadableArgs<R,
pending_htlcs_updated.push(Readable::read(reader)?);
}

let pending_events_len: u64 = Readable::read(reader)?;
let mut pending_events = Vec::with_capacity(cmp::min(pending_events_len as usize, MAX_ALLOC_SIZE / mem::size_of::<events::Event>()));
for _ in 0..pending_events_len {
if let Some(event) = MaybeReadable::read(reader)? {
pending_events.push(event);
}
}

let last_block_hash: Sha256dHash = Readable::read(reader)?;
let destination_script = Readable::read(reader)?;
let to_remote_rescue = match <u8 as Readable<R>>::read(reader)? {
Expand Down Expand Up @@ -3471,6 +3502,7 @@ impl<R: ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>> ReadableArgs<R,

payment_preimages,
pending_htlcs_updated,
pending_events,

destination_script,
to_remote_rescue,
Expand Down
97 changes: 90 additions & 7 deletions lightning/src/util/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,12 @@
//! Because we don't have a built-in runtime, it's up to the client to call events at a time in the
//! future, as well as generate and broadcast funding transactions handle payment preimages and a
//! few other things.
//!
//! Note that many events are handled for you by PeerHandler, so in the common design of having a
//! PeerManager which marshalls messages to ChannelManager and Router you only need to call
//! process_events on the PeerHandler and then get_and_clear_pending_events and handle the events
//! that bubble up to the surface. If, however, you do not have a PeerHandler managing a
//! ChannelManager you need to handle all of the events which may be generated.
//TODO: We need better separation of event types ^

use ln::msgs;
use ln::channelmanager::{PaymentPreimage, PaymentHash};
use chain::transaction::OutPoint;
use chain::keysinterface::SpendableOutputDescriptor;
use util::ser::{Writeable, Writer, MaybeReadable, Readable};

use bitcoin::blockdata::script::Script;

Expand All @@ -24,6 +18,10 @@ use secp256k1::key::PublicKey;
use std::time::Duration;

/// An Event which you should probably take some action in response to.
///
/// Note that while Writeable and Readable are implemented for Event, you probably shouldn't use
/// them directly as they don't round-trip exactly (for example FundingGenerationReady is never
/// written as it makes no sense to respond to it after reconnecting to peers).
pub enum Event {
/// Used to indicate that the client should generate a funding transaction with the given
/// parameters and then call ChannelManager::funding_transaction_generated.
Expand Down Expand Up @@ -108,6 +106,91 @@ pub enum Event {
},
}

impl Writeable for Event {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
match self {
&Event::FundingGenerationReady { .. } => {
0u8.write(writer)?;
// We never write out FundingGenerationReady events as, upon disconnection, peers
// drop any channels which have not yet exchanged funding_signed.
},
&Event::FundingBroadcastSafe { ref funding_txo, ref user_channel_id } => {
1u8.write(writer)?;
funding_txo.write(writer)?;
user_channel_id.write(writer)?;
},
&Event::PaymentReceived { ref payment_hash, ref amt } => {
2u8.write(writer)?;
payment_hash.write(writer)?;
amt.write(writer)?;
},
&Event::PaymentSent { ref payment_preimage } => {
3u8.write(writer)?;
payment_preimage.write(writer)?;
},
&Event::PaymentFailed { ref payment_hash, ref rejected_by_dest,
#[cfg(test)]
ref error_code,
} => {
4u8.write(writer)?;
payment_hash.write(writer)?;
rejected_by_dest.write(writer)?;
#[cfg(test)]
error_code.write(writer)?;
},
&Event::PendingHTLCsForwardable { time_forwardable: _ } => {
5u8.write(writer)?;
// We don't write the time_fordwardable out at all, as we presume when the user
// deserializes us at least that much time has elapsed.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About the assumption than some time has elapsed, I' m not sure, some people where interested by some HA scheme where you synchronize in real-time state between different ChannelManager. Dunno if it makes sense right now with current locking of ChannelHolder but still something to have in mind..

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, certainly isn't a thing you can do now, but also no big deal - if the user did some kind of crazy serialize-to-disk thing and then ends up relaying pending HTLCs a few seconds early, not really any harm done.

},
&Event::SpendableOutputs { ref outputs } => {
6u8.write(writer)?;
(outputs.len() as u64).write(writer)?;
for output in outputs.iter() {
output.write(writer)?;
}
},
}
Ok(())
}
}
impl<R: ::std::io::Read> MaybeReadable<R> for Event {
fn read(reader: &mut R) -> Result<Option<Self>, msgs::DecodeError> {
match Readable::read(reader)? {
0u8 => Ok(None),
1u8 => Ok(Some(Event::FundingBroadcastSafe {
funding_txo: Readable::read(reader)?,
user_channel_id: Readable::read(reader)?,
})),
2u8 => Ok(Some(Event::PaymentReceived {
payment_hash: Readable::read(reader)?,
amt: Readable::read(reader)?,
})),
3u8 => Ok(Some(Event::PaymentSent {
payment_preimage: Readable::read(reader)?,
})),
4u8 => Ok(Some(Event::PaymentFailed {
payment_hash: Readable::read(reader)?,
rejected_by_dest: Readable::read(reader)?,
#[cfg(test)]
error_code: Readable::read(reader)?,
})),
5u8 => Ok(Some(Event::PendingHTLCsForwardable {
time_forwardable: Duration::from_secs(0)
})),
6u8 => {
let outputs_len: u64 = Readable::read(reader)?;
let mut outputs = Vec::new();
for _ in 0..outputs_len {
outputs.push(Readable::read(reader)?);
}
Ok(Some(Event::SpendableOutputs { outputs }))
},
_ => Err(msgs::DecodeError::InvalidValue)
}
}
}

/// An event generated by ChannelManager which indicates a message should be sent to a peer (or
/// broadcast to most peers).
/// These events are handled by PeerManager::process_events if you are using a PeerManager.
Expand Down
Loading