Skip to content

Commit 11bc14d

Browse files
committed
auto merge of #11578 : alexcrichton/rust/chan-changes, r=brson
The user-facing API-level change of this commit is that `SharedChan` is gone and `Chan` now has `clone`. The major parts of this patch are the internals which have changed. Channels are now internally upgraded from oneshots to streams to shared channels depending on the use case. I've noticed a 3x improvement in the oneshot case and very little slowdown (if any) in the stream/shared case. This patch is mostly a reorganization of the `std::comm` module, and the large increase in code is from either dispatching to one of 3 impls or the duplication between the stream/shared impl (because they're not entirely separate). The `comm` module is now divided into `oneshot`, `stream`, `shared`, and `select` modules. Each module contains the implementation for that flavor of channel (or the select implementation for select). Some notable parts of this patch * Upgrades are done through a semi-ad-hoc scheme for oneshots and messages for streams * Upgrades are processed ASAP and have some interesting interactions with select * send_deferred is gone because I expect the mutex to land before this * Some of stream/shared is straight-up duplicated, but I like having the distinction between the two modules * Select got a little worse, but it's still "basically limping along" * This lumps in the patch of deallocating the queue backlog on packet drop * I'll rebase this on top of the "more errors from try_recv" patch once it lands (all the infrastructure is here already) All in all, this shouldn't be merged until the new mutexes are merged (because send_deferred wasn't implemented). Closes #11351
2 parents db8a580 + e633249 commit 11bc14d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+2164
-997
lines changed

