diff --git a/benches/blocking.rs b/benches/blocking.rs new file mode 100644 index 000000000..f9e659748 --- /dev/null +++ b/benches/blocking.rs @@ -0,0 +1,38 @@ +#![feature(test)] + +extern crate test; + +use async_std::task; +use async_std::task::blocking::JoinHandle; +use futures::future::join_all; +use std::thread; +use std::time::Duration; +use test::Bencher; + +// Benchmark for a 10K burst task spawn +#[bench] +fn blocking(b: &mut Bencher) { + b.iter(|| { + let handles = (0..10_000) + .map(|_| { + task::blocking::spawn(async { + let duration = Duration::from_millis(1); + thread::sleep(duration); + }) + }) + .collect::>>(); + + task::block_on(join_all(handles)); + }); +} + +// Benchmark for a single blocking task spawn +#[bench] +fn blocking_single(b: &mut Bencher) { + b.iter(|| { + task::blocking::spawn(async { + let duration = Duration::from_millis(1); + thread::sleep(duration); + }) + }); +} diff --git a/src/task/blocking.rs b/src/task/blocking.rs index 41177bc93..a91a51188 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -1,5 +1,57 @@ //! A thread pool for running blocking functions asynchronously. +//! +//! Blocking thread pool consists of four elements: +//! * Frequency Detector +//! * Trend Estimator +//! * Predictive Upscaler +//! * Time-based Downscaler +//! +//! ## Frequency Detector +//! Detects how many tasks are submitted from scheduler to thread pool in a given time frame. +//! Pool manager thread does this sampling every 200 milliseconds. +//! This value is going to be used for trend estimation phase. +//! +//! ## Trend Estimator +//! Hold up to the given number of frequencies to create an estimation. +//! Trend estimator holds 10 frequencies at a time. +//! This value is stored as constant in [FREQUENCY_QUEUE_SIZE](constant.FREQUENCY_QUEUE_SIZE.html). +//! Estimation algorithm and prediction uses Exponentially Weighted Moving Average algorithm. +//! +//! This algorithm is adapted from [A Novel Predictive and Self–Adaptive Dynamic Thread Pool Management](https://doi.org/10.1109/ISPA.2011.61) +//! and altered to: +//! * use instead of heavy calculation of trend, utilize thread redundancy which is the sum of the differences between the predicted and observed value. +//! * use instead of linear trend estimation, it uses exponential trend estimation where formula is: +//! ```text +//! LOW_WATERMARK * (predicted - observed) + LOW_WATERMARK +//! ``` +//! *NOTE:* If this algorithm wants to be tweaked increasing [LOW_WATERMARK](constant.LOW_WATERMARK.html) will automatically adapt the additional dynamic thread spawn count +//! * operate without watermarking by timestamps (in paper which is used to measure algorithms own performance during the execution) +//! * operate extensive subsampling. Extensive subsampling congests the pool manager thread. +//! * operate without keeping track of idle time of threads or job out queue like TEMA and FOPS implementations. +//! +//! ## Predictive Upscaler +//! Upscaler has three cases (also can be seen in paper): +//! * The rate slightly increases and there are many idle threads. +//! * The number of worker threads tends to be reduced since the workload of the system is descending. +//! * The system has no request or stalled. (Our case here is when the current tasks block further tasks from being processed – throughput hogs) +//! +//! For the first two EMA calculation and exponential trend estimation gives good performance. +//! For the last case, upscaler selects upscaling amount by amount of tasks mapped when throughput hogs happen. +//! +//! **example scenario:** Let's say we have 10_000 tasks where every one of them is blocking for 1 second. Scheduler will map plenty of tasks but will got rejected. +//! This makes estimation calculation nearly 0 for both entering and exiting parts. When this happens and we still see tasks mapped from scheduler. +//! We start to slowly increase threads by amount of frequency linearly. High increase of this value either make us hit to the thread threshold on +//! some OS or make congestion on the other thread utilizations of the program, because of context switch. +//! +//! Throughput hogs determined by a combination of job in / job out frequency and current scheduler task assignment frequency. +//! Threshold of EMA difference is eluded by machine epsilon for floating point arithmetic errors. +//! +//! ## Time-based Downscaler +//! When threads becomes idle, they will not shut down immediately. +//! Instead, they wait a random amount between 1 and 11 seconds +//! to even out the load. +use std::collections::VecDeque; use std::fmt; use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; @@ -10,21 +62,44 @@ use crossbeam_channel::{bounded, Receiver, Sender}; use lazy_static::lazy_static; use crate::future::Future; +use crate::io::ErrorKind; use crate::task::{Context, Poll}; use crate::utils::abort_on_panic; +use std::sync::Mutex; -const MAX_THREADS: u64 = 10_000; +/// Low watermark value, defines the bare minimum of the pool. +/// Spawns initial thread set. +const LOW_WATERMARK: u64 = 2; -static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0); +/// Pool managers interval time (milliseconds). +/// This is the actual interval which makes adaptation calculation. +const MANAGER_POLL_INTERVAL: u64 = 200; +/// Frequency histogram's sliding window size. +/// Defines how many frequencies will be considered for adaptation. +const FREQUENCY_QUEUE_SIZE: usize = 10; + +/// Exponential moving average smoothing coefficient for limited window. +/// Smoothing factor is estimated with: 2 / (N + 1) where N is sample size. +const EMA_COEFFICIENT: f64 = 2_f64 / (FREQUENCY_QUEUE_SIZE as f64 + 1_f64); + +/// Pool task frequency variable. +/// Holds scheduled tasks onto the thread pool for the calculation time window. +static FREQUENCY: AtomicU64 = AtomicU64::new(0); + +/// Possible max threads (without OS contract). +static MAX_THREADS: AtomicU64 = AtomicU64::new(10_000); + +/// Pool interface between the scheduler and thread pool struct Pool { sender: Sender>, receiver: Receiver>, } lazy_static! { + /// Blocking pool with static starting thread count. static ref POOL: Pool = { - for _ in 0..2 { + for _ in 0..LOW_WATERMARK { thread::Builder::new() .name("async-blocking-driver".to_string()) .spawn(|| abort_on_panic(|| { @@ -35,6 +110,19 @@ lazy_static! { .expect("cannot start a thread driving blocking tasks"); } + // Pool manager to check frequency of task rates + // and take action by scaling the pool accordingly. + thread::Builder::new() + .name("async-pool-manager".to_string()) + .spawn(|| abort_on_panic(|| { + let poll_interval = Duration::from_millis(MANAGER_POLL_INTERVAL); + loop { + scale_pool(); + thread::sleep(poll_interval); + } + })) + .expect("thread pool manager cannot be started"); + // We want to use an unbuffered channel here to help // us drive our dynamic control. In effect, the // kernel's scheduler becomes the queue, reducing @@ -45,20 +133,119 @@ lazy_static! { let (sender, receiver) = bounded(0); Pool { sender, receiver } }; + + /// Sliding window for pool task frequency calculation + static ref FREQ_QUEUE: Mutex> = { + Mutex::new(VecDeque::with_capacity(FREQUENCY_QUEUE_SIZE.saturating_add(1))) + }; + + /// Dynamic pool thread count variable + static ref POOL_SIZE: Mutex = Mutex::new(LOW_WATERMARK); } -// Create up to MAX_THREADS dynamic blocking task worker threads. -// Dynamic threads will terminate themselves if they don't -// receive any work after between one and ten seconds. -fn maybe_create_another_blocking_thread() { - // We use a `Relaxed` atomic operation because - // it's just a heuristic, and would not lose correctness - // even if it's random. - let workers = DYNAMIC_THREAD_COUNT.load(Ordering::Relaxed); - if workers >= MAX_THREADS { - return; +/// Exponentially Weighted Moving Average calculation +/// +/// This allows us to find the EMA value. +/// This value represents the trend of tasks mapped onto the thread pool. +/// Calculation is following: +/// ```text +/// +--------+-----------------+----------------------------------+ +/// | Symbol | Identifier | Explanation | +/// +--------+-----------------+----------------------------------+ +/// | α | EMA_COEFFICIENT | smoothing factor between 0 and 1 | +/// | Yt | freq | frequency sample at time t | +/// | St | acc | EMA at time t | +/// +--------+-----------------+----------------------------------+ +/// ``` +/// Under these definitions formula is following: +/// ```text +/// EMA = α * [ Yt + (1 - α)*Yt-1 + ((1 - α)^2)*Yt-2 + ((1 - α)^3)*Yt-3 ... ] + St +/// ``` +/// # Arguments +/// +/// * `freq_queue` - Sliding window of frequency samples +#[inline] +fn calculate_ema(freq_queue: &VecDeque) -> f64 { + freq_queue.iter().enumerate().fold(0_f64, |acc, (i, freq)| { + acc + ((*freq as f64) * ((1_f64 - EMA_COEFFICIENT).powf(i as f64) as f64)) + }) * EMA_COEFFICIENT as f64 +} + +/// Adaptive pool scaling function +/// +/// This allows to spawn new threads to make room for incoming task pressure. +/// Works in the background detached from the pool system and scales up the pool based +/// on the request rate. +/// +/// It uses frequency based calculation to define work. Utilizing average processing rate. +fn scale_pool() { + // Fetch current frequency, it does matter that operations are ordered in this approach. + let current_frequency = FREQUENCY.swap(0, Ordering::SeqCst); + let mut freq_queue = FREQ_QUEUE.lock().unwrap(); + + // Make it safe to start for calculations by adding initial frequency scale + if freq_queue.len() == 0 { + freq_queue.push_back(0); } + // Calculate message rate for the given time window + let frequency = (current_frequency as f64 / MANAGER_POLL_INTERVAL as f64) as u64; + + // Calculates current time window's EMA value (including last sample) + let prev_ema_frequency = calculate_ema(&freq_queue); + + // Add seen frequency data to the frequency histogram. + freq_queue.push_back(frequency); + if freq_queue.len() == FREQUENCY_QUEUE_SIZE.saturating_add(1) { + freq_queue.pop_front(); + } + + // Calculates current time window's EMA value (including last sample) + let curr_ema_frequency = calculate_ema(&freq_queue); + + // Adapts the thread count of pool + // + // Sliding window of frequencies visited by the pool manager. + // Pool manager creates EMA value for previous window and current window. + // Compare them to determine scaling amount based on the trends. + // If current EMA value is bigger, we will scale up. + if curr_ema_frequency > prev_ema_frequency { + // "Scale by" amount can be seen as "how much load is coming". + // "Scale" amount is "how many threads we should spawn". + let scale_by: f64 = curr_ema_frequency - prev_ema_frequency; + let scale = num_cpus::get() + .min(((LOW_WATERMARK as f64 * scale_by) + LOW_WATERMARK as f64) as usize); + + // It is time to scale the pool! + (0..scale).for_each(|_| { + create_blocking_thread(); + }); + } else if (curr_ema_frequency - prev_ema_frequency).abs() < std::f64::EPSILON + && current_frequency != 0 + { + // Throughput is low. Allocate more threads to unblock flow. + // If we fall to this case, scheduler is congested by longhauling tasks. + // For unblock the flow we should add up some threads to the pool, but not that many to + // stagger the program's operation. + (0..LOW_WATERMARK).for_each(|_| { + create_blocking_thread(); + }); + } +} + +/// Creates blocking thread to receive tasks +/// Dynamic threads will terminate themselves if they don't +/// receive any work after between one and ten seconds. +fn create_blocking_thread() { + // Check that thread is spawnable. + // If it hits to the OS limits don't spawn it. + { + let pool_size = *POOL_SIZE.lock().unwrap(); + if pool_size >= MAX_THREADS.load(Ordering::SeqCst) { + MAX_THREADS.store(10_000, Ordering::SeqCst); + return; + } + } // We want to avoid having all threads terminate at // exactly the same time, causing thundering herd // effects. We want to stagger their destruction over @@ -66,31 +253,55 @@ fn maybe_create_another_blocking_thread() { // background noise. // // Generate a simple random number of milliseconds - let rand_sleep_ms = u64::from(random(10_000)); + let rand_sleep_ms = 1000_u64 + .checked_add(u64::from(random(10_000))) + .expect("shouldn't overflow"); - thread::Builder::new() + let _ = thread::Builder::new() .name("async-blocking-driver-dynamic".to_string()) .spawn(move || { - let wait_limit = Duration::from_millis(1000 + rand_sleep_ms); + let wait_limit = Duration::from_millis(rand_sleep_ms); - DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed); + // Adjust the pool size counter before and after spawn + *POOL_SIZE.lock().unwrap() += 1; while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) { abort_on_panic(|| task.run()); } - DYNAMIC_THREAD_COUNT.fetch_sub(1, Ordering::Relaxed); + *POOL_SIZE.lock().unwrap() -= 1; }) - .expect("cannot start a dynamic thread driving blocking tasks"); + .map_err(|err| { + match err.kind() { + ErrorKind::WouldBlock => { + // Maximum allowed threads per process is varying from system to system. + // Also, some systems have it(like macOS), and some don't(Linux). + // This case expected not to happen. + // But when happened this shouldn't throw a panic. + let guarded_count = POOL_SIZE + .lock() + .unwrap() + .checked_sub(1) + .expect("shouldn't underflow"); + MAX_THREADS.store(guarded_count, Ordering::SeqCst); + } + _ => eprintln!( + "cannot start a dynamic thread driving blocking tasks: {}", + err + ), + } + }); } -// Enqueues work, attempting to send to the threadpool in a -// nonblocking way and spinning up another worker thread if -// there is not a thread ready to accept the work. +/// Enqueues work, attempting to send to the thread pool in a +/// nonblocking way and spinning up needed amount of threads +/// based on the previous statistics without relying on +/// if there is not a thread ready to accept the work or not. fn schedule(t: async_task::Task<()>) { + // Add up for every incoming scheduled task + FREQUENCY.fetch_add(1, Ordering::Acquire); + if let Err(err) = POOL.sender.try_send(t) { // We were not able to send to the channel without - // blocking. Try to spin up another thread and then - // retry sending while blocking. - maybe_create_another_blocking_thread(); + // blocking. POOL.sender.send(err.into_inner()).unwrap(); } } diff --git a/src/task/mod.rs b/src/task/mod.rs index 42b7e0883..e654fdfbd 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -34,4 +34,4 @@ mod pool; mod sleep; mod task; -pub(crate) mod blocking; +pub mod blocking; diff --git a/tests/thread_pool.rs b/tests/thread_pool.rs new file mode 100644 index 000000000..29a85c9f3 --- /dev/null +++ b/tests/thread_pool.rs @@ -0,0 +1,135 @@ +use async_std::task; +use async_std::task::blocking::JoinHandle; +use futures::future::join_all; +use std::thread; +use std::time::Duration; +use std::time::Instant; + +// Test for slow joins without task bursts during joins. +#[test] +#[ignore] +fn slow_join() { + let thread_join_time_max = 11_000; + let start = Instant::now(); + + // Send an initial batch of million bursts. + let handles = (0..1_000_000) + .map(|_| { + task::blocking::spawn(async { + let duration = Duration::from_millis(1); + thread::sleep(duration); + }) + }) + .collect::>>(); + + task::block_on(join_all(handles)); + + // Let them join to see how it behaves under different workloads. + let duration = Duration::from_millis(thread_join_time_max); + thread::sleep(duration); + + // Spawn yet another batch of work on top of it + let handles = (0..10_000) + .map(|_| { + task::blocking::spawn(async { + let duration = Duration::from_millis(100); + thread::sleep(duration); + }) + }) + .collect::>>(); + + task::block_on(join_all(handles)); + + // Slow joins shouldn't cause internal slow down + let elapsed = start.elapsed().as_millis() - thread_join_time_max as u128; + println!("Slow task join. Monotonic exec time: {:?} ns", elapsed); + + // Previous implementation is around this threshold. +} + +// Test for slow joins with task burst. +#[test] +#[ignore] +fn slow_join_interrupted() { + let thread_join_time_max = 2_000; + let start = Instant::now(); + + // Send an initial batch of million bursts. + let handles = (0..1_000_000) + .map(|_| { + task::blocking::spawn(async { + let duration = Duration::from_millis(1); + thread::sleep(duration); + }) + }) + .collect::>>(); + + task::block_on(join_all(handles)); + + // Let them join to see how it behaves under different workloads. + // This time join under the time window. + let duration = Duration::from_millis(thread_join_time_max); + thread::sleep(duration); + + // Spawn yet another batch of work on top of it + let handles = (0..10_000) + .map(|_| { + task::blocking::spawn(async { + let duration = Duration::from_millis(100); + thread::sleep(duration); + }) + }) + .collect::>>(); + + task::block_on(join_all(handles)); + + // Slow joins shouldn't cause internal slow down + let elapsed = start.elapsed().as_millis() - thread_join_time_max as u128; + println!("Slow task join. Monotonic exec time: {:?} ns", elapsed); + + // Previous implementation is around this threshold. +} + +// This test is expensive but it proves that longhauling tasks are working in adaptive thread pool. +// Thread pool which spawns on-demand will panic with this test. +#[test] +#[ignore] +fn longhauling_task_join() { + let thread_join_time_max = 11_000; + let start = Instant::now(); + + // First batch of overhauling tasks + let _ = (0..100_000) + .map(|_| { + task::blocking::spawn(async { + let duration = Duration::from_millis(1000); + thread::sleep(duration); + }) + }) + .collect::>>(); + + // Let them join to see how it behaves under different workloads. + let duration = Duration::from_millis(thread_join_time_max); + thread::sleep(duration); + + // Send yet another medium sized batch to see how it scales. + let handles = (0..10_000) + .map(|_| { + task::blocking::spawn(async { + let duration = Duration::from_millis(100); + thread::sleep(duration); + }) + }) + .collect::>>(); + + task::block_on(join_all(handles)); + + // Slow joins shouldn't cause internal slow down + let elapsed = start.elapsed().as_millis() - thread_join_time_max as u128; + println!( + "Long-hauling task join. Monotonic exec time: {:?} ns", + elapsed + ); + + // Previous implementation will panic when this test is running. +}