Skip to content

Commit 48918fa

Browse files
committed
auto merge of #11212 : alexcrichton/rust/local-task-count, r=brson
For libgreen, bookeeping should not be global but rather on a per-pool basis. Inside libnative, it's known that there must be a global counter with a mutex/cvar. The benefit of taking this strategy is to remove this functionality from libstd to allow fine-grained control of it through libnative/libgreen. Notably, helper threads in libnative can manually decrement the global count so they don't count towards the global count of threads. Also, the shutdown process of *all* sched pools is now dependent on the number of tasks in the pool being 0 rather than this only being a hardcoded solution for the initial sched pool in libgreen. This involved adding a Local::try_take() method on the Local trait in order for the channel wakeup to work inside of libgreen. The channel send was happening from a SchedTask when there is no Task available in TLS, and now this is possible to work (remote wakeups are always possible, just a little slower).
2 parents c34ef5d + 3f11f87 commit 48918fa

File tree

9 files changed

+244
-71
lines changed

9 files changed

+244
-71
lines changed

src/libgreen/lib.rs

+77-15
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,20 @@
2525
// NB this does *not* include globs, please keep it that way.
2626
#[feature(macro_rules)];
2727

28+
// Allow check-stage0-green for now
29+
#[cfg(test, stage0)] extern mod green;
30+
2831
use std::os;
2932
use std::rt::crate_map;
30-
use std::rt::local::Local;
3133
use std::rt::rtio;
32-
use std::rt::task::Task;
3334
use std::rt::thread::Thread;
3435
use std::rt;
3536
use std::sync::atomics::{SeqCst, AtomicUint, INIT_ATOMIC_UINT};
3637
use std::sync::deque;
3738
use std::task::TaskOpts;
3839
use std::util;
3940
use std::vec;
41+
use std::sync::arc::UnsafeArc;
4042

4143
use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, NewNeighbor};
4244
use sleeper_list::SleeperList;
@@ -118,14 +120,6 @@ pub fn run(main: proc()) -> int {
118120
os::set_exit_status(rt::DEFAULT_ERROR_CODE);
119121
}
120122

121-
// Once the main task has exited and we've set our exit code, wait for all
122-
// spawned sub-tasks to finish running. This is done to allow all schedulers
123-
// to remain active while there are still tasks possibly running.
124-
unsafe {
125-
let mut task = Local::borrow(None::<Task>);
126-
task.get().wait_for_other_tasks();
127-
}
128-
129123
// Now that we're sure all tasks are dead, shut down the pool of schedulers,
130124
// waiting for them all to return.
131125
pool.shutdown();
@@ -164,6 +158,17 @@ pub struct SchedPool {
164158
priv deque_pool: deque::BufferPool<~task::GreenTask>,
165159
priv sleepers: SleeperList,
166160
priv factory: fn() -> ~rtio::EventLoop,
161+
priv task_state: TaskState,
162+
priv tasks_done: Port<()>,
163+
}
164+
165+
/// This is an internal state shared among a pool of schedulers. This is used to
166+
/// keep track of how many tasks are currently running in the pool and then
167+
/// sending on a channel once the entire pool has been drained of all tasks.
168+
#[deriving(Clone)]
169+
struct TaskState {
170+
cnt: UnsafeArc<AtomicUint>,
171+
done: SharedChan<()>,
167172
}
168173

