Skip to content

Add util to track justice transactions for watchtowers #2552

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,16 @@ impl MonitorUpdateId {
///
/// Third-party watchtowers may be built as a part of an implementation of this trait, with the
/// advantage that you can control whether to resume channel operation depending on if an update
/// has been persisted to a watchtower. For this, you may find the following methods useful:
/// [`ChannelMonitor::initial_counterparty_commitment_tx`],
/// has been persisted to a watchtower. A utility for tracking and building signed justice
/// transactions is provided in the [`util::watchtower`] module. Otherwise, you may find the
/// following methods useful: [`ChannelMonitor::initial_counterparty_commitment_tx`],
/// [`ChannelMonitor::counterparty_commitment_txs_from_update`],
/// [`ChannelMonitor::sign_to_local_justice_tx`], [`TrustedCommitmentTransaction::revokeable_output_index`],
/// [`TrustedCommitmentTransaction::build_to_local_justice_tx`].
///
/// [`TrustedCommitmentTransaction::revokeable_output_index`]: crate::ln::chan_utils::TrustedCommitmentTransaction::revokeable_output_index
/// [`TrustedCommitmentTransaction::build_to_local_justice_tx`]: crate::ln::chan_utils::TrustedCommitmentTransaction::build_to_local_justice_tx
/// [`util::watchtower`]: crate::util::watchtower
pub trait Persist<ChannelSigner: WriteableEcdsaChannelSigner> {
/// Persist a new channel's data in response to a [`chain::Watch::watch_channel`] call. This is
/// called by [`ChannelManager`] for new channels, or may be called directly, e.g. on startup.
Expand Down
41 changes: 0 additions & 41 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8404,47 +8404,6 @@ where
}
}

impl Writeable for VecDeque<(Event, Option<EventCompletionAction>)> {
fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
(self.len() as u64).write(w)?;
for (event, action) in self.iter() {
event.write(w)?;
action.write(w)?;
#[cfg(debug_assertions)] {
// Events are MaybeReadable, in some cases indicating that they shouldn't actually
// be persisted and are regenerated on restart. However, if such an event has a
// post-event-handling action we'll write nothing for the event and would have to
// either forget the action or fail on deserialization (which we do below). Thus,
// check that the event is sane here.
let event_encoded = event.encode();
let event_read: Option<Event> =
MaybeReadable::read(&mut &event_encoded[..]).unwrap();
if action.is_some() { assert!(event_read.is_some()); }
}
}
Ok(())
}
}
impl Readable for VecDeque<(Event, Option<EventCompletionAction>)> {
fn read<R: Read>(reader: &mut R) -> Result<Self, DecodeError> {
let len: u64 = Readable::read(reader)?;
const MAX_ALLOC_SIZE: u64 = 1024 * 16;
let mut events: Self = VecDeque::with_capacity(cmp::min(
MAX_ALLOC_SIZE/mem::size_of::<(events::Event, Option<EventCompletionAction>)>() as u64,
len) as usize);
for _ in 0..len {
let ev_opt = MaybeReadable::read(reader)?;
let action = Readable::read(reader)?;
if let Some(ev) = ev_opt {
events.push_back((ev, action));
} else if action.is_some() {
return Err(DecodeError::InvalidValue);
}
}
Ok(events)
}
}

impl_writeable_tlv_based_enum!(ChannelShutdownState,
(0, NotShuttingDown) => {},
(2, ShutdownInitiated) => {},
Expand Down
1 change: 1 addition & 0 deletions lightning/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod invoice;
pub mod persist;
pub mod string;
pub mod wakers;
pub mod watchtower;

pub(crate) mod atomic_counter;
pub(crate) mod byte_utils;
Expand Down
75 changes: 74 additions & 1 deletion lightning/src/util/ser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
//! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
//! [`ChannelMonitor`]: crate::chain::channelmonitor::ChannelMonitor

use crate::events::Event;
use crate::ln::channelmanager::EventCompletionAction;
use crate::prelude::*;
use crate::io::{self, Read, Seek, Write};
use crate::io_extras::{copy, sink};
use core::hash::Hash;
use crate::sync::Mutex;
use core::cmp;
use core::{cmp, mem};
use core::convert::TryFrom;
use core::ops::Deref;

Expand All @@ -45,6 +47,7 @@ use crate::ln::{PaymentPreimage, PaymentHash, PaymentSecret};

use crate::util::byte_utils::{be48_to_array, slice_to_be48};
use crate::util::string::UntrustedString;
use crate::util::watchtower::UnsignedJusticeData;

/// serialization buffer size
pub const MAX_BUF_SIZE: usize = 64 * 1024;
Expand Down Expand Up @@ -785,6 +788,75 @@ where T: Readable + Eq + Hash
}
}

// VecDeques
impl Writeable for VecDeque<UnsignedJusticeData> {
#[inline]
fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
CollectionLength(self.len() as u64).write(w)?;
for elem in self.iter() {
elem.write(w)?;
}
Ok(())
}
}

