Skip to content

Commit d664ca2

Browse files
committed
auto merge of #10080 : brson/rust/sched_queue, r=brson
Rebase and update of #9710
2 parents a120011 + a849c47 commit d664ca2

File tree

6 files changed

+446
-105
lines changed

6 files changed

+446
-105
lines changed

src/etc/licenseck.py

+2
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@
7676
"rt/isaac/randport.cpp", # public domain
7777
"rt/isaac/rand.h", # public domain
7878
"rt/isaac/standard.h", # public domain
79+
"libstd/rt/mpsc_queue.rs", # BSD
80+
"libstd/rt/mpmc_bounded_queue.rs", # BSD
7981
]
8082

8183
def check_license(name, contents):

src/libstd/rt/message_queue.rs

+11-49
Original file line numberDiff line numberDiff line change
@@ -11,83 +11,45 @@
1111
//! A concurrent queue that supports multiple producers and a
1212
//! single consumer.
1313
14-
use container::Container;
1514
use kinds::Send;
1615
use vec::OwnedVector;
17-
use cell::Cell;
18-
use option::*;
19-
use unstable::sync::{UnsafeArc, LittleLock};
16+
use option::Option;
2017
use clone::Clone;
18+
use rt::mpsc_queue::Queue;
2119

2220
pub struct MessageQueue<T> {
23-
priv state: UnsafeArc<State<T>>
24-
}
25-
26-
struct State<T> {
27-
count: uint,
28-
queue: ~[T],
29-
lock: LittleLock
21+
priv queue: Queue<T>
3022
}
3123

3224
impl<T: Send> MessageQueue<T> {
3325
pub fn new() -> MessageQueue<T> {
3426
MessageQueue {
35-
state: UnsafeArc::new(State {
36-
count: 0,
37-
queue: ~[],
38-
lock: LittleLock::new()
39-
})
27+
queue: Queue::new()
4028
}
4129
}
4230

31+
#[inline]
4332
pub fn push(&mut self, value: T) {
44-
unsafe {
45-
let value = Cell::new(value);
46-
let state = self.state.get();
47-
do (*state).lock.lock {
48-
(*state).count += 1;
49-
(*state).queue.push(value.take());
50-
}
51-
}
33+
self.queue.push(value)
5234
}
5335

36+
#[inline]
5437
pub fn pop(&mut self) -> Option<T> {
55-
unsafe {
56-
let state = self.state.get();
57-
do (*state).lock.lock {
58-
if !(*state).queue.is_empty() {
59-
(*state).count += 1;
60-
Some((*state).queue.shift())
61-
} else {
62-
None
63-
}
64-
}
65-
}
38+
self.queue.pop()
6639
}
6740

6841
/// A pop that may sometimes miss enqueued elements, but is much faster
6942
/// to give up without doing any synchronization
43+
#[inline]
7044
pub fn casual_pop(&mut self) -> Option<T> {
71-
unsafe {
72-
let state = self.state.get();
73-
// NB: Unsynchronized check
74-
if (*state).count == 0 { return None; }
75-
do (*state).lock.lock {
76-
if !(*state).queue.is_empty() {
77-
(*state).count += 1;
78-
Some((*state).queue.shift())
79-
} else {
80-
None
81-
}
82-
}
83-
}
45+
self.queue.pop()
8446
}
8547
}
8648

8749
impl<T: Send> Clone for MessageQueue<T> {
8850
fn clone(&self) -> MessageQueue<T> {
8951
MessageQueue {
90-
state: self.state.clone()
52+
queue: self.queue.clone()
9153
}
9254
}
9355
}

src/libstd/rt/mod.rs

+6
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,12 @@ mod work_queue;
136136
/// A parallel queue.
137137
mod message_queue;
138138

139+
/// A mostly lock-free multi-producer, single consumer queue.
140+
mod mpsc_queue;
141+
142+
/// A lock-free multi-producer, multi-consumer bounded queue.
143+
mod mpmc_bounded_queue;
144+
139145
/// A parallel data structure for tracking sleeping schedulers.
140146
mod sleeper_list;
141147

src/libstd/rt/mpmc_bounded_queue.rs

