@@ -15,7 +15,6 @@ use std::{
1515 Arc ,
1616 atomic:: { AtomicBool , AtomicU64 , AtomicUsize , Ordering } ,
1717 } ,
18- thread:: available_parallelism,
1918} ;
2019
2120use anyhow:: { Result , bail} ;
@@ -67,7 +66,7 @@ use crate::{
6766 } ,
6867 utils:: {
6968 bi_map:: BiMap , chunked_vec:: ChunkedVec , dash_map_drop_contents:: drop_contents,
70- ptr_eq_arc:: PtrEqArc , sharded:: Sharded , swap_retain,
69+ ptr_eq_arc:: PtrEqArc , shard_amount :: compute_shard_amount , sharded:: Sharded , swap_retain,
7170 } ,
7271} ;
7372
@@ -134,6 +133,10 @@ pub struct BackendOptions {
134133 /// Enables the backing storage.
135134 pub storage_mode : Option < StorageMode > ,
136135
136+ /// Number of tokio worker threads. It will be used to compute the shard amount of parallel
137+ /// datastructures. If `None`, it will use the available parallelism.
138+ pub num_workers : Option < usize > ,
139+
137140 /// Avoid big preallocations for faster startup. Should only be used for testing purposes.
138141 pub small_preallocation : bool ,
139142}
@@ -144,6 +147,7 @@ impl Default for BackendOptions {
144147 dependency_tracking : true ,
145148 active_tracking : true ,
146149 storage_mode : Some ( StorageMode :: ReadWrite ) ,
150+ num_workers : None ,
147151 small_preallocation : false ,
148152 }
149153 }
@@ -228,8 +232,7 @@ impl<B: BackingStorage> TurboTasksBackend<B> {
228232
229233impl < B : BackingStorage > TurboTasksBackendInner < B > {
230234 pub fn new ( mut options : BackendOptions , backing_storage : B ) -> Self {
231- let shard_amount =
232- ( available_parallelism ( ) . map_or ( 4 , |v| v. get ( ) ) * 64 ) . next_power_of_two ( ) ;
235+ let shard_amount = compute_shard_amount ( options. num_workers , options. small_preallocation ) ;
233236 let need_log = matches ! ( options. storage_mode, Some ( StorageMode :: ReadWrite ) ) ;
234237 if !options. dependency_tracking {
235238 options. active_tracking = false ;
@@ -256,7 +259,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
256259 task_cache : BiMap :: new ( ) ,
257260 transient_tasks : FxDashMap :: default ( ) ,
258261 local_is_partial : AtomicBool :: new ( next_task_id != TaskId :: MIN ) ,
259- storage : Storage :: new ( small_preallocation) ,
262+ storage : Storage :: new ( shard_amount , small_preallocation) ,
260263 in_progress_operations : AtomicUsize :: new ( 0 ) ,
261264 snapshot_request : Mutex :: new ( SnapshotRequest :: new ( ) ) ,
262265 operations_suspended : Condvar :: new ( ) ,
0 commit comments