impl Readable for VecDeque<UnsignedJusticeData> {
#[inline]
fn read<R: Read>(r: &mut R) -> Result<Self, DecodeError> {
let len: CollectionLength = Readable::read(r)?;
let mut ret = VecDeque::with_capacity(cmp::min(
len.0 as usize, MAX_BUF_SIZE / core::mem::size_of::<UnsignedJusticeData>()));
for _ in 0..len.0 {
if let Some(val) = MaybeReadable::read(r)? {
ret.push_back(val);
}
}
Ok(ret)
}
}

impl Writeable for VecDeque<(Event, Option<EventCompletionAction>)> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO its a bit weird to move stuff that's only used in one file into ser.rs, I'd kinda rather keep it with the struct definition.

fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
(self.len() as u64).write(w)?;
for (event, action) in self.iter() {
event.write(w)?;
action.write(w)?;
#[cfg(debug_assertions)] {
// Events are MaybeReadable, in some cases indicating that they shouldn't actually
// be persisted and are regenerated on restart. However, if such an event has a
// post-event-handling action we'll write nothing for the event and would have to
// either forget the action or fail on deserialization (which we do below). Thus,
// check that the event is sane here.
let event_encoded = event.encode();
let event_read: Option<Event> =
MaybeReadable::read(&mut &event_encoded[..]).unwrap();
if action.is_some() { assert!(event_read.is_some()); }
}
}
Ok(())
}
}

impl Readable for VecDeque<(Event, Option<EventCompletionAction>)> {
fn read<R: Read>(reader: &mut R) -> Result<Self, DecodeError> {
let len: u64 = Readable::read(reader)?;
const MAX_ALLOC_SIZE: u64 = 1024 * 16;
let mut events: Self = VecDeque::with_capacity(cmp::min(
MAX_ALLOC_SIZE/mem::size_of::<(Event, Option<EventCompletionAction>)>() as u64,
len) as usize);
for _ in 0..len {
let ev_opt = MaybeReadable::read(reader)?;
let action = Readable::read(reader)?;
if let Some(ev) = ev_opt {
events.push_back((ev, action));
} else if action.is_some() {
return Err(DecodeError::InvalidValue);
}
}
Ok(events)
}
}

// Vectors
macro_rules! impl_writeable_for_vec {
($ty: ty $(, $name: ident)*) => {
Expand Down Expand Up @@ -848,6 +920,7 @@ impl Readable for Vec<u8> {
}
}

impl_for_vec!(u32);
impl_for_vec!(ecdsa::Signature);
impl_for_vec!(crate::chain::channelmonitor::ChannelMonitorUpdate);
impl_for_vec!(crate::ln::channelmanager::MonitorUpdateCompletionAction);
Expand Down
87 changes: 20 additions & 67 deletions lightning/src/util/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use crate::events;
use crate::events::bump_transaction::{WalletSource, Utxo};
use crate::ln::ChannelId;
use crate::ln::channelmanager;
use crate::ln::chan_utils::CommitmentTransaction;
use crate::ln::features::{ChannelFeatures, InitFeatures, NodeFeatures};
use crate::ln::{msgs, wire};
use crate::ln::msgs::LightningError;
Expand All @@ -38,6 +37,7 @@ use crate::util::config::UserConfig;
use crate::util::test_channel_signer::{TestChannelSigner, EnforcementState};
use crate::util::logger::{Logger, Level, Record};
use crate::util::ser::{Readable, ReadableArgs, Writer, Writeable};
use crate::util::watchtower::JusticeTxTracker;

use bitcoin::EcdsaSighashType;
use bitcoin::blockdata::constants::ChainHash;
Expand All @@ -61,7 +61,6 @@ use regex;
use crate::io;
use crate::prelude::*;
use core::cell::RefCell;
use core::ops::Deref;
use core::time::Duration;
use crate::sync::{Mutex, Arc};
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
Expand Down Expand Up @@ -276,50 +275,30 @@ impl<'a> chain::Watch<TestChannelSigner> for TestChainMonitor<'a> {
}
}

struct JusticeTxData {
justice_tx: Transaction,
value: u64,
commitment_number: u64,
}

pub(crate) struct WatchtowerPersister {
persister: TestPersister,
/// Upon a new commitment_signed, we'll get a
/// ChannelMonitorUpdateStep::LatestCounterpartyCommitmentTxInfo. We'll store the justice tx
/// amount, and commitment number so we can build the justice tx after our counterparty
/// revokes it.
unsigned_justice_tx_data: Mutex<HashMap<OutPoint, VecDeque<JusticeTxData>>>,
/// `ChannelMonitorUpdateStep::LatestCounterpartyCommitmentTxInfo`. We'll use our utility
/// object `JusticeTxTracker` to build and sign the justice tx.
justice_tx_tracker: RefCell<JusticeTxTracker>,
/// After receiving a revoke_and_ack for a commitment number, we'll form and store the justice
/// tx which would be used to provide a watchtower with the data it needs.
watchtower_state: Mutex<HashMap<OutPoint, HashMap<Txid, Transaction>>>,
destination_script: Script,
signed_justice_txs: RefCell<HashMap<OutPoint, HashMap<Txid, Transaction>>>,
}