169174
impl SchedPool {
@@ -182,6 +187,7 @@ impl SchedPool {
182187
assert!(nscheds > 0);
183188

184189
// The pool of schedulers that will be returned from this function
190+
let (p, state) = TaskState::new();
185191
let mut pool = SchedPool {
186192
threads: ~[],
187193
handles: ~[],
@@ -192,6 +198,8 @@ impl SchedPool {
192198
deque_pool: deque::BufferPool::new(),
193199
next_friend: 0,
194200
factory: factory,
201+
task_state: state,
202+
tasks_done: p,
195203
};
196204

197205
// Create a work queue for each scheduler, ntimes. Create an extra
@@ -210,21 +218,30 @@ impl SchedPool {
210218
(pool.factory)(),
211219
worker,
212220
pool.stealers.clone(),
213-
pool.sleepers.clone());
221+
pool.sleepers.clone(),
222+
pool.task_state.clone());
214223
pool.handles.push(sched.make_handle());
215224
let sched = sched;
216-
pool.threads.push(do Thread::start {
217-
sched.bootstrap();
218-
});
225+
pool.threads.push(do Thread::start { sched.bootstrap(); });
219226
}
220227

221228
return pool;
222229
}
223230

231+
/// Creates a new task configured to run inside of this pool of schedulers.
232+
/// This is useful to create a task which can then be sent to a specific
233+
/// scheduler created by `spawn_sched` (and possibly pin it to that
234+
/// scheduler).
224235
pub fn task(&mut self, opts: TaskOpts, f: proc()) -> ~GreenTask {
225236
GreenTask::configure(&mut self.stack_pool, opts, f)
226237
}
227238

239+
/// Spawns a new task into this pool of schedulers, using the specified
240+
/// options to configure the new task which is spawned.
241+
///
242+
/// New tasks are spawned in a round-robin fashion to the schedulers in this
243+
/// pool, but tasks can certainly migrate among schedulers once they're in
244+
/// the pool.
228245
pub fn spawn(&mut self, opts: TaskOpts, f: proc()) {
229246
let task = self.task(opts, f);
230247

@@ -262,7 +279,8 @@ impl SchedPool {
262279
(self.factory)(),
263280
worker,
264281
self.stealers.clone(),
265-
self.sleepers.clone());
282+
self.sleepers.clone(),
283+
self.task_state.clone());
266284
let ret = sched.make_handle();
267285
self.handles.push(sched.make_handle());
268286
let sched = sched;
@@ -271,9 +289,28 @@ impl SchedPool {
271289
return ret;
272290
}
273291

292+
/// Consumes the pool of schedulers, waiting for all tasks to exit and all
293+
/// schedulers to shut down.
294+
///
295+
/// This function is required to be called in order to drop a pool of
296+
/// schedulers, it is considered an error to drop a pool without calling
297+
/// this method.
298+
///
299+
/// This only waits for all tasks in *this pool* of schedulers to exit, any
300+
/// native tasks or extern pools will not be waited on
274301
pub fn shutdown(mut self) {
275302
self.stealers = ~[];
276303

304+
// Wait for everyone to exit. We may have reached a 0-task count
305+
// multiple times in the past, meaning there could be several buffered
306+
// messages on the `tasks_done` port. We're guaranteed that after *some*
307+
// message the current task count will be 0, so we just receive in a
308+
// loop until everything is totally dead.
309+
while self.task_state.active() {
310+
self.tasks_done.recv();
311+
}
312+
313+
// Now that everyone's gone, tell everything to shut down.
277314
for mut handle in util::replace(&mut self.handles, ~[]).move_iter() {
278315
handle.send(Shutdown);
279316
}
@@ -283,6 +320,31 @@ impl SchedPool {
283320
}
284321
}
285322

