Skip to content

Replay events on event handling failures due to persistence failures. #374

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Oct 16, 2024
Merged
234 changes: 133 additions & 101 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,11 +498,13 @@ where
status: Some(PaymentStatus::Failed),
..PaymentDetailsUpdate::new(payment_id)
};
self.payment_store.update(&update).unwrap_or_else(|e| {
log_error!(self.logger, "Failed to access payment store: {}", e);
panic!("Failed to access payment store");
});
return Ok(());
match self.payment_store.update(&update) {
Ok(_) => return Ok(()),
Err(e) => {
log_error!(self.logger, "Failed to access payment store: {}", e);
return Err(ReplayEvent());
},
};
}

if info.status == PaymentStatus::Succeeded
Expand All @@ -520,11 +522,13 @@ where
status: Some(PaymentStatus::Failed),
..PaymentDetailsUpdate::new(payment_id)
};
self.payment_store.update(&update).unwrap_or_else(|e| {
log_error!(self.logger, "Failed to access payment store: {}", e);
panic!("Failed to access payment store");
});
return Ok(());
match self.payment_store.update(&update) {
Ok(_) => return Ok(()),
Err(e) => {
log_error!(self.logger, "Failed to access payment store: {}", e);
return Err(ReplayEvent());
},
};
}

let max_total_opening_fee_msat = match info.kind {
Expand Down Expand Up @@ -559,11 +563,13 @@ where
status: Some(PaymentStatus::Failed),
..PaymentDetailsUpdate::new(payment_id)
};
self.payment_store.update(&update).unwrap_or_else(|e| {
log_error!(self.logger, "Failed to access payment store: {}", e);
panic!("Failed to access payment store");
});
return Ok(());
match self.payment_store.update(&update) {
Ok(_) => return Ok(()),
Err(e) => {
log_error!(self.logger, "Failed to access payment store: {}", e);
return Err(ReplayEvent());
},
};
}

// If this is known by the store but ChannelManager doesn't know the preimage,
Expand All @@ -577,22 +583,23 @@ where
"We would have registered the preimage if we knew"
);

self.event_queue
.add_event(Event::PaymentClaimable {
payment_id,
payment_hash,
claimable_amount_msat: amount_msat,
claim_deadline,
})
.unwrap_or_else(|e| {
let event = Event::PaymentClaimable {
payment_id,
payment_hash,
claimable_amount_msat: amount_msat,
claim_deadline,
};
match self.event_queue.add_event(event) {
Ok(_) => return Ok(()),
Err(e) => {
log_error!(
self.logger,
"Failed to push to event queue: {}",
e
);
panic!("Failed to push to event queue");
});
return Ok(());
return Err(ReplayEvent());
},
};
}
},
_ => {},
Expand Down Expand Up @@ -715,10 +722,13 @@ where
status: Some(PaymentStatus::Failed),
..PaymentDetailsUpdate::new(payment_id)
};
self.payment_store.update(&update).unwrap_or_else(|e| {
log_error!(self.logger, "Failed to access payment store: {}", e);
panic!("Failed to access payment store");
});
match self.payment_store.update(&update) {
Ok(_) => return Ok(()),
Err(e) => {
log_error!(self.logger, "Failed to access payment store: {}", e);
return Err(ReplayEvent());
},
};
}
},
LdkEvent::PaymentClaimed {
Expand Down Expand Up @@ -796,20 +806,22 @@ where
payment_id,
e
);
panic!("Failed to access payment store");
return Err(ReplayEvent());
},
}

self.event_queue
.add_event(Event::PaymentReceived {
payment_id: Some(payment_id),
payment_hash,
amount_msat,
})
.unwrap_or_else(|e| {
let event = Event::PaymentReceived {
payment_id: Some(payment_id),
payment_hash,
amount_msat,
};
match self.event_queue.add_event(event) {
Ok(_) => return Ok(()),
Err(e) => {
log_error!(self.logger, "Failed to push to event queue: {}", e);
panic!("Failed to push to event queue");
});
return Err(ReplayEvent());
},
};
},
LdkEvent::PaymentSent {
payment_id,
Expand All @@ -832,10 +844,13 @@ where
..PaymentDetailsUpdate::new(payment_id)
};

self.payment_store.update(&update).unwrap_or_else(|e| {
log_error!(self.logger, "Failed to access payment store: {}", e);
panic!("Failed to access payment store");
});
match self.payment_store.update(&update) {
Ok(_) => {},
Err(e) => {
log_error!(self.logger, "Failed to access payment store: {}", e);
return Err(ReplayEvent());
},
};

self.payment_store.get(&payment_id).map(|payment| {
log_info!(
Expand All @@ -852,17 +867,19 @@ where
hex_utils::to_string(&payment_preimage.0)
);
});
let event = Event::PaymentSuccessful {
payment_id: Some(payment_id),
payment_hash,
fee_paid_msat,
};

