Skip to content

Commit f049b32

Browse files
committed
Refactor the waitlist to another module
1 parent 01efa4c commit f049b32

File tree

4 files changed

+177
-151
lines changed

4 files changed

+177
-151
lines changed

src/conn/pool/futures/disconnect_pool.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use futures_core::ready;
1616
use tokio::sync::mpsc::UnboundedSender;
1717

1818
use crate::{
19-
conn::pool::{Inner, Pool, QUEUE_END_ID},
19+
conn::pool::{waitlist::QUEUE_END_ID, Inner, Pool},
2020
error::Error,
2121
Conn,
2222
};

src/conn/pool/futures/get_conn.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use {
2222

2323
use crate::{
2424
conn::{
25-
pool::{Pool, QueueId},
25+
pool::{waitlist::QueueId, Pool},
2626
Conn,
2727
},
2828
error::*,

src/conn/pool/mod.rs

Lines changed: 9 additions & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,13 @@
77
// modified, or distributed except according to those terms.
88

99
use futures_util::FutureExt;
10-
use keyed_priority_queue::KeyedPriorityQueue;
1110
use tokio::sync::mpsc;
1211

1312
use std::{
14-
borrow::Borrow,
15-
cmp::Reverse,
1613
collections::VecDeque,
17-
hash::{Hash, Hasher},
1814
str::FromStr,
1915
sync::{atomic, Arc, Mutex},
20-
task::{Context, Poll, Waker},
16+
task::{Context, Poll},
2117
time::{Duration, Instant},
2218
};
2319

@@ -29,12 +25,14 @@ use crate::{
2925
};
3026

3127
pub use metrics::Metrics;
28+
use waitlist::{QueueId, Waitlist};
3229

3330
mod recycler;
3431
// this is a really unfortunate name for a module
3532
pub mod futures;
3633
mod metrics;
3734
mod ttl_check_inerval;
35+
mod waitlist;
3836

3937
/// Connection that is idling in the pool.
4038
#[derive(Debug)]
@@ -104,103 +102,6 @@ impl Exchange {
104102
}
105103
}
106104

107-
#[derive(Default, Debug)]
108-
struct Waitlist {
109-
queue: KeyedPriorityQueue<QueuedWaker, QueueId>,
110-
metrics: Arc<Metrics>,
111-
}
112-
113-
impl Waitlist {
114-
/// Returns `true` if pushed.
115-
fn push(&mut self, waker: Waker, queue_id: QueueId) -> bool {
116-
// The documentation of Future::poll says:
117-
// Note that on multiple calls to poll, only the Waker from
118-
// the Context passed to the most recent call should be
119-
// scheduled to receive a wakeup.
120-
//
121-
// But the the documentation of KeyedPriorityQueue::push says:
122-
// Adds new element to queue if missing key or replace its
123-
// priority if key exists. In second case doesn’t replace key.
124-
//
125-
// This means we have to remove first to have the most recent
126-
// waker in the queue.
127-
let occupied = self.remove(queue_id);
128-
self.queue.push(QueuedWaker { queue_id, waker }, queue_id);
129-
130-
self.metrics
131-
.active_wait_requests
132-
.fetch_add(1, atomic::Ordering::Relaxed);
133-
134-
!occupied
135-
}
136-
137-
fn pop(&mut self) -> Option<Waker> {
138-
match self.queue.pop() {
139-
Some((qw, _)) => {
140-
self.metrics
141-
.active_wait_requests
142-
.fetch_sub(1, atomic::Ordering::Relaxed);
143-
Some(qw.waker)
144-
}
145-
None => None,
146-
}
147-
}
148-
149-
/// Returns `true` if removed.
150-
fn remove(&mut self, id: QueueId) -> bool {
151-
let is_removed = self.queue.remove(&id).is_some();
152-
if is_removed {
153-
self.metrics
154-
.active_wait_requests
155-
.fetch_sub(1, atomic::Ordering::Relaxed);
156-
}
157-
158-
is_removed
159-
}
160-
161-
fn peek_id(&mut self) -> Option<QueueId> {
162-
self.queue.peek().map(|(qw, _)| qw.queue_id)
163-
}
164-
}
165-
166-
const QUEUE_END_ID: QueueId = QueueId(Reverse(u64::MAX));
167-
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
168-
pub(crate) struct QueueId(Reverse<u64>);
169-
170-
impl QueueId {
171-
fn next() -> Self {
172-
static NEXT_QUEUE_ID: atomic::AtomicU64 = atomic::AtomicU64::new(0);
173-
let id = NEXT_QUEUE_ID.fetch_add(1, atomic::Ordering::SeqCst);
174-
QueueId(Reverse(id))
175-
}
176-
}
177-
178-
#[derive(Debug)]
179-
struct QueuedWaker {
180-
queue_id: QueueId,
181-
waker: Waker,
182-
}
183-
184-
impl Eq for QueuedWaker {}
185-
186-
impl Borrow<QueueId> for QueuedWaker {
187-
fn borrow(&self) -> &QueueId {
188-
&self.queue_id
189-
}
190-
}
191-
192-
impl PartialEq for QueuedWaker {
193-
fn eq(&self, other: &Self) -> bool {
194-
self.queue_id == other.queue_id
195-
}
196-
}
197-
198-
impl Hash for QueuedWaker {
199-
fn hash<H: Hasher>(&self, state: &mut H) {
200-
self.queue_id.hash(state)
201-
}
202-
}
203-
204105
/// Connection pool data.
205106
#[derive(Debug)]
206107
pub struct Inner {
@@ -248,10 +149,7 @@ impl Pool {
248149
metrics: metrics.clone(),
249150
exchange: Mutex::new(Exchange {
250151
available: VecDeque::with_capacity(pool_opts.constraints().max()),
251-
waiting: Waitlist {
252-
queue: KeyedPriorityQueue::default(),
253-
metrics,
254-
},
152+
waiting: Waitlist::new(metrics),
255153
exist: 0,
256154
recycler: Some((rx, pool_opts)),
257155
}),
@@ -480,20 +378,16 @@ mod test {
480378
use waker_fn::waker_fn;
481379

482380
use std::{
483-
cmp::Reverse,
484381
future::Future,
485382
pin::pin,
486383
sync::{Arc, OnceLock},
487-
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
384+
task::{Context, Poll},
488385
time::Duration,
489386
};
490387

491388
use crate::{
492-
conn::pool::{Pool, QueueId, Waitlist, QUEUE_END_ID},
493-
opts::PoolOpts,
494-
prelude::*,
495-
test_misc::get_opts,
496-
PoolConstraints, Row, TxOpts, Value,
389+
conn::pool::Pool, opts::PoolOpts, prelude::*, test_misc::get_opts, PoolConstraints, Row,
390+
TxOpts, Value,
497391
};
498392

499393
macro_rules! conn_ex_field {
@@ -1022,7 +916,7 @@ mod test {
1022916
}
1023917
drop(only_conn);
1024918

1025-
assert_eq!(0, pool.inner.exchange.lock().unwrap().waiting.queue.len());
919+
assert_eq!(0, pool.inner.exchange.lock().unwrap().waiting.len());
1026920
// metrics should catch up with waiting queue (see #335)
1027921
assert_eq!(
1028922
0,
@@ -1076,40 +970,6 @@ mod test {
1076970
Ok(())
1077971
}
1078972

1079-
#[test]
1080-
fn waitlist_integrity() {
1081-
const DATA: *const () = &();
1082-
const NOOP_CLONE_FN: unsafe fn(*const ()) -> RawWaker = |_| RawWaker::new(DATA, &RW_VTABLE);
1083-
const NOOP_FN: unsafe fn(*const ()) = |_| {};
1084-
static RW_VTABLE: RawWakerVTable =
1085-
RawWakerVTable::new(NOOP_CLONE_FN, NOOP_FN, NOOP_FN, NOOP_FN);
1086-
let w = unsafe { Waker::from_raw(RawWaker::new(DATA, &RW_VTABLE)) };
1087-
1088-
let mut waitlist = Waitlist::default();
1089-
assert_eq!(0, waitlist.queue.len());
1090-
1091-
waitlist.push(w.clone(), QueueId(Reverse(4)));
1092-
waitlist.push(w.clone(), QueueId(Reverse(2)));
1093-
waitlist.push(w.clone(), QueueId(Reverse(8)));
1094-
waitlist.push(w.clone(), QUEUE_END_ID);
1095-
waitlist.push(w.clone(), QueueId(Reverse(10)));
1096-
1097-
waitlist.remove(QueueId(Reverse(8)));
1098-
1099-
assert_eq!(4, waitlist.queue.len());
1100-
1101-
let (_, id) = waitlist.queue.pop().unwrap();
1102-
assert_eq!(2, id.0 .0);
1103-
let (_, id) = waitlist.queue.pop().unwrap();
1104-
assert_eq!(4, id.0 .0);
1105-
let (_, id) = waitlist.queue.pop().unwrap();
1106-
assert_eq!(10, id.0 .0);
1107-
let (_, id) = waitlist.queue.pop().unwrap();
1108-
assert_eq!(QUEUE_END_ID, id);
1109-
1110-
assert_eq!(0, waitlist.queue.len());
1111-
}
1112-
1113973
#[tokio::test]
1114974
async fn check_absolute_connection_ttl() -> super::Result<()> {
1115975
let constraints = PoolConstraints::new(1, 3).unwrap();
@@ -1191,7 +1051,7 @@ mod test {
11911051

11921052
let queue_len = || {
11931053
let exchange = pool.inner.exchange.lock().unwrap();
1194-
exchange.waiting.queue.len()
1054+
exchange.waiting.len()
11951055
};
11961056

11971057
// Get a connection, so we know the next futures will be

0 commit comments

Comments
 (0)