Skip to content

Commit ba10819

Browse files
committed
Added a message send that uses shared chans. They are slower than port selectors, but scale better.
1 parent 75e55c1 commit ba10819

File tree

1 file changed

+208
-0
lines changed

1 file changed

+208
-0
lines changed
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
// A port of the simplistic benchmark from
2+
//
3+
// http://github.com/PaulKeeble/ScalaVErlangAgents
4+
//
5+
// I *think* it's the same, more or less.
6+
7+
// This version uses pipes with a shared send endpoint. It should have
8+
// different scalability characteristics compared to the select
9+
// version.
10+
11+
use std;
12+
import io::writer;
13+
import io::writer_util;
14+
15+
import arc::methods;
16+
import pipes::{port, chan};
17+
18+
macro_rules! move {
19+
{ $x:expr } => { unsafe { let y <- *ptr::addr_of($x); y } }
20+
}
21+
22+
enum request {
23+
get_count,
24+
bytes(uint),
25+
stop
26+
}
27+
28+
fn server(requests: port<request>, responses: pipes::chan<uint>) {
29+
let mut count = 0u;
30+
let mut done = false;
31+
while !done {
32+
alt requests.try_recv() {
33+
some(get_count) { responses.send(copy count); }
34+
some(bytes(b)) {
35+
//#error("server: received %? bytes", b);
36+
count += b;
37+
}
38+
none { done = true; }
39+
_ { }
40+
}
41+
}
42+
responses.send(count);
43+
//#error("server exiting");
44+
}
45+
46+
fn run(args: &[str]) {
47+
let (to_parent, from_child) = pipes::stream();
48+
let (to_child, from_parent) = pipes::stream();
49+
50+
let to_child = shared_chan(to_child);
51+
52+
let size = option::get(uint::from_str(args[1]));
53+
let workers = option::get(uint::from_str(args[2]));
54+
let num_bytes = 100;
55+
let start = std::time::precise_time_s();
56+
let mut worker_results = ~[];
57+
for uint::range(0u, workers) |i| {
58+
let builder = task::builder();
59+
vec::push(worker_results, task::future_result(builder));
60+
let to_child = to_child.clone();
61+
do task::run(builder) {
62+
for uint::range(0u, size / workers) |_i| {
63+
//#error("worker %?: sending %? bytes", i, num_bytes);
64+
to_child.send(bytes(num_bytes));
65+
}
66+
//#error("worker %? exiting", i);
67+
};
68+
}
69+
do task::spawn {
70+
server(from_parent, to_parent);
71+
}
72+
73+
vec::iter(worker_results, |r| { future::get(r); } );
74+
//#error("sending stop message");
75+
to_child.send(stop);
76+
move!{to_child};
77+
let result = from_child.recv();
78+
let end = std::time::precise_time_s();
79+
let elapsed = end - start;
80+
io::stdout().write_str(#fmt("Count is %?\n", result));
81+
io::stdout().write_str(#fmt("Test took %? seconds\n", elapsed));
82+
let thruput = ((size / workers * workers) as float) / (elapsed as float);
83+
io::stdout().write_str(#fmt("Throughput=%f per sec\n", thruput));
84+
assert result == num_bytes * size;
85+
}
86+
87+
fn main(args: ~[str]) {
88+
let args = if os::getenv("RUST_BENCH").is_some() {
89+
~["", "1000000", "10000"]
90+
} else if args.len() <= 1u {
91+
~["", "10000", "4"]
92+
} else {
93+
copy args
94+
};
95+
96+
#debug("%?", args);
97+
run(args);
98+
}
99+
100+
// Treat a whole bunch of ports as one.
101+
class box<T> {
102+
let mut contents: option<T>;
103+
new(+x: T) { self.contents = some(x); }
104+
105+
fn swap(f: fn(+T) -> T) {
106+
let mut tmp = none;
107+
self.contents <-> tmp;
108+
self.contents = some(f(option::unwrap(tmp)));
109+
}
110+
111+
fn unwrap() -> T {
112+
let mut tmp = none;
113+
self.contents <-> tmp;
114+
option::unwrap(tmp)
115+
}
116+
}
117+
118+
class port_set<T: send> {
119+
let mut ports: ~[pipes::port<T>];
120+
121+
new() { self.ports = ~[]; }
122+
123+
fn add(+port: pipes::port<T>) {
124+
vec::push(self.ports, port)
125+
}
126+
127+
fn try_recv() -> option<T> {
128+
let mut result = none;
129+
while result == none && self.ports.len() > 0 {
130+
let i = pipes::wait_many(self.ports.map(|p| p.header()));
131+
// dereferencing an unsafe pointer nonsense to appease the
132+
// borrowchecker.
133+
alt unsafe {(*ptr::addr_of(self.ports[i])).try_recv()} {
134+
some(m) {
135+
result = some(move!{m});
136+
}
137+
none {
138+
// Remove this port.
139+
let mut ports = ~[];
140+
self.ports <-> ports;
141+
vec::consume(ports,
142+
|j, x| if i != j { vec::push(self.ports, x) });
143+
}
144+
}
145+
}
146+
/*
147+
while !done {
148+
do self.ports.swap |ports| {
149+
if ports.len() > 0 {
150+
let old_len = ports.len();
151+
let (_, m, ports) = pipes::select(ports);
152+
alt m {
153+
some(pipes::streamp::data(x, next)) {
154+
result = some(move!{x});
155+
done = true;
156+
assert ports.len() == old_len - 1;
157+
vec::append_one(ports, move!{next})
158+
}
159+
none {
160+
//#error("pipe closed");
161+
assert ports.len() == old_len - 1;
162+
ports
163+
}
164+
}
165+
}
166+
else {
167+
//#error("no more pipes");
168+
done = true;
169+
~[]
170+
}
171+
}
172+
}
173+
*/
174+
result
175+
}
176+
177+
fn recv() -> T {
178+
option::unwrap(self.try_recv())
179+
}
180+
}
181+
182+
impl private_methods/&<T: send> for pipes::port<T> {
183+
pure fn header() -> *pipes::packet_header unchecked {
184+
alt self.endp {
185+
some(endp) {
186+
endp.header()
187+
}
188+
none { fail "peeking empty stream" }
189+
}
190+
}
191+
}
192+
193+
type shared_chan<T: send> = arc::exclusive<pipes::chan<T>>;
194+
195+
impl chan<T: send> for shared_chan<T> {
196+
fn send(+x: T) {
197+
let mut xx = some(x);
198+
do self.with |_c, chan| {
199+
let mut x = none;
200+
x <-> xx;
201+
chan.send(option::unwrap(x))
202+
}
203+
}
204+
}
205+
206+
fn shared_chan<T:send>(+c: pipes::chan<T>) -> shared_chan<T> {
207+
arc::exclusive(c)
208+
}

0 commit comments

Comments
 (0)