@@ -65,7 +65,7 @@ use crate::future::Future;
65
65
use crate :: io:: ErrorKind ;
66
66
use crate :: task:: { Context , Poll } ;
67
67
use crate :: utils:: abort_on_panic;
68
- use std:: sync:: { Arc , Mutex } ;
68
+ use std:: sync:: Mutex ;
69
69
70
70
/// Low watermark value, defines the bare minimum of the pool.
71
71
/// Spawns initial thread set.
@@ -135,12 +135,12 @@ lazy_static! {
135
135
} ;
136
136
137
137
/// Sliding window for pool task frequency calculation
138
- static ref FREQ_QUEUE : Arc < Mutex <VecDeque <u64 > >> = {
139
- Arc :: new ( Mutex :: new( VecDeque :: with_capacity( FREQUENCY_QUEUE_SIZE + 1 ) ) )
138
+ static ref FREQ_QUEUE : Mutex <VecDeque <u64 >> = {
139
+ Mutex :: new( VecDeque :: with_capacity( FREQUENCY_QUEUE_SIZE + 1 ) )
140
140
} ;
141
141
142
142
/// Dynamic pool thread count variable
143
- static ref POOL_SIZE : Arc < Mutex <u64 >> = Arc :: new ( Mutex :: new( LOW_WATERMARK ) ) ;
143
+ static ref POOL_SIZE : Mutex <u64 > = Mutex :: new( LOW_WATERMARK ) ;
144
144
}
145
145
146
146
/// Exponentially Weighted Moving Average calculation
@@ -180,9 +180,8 @@ fn calculate_ema(freq_queue: &VecDeque<u64>) -> f64 {
180
180
/// It uses frequency based calculation to define work. Utilizing average processing rate.
181
181
fn scale_pool ( ) {
182
182
// Fetch current frequency, it does matter that operations are ordered in this approach.
183
- let current_frequency = FREQUENCY . load ( Ordering :: SeqCst ) ;
184
- let freq_queue_arc = FREQ_QUEUE . clone ( ) ;
185
- let mut freq_queue = freq_queue_arc. lock ( ) . unwrap ( ) ;
183
+ let current_frequency = FREQUENCY . swap ( 0 , Ordering :: SeqCst ) ;
184
+ let mut freq_queue = FREQ_QUEUE . lock ( ) . unwrap ( ) ;
186
185
187
186
// Make it safe to start for calculations by adding initial frequency scale
188
187
if freq_queue. len ( ) == 0 {
@@ -227,15 +226,13 @@ fn scale_pool() {
227
226
// If we fall to this case, scheduler is congested by longhauling tasks.
228
227
// For unblock the flow we should add up some threads to the pool, but not that many to
229
228
// stagger the program's operation.
230
- let scale = LOW_WATERMARK * current_frequency + 1 ;
229
+ let scale = ( ( current_frequency as f64 ) . powf ( LOW_WATERMARK as f64 ) + 1_f64 ) as u64 ;
231
230
232
231
// Scale it up!
233
232
( 0 ..scale) . for_each ( |_| {
234
233
create_blocking_thread ( ) ;
235
234
} ) ;
236
235
}
237
-
238
- FREQUENCY . store ( 0 , Ordering :: Release ) ;
239
236
}
240
237
241
238
/// Creates blocking thread to receive tasks
@@ -245,8 +242,7 @@ fn create_blocking_thread() {
245
242
// Check that thread is spawnable.
246
243
// If it hits to the OS limits don't spawn it.
247
244
{
248
- let current_arc = POOL_SIZE . clone ( ) ;
249
- let pool_size = * current_arc. lock ( ) . unwrap ( ) ;
245
+ let pool_size = * POOL_SIZE . lock ( ) . unwrap ( ) ;
250
246
if pool_size >= MAX_THREADS . load ( Ordering :: SeqCst ) {
251
247
MAX_THREADS . store ( 10_000 , Ordering :: SeqCst ) ;
252
248
return ;
@@ -267,17 +263,11 @@ fn create_blocking_thread() {
267
263
let wait_limit = Duration :: from_millis ( 1000 + rand_sleep_ms) ;
268
264
269
265
// Adjust the pool size counter before and after spawn
270
- {
271
- let current_arc = POOL_SIZE . clone ( ) ;
272
- * current_arc. lock ( ) . unwrap ( ) += 1 ;
273
- }
266
+ * POOL_SIZE . lock ( ) . unwrap ( ) += 1 ;
274
267
while let Ok ( task) = POOL . receiver . recv_timeout ( wait_limit) {
275
268
abort_on_panic ( || task. run ( ) ) ;
276
269
}
277
- {
278
- let current_arc = POOL_SIZE . clone ( ) ;
279
- * current_arc. lock ( ) . unwrap ( ) -= 1 ;
280
- }
270
+ * POOL_SIZE . lock ( ) . unwrap ( ) -= 1 ;
281
271
} )
282
272
. map_err ( |err| {
283
273
match err. kind ( ) {
@@ -286,8 +276,7 @@ fn create_blocking_thread() {
286
276
// Also, some systems have it(like macOS), and some don't(Linux).
287
277
// This case expected not to happen.
288
278
// But when happened this shouldn't throw a panic.
289
- let current_arc = POOL_SIZE . clone ( ) ;
290
- MAX_THREADS . store ( * current_arc. lock ( ) . unwrap ( ) - 1 , Ordering :: SeqCst ) ;
279
+ MAX_THREADS . store ( * POOL_SIZE . lock ( ) . unwrap ( ) - 1 , Ordering :: SeqCst ) ;
291
280
}
292
281
_ => eprintln ! (
293
282
"cannot start a dynamic thread driving blocking tasks: {}" ,
0 commit comments