Skip to content

Commit 414e008

Browse files
marshallyalemarshall-yale
authored andcommitted
fix: fix batch ordering issue
Fixes issue #75 Raw client waiting map was using the same channel for every request/response. When items were put back into the channel inside of _reader_thread the waiting receiver in recv would just take the next response without validating it on request id request. This fixes this by using unique channels for each request response inside of the waiting map.
1 parent 805ea0a commit 414e008

File tree

1 file changed

+10
-9
lines changed

1 file changed

+10
-9
lines changed

src/raw_client.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
//! This module contains the definition of the raw client that wraps the transport method
44
55
use std::borrow::Borrow;
6-
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
6+
use std::collections::{BTreeMap, HashMap, VecDeque};
77
use std::io::{BufRead, BufReader, Read, Write};
88
use std::mem::drop;
99
use std::net::{TcpStream, ToSocketAddrs};
@@ -762,22 +762,23 @@ impl<T: Read + Write> ElectrumApi for RawClient<T> {
762762
fn batch_call(&self, batch: &Batch) -> Result<Vec<serde_json::Value>, Error> {
763763
let mut raw = Vec::new();
764764

765-
let mut missing_responses = BTreeSet::new();
765+
let mut missing_responses = Vec::new();
766766
let mut answers = BTreeMap::new();
767767

768-
// Add our listener to the map before we send the request, Here we will clone the sender
769-
// for every request id, so that we only have to monitor one receiver.
770-
let (sender, receiver) = channel();
768+
// Add our listener to the map before we send the request
771769

772770
for (method, params) in batch.iter() {
773771
let req = Request::new_id(
774772
self.last_id.fetch_add(1, Ordering::SeqCst),
775773
method,
776774
params.to_vec(),
777775
);
778-
missing_responses.insert(req.id);
776+
// Add distinct channel to each request so when we remove our request id (and sender) from the waiting_map
777+
// we can be sure that the response gets sent to the correct channel in self.recv
778+
let (sender, receiver) = channel();
779+
missing_responses.push((req.id, receiver));
779780

780-
self.waiting_map.lock()?.insert(req.id, sender.clone());
781+
self.waiting_map.lock()?.insert(req.id, sender);
781782

782783
raw.append(&mut serde_json::to_vec(&req)?);
783784
raw.extend_from_slice(b"\n");
@@ -796,7 +797,7 @@ impl<T: Read + Write> ElectrumApi for RawClient<T> {
796797

797798
self.increment_calls();
798799

799-
for req_id in missing_responses.iter() {
800+
for (req_id, receiver) in missing_responses.iter() {
800801
match self.recv(&receiver, *req_id) {
801802
Ok(mut resp) => answers.insert(req_id, resp["result"].take()),
802803
Err(e) => {
@@ -805,7 +806,7 @@ impl<T: Read + Write> ElectrumApi for RawClient<T> {
805806
warn!("got error for req_id {}: {:?}", req_id, e);
806807
warn!("removing all waiting req of this batch");
807808
let mut guard = self.waiting_map.lock()?;
808-
for req_id in missing_responses.iter() {
809+
for (req_id, _) in missing_responses.iter() {
809810
guard.remove(req_id);
810811
}
811812
return Err(e);

0 commit comments

Comments
 (0)