@@ -20,7 +20,7 @@ use std::sync::atomics;
20
20
use std:: mem;
21
21
use std:: rt:: rtio:: { EventLoop , IoFactory , RemoteCallback } ;
22
22
use std:: rt:: rtio:: { PausableIdleCallback , Callback } ;
23
- use std:: unstable :: sync :: Exclusive ;
23
+ use std:: rt :: exclusive :: Exclusive ;
24
24
25
25
/// This is the only exported function from this module.
26
26
pub fn event_loop ( ) -> Box < EventLoop : Send > {
@@ -31,7 +31,7 @@ struct BasicLoop {
31
31
work : Vec < proc ( ) : Send > , // pending work
32
32
remotes : Vec < ( uint , Box < Callback : Send > ) > ,
33
33
next_remote : uint ,
34
- messages : Exclusive < Vec < Message > > ,
34
+ messages : Arc < Exclusive < Vec < Message > > > ,
35
35
idle : Option < Box < Callback : Send > > ,
36
36
idle_active : Option < Arc < atomics:: AtomicBool > > ,
37
37
}
@@ -46,7 +46,7 @@ impl BasicLoop {
46
46
idle_active : None ,
47
47
next_remote : 0 ,
48
48
remotes : vec ! [ ] ,
49
- messages : Exclusive :: new ( vec ! [ ] ) ,
49
+ messages : Arc :: new ( Exclusive :: new ( Vec :: new ( ) ) ) ,
50
50
}
51
51
}
52
52
@@ -61,19 +61,10 @@ impl BasicLoop {
61
61
62
62
fn remote_work ( & mut self ) {
63
63
let messages = unsafe {
64
- self . messages . with ( |messages| {
65
- if messages. len ( ) > 0 {
66
- Some ( mem:: replace ( messages, vec ! [ ] ) )
67
- } else {
68
- None
69
- }
70
- } )
71
- } ;
72
- let messages = match messages {
73
- Some ( m) => m, None => return
64
+ mem:: replace ( & mut * self . messages . lock ( ) , Vec :: new ( ) )
74
65
} ;
75
- for message in messages. iter ( ) {
76
- self . message ( * message) ;
66
+ for message in messages. move_iter ( ) {
67
+ self . message ( message) ;
77
68
}
78
69
}
79
70
@@ -125,13 +116,13 @@ impl EventLoop for BasicLoop {
125
116
}
126
117
127
118
unsafe {
119
+ let mut messages = self . messages . lock ( ) ;
128
120
// We block here if we have no messages to process and we may
129
121
// receive a message at a later date
130
- self . messages . hold_and_wait ( |messages| {
131
- self . remotes . len ( ) > 0 &&
132
- messages. len ( ) == 0 &&
133
- self . work . len ( ) == 0
134
- } )
122
+ if self . remotes . len ( ) > 0 && messages. len ( ) == 0 &&
123
+ self . work . len ( ) == 0 {
124
+ messages. wait ( )
125
+ }
135
126
}
136
127
}
137
128
}
@@ -165,33 +156,29 @@ impl EventLoop for BasicLoop {
165
156
}
166
157
167
158
struct BasicRemote {
168
- queue : Exclusive < Vec < Message > > ,
159
+ queue : Arc < Exclusive < Vec < Message > > > ,
169
160
id : uint ,
170
161
}
171
162
172
163
impl BasicRemote {
173
- fn new ( queue : Exclusive < Vec < Message > > , id : uint ) -> BasicRemote {
164
+ fn new ( queue : Arc < Exclusive < Vec < Message > > > , id : uint ) -> BasicRemote {
174
165
BasicRemote { queue : queue, id : id }
175
166
}
176
167
}
177
168
178
169
impl RemoteCallback for BasicRemote {
179
170
fn fire ( & mut self ) {
180
- unsafe {
181
- self . queue . hold_and_signal ( |queue| {
182
- queue. push ( RunRemote ( self . id ) ) ;
183
- } )
184
- }
171
+ let mut queue = unsafe { self . queue . lock ( ) } ;
172
+ queue. push ( RunRemote ( self . id ) ) ;
173
+ queue. signal ( ) ;
185
174
}
186
175
}
187
176
188
177
impl Drop for BasicRemote {
189
178
fn drop ( & mut self ) {
190
- unsafe {
191
- self . queue . hold_and_signal ( |queue| {
192
- queue. push ( RemoveRemote ( self . id ) ) ;
193
- } )
194
- }
179
+ let mut queue = unsafe { self . queue . lock ( ) } ;
180
+ queue. push ( RemoveRemote ( self . id ) ) ;
181
+ queue. signal ( ) ;
195
182
}
196
183
}
197
184
@@ -216,7 +203,7 @@ impl Drop for BasicPausable {
216
203
217
204
#[ cfg( test) ]
218
205
mod test {
219
- use std:: task:: TaskOpts ;
206
+ use std:: rt :: task:: TaskOpts ;
220
207
221
208
use basic;
222
209
use PoolConfig ;
0 commit comments