@@ -5,12 +5,13 @@ use std::{
55 os:: unix:: io:: { AsRawFd , RawFd } ,
66 pin:: Pin ,
77 ptr,
8+ sync:: atomic:: { AtomicBool , Ordering } ,
89 task:: { self , Poll } ,
910} ;
1011
11- use log:: { error, warn} ;
12+ use log:: { debug , error, warn} ;
1213use pin_project:: pin_project;
13- use socket2:: { Domain , Protocol , Socket , Type } ;
14+ use socket2:: { Domain , Protocol , SockAddr , Socket , Type } ;
1415use tokio:: {
1516 io:: { AsyncRead , AsyncWrite , ReadBuf } ,
1617 net:: { TcpSocket , TcpStream as TokioTcpStream , UdpSocket } ,
@@ -19,6 +20,7 @@ use tokio_tfo::TfoStream;
1920
2021use crate :: net:: {
2122 sys:: { set_common_sockopt_after_connect, set_common_sockopt_for_connect, socket_bind_dual_stack} ,
23+ udp:: { BatchRecvMessage , BatchSendMessage } ,
2224 AddrFamily ,
2325 ConnectOpts ,
2426} ;
@@ -273,3 +275,172 @@ pub async fn create_outbound_udp_socket(af: AddrFamily, config: &ConnectOpts) ->
273275
274276 Ok ( socket)
275277}
278+
279+ /// https://github.com/apple/darwin-xnu/blob/main/bsd/sys/socket.h
280+ #[ repr( C ) ]
281+ struct msghdr_x {
282+ msg_name : * mut libc:: c_void , //< optional address
283+ msg_namelen : libc:: socklen_t , //< size of address
284+ msg_iov : * mut libc:: iovec , //< scatter/gather array
285+ msg_iovlen : libc:: c_int , //< # elements in msg_iov
286+ msg_control : * mut libc:: c_void , //< ancillary data, see below
287+ msg_controllen : libc:: socklen_t , //< ancillary data buffer len
288+ msg_flags : libc:: c_int , //< flags on received message
289+ msg_datalen : libc:: size_t , //< byte length of buffer in msg_iov
290+ }
291+
292+ extern "C" {
293+ fn recvmsg_x ( s : libc:: c_int , msgp : * const msghdr_x , cnt : libc:: c_uint , flags : libc:: c_int ) -> libc:: ssize_t ;
294+ fn sendmsg_x ( s : libc:: c_int , msgp : * const msghdr_x , cnt : libc:: c_uint , flags : libc:: c_int ) -> libc:: ssize_t ;
295+ }
296+
297+ static SUPPORT_BATCH_SEND_RECV_MSG : AtomicBool = AtomicBool :: new ( true ) ;
298+
299+ fn recvmsg_fallback < S : AsRawFd > ( sock : & S , msg : & mut BatchRecvMessage < ' _ > ) -> io:: Result < ( ) > {
300+ let mut hdr: libc:: msghdr = unsafe { mem:: zeroed ( ) } ;
301+
302+ let addr_storage: libc:: sockaddr_storage = unsafe { mem:: zeroed ( ) } ;
303+ let addr_len = mem:: size_of_val ( & addr_storage) as libc:: socklen_t ;
304+ let sock_addr = unsafe { SockAddr :: new ( addr_storage, addr_len) } ;
305+ hdr. msg_name = sock_addr. as_ptr ( ) as * mut _ ;
306+ hdr. msg_namelen = sock_addr. len ( ) as _ ;
307+
308+ hdr. msg_iov = msg. data . as_ptr ( ) as * mut _ ;
309+ hdr. msg_iovlen = msg. data . len ( ) as _ ;
310+
311+ let ret = unsafe { libc:: recvmsg ( sock. as_raw_fd ( ) , & mut hdr as * mut _ , 0 ) } ;
312+ if ret < 0 {
313+ return Err ( io:: Error :: last_os_error ( ) ) ;
314+ }
315+
316+ msg. addr = sock_addr. as_socket ( ) . expect ( "SockAddr.as_socket" ) ;
317+ msg. data_len = ret as usize ;
318+
319+ Ok ( ( ) )
320+ }
321+
322+ pub fn batch_recvmsg < S : AsRawFd > ( sock : & S , msgs : & mut [ BatchRecvMessage < ' _ > ] ) -> io:: Result < usize > {
323+ if msgs. is_empty ( ) {
324+ return Ok ( 0 ) ;
325+ }
326+
327+ if !SUPPORT_BATCH_SEND_RECV_MSG . load ( Ordering :: Acquire ) {
328+ recvmsg_fallback ( sock, & mut msgs[ 0 ] ) ?;
329+ return Ok ( 1 ) ;
330+ }
331+
332+ let mut vec_msg_name = Vec :: with_capacity ( msgs. len ( ) ) ;
333+ let mut vec_msg_hdr = Vec :: with_capacity ( msgs. len ( ) ) ;
334+
335+ for msg in msgs. iter_mut ( ) {
336+ let mut hdr: msghdr_x = unsafe { mem:: zeroed ( ) } ;
337+
338+ let addr_storage: libc:: sockaddr_storage = unsafe { mem:: zeroed ( ) } ;
339+ let addr_len = mem:: size_of_val ( & addr_storage) as libc:: socklen_t ;
340+
341+ vec_msg_name. push ( unsafe { SockAddr :: new ( addr_storage, addr_len) } ) ;
342+ let sock_addr = vec_msg_name. last_mut ( ) . unwrap ( ) ;
343+ hdr. msg_name = sock_addr. as_ptr ( ) as * mut _ ;
344+ hdr. msg_namelen = sock_addr. len ( ) as _ ;
345+
346+ hdr. msg_iov = msg. data . as_ptr ( ) as * mut _ ;
347+ hdr. msg_iovlen = msg. data . len ( ) as _ ;
348+
349+ vec_msg_hdr. push ( hdr) ;
350+ }
351+
352+ let ret = unsafe { recvmsg_x ( sock. as_raw_fd ( ) , vec_msg_hdr. as_ptr ( ) , vec_msg_hdr. len ( ) as _ , 0 ) } ;
353+ if ret < 0 {
354+ let err = io:: Error :: last_os_error ( ) ;
355+ if let Some ( libc:: ENOSYS ) = err. raw_os_error ( ) {
356+ debug ! ( "recvmsg_x is not supported, fallback to recvmsg, error: {:?}" , err) ;
357+ SUPPORT_BATCH_SEND_RECV_MSG . store ( false , Ordering :: Release ) ;
358+
359+ recvmsg_fallback ( sock, & mut msgs[ 0 ] ) ?;
360+ return Ok ( 1 ) ;
361+ }
362+ return Err ( err) ;
363+ }
364+
365+ for idx in 0 ..ret as usize {
366+ let msg = & mut msgs[ idx] ;
367+ let hdr = & vec_msg_hdr[ idx] ;
368+ let name = & vec_msg_name[ idx] ;
369+ msg. addr = name. as_socket ( ) . expect ( "SockAddr.as_socket" ) ;
370+ msg. data_len = hdr. msg_datalen as usize ;
371+ }
372+
373+ Ok ( ret as usize )
374+ }
375+
376+ fn sendmsg_fallback < S : AsRawFd > ( sock : & S , msg : & mut BatchSendMessage < ' _ > ) -> io:: Result < ( ) > {
377+ let mut hdr: libc:: msghdr = unsafe { mem:: zeroed ( ) } ;
378+
379+ let sock_addr = msg. addr . map ( SockAddr :: from) ;
380+ if let Some ( ref sa) = sock_addr {
381+ hdr. msg_name = sa. as_ptr ( ) as * mut _ ;
382+ hdr. msg_namelen = sa. len ( ) as _ ;
383+ }
384+
385+ hdr. msg_iov = msg. data . as_ptr ( ) as * mut _ ;
386+ hdr. msg_iovlen = msg. data . len ( ) as _ ;
387+
388+ let ret = unsafe { libc:: sendmsg ( sock. as_raw_fd ( ) , & hdr as * const _ , 0 ) } ;
389+ if ret < 0 {
390+ return Err ( io:: Error :: last_os_error ( ) ) ;
391+ }
392+ msg. data_len = ret as usize ;
393+
394+ Ok ( ( ) )
395+ }
396+
397+ pub fn batch_sendmsg < S : AsRawFd > ( sock : & S , msgs : & mut [ BatchSendMessage < ' _ > ] ) -> io:: Result < usize > {
398+ if msgs. is_empty ( ) {
399+ return Ok ( 0 ) ;
400+ }
401+
402+ if !SUPPORT_BATCH_SEND_RECV_MSG . load ( Ordering :: Acquire ) {
403+ sendmsg_fallback ( sock, & mut msgs[ 0 ] ) ?;
404+ return Ok ( 1 ) ;
405+ }
406+
407+ let mut vec_msg_name = Vec :: with_capacity ( msgs. len ( ) ) ;
408+ let mut vec_msg_hdr = Vec :: with_capacity ( msgs. len ( ) ) ;
409+
410+ for msg in msgs. iter_mut ( ) {
411+ let mut hdr: msghdr_x = unsafe { mem:: zeroed ( ) } ;
412+
413+ if let Some ( addr) = msg. addr {
414+ vec_msg_name. push ( SockAddr :: from ( addr) ) ;
415+ let sock_addr = vec_msg_name. last_mut ( ) . unwrap ( ) ;
416+ hdr. msg_name = sock_addr. as_ptr ( ) as * mut _ ;
417+ hdr. msg_namelen = sock_addr. len ( ) as _ ;
418+ }
419+
420+ hdr. msg_iov = msg. data . as_ptr ( ) as * mut _ ;
421+ hdr. msg_iovlen = msg. data . len ( ) as _ ;
422+
423+ vec_msg_hdr. push ( hdr) ;
424+ }
425+
426+ let ret = unsafe { sendmsg_x ( sock. as_raw_fd ( ) , vec_msg_hdr. as_ptr ( ) , vec_msg_hdr. len ( ) as _ , 0 ) } ;
427+ if ret < 0 {
428+ let err = io:: Error :: last_os_error ( ) ;
429+ if let Some ( libc:: ENOSYS ) = err. raw_os_error ( ) {
430+ debug ! ( "sendmsg_x is not supported, fallback to sendmsg, error: {:?}" , err) ;
431+ SUPPORT_BATCH_SEND_RECV_MSG . store ( false , Ordering :: Release ) ;
432+
433+ sendmsg_fallback ( sock, & mut msgs[ 0 ] ) ?;
434+ return Ok ( 1 ) ;
435+ }
436+ return Err ( err) ;
437+ }
438+
439+ for idx in 0 ..ret as usize {
440+ let msg = & mut msgs[ idx] ;
441+ let hdr = & vec_msg_hdr[ idx] ;
442+ msg. data_len = hdr. msg_datalen as usize ;
443+ }
444+
445+ Ok ( ret as usize )
446+ }
0 commit comments