Skip to content

Avoid blocking and panicking where possible #89

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,8 @@ where
}
LdkEvent::SpendableOutputs { outputs } => {
// TODO: We should eventually remember the outputs and supply them to the wallet's coin selection, once BDK allows us to do so.
let destination_address = self.wallet.get_new_address().unwrap();
let destination_address =
self.wallet.get_new_address().expect("Failed to get destination address");
let output_descriptors = &outputs.iter().collect::<Vec<_>>();
let tx_feerate =
self.wallet.get_est_sat_per_1000_weight(ConfirmationTarget::Normal);
Expand Down
126 changes: 53 additions & 73 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,9 @@ impl Node {
///
/// **Note:** This **MUST** be called after each event has been handled.
pub fn event_handled(&self) {
self.event_queue.event_handled().unwrap();
self.event_queue
.event_handled()
.expect("Couldn't mark event handled due to persistence failure");
}

/// Returns our own node id
Expand Down Expand Up @@ -922,23 +924,12 @@ impl Node {

let con_peer_pubkey = peer_info.pubkey;
let con_peer_addr = peer_info.address;
let con_success = Arc::new(AtomicBool::new(false));
let con_success_cloned = Arc::clone(&con_success);
let con_logger = Arc::clone(&self.logger);
let con_pm = Arc::clone(&self.peer_manager);

tokio::task::block_in_place(move || {
runtime.block_on(async move {
let res =
connect_peer_if_necessary(con_peer_pubkey, con_peer_addr, con_pm, con_logger)
.await;
con_success_cloned.store(res.is_ok(), Ordering::Release);
})
});

if !con_success.load(Ordering::Acquire) {
return Err(Error::ConnectionFailed);
}
runtime.block_on(async move {
connect_peer_if_necessary(con_peer_pubkey, con_peer_addr, con_pm, con_logger).await
})?;

log_info!(self.logger, "Connected to peer {}@{}. ", peer_info.pubkey, peer_info.address,);

Expand Down Expand Up @@ -1001,23 +992,12 @@ impl Node {

let con_peer_pubkey = peer_info.pubkey;
let con_peer_addr = peer_info.address;
let con_success = Arc::new(AtomicBool::new(false));
let con_success_cloned = Arc::clone(&con_success);
let con_logger = Arc::clone(&self.logger);
let con_pm = Arc::clone(&self.peer_manager);

tokio::task::block_in_place(move || {
runtime.block_on(async move {
let res =
connect_peer_if_necessary(con_peer_pubkey, con_peer_addr, con_pm, con_logger)
.await;
con_success_cloned.store(res.is_ok(), Ordering::Release);
})
});

if !con_success.load(Ordering::Acquire) {
return Err(Error::ConnectionFailed);
}
runtime.block_on(async move {
connect_peer_if_necessary(con_peer_pubkey, con_peer_addr, con_pm, con_logger).await
})?;

let user_config = UserConfig {
channel_handshake_limits: ChannelHandshakeLimits {
Expand Down Expand Up @@ -1070,56 +1050,56 @@ impl Node {

let wallet = Arc::clone(&self.wallet);
let tx_sync = Arc::clone(&self.tx_sync);
let sync_cman = Arc::clone(&self.channel_manager);
let sync_cmon = Arc::clone(&self.chain_monitor);
let sync_logger = Arc::clone(&self.logger);
let confirmables = vec![
&*sync_cman as &(dyn Confirm + Sync + Send),
&*sync_cmon as &(dyn Confirm + Sync + Send),
];

tokio::task::block_in_place(move || {
tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(
async move {
let now = Instant::now();
match wallet.sync().await {
Ok(()) => {
log_info!(
sync_logger,
"Sync of on-chain wallet finished in {}ms.",
now.elapsed().as_millis()
);
Ok(())
}
Err(e) => {
log_error!(sync_logger, "Sync of on-chain wallet failed: {}", e);
Err(e)
}
}
},
)
let local_runtime =
tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
local_runtime.block_on(async move {
let now = Instant::now();
match wallet.sync().await {
Ok(()) => {
log_info!(
sync_logger,
"Sync of on-chain wallet finished in {}ms.",
now.elapsed().as_millis()
);
Ok(())
}
Err(e) => {
log_error!(sync_logger, "Sync of on-chain wallet failed: {}", e);
Err(e)
}
}
})?;

let sync_cman = Arc::clone(&self.channel_manager);
let sync_cmon = Arc::clone(&self.chain_monitor);
let sync_logger = Arc::clone(&self.logger);
tokio::task::block_in_place(move || {
runtime.block_on(async move {
let now = Instant::now();
match tx_sync.sync(confirmables).await {
Ok(()) => {
log_info!(
sync_logger,
"Sync of Lightning wallet finished in {}ms.",
now.elapsed().as_millis()
);
Ok(())
}
Err(e) => {
log_error!(sync_logger, "Sync of Lightning wallet failed: {}", e);
Err(e)
}
let (sender, receiver) = std::sync::mpsc::sync_channel(1);

runtime.spawn(async move {
let confirmables = vec![
&*sync_cman as &(dyn Confirm + Sync + Send),
&*sync_cmon as &(dyn Confirm + Sync + Send),
];
let now = Instant::now();
match tx_sync.sync(confirmables).await {
Ok(()) => {
log_info!(
sync_logger,
"Sync of Lightning wallet finished in {}ms.",
now.elapsed().as_millis()
);
let _ = sender.send(Ok(()));
}
})
})?;
Err(e) => {
log_error!(sync_logger, "Sync of Lightning wallet failed: {}", e);
let _ = sender.send(Err(e));
}
}
});

receiver.recv().map_err(|_| Error::TxSyncFailed)??;

Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub(crate) struct FilesystemLogger {
impl FilesystemLogger {
pub(crate) fn new(file_path: String) -> Self {
if let Some(parent_dir) = Path::new(&file_path).parent() {
fs::create_dir_all(parent_dir).unwrap();
fs::create_dir_all(parent_dir).expect("Failed to create log parent directory");
}
Self { file_path }
}
Expand All @@ -35,8 +35,8 @@ impl Logger for FilesystemLogger {
.create(true)
.append(true)
.open(self.file_path.clone())
.unwrap()
.expect("Failed to open log file")
.write_all(log.as_bytes())
.unwrap();
.expect("Failed to write to log file")
}
}
2 changes: 1 addition & 1 deletion src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ impl UniffiCustomTypeConverter for Network {
impl UniffiCustomTypeConverter for Txid {
type Builtin = String;
fn into_custom(val: Self::Builtin) -> uniffi::Result<Self> {
Ok(Txid::from_str(&val).unwrap())
Ok(Txid::from_str(&val)?)
}

fn from_custom(obj: Self) -> Self::Builtin {
Expand Down