Skip to content

Reverse the order of the results of pipes::stream #4167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 12, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/libcore/pipes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -978,10 +978,10 @@ pub enum Port<T:Send> {
These allow sending or receiving an unlimited number of messages.

*/
pub fn stream<T:Send>() -> (Chan<T>, Port<T>) {
pub fn stream<T:Send>() -> (Port<T>, Chan<T>) {
let (c, s) = streamp::init();

(Chan_({ mut endp: Some(move c) }), Port_({ mut endp: Some(move s) }))
(Port_({ mut endp: Some(move s) }), Chan_({ mut endp: Some(move c) }))
}

impl<T: Send> Chan<T>: GenericChan<T> {
Expand Down Expand Up @@ -1070,7 +1070,7 @@ impl<T: Send> PortSet<T> {
}

fn chan() -> Chan<T> {
let (ch, po) = stream();
let (po, ch) = stream();
self.add(move po);
move ch
}
Expand Down Expand Up @@ -1240,8 +1240,8 @@ pub mod rt {
pub mod test {
#[test]
pub fn test_select2() {
let (c1, p1) = pipes::stream();
let (c2, p2) = pipes::stream();
let (p1, c1) = pipes::stream();
let (p2, c2) = pipes::stream();

c1.send(~"abc");

Expand All @@ -1264,7 +1264,7 @@ pub mod test {

#[test]
fn test_peek_terminated() {
let (chan, port): (Chan<int>, Port<int>) = stream();
let (port, chan): (Port<int>, Chan<int>) = stream();

{
// Destroy the channel
Expand Down
2 changes: 1 addition & 1 deletion src/libcore/private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ pub mod tests {

for uint::range(0, num_tasks) |_i| {
let total = total.clone();
let (chan, port) = pipes::stream();
let (port, chan) = pipes::stream();
futures.push(move port);

do task::spawn |move total, move chan| {
Expand Down
10 changes: 5 additions & 5 deletions src/libcore/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ impl TaskBuilder {
}

// Construct the future and give it to the caller.
let (notify_pipe_ch, notify_pipe_po) = stream::<TaskResult>();
let (notify_pipe_po, notify_pipe_ch) = stream::<TaskResult>();

blk(move notify_pipe_po);

Expand Down Expand Up @@ -1211,7 +1211,7 @@ fn test_unkillable() {
#[ignore(cfg(windows))]
#[should_fail]
fn test_unkillable_nested() {
let (ch, po) = pipes::stream();
let (po, ch) = pipes::stream();

// We want to do this after failing
do spawn_unlinked |move ch| {
Expand Down Expand Up @@ -1277,7 +1277,7 @@ fn test_child_doesnt_ref_parent() {

#[test]
fn test_sched_thread_per_core() {
let (chan, port) = pipes::stream();
let (port, chan) = pipes::stream();

do spawn_sched(ThreadPerCore) |move chan| {
let cores = rt::rust_num_threads();
Expand All @@ -1291,15 +1291,15 @@ fn test_sched_thread_per_core() {

#[test]
fn test_spawn_thread_on_demand() {
let (chan, port) = pipes::stream();
let (port, chan) = pipes::stream();

do spawn_sched(ManualThreads(2)) |move chan| {
let max_threads = rt::rust_sched_threads();
assert(max_threads as int == 2);
let running_threads = rt::rust_sched_current_nonlazy_threads();
assert(running_threads as int == 1);

let (chan2, port2) = pipes::stream();
let (port2, chan2) = pipes::stream();

do spawn() |move chan2| {
chan2.send(());
Expand Down
4 changes: 2 additions & 2 deletions src/libcore/task/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ fn test_spawn_raw_unsupervise() {
#[test]
#[ignore(cfg(windows))]
fn test_spawn_raw_notify_success() {
let (notify_ch, notify_po) = pipes::stream();
let (notify_po, notify_ch) = pipes::stream();

let opts = {
notify_chan: Some(move notify_ch),
Expand All @@ -685,7 +685,7 @@ fn test_spawn_raw_notify_success() {
#[ignore(cfg(windows))]
fn test_spawn_raw_notify_failure() {
// New bindings for these
let (notify_ch, notify_po) = pipes::stream();
let (notify_po, notify_ch) = pipes::stream();

let opts = {
linked: false,
Expand Down
10 changes: 5 additions & 5 deletions src/librustdoc/markdown_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ fn pandoc_writer(
os::close(pipe_err.out);
os::close(pipe_in.out);

let (stdout_ch, stdout_po) = pipes::stream();
let (stdout_po, stdout_ch) = pipes::stream();
do task::spawn_sched(task::SingleThreaded) |move stdout_ch| {
stdout_ch.send(readclose(pipe_out.in));
}

let (stderr_ch, stderr_po) = pipes::stream();
let (stderr_po, stderr_ch) = pipes::stream();
do task::spawn_sched(task::SingleThreaded) |move stderr_ch| {
stderr_ch.send(readclose(pipe_err.in));
}
Expand Down Expand Up @@ -149,7 +149,7 @@ fn readclose(fd: libc::c_int) -> ~str {
}

fn generic_writer(+process: fn~(+markdown: ~str)) -> Writer {
let (setup_ch, setup_po) = pipes::stream();
let (setup_po, setup_ch) = pipes::stream();
do task::spawn |move process, move setup_ch| {
let po: comm::Port<WriteInstr> = comm::Port();
let ch = comm::Chan(&po);
Expand Down Expand Up @@ -279,7 +279,7 @@ pub fn future_writer_factory(
let markdown_po = comm::Port();
let markdown_ch = comm::Chan(&markdown_po);
let writer_factory = fn~(+page: doc::Page) -> Writer {
let (writer_ch, writer_po) = pipes::stream();
let (writer_po, writer_ch) = pipes::stream();
do task::spawn |move writer_ch| {
let (writer, future) = future_writer();
writer_ch.send(move writer);
Expand All @@ -293,7 +293,7 @@ pub fn future_writer_factory(
}

fn future_writer() -> (Writer, future::Future<~str>) {
let (chan, port) = pipes::stream();
let (port, chan) = pipes::stream();
let writer = fn~(move chan, +instr: WriteInstr) {
chan.send(copy instr);
};
Expand Down
12 changes: 6 additions & 6 deletions src/libstd/arc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ mod tests {
let v = ~[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let arc_v = arc::ARC(v);

let (c, p) = pipes::stream();
let (p, c) = pipes::stream();

do task::spawn() |move c| {
let p = pipes::PortSet();
Expand Down Expand Up @@ -517,7 +517,7 @@ mod tests {
fn test_arc_condvar_poison() {
let arc = ~MutexARC(1);
let arc2 = ~arc.clone();
let (c,p) = pipes::stream();
let (p, c) = pipes::stream();

do task::spawn_unlinked |move arc2, move p| {
let _ = p.recv();
Expand Down Expand Up @@ -551,7 +551,7 @@ mod tests {
fn test_mutex_arc_unwrap_poison() {
let arc = MutexARC(1);
let arc2 = ~(&arc).clone();
let (c,p) = pipes::stream();
let (p, c) = pipes::stream();
do task::spawn |move c, move arc2| {
do arc2.access |one| {
c.send(());
Expand Down Expand Up @@ -649,7 +649,7 @@ mod tests {
fn test_rw_arc() {
let arc = ~RWARC(0);
let arc2 = ~arc.clone();
let (c,p) = pipes::stream();
let (p,c) = pipes::stream();

do task::spawn |move arc2, move c| {
do arc2.write |num| {
Expand Down Expand Up @@ -695,7 +695,7 @@ mod tests {
// Reader tasks
let mut reader_convos = ~[];
for 10.times {
let ((rc1,rp1),(rc2,rp2)) = (pipes::stream(),pipes::stream());
let ((rp1,rc1),(rp2,rc2)) = (pipes::stream(),pipes::stream());
reader_convos.push((move rc1, move rp2));
let arcn = ~arc.clone();
do task::spawn |move rp1, move rc2, move arcn| {
Expand All @@ -709,7 +709,7 @@ mod tests {

// Writer task
let arc2 = ~arc.clone();
let ((wc1,wp1),(wc2,wp2)) = (pipes::stream(),pipes::stream());
let ((wp1,wc1),(wp2,wc2)) = (pipes::stream(),pipes::stream());
do task::spawn |move arc2, move wc2, move wp1| {
wp1.recv();
do arc2.write_cond |state, cond| {
Expand Down
4 changes: 2 additions & 2 deletions src/libstd/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ impl<T: Send, U: Send> DuplexStream<T, U> : Selectable {
pub fn DuplexStream<T: Send, U: Send>()
-> (DuplexStream<T, U>, DuplexStream<U, T>)
{
let (c2, p1) = pipes::stream();
let (c1, p2) = pipes::stream();
let (p1, c2) = pipes::stream();
let (p2, c1) = pipes::stream();
(DuplexStream {
chan: move c1,
port: move p1
Expand Down
36 changes: 18 additions & 18 deletions src/libstd/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ struct Waitqueue { head: pipes::Port<SignalEnd>,
tail: pipes::Chan<SignalEnd> }

fn new_waitqueue() -> Waitqueue {
let (block_tail, block_head) = pipes::stream();
let (block_head, block_tail) = pipes::stream();
Waitqueue { head: move block_head, tail: move block_tail }
}

Expand Down Expand Up @@ -733,7 +733,7 @@ mod tests {
#[test]
fn test_sem_as_cvar() {
/* Child waits and parent signals */
let (c,p) = pipes::stream();
let (p,c) = pipes::stream();
let s = ~semaphore(0);
let s2 = ~s.clone();
do task::spawn |move s2, move c| {
Expand All @@ -745,7 +745,7 @@ mod tests {
let _ = p.recv();

/* Parent waits and child signals */
let (c,p) = pipes::stream();
let (p,c) = pipes::stream();
let s = ~semaphore(0);
let s2 = ~s.clone();
do task::spawn |move s2, move p| {
Expand All @@ -762,8 +762,8 @@ mod tests {
// time, and shake hands.
let s = ~semaphore(2);
let s2 = ~s.clone();
let (c1,p1) = pipes::stream();
let (c2,p2) = pipes::stream();
let (p1,c1) = pipes::stream();
let (p2,c2) = pipes::stream();
do task::spawn |move s2, move c1, move p2| {
do s2.access {
let _ = p2.recv();
Expand All @@ -782,7 +782,7 @@ mod tests {
do task::spawn_sched(task::ManualThreads(1)) {
let s = ~semaphore(1);
let s2 = ~s.clone();
let (c,p) = pipes::stream();
let (p,c) = pipes::stream();
let child_data = ~mut Some((move s2, move c));
do s.access {
let (s2,c) = option::swap_unwrap(child_data);
Expand All @@ -804,7 +804,7 @@ mod tests {
fn test_mutex_lock() {
// Unsafely achieve shared state, and do the textbook
// "load tmp = move ptr; inc tmp; store ptr <- tmp" dance.
let (c,p) = pipes::stream();
let (p,c) = pipes::stream();
let m = ~Mutex();
let m2 = ~m.clone();
let mut sharedstate = ~0;
Expand Down Expand Up @@ -847,7 +847,7 @@ mod tests {
cond.wait();
}
// Parent wakes up child
let (chan,port) = pipes::stream();
let (port,chan) = pipes::stream();
let m3 = ~m.clone();
do task::spawn |move chan, move m3| {
do m3.lock_cond |cond| {
Expand All @@ -870,7 +870,7 @@ mod tests {

for num_waiters.times {
let mi = ~m.clone();
let (chan, port) = pipes::stream();
let (port, chan) = pipes::stream();
ports.push(move port);
do task::spawn |move chan, move mi| {
do mi.lock_cond |cond| {
Expand Down Expand Up @@ -932,7 +932,7 @@ mod tests {
let m2 = ~m.clone();

let result: result::Result<(),()> = do task::try |move m2| {
let (c,p) = pipes::stream();
let (p,c) = pipes::stream();
do task::spawn |move p| { // linked
let _ = p.recv(); // wait for sibling to get in the mutex
task::yield();
Expand All @@ -954,12 +954,12 @@ mod tests {
fn test_mutex_killed_broadcast() {
let m = ~Mutex();
let m2 = ~m.clone();
let (c,p) = pipes::stream();
let (p,c) = pipes::stream();

let result: result::Result<(),()> = do task::try |move c, move m2| {
let mut sibling_convos = ~[];
for 2.times {
let (c,p) = pipes::stream();
let (p,c) = pipes::stream();
let c = ~mut Some(move c);
sibling_convos.push(move p);
let mi = ~m2.clone();
Expand Down Expand Up @@ -1022,7 +1022,7 @@ mod tests {
let result = do task::try {
let m = ~mutex_with_condvars(2);
let m2 = ~m.clone();
let (c,p) = pipes::stream();
let (p,c) = pipes::stream();
do task::spawn |move m2, move c| {
do m2.lock_cond |cond| {
c.send(());
Expand Down Expand Up @@ -1082,7 +1082,7 @@ mod tests {
mode2: RWlockMode) {
// Test mutual exclusion between readers and writers. Just like the
// mutex mutual exclusion test, a ways above.
let (c,p) = pipes::stream();
let (p,c) = pipes::stream();
let x2 = ~x.clone();
let mut sharedstate = ~0;
let ptr = ptr::addr_of(&(*sharedstate));
Expand Down Expand Up @@ -1127,8 +1127,8 @@ mod tests {
mode2: RWlockMode, make_mode2_go_first: bool) {
// Much like sem_multi_resource.
let x2 = ~x.clone();
let (c1,p1) = pipes::stream();
let (c2,p2) = pipes::stream();
let (p1,c1) = pipes::stream();
let (p2,c2) = pipes::stream();
do task::spawn |move c1, move x2, move p2| {
if !make_mode2_go_first {
let _ = p2.recv(); // parent sends to us once it locks, or ...
Expand Down Expand Up @@ -1193,7 +1193,7 @@ mod tests {
cond.wait();
}
// Parent wakes up child
let (chan,port) = pipes::stream();
let (port,chan) = pipes::stream();
let x3 = ~x.clone();
do task::spawn |move x3, move chan| {
do x3.write_cond |cond| {
Expand Down Expand Up @@ -1229,7 +1229,7 @@ mod tests {

for num_waiters.times {
let xi = ~x.clone();
let (chan, port) = pipes::stream();
let (port, chan) = pipes::stream();
ports.push(move port);
do task::spawn |move chan, move xi| {
do lock_cond(xi, dg1) |cond| {
Expand Down
2 changes: 1 addition & 1 deletion src/libstd/task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub impl<T> TaskPool<T> {
assert n_tasks >= 1;

let channels = do vec::from_fn(n_tasks) |i| {
let (chan, port) = pipes::stream::<Msg<T>>();
let (port, chan) = pipes::stream::<Msg<T>>();
let init_fn = init_fn_factory();

let task_body: ~fn() = |move port, move init_fn| {
Expand Down
4 changes: 2 additions & 2 deletions src/test/bench/msgsend-pipes-shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ fn server(requests: Port<request>, responses: pipes::Chan<uint>) {
}

fn run(args: &[~str]) {
let (to_parent, from_child) = pipes::stream();
let (to_child, from_parent) = pipes::stream();
let (from_child, to_parent) = pipes::stream();
let (from_parent, to_child) = pipes::stream();

let to_child = SharedChan(move to_child);

Expand Down
Loading