25
25
// NB this does *not* include globs, please keep it that way.
26
26
#[ feature( macro_rules) ] ;
27
27
28
+ // Allow check-stage0-green for now
29
+ #[ cfg( test, stage0) ] extern mod green;
30
+
28
31
use std:: os;
29
32
use std:: rt:: crate_map;
30
- use std:: rt:: local:: Local ;
31
33
use std:: rt:: rtio;
32
- use std:: rt:: task:: Task ;
33
34
use std:: rt:: thread:: Thread ;
34
35
use std:: rt;
35
36
use std:: sync:: atomics:: { SeqCst , AtomicUint , INIT_ATOMIC_UINT } ;
36
37
use std:: sync:: deque;
37
38
use std:: task:: TaskOpts ;
38
39
use std:: util;
39
40
use std:: vec;
41
+ use std:: sync:: arc:: UnsafeArc ;
40
42
41
43
use sched:: { Shutdown , Scheduler , SchedHandle , TaskFromFriend , NewNeighbor } ;
42
44
use sleeper_list:: SleeperList ;
@@ -118,14 +120,6 @@ pub fn run(main: proc()) -> int {
118
120
os:: set_exit_status ( rt:: DEFAULT_ERROR_CODE ) ;
119
121
}
120
122
121
- // Once the main task has exited and we've set our exit code, wait for all
122
- // spawned sub-tasks to finish running. This is done to allow all schedulers
123
- // to remain active while there are still tasks possibly running.
124
- unsafe {
125
- let mut task = Local :: borrow ( None :: < Task > ) ;
126
- task. get ( ) . wait_for_other_tasks ( ) ;
127
- }
128
-
129
123
// Now that we're sure all tasks are dead, shut down the pool of schedulers,
130
124
// waiting for them all to return.
131
125
pool. shutdown ( ) ;
@@ -164,6 +158,17 @@ pub struct SchedPool {
164
158
priv deque_pool : deque:: BufferPool < ~task:: GreenTask > ,
165
159
priv sleepers : SleeperList ,
166
160
priv factory : fn ( ) -> ~rtio:: EventLoop ,
161
+ priv task_state : TaskState ,
162
+ priv tasks_done : Port < ( ) > ,
163
+ }
164
+
165
+ /// This is an internal state shared among a pool of schedulers. This is used to
166
+ /// keep track of how many tasks are currently running in the pool and then
167
+ /// sending on a channel once the entire pool has been drained of all tasks.
168
+ #[ deriving( Clone ) ]
169
+ struct TaskState {
170
+ cnt : UnsafeArc < AtomicUint > ,
171
+ done : SharedChan < ( ) > ,
167
172
}
168
173
169
174
impl SchedPool {
@@ -182,6 +187,7 @@ impl SchedPool {
182
187
assert ! ( nscheds > 0 ) ;
183
188
184
189
// The pool of schedulers that will be returned from this function
190
+ let ( p, state) = TaskState :: new ( ) ;
185
191
let mut pool = SchedPool {
186
192
threads : ~[ ] ,
187
193
handles : ~[ ] ,
@@ -192,6 +198,8 @@ impl SchedPool {
192
198
deque_pool : deque:: BufferPool :: new ( ) ,
193
199
next_friend : 0 ,
194
200
factory : factory,
201
+ task_state : state,
202
+ tasks_done : p,
195
203
} ;
196
204
197
205
// Create a work queue for each scheduler, ntimes. Create an extra
@@ -210,21 +218,30 @@ impl SchedPool {
210
218
( pool. factory ) ( ) ,
211
219
worker,
212
220
pool. stealers . clone ( ) ,
213
- pool. sleepers . clone ( ) ) ;
221
+ pool. sleepers . clone ( ) ,
222
+ pool. task_state . clone ( ) ) ;
214
223
pool. handles . push ( sched. make_handle ( ) ) ;
215
224
let sched = sched;
216
- pool. threads . push ( do Thread :: start {
217
- sched. bootstrap ( ) ;
218
- } ) ;
225
+ pool. threads . push ( do Thread :: start { sched. bootstrap ( ) ; } ) ;
219
226
}
220
227
221
228
return pool;
222
229
}
223
230
231
+ /// Creates a new task configured to run inside of this pool of schedulers.
232
+ /// This is useful to create a task which can then be sent to a specific
233
+ /// scheduler created by `spawn_sched` (and possibly pin it to that
234
+ /// scheduler).
224
235
pub fn task ( & mut self , opts : TaskOpts , f : proc ( ) ) -> ~GreenTask {
225
236
GreenTask :: configure ( & mut self . stack_pool , opts, f)
226
237
}
227
238
239
+ /// Spawns a new task into this pool of schedulers, using the specified
240
+ /// options to configure the new task which is spawned.
241
+ ///
242
+ /// New tasks are spawned in a round-robin fashion to the schedulers in this
243
+ /// pool, but tasks can certainly migrate among schedulers once they're in
244
+ /// the pool.
228
245
pub fn spawn ( & mut self , opts : TaskOpts , f : proc ( ) ) {
229
246
let task = self . task ( opts, f) ;
230
247
@@ -262,7 +279,8 @@ impl SchedPool {
262
279
( self . factory ) ( ) ,
263
280
worker,
264
281
self . stealers . clone ( ) ,
265
- self . sleepers . clone ( ) ) ;
282
+ self . sleepers . clone ( ) ,
283
+ self . task_state . clone ( ) ) ;
266
284
let ret = sched. make_handle ( ) ;
267
285
self . handles . push ( sched. make_handle ( ) ) ;
268
286
let sched = sched;
@@ -271,9 +289,28 @@ impl SchedPool {
271
289
return ret;
272
290
}
273
291
292
+ /// Consumes the pool of schedulers, waiting for all tasks to exit and all
293
+ /// schedulers to shut down.
294
+ ///
295
+ /// This function is required to be called in order to drop a pool of
296
+ /// schedulers, it is considered an error to drop a pool without calling
297
+ /// this method.
298
+ ///
299
+ /// This only waits for all tasks in *this pool* of schedulers to exit, any
300
+ /// native tasks or extern pools will not be waited on
274
301
pub fn shutdown ( mut self ) {
275
302
self . stealers = ~[ ] ;
276
303
304
+ // Wait for everyone to exit. We may have reached a 0-task count
305
+ // multiple times in the past, meaning there could be several buffered
306
+ // messages on the `tasks_done` port. We're guaranteed that after *some*
307
+ // message the current task count will be 0, so we just receive in a
308
+ // loop until everything is totally dead.
309
+ while self . task_state . active ( ) {
310
+ self . tasks_done . recv ( ) ;
311
+ }
312
+
313
+ // Now that everyone's gone, tell everything to shut down.
277
314
for mut handle in util:: replace ( & mut self . handles , ~[ ] ) . move_iter ( ) {
278
315
handle. send ( Shutdown ) ;
279
316
}
@@ -283,6 +320,31 @@ impl SchedPool {
283
320
}
284
321
}
285
322
323
+ impl TaskState {
324
+ fn new ( ) -> ( Port < ( ) > , TaskState ) {
325
+ let ( p, c) = SharedChan :: new ( ) ;
326
+ ( p, TaskState {
327
+ cnt : UnsafeArc :: new ( AtomicUint :: new ( 0 ) ) ,
328
+ done : c,
329
+ } )
330
+ }
331
+
332
+ fn increment ( & mut self ) {
333
+ unsafe { ( * self . cnt . get ( ) ) . fetch_add ( 1 , SeqCst ) ; }
334
+ }
335
+
336
+ fn active ( & self ) -> bool {
337
+ unsafe { ( * self . cnt . get ( ) ) . load ( SeqCst ) != 0 }
338
+ }
339
+
340
+ fn decrement ( & mut self ) {
341
+ let prev = unsafe { ( * self . cnt . get ( ) ) . fetch_sub ( 1 , SeqCst ) } ;
342
+ if prev == 1 {
343
+ self . done . send ( ( ) ) ;
344
+ }
345
+ }
346
+ }
347
+
286
348
impl Drop for SchedPool {
287
349
fn drop ( & mut self ) {
288
350
if self . threads . len ( ) > 0 {
0 commit comments