@@ -74,7 +74,11 @@ fn calculate_dispatch_frequency() {
74
74
EXPECTED_POOL_SIZE . store ( current_pool_size + 1 , Ordering :: Relaxed ) ;
75
75
} else {
76
76
// There is no need for the extra threads, schedule them to be closed.
77
- EXPECTED_POOL_SIZE . fetch_sub ( 2 , Ordering :: Relaxed ) ;
77
+ let expected = EXPECTED_POOL_SIZE . load ( Ordering :: Relaxed ) ;
78
+ if 1 + LOW_WATERMARK < expected {
79
+ // Substract amount of low watermark
80
+ EXPECTED_POOL_SIZE . fetch_sub ( LOW_WATERMARK , Ordering :: Relaxed ) ;
81
+ }
78
82
}
79
83
}
80
84
@@ -120,10 +124,13 @@ fn schedule(t: async_task::Task<()>) {
120
124
// case won't happen)
121
125
let pool_size = EXPECTED_POOL_SIZE . load ( Ordering :: Relaxed ) ;
122
126
let current_pool_size = CURRENT_POOL_SIZE . load ( Ordering :: Relaxed ) ;
127
+ let reward = ( AVR_FREQUENCY . load ( Ordering :: Relaxed ) as f64 / 2.0_f64 ) as u64 ;
128
+
123
129
if pool_size > current_pool_size && pool_size <= MAX_THREADS {
124
- let needed = pool_size - current_pool_size;
130
+ let needed = pool_size. saturating_sub ( current_pool_size) ;
125
131
126
- // For safety, check boundaries before spawning threads
132
+ // For safety, check boundaries before spawning threads.
133
+ // This also won't be expected to happen. But better safe than sorry.
127
134
if needed > 0 && ( needed < pool_size || needed < current_pool_size) {
128
135
( 0 ..needed) . for_each ( |_| {
129
136
create_blocking_thread ( ) ;
@@ -137,8 +144,9 @@ fn schedule(t: async_task::Task<()>) {
137
144
POOL . sender . send ( err. into_inner ( ) ) . unwrap ( ) ;
138
145
} else {
139
146
// Every successful dispatch, rewarded with negative
140
- let reward = AVR_FREQUENCY . load ( Ordering :: Relaxed ) as f64 / 2.0_f64 ;
141
- EXPECTED_POOL_SIZE . fetch_sub ( reward as u64 , Ordering :: Relaxed ) ;
147
+ if reward + LOW_WATERMARK < pool_size {
148
+ EXPECTED_POOL_SIZE . fetch_sub ( reward, Ordering :: Relaxed ) ;
149
+ }
142
150
}
143
151
}
144
152
0 commit comments