Skip to content

Commit 529e268

Browse files
committed
Fallout of rewriting std::comm
1 parent bfa9064 commit 529e268

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

86 files changed

+765
-2590
lines changed

src/etc/licenseck.py

+1
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
"rt/isaac/rand.h", # public domain
7878
"rt/isaac/standard.h", # public domain
7979
"libstd/rt/mpsc_queue.rs", # BSD
80+
"libstd/rt/spsc_queue.rs", # BSD
8081
"libstd/rt/mpmc_bounded_queue.rs", # BSD
8182
]
8283

src/libextra/arc.rs

+13-14
Original file line numberDiff line numberDiff line change
@@ -597,15 +597,14 @@ mod tests {
597597

598598
use arc::*;
599599

600-
use std::comm;
601600
use std::task;
602601

603602
#[test]
604603
fn manually_share_arc() {
605604
let v = ~[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
606605
let arc_v = Arc::new(v);
607606

608-
let (p, c) = comm::stream();
607+
let (p, c) = Chan::new();
609608

610609
do task::spawn {
611610
let arc_v: Arc<~[int]> = p.recv();
@@ -626,7 +625,7 @@ mod tests {
626625
fn test_mutex_arc_condvar() {
627626
let arc = ~MutexArc::new(false);
628627
let arc2 = ~arc.clone();
629-
let (p,c) = comm::oneshot();
628+
let (p,c) = Chan::new();
630629
do task::spawn {
631630
// wait until parent gets in
632631
p.recv();
@@ -638,7 +637,7 @@ mod tests {
638637

639638
let mut c = Some(c);
640639
arc.access_cond(|state, cond| {
641-
c.take_unwrap().send(());
640+
c.take_unwrawp().send(());
642641
assert!(!*state);
643642
while !*state {
644643
cond.wait();
@@ -650,7 +649,7 @@ mod tests {
650649
fn test_arc_condvar_poison() {
651650
let arc = ~MutexArc::new(1);
652651
let arc2 = ~arc.clone();
653-
let (p, c) = comm::stream();
652+
let (p, c) = Chan::new();
654653

655654
do spawn {
656655
let _ = p.recv();
@@ -687,7 +686,7 @@ mod tests {
687686
pub fn test_mutex_arc_unwrap_poison() {
688687
let arc = MutexArc::new(1);
689688
let arc2 = ~(&arc).clone();
690-
let (p, c) = comm::stream();
689+
let (p, c) = Chan::new();
691690
do task::spawn {
692691
arc2.access(|one| {
693692
c.send(());
@@ -804,7 +803,7 @@ mod tests {
804803
fn test_rw_arc() {
805804
let arc = RWArc::new(0);
806805
let arc2 = arc.clone();
807-
let (p, c) = comm::stream();
806+
let (p, c) = Chan::new();
808807

809808
do task::spawn {
810809
arc2.write(|num| {
@@ -832,7 +831,7 @@ mod tests {
832831
});
833832

834833
// Wait for children to pass their asserts
835-
for r in children.iter() {
834+
for r in children.mut_iter() {
836835
r.recv();
837836
}
838837

@@ -855,7 +854,7 @@ mod tests {
855854
// Reader tasks
856855
let mut reader_convos = ~[];
857856
10.times(|| {
858-
let ((rp1, rc1), (rp2, rc2)) = (comm::stream(), comm::stream());
857+
let ((rp1, rc1), (rp2, rc2)) = (Chan::new(), Chan::new());
859858
reader_convos.push((rc1, rp2));
860859
let arcn = arc.clone();
861860
do task::spawn {
@@ -869,7 +868,7 @@ mod tests {
869868

870869
// Writer task
871870
let arc2 = arc.clone();
872-
let ((wp1, wc1), (wp2, wc2)) = (comm::stream(), comm::stream());
871+
let ((wp1, wc1), (wp2, wc2)) = (Chan::new(), Chan::new());
873872
do task::spawn || {
874873
wp1.recv();
875874
arc2.write_cond(|state, cond| {
@@ -897,14 +896,14 @@ mod tests {
897896
assert_eq!(*state, 42);
898897
*state = 31337;
899898
// send to other readers
900-
for &(ref rc, _) in reader_convos.iter() {
899+
for &(ref mut rc, _) in reader_convos.mut_iter() {
901900
rc.send(())
902901
}
903902
});
904903
let read_mode = arc.downgrade(write_mode);
905904
read_mode.read(|state| {
906905
// complete handshake with other readers
907-
for &(_, ref rp) in reader_convos.iter() {
906+
for &(_, ref mut rp) in reader_convos.mut_iter() {
908907
rp.recv()
909908
}
910909
wc1.send(()); // tell writer to try again
@@ -926,7 +925,7 @@ mod tests {
926925
// "blk(&Condvar { order: opt_lock, ..*cond })"
927926
// with just "blk(cond)".
928927
let x = RWArc::new(true);
929-
let (wp, wc) = comm::stream();
928+
let (wp, wc) = Chan::new();
930929

931930
// writer task
932931
let xw = x.clone();
@@ -951,7 +950,7 @@ mod tests {
951950
});
952951
// make a reader task to trigger the "reader cloud lock" handoff
953952
let xr = x.clone();
954-
let (rp, rc) = comm::stream();
953+
let (rp, rc) = Chan::new();
955954
do task::spawn {
956955
rc.send(());
957956
xr.read(|_state| { })

src/libextra/comm.rs

+42-77
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,6 @@ Higher level communication abstractions.
1616

1717
#[allow(missing_doc)];
1818

19-
20-
use std::comm::{GenericChan, GenericSmartChan, GenericPort};
21-
use std::comm::{Chan, Port, Peekable};
22-
use std::comm;
23-
2419
/// An extension of `pipes::stream` that allows both sending and receiving.
2520
pub struct DuplexStream<T, U> {
2621
priv chan: Chan<T>,
@@ -29,108 +24,73 @@ pub struct DuplexStream<T, U> {
2924

3025
// Allow these methods to be used without import:
3126
impl<T:Send,U:Send> DuplexStream<T, U> {
27+
/// Creates a bidirectional stream.
28+
pub fn new() -> (DuplexStream<T, U>, DuplexStream<U, T>) {
29+
let (p1, c2) = Chan::new();
30+
let (p2, c1) = Chan::new();
31+
(DuplexStream { chan: c1, port: p1 },
32+
DuplexStream { chan: c2, port: p2 })
33+
}
3234
pub fn send(&self, x: T) {
3335
self.chan.send(x)
3436
}
3537
pub fn try_send(&self, x: T) -> bool {
3638
self.chan.try_send(x)
3739
}
38-
pub fn recv(&self, ) -> U {
40+
pub fn recv(&self) -> U {
3941
self.port.recv()
4042
}
4143
pub fn try_recv(&self) -> Option<U> {
4244
self.port.try_recv()
4345
}
44-
pub fn peek(&self) -> bool {
45-
self.port.peek()
46-
}
47-
}
48-
49-
impl<T:Send,U:Send> GenericChan<T> for DuplexStream<T, U> {
50-
fn send(&self, x: T) {
51-
self.chan.send(x)
52-
}
53-
}
54-
55-
impl<T:Send,U:Send> GenericSmartChan<T> for DuplexStream<T, U> {
56-
fn try_send(&self, x: T) -> bool {
57-
self.chan.try_send(x)
58-
}
59-
}
60-
61-
impl<T:Send,U:Send> GenericPort<U> for DuplexStream<T, U> {
62-
fn recv(&self) -> U {
63-
self.port.recv()
64-
}
65-
66-
fn try_recv(&self) -> Option<U> {
67-
self.port.try_recv()
68-
}
69-
}
70-
71-
impl<T:Send,U:Send> Peekable<U> for DuplexStream<T, U> {
72-
fn peek(&self) -> bool {
73-
self.port.peek()
46+
pub fn recv_opt(&self) -> Option<U> {
47+
self.port.recv_opt()
7448
}
7549
}
7650

77-
/// Creates a bidirectional stream.
78-
pub fn DuplexStream<T:Send,U:Send>()
79-
-> (DuplexStream<T, U>, DuplexStream<U, T>)
80-
{
81-
let (p1, c2) = comm::stream();
82-
let (p2, c1) = comm::stream();
83-
(DuplexStream {
84-
chan: c1,
85-
port: p1
86-
},
87-
DuplexStream {
88-
chan: c2,
89-
port: p2
90-
})
91-
}
92-
9351
/// An extension of `pipes::stream` that provides synchronous message sending.
9452
pub struct SyncChan<T> { priv duplex_stream: DuplexStream<T, ()> }
9553
/// An extension of `pipes::stream` that acknowledges each message received.
9654
pub struct SyncPort<T> { priv duplex_stream: DuplexStream<(), T> }
9755

98-
impl<T: Send> GenericChan<T> for SyncChan<T> {
99-
fn send(&self, val: T) {
56+
impl<T: Send> SyncChan<T> {
57+
pub fn send(&self, val: T) {
10058
assert!(self.try_send(val), "SyncChan.send: receiving port closed");
10159
}
102-
}
10360

104-
impl<T: Send> GenericSmartChan<T> for SyncChan<T> {
105-
/// Sends a message, or report if the receiver has closed the connection before receiving.
106-
fn try_send(&self, val: T) -> bool {
107-
self.duplex_stream.try_send(val) && self.duplex_stream.try_recv().is_some()
61+
/// Sends a message, or report if the receiver has closed the connection
62+
/// before receiving.
63+
pub fn try_send(&self, val: T) -> bool {
64+
self.duplex_stream.try_send(val) && self.duplex_stream.recv_opt().is_some()
10865
}
10966
}
11067

111-
impl<T: Send> GenericPort<T> for SyncPort<T> {
112-
fn recv(&self) -> T {
113-
self.try_recv().expect("SyncPort.recv: sending channel closed")
68+
impl<T: Send> SyncPort<T> {
69+
pub fn recv(&self) -> T {
70+
self.recv_opt().expect("SyncPort.recv: sending channel closed")
11471
}
11572

116-
fn try_recv(&self) -> Option<T> {
117-
self.duplex_stream.try_recv().map(|val| {
73+
pub fn recv_opt(&self) -> Option<T> {
74+
self.duplex_stream.recv_opt().map(|val| {
11875
self.duplex_stream.try_send(());
11976
val
12077
})
12178
}
122-
}
12379

124-
impl<T: Send> Peekable<T> for SyncPort<T> {
125-
fn peek(&self) -> bool {
126-
self.duplex_stream.peek()
80+
pub fn try_recv(&self) -> Option<T> {
81+
self.duplex_stream.try_recv().map(|val| {
82+
self.duplex_stream.try_send(());
83+
val
84+
})
12785
}
12886
}
12987

130-
/// Creates a stream whose channel, upon sending a message, blocks until the message is received.
88+
/// Creates a stream whose channel, upon sending a message, blocks until the
89+
/// message is received.
13190
pub fn rendezvous<T: Send>() -> (SyncPort<T>, SyncChan<T>) {
132-
let (chan_stream, port_stream) = DuplexStream();
133-
(SyncPort { duplex_stream: port_stream }, SyncChan { duplex_stream: chan_stream })
91+
let (chan_stream, port_stream) = DuplexStream::new();
92+
(SyncPort { duplex_stream: port_stream },
93+
SyncChan { duplex_stream: chan_stream })
13494
}
13595

13696
#[cfg(test)]
@@ -141,7 +101,7 @@ mod test {
141101

142102
#[test]
143103
pub fn DuplexStream1() {
144-
let (left, right) = DuplexStream();
104+
let (mut left, mut right) = DuplexStream::new();
145105

146106
left.send(~"abc");
147107
right.send(123);
@@ -152,9 +112,10 @@ mod test {
152112
153113
#[test]
154114
pub fn basic_rendezvous_test() {
155-
let (port, chan) = rendezvous();
115+
let (mut port, chan) = rendezvous();
156116
157117
do spawn {
118+
let mut chan = chan;
158119
chan.send("abc");
159120
}
160121
@@ -165,8 +126,9 @@ mod test {
165126
fn recv_a_lot() {
166127
// Rendezvous streams should be able to handle any number of messages being sent
167128
do run_in_uv_task {
168-
let (port, chan) = rendezvous();
129+
let (mut port, chan) = rendezvous();
169130
do spawn {
131+
let mut chan = chan;
170132
1000000.times(|| { chan.send(()) })
171133
}
172134
1000000.times(|| { port.recv() })
@@ -175,8 +137,9 @@ mod test {
175137

176138
#[test]
177139
fn send_and_fail_and_try_recv() {
178-
let (port, chan) = rendezvous();
140+
let (mut port, chan) = rendezvous();
179141
do spawn {
142+
let mut chan = chan;
180143
chan.duplex_stream.send(()); // Can't access this field outside this module
181144
fail!()
182145
}
@@ -185,8 +148,9 @@ mod test {
185148

186149
#[test]
187150
fn try_send_and_recv_then_fail_before_ack() {
188-
let (port, chan) = rendezvous();
151+
let (port, mut chan) = rendezvous();
189152
do spawn {
153+
let mut port = port;
190154
port.duplex_stream.recv();
191155
fail!()
192156
}
@@ -196,8 +160,9 @@ mod test {
196160
#[test]
197161
#[should_fail]
198162
fn send_and_recv_then_fail_before_ack() {
199-
let (port, chan) = rendezvous();
163+
let (port, mut chan) = rendezvous();
200164
do spawn {
165+
let mut port = port;
201166
port.duplex_stream.recv();
202167
fail!()
203168
}

src/libextra/future.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525

2626
#[allow(missing_doc)];
2727

28-
use std::comm::{PortOne, oneshot};
2928
use std::util::replace;
3029

3130
/// A type encapsulating the result of a computation which may not be complete
@@ -104,7 +103,7 @@ impl<A> Future<A> {
104103
}
105104

106105
impl<A:Send> Future<A> {
107-
pub fn from_port(port: PortOne<A>) -> Future<A> {
106+
pub fn from_port(port: Port<A>) -> Future<A> {
108107
/*!
109108
* Create a future from a port
110109
*
@@ -125,7 +124,7 @@ impl<A:Send> Future<A> {
125124
* value of the future.
126125
*/
127126

128-
let (port, chan) = oneshot();
127+
let (port, chan) = Chan::new();
129128

130129
do spawn {
131130
chan.send(blk());
@@ -139,7 +138,6 @@ impl<A:Send> Future<A> {
139138
mod test {
140139
use future::Future;
141140

142-
use std::comm::oneshot;
143141
use std::task;
144142

145143
#[test]
@@ -150,7 +148,7 @@ mod test {
150148
151149
#[test]
152150
fn test_from_port() {
153-
let (po, ch) = oneshot();
151+
let (po, ch) = Chan::new();
154152
ch.send(~"whale");
155153
let mut f = Future::from_port(po);
156154
assert_eq!(f.get(), ~"whale");

0 commit comments

Comments
 (0)