1111} ;
1212
1313/// Bidirectional interprocess communication
14+ ///
15+ /// # SYNOPSIS
16+ ///
17+ /// A channel is a bidirectional transport of messages consisting of some
18+ /// amount of byte data and some number of handles.
19+ ///
20+ /// # DESCRIPTION
21+ ///
22+ /// The process of sending a message via a channel has two steps. The first is to
23+ /// atomically write the data into the channel and move ownership of all handles in
24+ /// the message into this channel. This operation always consumes the handles: at
25+ /// the end of the call, all handles either are all in the channel or are all
26+ /// discarded. The second operation, channel read, is similar: on success
27+ /// all the handles in the next message are atomically moved into the
28+ /// receiving process' handle table. On failure, the channel retains ownership.
1429pub struct Channel {
1530 base : KObjectBase ,
1631 _counter : CountHelper ,
@@ -87,9 +102,9 @@ impl Channel {
87102 /// Write a packet to the channel
88103 pub fn write ( & self , msg : T ) -> ZxResult {
89104 let peer = self . peer . upgrade ( ) . ok_or ( ZxError :: PEER_CLOSED ) ?;
90- if msg . data . len ( ) >= 4 {
91- // check first 4 bytes: whether it is a call reply?
92- let txid = TxID :: from_ne_bytes ( msg . data [ .. 4 ] . try_into ( ) . unwrap ( ) ) ;
105+ // check first 4 bytes: whether it is a call reply?
106+ let txid = msg . get_txid ( ) ;
107+ if txid != 0 {
93108 if let Some ( sender) = peer. call_reply . lock ( ) . remove ( & txid) {
94109 let _ = sender. send ( Ok ( msg) ) ;
95110 return Ok ( ( ) ) ;
@@ -100,10 +115,17 @@ impl Channel {
100115 }
101116
102117 /// Send a message to a channel and await a reply.
118+ ///
119+ /// The first four bytes of the written and read back messages are treated as a
120+ /// transaction ID. The kernel generates a txid for the
121+ /// written message, replacing that part of the message as read from userspace.
122+ ///
123+ /// `msg.data` must have at lease a length of 4 bytes.
103124 pub async fn call ( self : & Arc < Self > , mut msg : T ) -> ZxResult < T > {
125+ assert ! ( msg. data. len( ) >= 4 ) ;
104126 let peer = self . peer . upgrade ( ) . ok_or ( ZxError :: PEER_CLOSED ) ?;
105127 let txid = self . new_txid ( ) ;
106- msg. data [ .. 4 ] . copy_from_slice ( & txid. to_ne_bytes ( ) ) ;
128+ msg. set_txid ( txid) ;
107129 peer. push_general ( msg) ;
108130 let ( sender, receiver) = oneshot:: channel ( ) ;
109131 self . call_reply . lock ( ) . insert ( txid, sender) ;
@@ -143,17 +165,54 @@ impl Drop for Channel {
143165 }
144166}
145167
146- #[ derive( Default ) ]
168+ /// The message transferred in the channel.
169+ /// See [Channel](struct.Channel.html) for details.
170+ #[ derive( Default , Debug ) ]
147171pub struct MessagePacket {
172+ /// The data carried by the message packet
148173 pub data : Vec < u8 > ,
174+ /// See [Channel](struct.Channel.html) for details.
149175 pub handles : Vec < Handle > ,
150176}
151177
178+ impl MessagePacket {
179+ /// Set txid (the first 4 bytes)
180+ pub fn set_txid ( & mut self , txid : TxID ) {
181+ if self . data . len ( ) >= core:: mem:: size_of :: < TxID > ( ) {
182+ self . data [ ..4 ] . copy_from_slice ( & txid. to_ne_bytes ( ) ) ;
183+ }
184+ }
185+
186+ /// Get txid (the first 4 bytes)
187+ pub fn get_txid ( & self ) -> TxID {
188+ if self . data . len ( ) >= core:: mem:: size_of :: < TxID > ( ) {
189+ TxID :: from_ne_bytes ( self . data [ ..4 ] . try_into ( ) . unwrap ( ) )
190+ } else {
191+ 0
192+ }
193+ }
194+ }
195+
152196#[ cfg( test) ]
153197mod tests {
154198 use super :: * ;
155199 use alloc:: boxed:: Box ;
156200 use core:: sync:: atomic:: * ;
201+ use core:: time:: Duration ;
202+
203+ #[ test]
204+ fn test_basics ( ) {
205+ let ( end0, end1) = Channel :: create ( ) ;
206+ assert ! ( Arc :: ptr_eq(
207+ & end0. peer( ) . unwrap( ) . downcast_arc( ) . unwrap( ) ,
208+ & end1
209+ ) ) ;
210+ assert_eq ! ( end0. related_koid( ) , end1. id( ) ) ;
211+
212+ drop ( end1) ;
213+ assert_eq ! ( end0. peer( ) . unwrap_err( ) , ZxError :: PEER_CLOSED ) ;
214+ assert_eq ! ( end0. related_koid( ) , 0 ) ;
215+ }
157216
158217 #[ test]
159218 fn read_write ( ) {
@@ -239,4 +298,68 @@ mod tests {
239298 drop ( channel1) ;
240299 assert ! ( peer_closed. load( Ordering :: SeqCst ) ) ;
241300 }
301+
302+ #[ async_std:: test]
303+ async fn call ( ) {
304+ let ( channel0, channel1) = Channel :: create ( ) ;
305+ async_std:: task:: spawn ( {
306+ let channel1 = channel1. clone ( ) ;
307+ async move {
308+ async_std:: task:: sleep ( Duration :: from_millis ( 10 ) ) . await ;
309+ let recv_msg = channel1. read ( ) . unwrap ( ) ;
310+ let txid = recv_msg. get_txid ( ) ;
311+ assert_eq ! ( txid, 0x8000_0000 ) ;
312+ assert_eq ! ( txid. to_ne_bytes( ) , & recv_msg. data[ ..4 ] ) ;
313+ assert_eq ! ( & recv_msg. data[ 4 ..] , b"o 0" ) ;
314+ // write an irrelevant message
315+ channel1
316+ . write ( MessagePacket {
317+ data : Vec :: from ( "hello 1" ) ,
318+ handles : Vec :: new ( ) ,
319+ } )
320+ . unwrap ( ) ;
321+ // reply the call
322+ let mut data: Vec < u8 > = vec ! [ ] ;
323+ data. append ( & mut txid. to_ne_bytes ( ) . to_vec ( ) ) ;
324+ data. append ( & mut Vec :: from ( "hello 2" ) ) ;
325+ channel1
326+ . write ( MessagePacket {
327+ data,
328+ handles : Vec :: new ( ) ,
329+ } )
330+ . unwrap ( ) ;
331+ }
332+ } ) ;
333+
334+ let recv_msg = channel0
335+ . call ( MessagePacket {
336+ data : Vec :: from ( "hello 0" ) ,
337+ handles : Vec :: new ( ) ,
338+ } )
339+ . await
340+ . unwrap ( ) ;
341+ let txid = recv_msg. get_txid ( ) ;
342+ assert_eq ! ( txid, 0x8000_0000 ) ;
343+ assert_eq ! ( txid. to_ne_bytes( ) , & recv_msg. data[ ..4 ] ) ;
344+ assert_eq ! ( & recv_msg. data[ 4 ..] , b"hello 2" ) ;
345+
346+ // peer dropped when calling
347+ let ( channel0, channel1) = Channel :: create ( ) ;
348+ async_std:: task:: spawn ( {
349+ async move {
350+ async_std:: task:: sleep ( Duration :: from_millis ( 10 ) ) . await ;
351+ let _ = channel1;
352+ }
353+ } ) ;
354+ assert_eq ! (
355+ channel0
356+ . call( MessagePacket {
357+ data: Vec :: from( "hello 0" ) ,
358+ handles: Vec :: new( ) ,
359+ } )
360+ . await
361+ . unwrap_err( ) ,
362+ ZxError :: PEER_CLOSED
363+ ) ;
364+ }
242365}
0 commit comments