+213
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
/* Multi-producer/multi-consumer bounded queue
2+
* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
3+
* Redistribution and use in source and binary forms, with or without
4+
* modification, are permitted provided that the following conditions are met:
5+
*
6+
* 1. Redistributions of source code must retain the above copyright notice,
7+
* this list of conditions and the following disclaimer.
8+
*
9+
* 2. Redistributions in binary form must reproduce the above copyright
10+
* notice, this list of conditions and the following disclaimer in the
11+
* documentation and/or other materials provided with the distribution.
12+
*
13+
* THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
14+
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
15+
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
16+
* SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
17+
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
18+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
19+
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
20+
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
21+
* OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
22+
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
23+
*
24+
* The views and conclusions contained in the software and documentation are
25+
* those of the authors and should not be interpreted as representing official
26+
* policies, either expressed or implied, of Dmitry Vyukov.
27+
*/
28+
29+
// http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
30+
31+
use unstable::sync::UnsafeArc;
32+
use unstable::atomics::{AtomicUint,Relaxed,Release,Acquire};
33+
use option::*;
34+
use vec;
35+
use clone::Clone;
36+
use kinds::Send;
37+
use num::{Exponential,Algebraic,Round};
38+
39+
struct Node<T> {
40+
sequence: AtomicUint,
41+
value: Option<T>,
42+
}
43+
44+
struct State<T> {
45+
pad0: [u8, ..64],
46+
buffer: ~[Node<T>],
47+
mask: uint,
48+
pad1: [u8, ..64],
49+
enqueue_pos: AtomicUint,
50+
pad2: [u8, ..64],
51+
dequeue_pos: AtomicUint,
52+
pad3: [u8, ..64],
53+
}
54+
55+
struct Queue<T> {
56+
priv state: UnsafeArc<State<T>>,
57+
}
58+
59+
impl<T: Send> State<T> {
60+
fn with_capacity(capacity: uint) -> State<T> {
61+
let capacity = if capacity < 2 || (capacity & (capacity - 1)) != 0 {
62+
if capacity < 2 {
63+
2u
64+
} else {
65+
// use next power of 2 as capacity
66+
2f64.pow(&((capacity as f64).log2().ceil())) as uint
67+
}
68+
} else {
69+
capacity
70+
};
71+
let buffer = do vec::from_fn(capacity) |i:uint| {
72+
Node{sequence:AtomicUint::new(i),value:None}
73+
};
74+
State{
75+
pad0: [0, ..64],
76+
buffer: buffer,
77+
mask: capacity-1,
78+
pad1: [0, ..64],
79+
enqueue_pos: AtomicUint::new(0),
80+
pad2: [0, ..64],
81+
dequeue_pos: AtomicUint::new(0),
82+
pad3: [0, ..64],
83+
}
84+
}
85+
86+
fn push(&mut self, value: T) -> bool {
87+
let mask = self.mask;
88+
let mut pos = self.enqueue_pos.load(Relaxed);
89+
loop {
90+
let node = &mut self.buffer[pos & mask];
91+
let seq = node.sequence.load(Acquire);
92+
let diff: int = seq as int - pos as int;
93+
94+
if diff == 0 {
95+
let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed);
96+
if enqueue_pos == pos {
97+
node.value = Some(value);
98+
node.sequence.store(pos+1, Release);
99+
break
100+
} else {
101+
pos = enqueue_pos;
102+
}
103+
} else if (diff < 0) {
104+
return false
105+
} else {
106+
pos = self.enqueue_pos.load(Relaxed);
107+
}
108+
}
109+
true
110+
}
111+
112+
fn pop(&mut self) -> Option<T> {
113+
let mask = self.mask;
114+
let mut pos = self.dequeue_pos.load(Relaxed);
115+
loop {
116+
let node = &mut self.buffer[pos & mask];
117+
let seq = node.sequence.load(Acquire);
118+
let diff: int = seq as int - (pos + 1) as int;
119+
if diff == 0 {
120+
let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed);
121+
if dequeue_pos == pos {
122+
let value = node.value.take();
123+
node.sequence.store(pos + mask + 1, Release);
124+
return value
125+
} else {
126+
pos = dequeue_pos;
127+
}
128+
} else if diff < 0 {
129+
return None
130+
} else {
131+
pos = self.dequeue_pos.load(Relaxed);
132+
}
133+
}
134+
}
135+
}
136+
137+
impl<T: Send> Queue<T> {
138+
pub fn with_capacity(capacity: uint) -> Queue<T> {
139+
Queue{
140+
state: UnsafeArc::new(State::with_capacity(capacity))
141+
}
142+
}
143+
144+
pub fn push(&mut self, value: T) -> bool {
145+
unsafe { (*self.state.get()).push(value) }
146+
}
147+
148+
pub fn pop(&mut self) -> Option<T> {
149+
unsafe { (*self.state.get()).pop() }
150+
}
151+
}
152+
153+
impl<T: Send> Clone for Queue<T> {
154+
fn clone(&self) -> Queue<T> {
155+
Queue {
156+
state: self.state.clone()
157+
}
158+
}
159+
}
160+
161+
#[cfg(test)]
162+
mod tests {
163+
use prelude::*;
164+
use option::*;
165+
use task;
166+
use comm;
167+
use super::Queue;
168+
169+
#[test]
170+
fn test() {
171+
let nthreads = 8u;
172+
let nmsgs = 1000u;
173+
let mut q = Queue::with_capacity(nthreads*nmsgs);
174+
assert_eq!(None, q.pop());
175+
176+
for _ in range(0, nthreads) {
177+
let (port, chan) = comm::stream();
178+
chan.send(q.clone());
179+
do task::spawn_sched(task::SingleThreaded) {
180+
let mut q = port.recv();
181+
for i in range(0, nmsgs) {
182+
assert!(q.push(i));
183+
}
184+
}
185+
}
186+
187+
let mut completion_ports = ~[];
188+
for _ in range(0, nthreads) {
189+
let (completion_port, completion_chan) = comm::stream();
190+
completion_ports.push(completion_port);
191+
let (port, chan) = comm::stream();
192+
chan.send(q.clone());
193+
do task::spawn_sched(task::SingleThreaded) {
194+
let mut q = port.recv();
195+
let mut i = 0u;
196+
loop {
197+
match q.pop() {
198+
None => {},
199+
Some(_) => {
200+
i += 1;
201+
if i == nmsgs { break }
202+
}
203+
}
204+
}
205+
completion_chan.send(i);
206+
}
207+
}
208+
209+
for completion_port in completion_ports.iter() {
210+
assert_eq!(nmsgs, completion_port.recv());
211+
}
212+
}
213+
}

0 commit comments

Comments
 (0)