1- use bevy_tasks:: { ComputeTaskPool , Scope , TaskPool } ;
1+ use bevy_tasks:: { ComputeTaskPool , Scope , TaskPool , ThreadExecutor } ;
22use bevy_utils:: default;
33use bevy_utils:: syncunsafecell:: SyncUnsafeCell ;
44#[ cfg( feature = "trace" ) ]
55use bevy_utils:: tracing:: { info_span, Instrument } ;
6+ use std:: sync:: Arc ;
67
78use async_channel:: { Receiver , Sender } ;
89use fixedbitset:: FixedBitSet ;
910
1011use crate :: {
1112 archetype:: ArchetypeComponentId ,
13+ prelude:: Resource ,
1214 query:: Access ,
1315 schedule_v3:: {
1416 is_apply_system_buffers, BoxedCondition , ExecutorKind , SystemExecutor , SystemSchedule ,
@@ -17,6 +19,8 @@ use crate::{
1719 world:: World ,
1820} ;
1921
22+ use crate as bevy_ecs;
23+
2024/// A funky borrow split of [`SystemSchedule`] required by the [`MultiThreadedExecutor`].
2125struct SyncUnsafeSchedule < ' a > {
2226 systems : & ' a [ SyncUnsafeCell < BoxedSystem > ] ,
@@ -145,47 +149,56 @@ impl SystemExecutor for MultiThreadedExecutor {
145149 }
146150 }
147151
152+ let thread_executor = world
153+ . get_resource :: < MainThreadExecutor > ( )
154+ . map ( |e| e. 0 . clone ( ) ) ;
155+ let thread_executor = thread_executor. as_deref ( ) ;
156+
148157 let world = SyncUnsafeCell :: from_mut ( world) ;
149158 let SyncUnsafeSchedule {
150159 systems,
151160 mut conditions,
152161 } = SyncUnsafeSchedule :: new ( schedule) ;
153162
154- ComputeTaskPool :: init ( TaskPool :: default) . scope ( |scope| {
155- // the executor itself is a `Send` future so that it can run
156- // alongside systems that claim the local thread
157- let executor = async {
158- while self . num_completed_systems < num_systems {
159- // SAFETY: self.ready_systems does not contain running systems
160- unsafe {
161- self . spawn_system_tasks ( scope, systems, & mut conditions, world) ;
162- }
163-
164- if self . num_running_systems > 0 {
165- // wait for systems to complete
166- let index = self
167- . receiver
168- . recv ( )
169- . await
170- . unwrap_or_else ( |error| unreachable ! ( "{}" , error) ) ;
163+ ComputeTaskPool :: init ( TaskPool :: default) . scope_with_executor (
164+ false ,
165+ thread_executor,
166+ |scope| {
167+ // the executor itself is a `Send` future so that it can run
168+ // alongside systems that claim the local thread
169+ let executor = async {
170+ while self . num_completed_systems < num_systems {
171+ // SAFETY: self.ready_systems does not contain running systems
172+ unsafe {
173+ self . spawn_system_tasks ( scope, systems, & mut conditions, world) ;
174+ }
171175
172- self . finish_system_and_signal_dependents ( index) ;
176+ if self . num_running_systems > 0 {
177+ // wait for systems to complete
178+ let index = self
179+ . receiver
180+ . recv ( )
181+ . await
182+ . unwrap_or_else ( |error| unreachable ! ( "{}" , error) ) ;
173183
174- while let Ok ( index) = self . receiver . try_recv ( ) {
175184 self . finish_system_and_signal_dependents ( index) ;
176- }
177185
178- self . rebuild_active_access ( ) ;
186+ while let Ok ( index) = self . receiver . try_recv ( ) {
187+ self . finish_system_and_signal_dependents ( index) ;
188+ }
189+
190+ self . rebuild_active_access ( ) ;
191+ }
179192 }
180- }
181- } ;
193+ } ;
182194
183- #[ cfg( feature = "trace" ) ]
184- let executor_span = info_span ! ( "schedule_task" ) ;
185- #[ cfg( feature = "trace" ) ]
186- let executor = executor. instrument ( executor_span) ;
187- scope. spawn ( executor) ;
188- } ) ;
195+ #[ cfg( feature = "trace" ) ]
196+ let executor_span = info_span ! ( "schedule_task" ) ;
197+ #[ cfg( feature = "trace" ) ]
198+ let executor = executor. instrument ( executor_span) ;
199+ scope. spawn ( executor) ;
200+ } ,
201+ ) ;
189202
190203 // Do one final apply buffers after all systems have completed
191204 // SAFETY: all systems have completed, and so no outstanding accesses remain
@@ -574,3 +587,13 @@ fn evaluate_and_fold_conditions(conditions: &mut [BoxedCondition], world: &World
574587 } )
575588 . fold ( true , |acc, res| acc && res)
576589}
590+
591+ /// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread
592+ #[ derive( Resource , Default , Clone ) ]
593+ pub struct MainThreadExecutor ( pub Arc < ThreadExecutor < ' static > > ) ;
594+
595+ impl MainThreadExecutor {
596+ pub fn new ( ) -> Self {
597+ MainThreadExecutor ( Arc :: new ( ThreadExecutor :: new ( ) ) )
598+ }
599+ }
0 commit comments