impl WatchtowerPersister {
pub(crate) fn new(destination_script: Script) -> Self {
WatchtowerPersister {
persister: TestPersister::new(),
unsigned_justice_tx_data: Mutex::new(HashMap::new()),
watchtower_state: Mutex::new(HashMap::new()),
destination_script,
justice_tx_tracker: RefCell::new(JusticeTxTracker::new(vec![FEERATE_FLOOR_SATS_PER_KW],
destination_script)),
signed_justice_txs: RefCell::new(HashMap::new()),
}
}

pub(crate) fn justice_tx(&self, funding_txo: OutPoint, commitment_txid: &Txid)
-> Option<Transaction> {
self.watchtower_state.lock().unwrap().get(&funding_txo).unwrap().get(commitment_txid).cloned()
}

fn form_justice_data_from_commitment(&self, counterparty_commitment_tx: &CommitmentTransaction)
-> Option<JusticeTxData> {
let trusted_tx = counterparty_commitment_tx.trust();
let output_idx = trusted_tx.revokeable_output_index()?;
let built_tx = trusted_tx.built_transaction();
let value = built_tx.transaction.output[output_idx as usize].value;
let justice_tx = trusted_tx.build_to_local_justice_tx(
FEERATE_FLOOR_SATS_PER_KW as u64, self.destination_script.clone()).ok()?;
let commitment_number = counterparty_commitment_tx.commitment_number();
Some(JusticeTxData { justice_tx, value, commitment_number })
self.signed_justice_txs.borrow().get(&funding_txo).unwrap().get(commitment_txid).cloned()
}
}

Expand All @@ -328,20 +307,8 @@ impl<Signer: sign::WriteableEcdsaChannelSigner> chainmonitor::Persist<Signer> fo
data: &channelmonitor::ChannelMonitor<Signer>, id: MonitorUpdateId
) -> chain::ChannelMonitorUpdateStatus {
let res = self.persister.persist_new_channel(funding_txo, data, id);

assert!(self.unsigned_justice_tx_data.lock().unwrap()
.insert(funding_txo, VecDeque::new()).is_none());
assert!(self.watchtower_state.lock().unwrap()
.insert(funding_txo, HashMap::new()).is_none());

let initial_counterparty_commitment_tx = data.initial_counterparty_commitment_tx()
.expect("First and only call expects Some");
if let Some(justice_data)
= self.form_justice_data_from_commitment(&initial_counterparty_commitment_tx) {
self.unsigned_justice_tx_data.lock().unwrap()
.get_mut(&funding_txo).unwrap()
.push_back(justice_data);
}
self.justice_tx_tracker.borrow_mut().add_new_channel(funding_txo, data);
assert!(self.signed_justice_txs.borrow_mut().insert(funding_txo, HashMap::new()).is_none());
res
}

Expand All @@ -350,29 +317,15 @@ impl<Signer: sign::WriteableEcdsaChannelSigner> chainmonitor::Persist<Signer> fo
data: &channelmonitor::ChannelMonitor<Signer>, update_id: MonitorUpdateId
) -> chain::ChannelMonitorUpdateStatus {
let res = self.persister.update_persisted_channel(funding_txo, update, data, update_id);

if let Some(update) = update {
let commitment_txs = data.counterparty_commitment_txs_from_update(update);
let justice_datas = commitment_txs.into_iter()
.filter_map(|commitment_tx| self.form_justice_data_from_commitment(&commitment_tx));
let mut channels_justice_txs = self.unsigned_justice_tx_data.lock().unwrap();
let channel_state = channels_justice_txs.get_mut(&funding_txo).unwrap();
channel_state.extend(justice_datas);

while let Some(JusticeTxData { justice_tx, value, commitment_number }) = channel_state.front() {
let input_idx = 0;
let commitment_txid = justice_tx.input[input_idx].previous_output.txid;
match data.sign_to_local_justice_tx(justice_tx.clone(), input_idx, *value, *commitment_number) {
Ok(signed_justice_tx) => {
let dup = self.watchtower_state.lock().unwrap()
.get_mut(&funding_txo).unwrap()
.insert(commitment_txid, signed_justice_tx);
assert!(dup.is_none());
channel_state.pop_front();
},
Err(_) => break,
}
}
let signed_justice_txs = match update {
Some(update) =>
self.justice_tx_tracker.borrow_mut().process_update(funding_txo, data, update),
None => vec![],
};
for tx in signed_justice_txs {
let commitment_txid = tx.input[0].previous_output.txid;
self.signed_justice_txs.borrow_mut()
.get_mut(&funding_txo).unwrap().insert(commitment_txid, tx);
}
res
}
Expand Down
Loading