self.event_queue
.add_event(Event::PaymentSuccessful {
payment_id: Some(payment_id),
payment_hash,
fee_paid_msat,
})
.unwrap_or_else(|e| {
match self.event_queue.add_event(event) {
Ok(_) => return Ok(()),
Err(e) => {
log_error!(self.logger, "Failed to push to event queue: {}", e);
panic!("Failed to push to event queue");
});
return Err(ReplayEvent());
},
};
},
LdkEvent::PaymentFailed { payment_id, payment_hash, reason, .. } => {
log_info!(
Expand All @@ -877,20 +894,23 @@ where
status: Some(PaymentStatus::Failed),
..PaymentDetailsUpdate::new(payment_id)
};
self.payment_store.update(&update).unwrap_or_else(|e| {
log_error!(self.logger, "Failed to access payment store: {}", e);
panic!("Failed to access payment store");
});
self.event_queue
.add_event(Event::PaymentFailed {
payment_id: Some(payment_id),
payment_hash,
reason,
})
.unwrap_or_else(|e| {
match self.payment_store.update(&update) {
Ok(_) => {},
Err(e) => {
log_error!(self.logger, "Failed to access payment store: {}", e);
return Err(ReplayEvent());
},
};

let event =
Event::PaymentFailed { payment_id: Some(payment_id), payment_hash, reason };
match self.event_queue.add_event(event) {
Ok(_) => return Ok(()),
Err(e) => {
log_error!(self.logger, "Failed to push to event queue: {}", e);
panic!("Failed to push to event queue");
});
return Err(ReplayEvent());
},
};
},

LdkEvent::PaymentPathSuccessful { .. } => {},
Expand All @@ -915,12 +935,13 @@ where
}
},
LdkEvent::SpendableOutputs { outputs, channel_id } => {
self.output_sweeper
.track_spendable_outputs(outputs, channel_id, true, None)
.unwrap_or_else(|_| {
match self.output_sweeper.track_spendable_outputs(outputs, channel_id, true, None) {
Ok(_) => return Ok(()),
Err(_) => {
log_error!(self.logger, "Failed to track spendable outputs");
panic!("Failed to track spendable outputs");
});
return Err(ReplayEvent());
},
};
},
LdkEvent::OpenChannelRequest {
temporary_channel_id,
Expand Down Expand Up @@ -1111,18 +1132,22 @@ where
channel_id,
counterparty_node_id,
);
self.event_queue
.add_event(Event::ChannelPending {
channel_id,
user_channel_id: UserChannelId(user_channel_id),
former_temporary_channel_id: former_temporary_channel_id.unwrap(),
counterparty_node_id,
funding_txo,
})
.unwrap_or_else(|e| {

let event = Event::ChannelPending {
channel_id,
user_channel_id: UserChannelId(user_channel_id),
former_temporary_channel_id: former_temporary_channel_id.unwrap(),
counterparty_node_id,
funding_txo,
};
match self.event_queue.add_event(event) {
Ok(_) => {},
Err(e) => {
log_error!(self.logger, "Failed to push to event queue: {}", e);
panic!("Failed to push to event queue");
});
return Err(ReplayEvent());
},
};

let network_graph = self.network_graph.read_only();
let channels =
self.channel_manager.list_channels_with_counterparty(&counterparty_node_id);
Expand Down Expand Up @@ -1164,16 +1189,19 @@ where
channel_id,
counterparty_node_id,
);
self.event_queue
.add_event(Event::ChannelReady {
channel_id,
user_channel_id: UserChannelId(user_channel_id),
counterparty_node_id: Some(counterparty_node_id),
})
.unwrap_or_else(|e| {

let event = Event::ChannelReady {
channel_id,
user_channel_id: UserChannelId(user_channel_id),
counterparty_node_id: Some(counterparty_node_id),
};
match self.event_queue.add_event(event) {
Ok(_) => {},
Err(e) => {
log_error!(self.logger, "Failed to push to event queue: {}", e);
panic!("Failed to push to event queue");
});
return Err(ReplayEvent());
},
};
},
LdkEvent::ChannelClosed {
channel_id,
Expand All @@ -1183,17 +1211,21 @@ where
..
} => {
log_info!(self.logger, "Channel {} closed due to: {}", channel_id, reason);
self.event_queue
.add_event(Event::ChannelClosed {
channel_id,
user_channel_id: UserChannelId(user_channel_id),
counterparty_node_id,
reason: Some(reason),
})
.unwrap_or_else(|e| {

let event = Event::ChannelClosed {
channel_id,
user_channel_id: UserChannelId(user_channel_id),
counterparty_node_id,
reason: Some(reason),
};

match self.event_queue.add_event(event) {
Ok(_) => {},
Err(e) => {
log_error!(self.logger, "Failed to push to event queue: {}", e);
panic!("Failed to push to event queue");
});
return Err(ReplayEvent());
},
};
},
LdkEvent::DiscardFunding { .. } => {},
LdkEvent::HTLCIntercepted { .. } => {},
Expand Down
Loading