323+
impl TaskState {
324+
fn new() -> (Port<()>, TaskState) {
325+
let (p, c) = SharedChan::new();
326+
(p, TaskState {
327+
cnt: UnsafeArc::new(AtomicUint::new(0)),
328+
done: c,
329+
})
330+
}
331+
332+
fn increment(&mut self) {
333+
unsafe { (*self.cnt.get()).fetch_add(1, SeqCst); }
334+
}
335+
336+
fn active(&self) -> bool {
337+
unsafe { (*self.cnt.get()).load(SeqCst) != 0 }
338+
}
339+
340+
fn decrement(&mut self) {
341+
let prev = unsafe { (*self.cnt.get()).fetch_sub(1, SeqCst) };
342+
if prev == 1 {
343+
self.done.send(());
344+
}
345+
}
346+
}
347+
286348
impl Drop for SchedPool {
287349
fn drop(&mut self) {
288350
if self.threads.len() > 0 {

src/libgreen/sched.rs

+18-8
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::unstable::mutex::Mutex;
1919
use std::unstable::raw;
2020
use mpsc = std::sync::mpsc_queue;
2121

22+
use TaskState;
2223
use context::Context;
2324
use coroutine::Coroutine;
2425
use sleeper_list::SleeperList;
@@ -85,6 +86,9 @@ pub struct Scheduler {
8586
/// A flag to tell the scheduler loop it needs to do some stealing
8687
/// in order to introduce randomness as part of a yield
8788
steal_for_yield: bool,
89+
/// Bookeeping for the number of tasks which are currently running around
90+
/// inside this pool of schedulers
91+
task_state: TaskState,
8892

8993
// n.b. currently destructors of an object are run in top-to-bottom in order
9094
// of field declaration. Due to its nature, the pausable idle callback
@@ -120,11 +124,12 @@ impl Scheduler {
120124
event_loop: ~EventLoop,
121125
work_queue: deque::Worker<~GreenTask>,
122126
work_queues: ~[deque::Stealer<~GreenTask>],
123-
sleeper_list: SleeperList)
127+
sleeper_list: SleeperList,
128+
state: TaskState)
124129
-> Scheduler {
125130

126131
Scheduler::new_special(pool_id, event_loop, work_queue, work_queues,
127-
sleeper_list, true, None)
132+
sleeper_list, true, None, state)
128133

129134
}
130135

@@ -134,7 +139,8 @@ impl Scheduler {
134139
work_queues: ~[deque::Stealer<~GreenTask>],
135140
sleeper_list: SleeperList,
136141
run_anything: bool,
137-
friend: Option<SchedHandle>)
142+
friend: Option<SchedHandle>,
143+
state: TaskState)
138144
-> Scheduler {
139145

140146
let (consumer, producer) = mpsc::queue(());
@@ -156,7 +162,8 @@ impl Scheduler {
156162
rng: new_sched_rng(),
157163
idle_callback: None,
158164
yield_check_count: 0,
159-
steal_for_yield: false
165+
steal_for_yield: false,
166+
task_state: state,
160167
};
161168

162169
sched.yield_check_count = reset_yield_check(&mut sched.rng);
@@ -756,6 +763,7 @@ impl Scheduler {
756763
let _cur = self.change_task_context(cur, stask, |sched, mut dead_task| {
757764
let coroutine = dead_task.coroutine.take_unwrap();
758765
coroutine.recycle(&mut sched.stack_pool);
766+
sched.task_state.decrement();
759767
});
760768
fail!("should never return!");
761769
}
@@ -955,11 +963,10 @@ mod test {
955963
use std::rt::task::Task;
956964
use std::rt::local::Local;
957965

966+
use {TaskState, PoolConfig, SchedPool};
958967
use basic;
959968
use sched::{TaskFromFriend, PinnedTask};
960969
use task::{GreenTask, HomeSched};
961-
use PoolConfig;
962-
use SchedPool;
963970

964971
fn pool() -> SchedPool {
965972
SchedPool::new(PoolConfig {
@@ -1078,14 +1085,16 @@ mod test {
10781085
let (normal_worker, normal_stealer) = pool.deque();
10791086
let (special_worker, special_stealer) = pool.deque();
10801087
let queues = ~[normal_stealer, special_stealer];
1088+
let (_p, state) = TaskState::new();
10811089

10821090
// Our normal scheduler
10831091
let mut normal_sched = ~Scheduler::new(
10841092
1,
10851093
basic::event_loop(),
10861094
normal_worker,
10871095
queues.clone(),
1088-
sleepers.clone());
1096+
sleepers.clone(),
1097+
state.clone());
10891098

10901099
let normal_handle = normal_sched.make_handle();
10911100
let friend_handle = normal_sched.make_handle();
@@ -1098,7 +1107,8 @@ mod test {
10981107
queues.clone(),
10991108
sleepers.clone(),
11001109
false,
1101-
Some(friend_handle));
1110+
Some(friend_handle),
1111+
state);
11021112

11031113
let special_handle = special_sched.make_handle();
11041114

src/libgreen/task.rs

+36-3
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,32 @@ use stack::StackPool;
3333
/// The necessary fields needed to keep track of a green task (as opposed to a
3434
/// 1:1 task).
3535
pub struct GreenTask {
36+
/// Coroutine that this task is running on, otherwise known as the register
37+
/// context and the stack that this task owns. This field is optional to
38+
/// relinquish ownership back to a scheduler to recycle stacks at a later
39+
/// date.
3640
coroutine: Option<Coroutine>,
41+
42+
/// Optional handle back into the home sched pool of this task. This field
43+
/// is lazily initialized.
3744
handle: Option<SchedHandle>,
45+
46+
/// Slot for maintaining ownership of a scheduler. If a task is running,
47+
/// this value will be Some(sched) where the task is running on "sched".
3848
sched: Option<~Scheduler>,
49+
50+
/// Temporary ownership slot of a std::rt::task::Task object. This is used
51+
/// to squirrel that libstd task away while we're performing green task
52+
/// operations.
3953
task: Option<~Task>,
54+
55+
/// Dictates whether this is a sched task or a normal green task
4056
task_type: TaskType,
57+
58+
/// Home pool that this task was spawned into. This field is lazily
59+
/// initialized until when the task is initially scheduled, and is used to
60+
/// make sure that tasks are always woken up in the correct pool of
61+
/// schedulers.
4162
pool_id: uint,
4263

4364
// See the comments in the scheduler about why this is necessary
@@ -147,10 +168,15 @@ impl GreenTask {
147168
// cleanup job after we have re-acquired ownership of the green
148169
// task.
149170
let mut task: ~GreenTask = unsafe { GreenTask::from_uint(ops) };
150-
task.sched.get_mut_ref().run_cleanup_job();
171+
task.pool_id = {
172+
let sched = task.sched.get_mut_ref();
173+
sched.run_cleanup_job();
174+
sched.task_state.increment();
175+
sched.pool_id
176+
};
151177

152178
// Convert our green task to a libstd task and then execute the code
153-
// requeted. This is the "try/catch" block for this green task and
179+
// requested. This is the "try/catch" block for this green task and
154180
// is the wrapper for *all* code run in the task.
155181
let mut start = Some(start);
156182
let task = task.swap().run(|| start.take_unwrap()());
@@ -350,6 +376,14 @@ impl Runtime for GreenTask {
350376
self.put_task(to_wake);
351377
assert!(self.sched.is_none());
352378

379+
// Optimistically look for a local task, but if one's not available to
380+
// inspect (in order to see if it's in the same sched pool as we are),
381+
// then just use our remote wakeup routine and carry on!
382+
let mut running_task: ~Task = match Local::try_take() {
383+
Some(task) => task,
384+
None => return self.reawaken_remotely()
385+
};
386+
353387
// Waking up a green thread is a bit of a tricky situation. We have no
354388
// guarantee about where the current task is running. The options we
355389
// have for where this current task is running are:
@@ -368,7 +402,6 @@ impl Runtime for GreenTask {
368402
//
369403
// In case 2 and 3, we need to remotely reawaken ourself in order to be
370404
// transplanted back to the correct scheduler pool.
371-
let mut running_task: ~Task = Local::take();
372405
match running_task.maybe_take_runtime::<GreenTask>() {
373406
Some(mut running_green_task) => {
374407
running_green_task.put_task(running_task);

0 commit comments

Comments
 (0)