src/doc/guide-tasks.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ Instead we can use a `SharedChan`, a type that allows a single
232232
~~~
233233
# use std::task::spawn;
234234
235-
let (port, chan) = SharedChan::new();
235+
let (port, chan) = Chan::new();
236236
237237
for init_val in range(0u, 3) {
238238
// Create a new channel handle to distribute to the child task

src/libextra/test.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -767,7 +767,7 @@ fn run_tests(opts: &TestOpts,
767767
remaining.reverse();
768768
let mut pending = 0;
769769

770-
let (p, ch) = SharedChan::new();
770+
let (p, ch) = Chan::new();
771771

772772
while pending > 0 || !remaining.is_empty() {
773773
while pending < concurrency && !remaining.is_empty() {
@@ -878,7 +878,7 @@ pub fn filter_tests(
878878

879879
pub fn run_test(force_ignore: bool,
880880
test: TestDescAndFn,
881-
monitor_ch: SharedChan<MonitorMsg>) {
881+
monitor_ch: Chan<MonitorMsg>) {
882882

883883
let TestDescAndFn {desc, testfn} = test;
884884

@@ -888,7 +888,7 @@ pub fn run_test(force_ignore: bool,
888888
}
889889

890890
fn run_test_inner(desc: TestDesc,
891-
monitor_ch: SharedChan<MonitorMsg>,
891+
monitor_ch: Chan<MonitorMsg>,
892892
testfn: proc()) {
893893
spawn(proc() {
894894
let mut task = task::task();
@@ -1260,7 +1260,7 @@ mod tests {
12601260
},
12611261
testfn: DynTestFn(proc() f()),
12621262
};
1263-
let (p, ch) = SharedChan::new();
1263+
let (p, ch) = Chan::new();
12641264
run_test(false, desc, ch);
12651265
let (_, res) = p.recv();
12661266
assert!(res != TrOk);
@@ -1277,7 +1277,7 @@ mod tests {
12771277
},
12781278
testfn: DynTestFn(proc() f()),
12791279
};
1280-
let (p, ch) = SharedChan::new();
1280+
let (p, ch) = Chan::new();
12811281
run_test(false, desc, ch);
12821282
let (_, res) = p.recv();
12831283
assert_eq!(res, TrIgnored);
@@ -1294,7 +1294,7 @@ mod tests {
12941294
},
12951295
testfn: DynTestFn(proc() f()),
12961296
};
1297-
let (p, ch) = SharedChan::new();
1297+
let (p, ch) = Chan::new();
12981298
run_test(false, desc, ch);
12991299
let (_, res) = p.recv();
13001300
assert_eq!(res, TrOk);
@@ -1311,7 +1311,7 @@ mod tests {
13111311
},
13121312
testfn: DynTestFn(proc() f()),
13131313
};
1314-
let (p, ch) = SharedChan::new();
1314+
let (p, ch) = Chan::new();
13151315
run_test(false, desc, ch);
13161316
let (_, res) = p.recv();
13171317
assert_eq!(res, TrFailed);

src/libgreen/lib.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ use task::GreenTask;
193193

194194
mod macros;
195195
mod simple;
196+
mod message_queue;
196197

197198
pub mod basic;
198199
pub mod context;
@@ -314,7 +315,7 @@ pub struct SchedPool {
314315
#[deriving(Clone)]
315316
struct TaskState {
316317
cnt: UnsafeArc<AtomicUint>,
317-
done: SharedChan<()>,
318+
done: Chan<()>,
318319
}
319320

320321
impl SchedPool {
@@ -468,7 +469,7 @@ impl SchedPool {
468469

469470
impl TaskState {
470471
fn new() -> (Port<()>, TaskState) {
471-
let (p, c) = SharedChan::new();
472+
let (p, c) = Chan::new();
472473
(p, TaskState {
473474
cnt: UnsafeArc::new(AtomicUint::new(0)),
474475
done: c,

src/libgreen/message_queue.rs

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2+
// file at the top-level directory of this distribution and at
3+
// http://rust-lang.org/COPYRIGHT.
4+
//
5+
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6+
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7+
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8+
// option. This file may not be copied, modified, or distributed
9+
// except according to those terms.
10+
11+
use mpsc = std::sync::mpsc_queue;
12+
use std::sync::arc::UnsafeArc;
13+
14+
pub enum PopResult<T> {
15+
Inconsistent,
16+
Empty,
17+
Data(T),
18+
}
19+
20+
pub fn queue<T: Send>() -> (Consumer<T>, Producer<T>) {
21+
let (a, b) = UnsafeArc::new2(mpsc::Queue::new());
22+
(Consumer { inner: a }, Producer { inner: b })
23+
}
24+
25+
pub struct Producer<T> {
26+
priv inner: UnsafeArc<mpsc::Queue<T>>,
27+
}
28+
29+
pub struct Consumer<T> {
30+
priv inner: UnsafeArc<mpsc::Queue<T>>,
31+
}
32+
33+
impl<T: Send> Consumer<T> {
34+
pub fn pop(&mut self) -> PopResult<T> {
35+
match unsafe { (*self.inner.get()).pop() } {
36+
mpsc::Inconsistent => Inconsistent,
37+
mpsc::Empty => Empty,
38+
mpsc::Data(t) => Data(t),
39+
}
40+
}
41+
42+
pub fn casual_pop(&mut self) -> Option<T> {
43+
match unsafe { (*self.inner.get()).pop() } {
44+
mpsc::Inconsistent => None,
45+
mpsc::Empty => None,
46+
mpsc::Data(t) => Some(t),
47+
}
48+
}
49+
}
50+
51+
impl<T: Send> Producer<T> {
52+
pub fn push(&mut self, t: T) {
53+
unsafe { (*self.inner.get()).push(t); }
54+
}
55+
}
56+
57+
impl<T: Send> Clone for Producer<T> {
58+
fn clone(&self) -> Producer<T> {
59+
Producer { inner: self.inner.clone() }
60+
}
61+
}

src/libgreen/sched.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@ use std::rt::task::Task;
1717
use std::sync::deque;
1818
use std::unstable::mutex::Mutex;
1919
use std::unstable::raw;
20-
use mpsc = std::sync::mpsc_queue;
2120

2221
use TaskState;
2322
use context::Context;
2423
use coroutine::Coroutine;
2524
use sleeper_list::SleeperList;
2625
use stack::StackPool;
2726
use task::{TypeSched, GreenTask, HomeSched, AnySched};
27+
use msgq = message_queue;
2828

2929
/// A scheduler is responsible for coordinating the execution of Tasks
3030
/// on a single thread. The scheduler runs inside a slightly modified
@@ -47,9 +47,9 @@ pub struct Scheduler {
4747
/// The queue of incoming messages from other schedulers.
4848
/// These are enqueued by SchedHandles after which a remote callback
4949
/// is triggered to handle the message.
50-
message_queue: mpsc::Consumer<SchedMessage, ()>,
50+
message_queue: msgq::Consumer<SchedMessage>,
5151
/// Producer used to clone sched handles from
52-
message_producer: mpsc::Producer<SchedMessage, ()>,
52+
message_producer: msgq::Producer<SchedMessage>,
5353
/// A shared list of sleeping schedulers. We'll use this to wake
5454
/// up schedulers when pushing work onto the work queue.
5555
sleeper_list: SleeperList,
@@ -143,7 +143,7 @@ impl Scheduler {
143143
state: TaskState)
144144
-> Scheduler {
145145

146-
let (consumer, producer) = mpsc::queue(());
146+
let (consumer, producer) = msgq::queue();
147147
let mut sched = Scheduler {
148148
pool_id: pool_id,
149149
sleeper_list: sleeper_list,
@@ -215,7 +215,7 @@ impl Scheduler {
215215

216216
// Should not have any messages
217217
let message = stask.sched.get_mut_ref().message_queue.pop();
218-
rtassert!(match message { mpsc::Empty => true, _ => false });
218+
rtassert!(match message { msgq::Empty => true, _ => false });
219219

220220
stask.task.get_mut_ref().destroyed = true;
221221
}
@@ -340,8 +340,8 @@ impl Scheduler {
340340
//
341341
// I have chosen to take route #2.
342342
match self.message_queue.pop() {
343-
mpsc::Data(t) => Some(t),
344-
mpsc::Empty | mpsc::Inconsistent => None
343+
msgq::Data(t) => Some(t),
344+
msgq::Empty | msgq::Inconsistent => None
345345
}
346346
};
347347

@@ -849,7 +849,7 @@ pub enum SchedMessage {
849849

850850
pub struct SchedHandle {
851851
priv remote: ~RemoteCallback,
852-
priv queue: mpsc::Producer<SchedMessage, ()>,
852+
priv queue: msgq::Producer<SchedMessage>,
853853
sched_id: uint
854854
}
855855

src/libnative/io/mod.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
//! that you would find on the respective platform.
2323
2424
use std::c_str::CString;
25-
use std::comm::SharedChan;
2625
use std::io;
2726
use std::io::IoError;
2827
use std::io::net::ip::SocketAddr;
@@ -289,7 +288,7 @@ impl rtio::IoFactory for IoFactory {
289288
})
290289
}
291290
}
292-
fn signal(&mut self, _signal: Signum, _channel: SharedChan<Signum>)
291+
fn signal(&mut self, _signal: Signum, _channel: Chan<Signum>)
293292
-> IoResult<~RtioSignal> {
294293
Err(unimpl())
295294
}

src/libnative/io/timer_helper.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use task;
3333
// only torn down after everything else has exited. This means that these
3434
// variables are read-only during use (after initialization) and both of which
3535
// are safe to use concurrently.
36-
static mut HELPER_CHAN: *mut SharedChan<Req> = 0 as *mut SharedChan<Req>;
36+
static mut HELPER_CHAN: *mut Chan<Req> = 0 as *mut Chan<Req>;
3737
static mut HELPER_SIGNAL: imp::signal = 0 as imp::signal;
3838

3939
pub fn boot(helper: fn(imp::signal, Port<Req>)) {
@@ -43,7 +43,9 @@ pub fn boot(helper: fn(imp::signal, Port<Req>)) {
4343
unsafe {
4444
LOCK.lock();
4545
if !INITIALIZED {
46-
let (msgp, msgc) = SharedChan::new();
46+
let (msgp, msgc) = Chan::new();
47+
// promote this to a shared channel
48+
drop(msgc.clone());
4749
HELPER_CHAN = cast::transmute(~msgc);
4850
let (receive, send) = imp::new();
4951
HELPER_SIGNAL = send;
@@ -84,8 +86,8 @@ fn shutdown() {
8486
// Clean up after ther helper thread
8587
unsafe {
8688
imp::close(HELPER_SIGNAL);
87-
let _chan: ~SharedChan<Req> = cast::transmute(HELPER_CHAN);
88-
HELPER_CHAN = 0 as *mut SharedChan<Req>;
89+
let _chan: ~Chan<Req> = cast::transmute(HELPER_CHAN);
90+
HELPER_CHAN = 0 as *mut Chan<Req>;
8991
HELPER_SIGNAL = 0 as imp::signal;
9092
}
9193
}

0 commit comments

Comments
 (0)