Skip to content

Commit 8f226c7

Browse files
committed
Merge branch 'mempool-multimap'
2 parents fceb53d + b45dceb commit 8f226c7

File tree

4 files changed

+48
-100
lines changed

4 files changed

+48
-100
lines changed

mempool/src/pool/mod.rs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -763,9 +763,8 @@ impl<M: MemoryUsageEstimator> Mempool<M> {
763763
let expired_ids: Vec<_> = self
764764
.store
765765
.txs_by_creation_time
766-
.values()
767-
.flat_map(Deref::deref)
768-
.map(|entry_id| self.store.txs_by_id.get(entry_id).expect("entry should exist"))
766+
.iter()
767+
.map(|(_time, id)| self.store.txs_by_id.get(id).expect("entry should exist"))
769768
.filter(|entry| {
770769
let now = self.clock.get_time();
771770
let expired = now.saturating_sub(entry.creation_time()) > self.max_tx_age;
@@ -794,9 +793,8 @@ impl<M: MemoryUsageEstimator> Mempool<M> {
794793
let removed_id = self
795794
.store
796795
.txs_by_descendant_score
797-
.values()
798-
.flat_map(Deref::deref)
799-
.copied()
796+
.iter()
797+
.map(|(_score, entry)| *entry.deref())
800798
.next()
801799
.expect("pool not empty");
802800
let removed = self.store.txs_by_id.get(&removed_id).expect("tx with id should exist");
@@ -930,9 +928,8 @@ impl<M: MemoryUsageEstimator> Mempool<M> {
930928
pub fn get_all(&self) -> Vec<SignedTransaction> {
931929
self.store
932930
.txs_by_descendant_score
933-
.values()
934-
.flat_map(Deref::deref)
935-
.map(|id| self.store.get_entry(id).expect("entry").transaction().clone())
931+
.iter()
932+
.map(|(_score, id)| self.store.get_entry(id).expect("entry").transaction().clone())
936933
.collect()
937934
}
938935

@@ -972,7 +969,7 @@ impl<M: MemoryUsageEstimator> Mempool<M> {
972969
let tx_source = TransactionSourceForConnect::for_mempool(&best_index);
973970

974971
let block_timestamp = tx_accumulator.block_timestamp();
975-
let tx_id_iter = self.store.txs_by_ancestor_score.values().flat_map(Deref::deref).rev();
972+
let tx_id_iter = self.store.txs_by_ancestor_score.iter().map(|(_, id)| id).rev();
976973
let mut tx_iter = tx_id_iter
977974
.filter_map(|tx_id| {
978975
let tx = self.store.txs_by_id.get(tx_id)?.deref();
@@ -1118,11 +1115,8 @@ impl<M: MemoryUsageEstimator> Mempool<M> {
11181115
.txs_by_descendant_score
11191116
.iter()
11201117
.rev()
1121-
.find(|(_score, txs)| {
1122-
total_size += txs
1123-
.iter()
1124-
.map(|tx_id| self.store.txs_by_id.get(tx_id).map_or(0, |tx| tx.size()))
1125-
.sum::<usize>();
1118+
.find(|(_score, tx_id)| {
1119+
total_size += self.store.txs_by_id.get(tx_id).map_or(0, |tx| tx.size());
11261120
(total_size / 1_000_000) >= in_top_x_mb
11271121
})
11281122
.map_or_else(

mempool/src/pool/store/mod.rs

Lines changed: 27 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ mod mem_usage;
1717

1818
use std::{
1919
cmp::Ordering,
20-
collections::{btree_map::Entry::Occupied, BTreeMap, BTreeSet},
20+
collections::{BTreeMap, BTreeSet},
2121
ops::Deref,
2222
};
2323

@@ -74,7 +74,7 @@ type StrictDropPolicy = mem_usage::NoOpDropPolicy;
7474

7575
type TrackedMap<K, V> = Tracked<BTreeMap<K, V>>;
7676
type TrackedSet<K> = Tracked<BTreeSet<K>>;
77-
type TrackedTxIdMultiMap<K> = TrackedMap<K, TrackedSet<Id<Transaction>>>;
77+
type TrackedTxIdMultiMap<K> = TrackedSet<(K, Id<Transaction>)>;
7878

7979
#[derive(Debug)]
8080
pub struct MempoolStore {
@@ -182,9 +182,9 @@ impl MempoolStore {
182182
}
183183

184184
let expected_size = map_size_deep(&self.txs_by_id)
185-
+ map_size_deep(&self.txs_by_descendant_score)
186-
+ map_size_deep(&self.txs_by_ancestor_score)
187-
+ map_size_deep(&self.txs_by_creation_time)
185+
+ self.txs_by_descendant_score.indirect_memory_usage()
186+
+ self.txs_by_ancestor_score.indirect_memory_usage()
187+
+ self.txs_by_creation_time.indirect_memory_usage()
188188
+ self.spender_txs.indirect_memory_usage()
189189
+ self.txs_by_seq_no.indirect_memory_usage()
190190
+ self.seq_nos_by_tx.indirect_memory_usage();
@@ -194,11 +194,7 @@ impl MempoolStore {
194194
"Memory size tracker out of sync",
195195
);
196196

197-
let entries = self
198-
.txs_by_descendant_score
199-
.values()
200-
.flat_map(|ids| ids.deref())
201-
.collect::<Vec<_>>();
197+
let entries: Vec<_> = self.txs_by_descendant_score.iter().map(|(_, id)| id).collect();
202198

203199
for id in self.txs_by_id.keys() {
204200
assert_eq!(
@@ -337,11 +333,8 @@ impl MempoolStore {
337333
self.add_to_ancestor_score_index(&entry);
338334
self.mem_tracker.modify(
339335
&mut self.txs_by_creation_time,
340-
|txs_by_creation_time, tracker| {
341-
tracker.modify(
342-
txs_by_creation_time.entry(entry.creation_time()).or_default(),
343-
|entry, _| entry.insert(tx_id),
344-
);
336+
|txs_by_creation_time, _tracker| {
337+
txs_by_creation_time.insert((entry.creation_time(), tx_id));
345338
},
346339
);
347340

@@ -358,11 +351,8 @@ impl MempoolStore {
358351
self.refresh_ancestors(entry);
359352
self.mem_tracker.modify(
360353
&mut self.txs_by_descendant_score,
361-
|by_desc_score, tracker| {
362-
tracker.modify(
363-
by_desc_score.entry(entry.descendant_score()).or_default(),
364-
|map_entry, _| map_entry.insert(*entry.tx_id()),
365-
)
354+
|by_desc_score, _tracker| {
355+
by_desc_score.insert((entry.descendant_score(), *entry.tx_id()));
366356
},
367357
);
368358
}
@@ -372,55 +362,37 @@ impl MempoolStore {
372362
// because such children would be orphans.
373363
// When we implement disconnecting a block, we'll need to clean up the mess we're leaving
374364
// here.
375-
self.mem_tracker.modify(&mut self.txs_by_ancestor_score, |anc_score, tracker| {
376-
tracker.modify(
377-
anc_score.entry(entry.ancestor_score()).or_default(),
378-
|e, _| e.insert(*entry.tx_id()),
379-
)
380-
});
365+
self.mem_tracker
366+
.modify(&mut self.txs_by_ancestor_score, |by_anc_score, _tracker| {
367+
by_anc_score.insert((entry.ancestor_score(), *entry.tx_id()));
368+
});
381369
}
382370

383371
fn refresh_ancestors(&mut self, entry: &TxMempoolEntry) {
384372
// Since the ancestors of `entry` have had their descendant score modified, their ordering
385373
// in txs_by_descendant_score may no longer be correct. We thus remove all ancestors and
386374
// reinsert them, taking the new, updated fees into account
387375
let ancestors = entry.unconfirmed_ancestors(self);
388-
self.mem_tracker.modify(&mut self.txs_by_descendant_score, |by_ds, tracker| {
389-
for entries in by_ds.values_mut() {
390-
tracker.modify(entries, |e, _| e.retain(|id| !ancestors.contains(id)));
391-
}
376+
self.mem_tracker.modify(&mut self.txs_by_descendant_score, |by_ds, _tracker| {
377+
by_ds.retain(|(_score, e)| !ancestors.contains(e));
392378
for ancestor_id in ancestors.0 {
393379
let ancestor =
394380
self.txs_by_id.get(&ancestor_id).expect("Inconsistent mempool state");
395-
tracker.modify(
396-
by_ds.entry(ancestor.descendant_score()).or_default(),
397-
|e, _| e.insert(ancestor_id),
398-
);
381+
by_ds.insert((ancestor.descendant_score(), ancestor_id));
399382
}
400-
401-
by_ds.retain(|_score, txs| !txs.is_empty())
402383
});
403384
}
404385

405386
/// refresh descendants with new ancestor scores
406387
fn refresh_descendants(&mut self, entry: &TxMempoolEntry) {
407388
let descendants = entry.unconfirmed_descendants(self);
408-
self.mem_tracker.modify(&mut self.txs_by_ancestor_score, |by_as, tracker| {
409-
for entries in by_as.values_mut() {
410-
tracker.modify(entries, |e, _| e.retain(|id| !descendants.contains(id)));
411-
}
389+
self.mem_tracker.modify(&mut self.txs_by_ancestor_score, |by_as, _tracker| {
390+
by_as.retain(|(_score, e)| !descendants.contains(e));
412391
for descendant_id in descendants.0 {
413392
let descendant =
414393
self.txs_by_id.get(&descendant_id).expect("Inconsistent mempool state");
415-
tracker.modify(
416-
by_as.entry(descendant.ancestor_score()).or_default(),
417-
|e, _| e.insert(descendant_id),
418-
);
394+
by_as.insert((descendant.ancestor_score(), descendant_id));
419395
}
420-
421-
tracker.modify(&mut self.txs_by_descendant_score, |by_ds, _| {
422-
by_ds.retain(|_score, txs| !txs.is_empty())
423-
});
424396
})
425397
}
426398

@@ -458,7 +430,7 @@ impl MempoolStore {
458430
self.drop_tx(&entry);
459431
Some(entry)
460432
} else {
461-
assert!(!self.txs_by_descendant_score.values().any(|by_ds| by_ds.contains(tx_id)));
433+
assert!(!self.txs_by_descendant_score.iter().any(|(_, id)| id == tx_id));
462434
assert!(!self.spender_txs.iter().any(|(_, id)| *id == *tx_id));
463435
None
464436
}
@@ -480,41 +452,21 @@ impl MempoolStore {
480452

481453
fn remove_from_ancestor_score_index(&mut self, entry: &TxMempoolEntry) {
482454
self.refresh_descendants(entry);
483-
self.mem_tracker.modify(&mut self.txs_by_ancestor_score, |by_as, tracker| {
484-
let map_entry = by_as.entry(entry.ancestor_score()).and_modify(|entries| {
485-
tracker.modify(entries, |e, _| e.remove(entry.tx_id()));
486-
});
487-
488-
match map_entry {
489-
Occupied(entries) if entries.get().is_empty() => drop(entries.remove_entry()),
490-
_ => (),
491-
};
455+
self.mem_tracker.modify(&mut self.txs_by_ancestor_score, |by_as, _tracker| {
456+
by_as.remove(&(entry.ancestor_score(), *entry.tx_id()));
492457
})
493458
}
494459

495460
fn remove_from_descendant_score_index(&mut self, entry: &TxMempoolEntry) {
496461
self.refresh_ancestors(entry);
497-
self.mem_tracker.modify(&mut self.txs_by_descendant_score, |by_ds, tracker| {
498-
let map_entry = by_ds.entry(entry.descendant_score()).and_modify(|entries| {
499-
tracker.modify(entries, |e, _| e.remove(entry.tx_id()));
500-
});
501-
502-
match map_entry {
503-
Occupied(entries) if entries.get().is_empty() => drop(entries.remove_entry()),
504-
_ => {}
505-
};
462+
self.mem_tracker.modify(&mut self.txs_by_descendant_score, |by_ds, _tracker| {
463+
by_ds.remove(&(entry.descendant_score(), *entry.tx_id()));
506464
})
507465
}
508466

509467
fn remove_from_creation_time_index(&mut self, entry: &TxMempoolEntry) {
510-
self.mem_tracker.modify(&mut self.txs_by_creation_time, |by_ct, tracker| {
511-
by_ct.entry(entry.creation_time()).and_modify(|entries| {
512-
tracker.modify(entries, |e, _| e.remove(entry.tx_id()));
513-
});
514-
515-
if by_ct.get(&entry.creation_time()).expect("key must exist").is_empty() {
516-
by_ct.remove(&entry.creation_time());
517-
}
468+
self.mem_tracker.modify(&mut self.txs_by_creation_time, |by_ct, _tracker| {
469+
by_ct.remove(&(entry.creation_time(), *entry.tx_id()));
518470
})
519471
}
520472

mempool/src/pool/tests/mod.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1335,7 +1335,6 @@ async fn ancestor_score(#[case] seed: Seed) -> anyhow::Result<()> {
13351335
entry_b.fees_with_ancestors(),
13361336
(entry_b.fee() + entry_tx.fee()).unwrap()
13371337
);
1338-
assert!(!mempool.store.txs_by_ancestor_score.contains_key(&tx_b_fee.into()));
13391338
log::debug!(
13401339
"BEFORE REMOVAL raw txs_by_ancestor_score {:?}",
13411340
mempool.store.txs_by_ancestor_score
@@ -1364,8 +1363,6 @@ async fn ancestor_score(#[case] seed: Seed) -> anyhow::Result<()> {
13641363
entry_c.ancestor_score()
13651364
);
13661365

1367-
assert!(!mempool.store.txs_by_ancestor_score.contains_key(&tx_c_fee.into()));
1368-
13691366
check_txs_sorted_by_ancestor_score(&mempool);
13701367
mempool.store.assert_valid();
13711368

@@ -1376,8 +1373,8 @@ fn check_txs_sorted_by_ancestor_score<E>(mempool: &Mempool<E>) {
13761373
let txs_by_ancestor_score = mempool
13771374
.store
13781375
.txs_by_descendant_score
1379-
.values()
1380-
.flat_map(Deref::deref)
1376+
.iter()
1377+
.map(|(_score, id)| id)
13811378
.collect::<Vec<_>>();
13821379
for i in 0..(txs_by_ancestor_score.len() - 1) {
13831380
log::debug!("i = {}", i);
@@ -1477,15 +1474,13 @@ async fn descendant_score(#[case] seed: Seed) -> anyhow::Result<()> {
14771474
entry_b.fees_with_descendants(),
14781475
(entry_b.fee() + entry_c.fee()).unwrap()
14791476
);
1480-
assert!(!mempool.store.txs_by_descendant_score.contains_key(&tx_b_fee.into()));
14811477
log::debug!(
14821478
"raw_txs_by_descendant_score {:?}",
14831479
mempool.store.txs_by_descendant_score
14841480
);
14851481
check_txs_sorted_by_descendant_sore(&mempool);
14861482

14871483
mempool.store.remove_tx(entry_c.tx_id(), MempoolRemovalReason::Block);
1488-
assert!(!mempool.store.txs_by_descendant_score.contains_key(&tx_c_fee.into()));
14891484
let entry_b = mempool.store.txs_by_id.get(&tx_b_id).expect("tx_b");
14901485
assert_eq!(entry_b.fees_with_descendants(), entry_b.fee());
14911486

@@ -1499,8 +1494,8 @@ fn check_txs_sorted_by_descendant_sore<M>(mempool: &Mempool<M>) {
14991494
let txs_by_descendant_score = mempool
15001495
.store
15011496
.txs_by_descendant_score
1502-
.values()
1503-
.flat_map(Deref::deref)
1497+
.iter()
1498+
.map(|(_score, id)| id)
15041499
.collect::<Vec<_>>();
15051500
for i in 0..(txs_by_descendant_score.len() - 1) {
15061501
log::debug!("i = {}", i);

mempool/src/pool/tests/orphans.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ async fn transaction_graph_subset_permutation(#[case] seed: Seed) {
221221
let mut rng = make_seedable_rng(seed);
222222

223223
// Generate a valid graph of transactions
224-
let num_txs = rng.gen_range(15..40);
224+
let num_txs = rng.gen_range(15..90);
225225
let time = TimeGetter::default().get_time();
226226
let full_tx_sequence: Vec<_> =
227227
generate_transaction_graph(&mut rng, time).take(num_txs).collect();
@@ -251,6 +251,13 @@ async fn transaction_graph_subset_permutation(#[case] seed: Seed) {
251251
let _ = mempool.add_transaction(tx, TxOrigin::TEST).expect("tx add");
252252
});
253253

254+
log::info!(
255+
"Stats: count {}, memory {}, encoded size {}",
256+
mempool.store.txs_by_id.len(),
257+
mempool.memory_usage(),
258+
mempool.store.txs_by_id.values().map(|e| e.size()).sum::<usize>(),
259+
);
260+
254261
// Check the final state of each transaction in the original sequence
255262
results.push(all_tx_ids.iter().map(|id| TxStatus::fetch(&mempool, id)).collect());
256263
}

0 commit comments

Comments
 (0)