Skip to content

Commit 8f5f42d

Browse files
committed
Add async_eventfd feature to optionally optimize notify method on epoll
1 parent 18990c4 commit 8f5f42d

File tree

5 files changed

+185
-51
lines changed

5 files changed

+185
-51
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ async = [
5454
"dep:pin-project-lite",
5555
"dep:crossbeam-channel",
5656
]
57+
# Optimize async notification with eventfd, requires nginx epoll event module
58+
async_eventfd = ["async"]
5759
# Provides APIs that require allocations via the `alloc` crate.
5860
alloc = ["allocator-api2/alloc"]
5961
# Enables serialization support for some of the provided and re-exported types.

examples/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,3 +75,4 @@ default = ["export-modules", "ngx/vendored"]
7575
export-modules = []
7676
linux = []
7777
async = ["ngx/async"]
78+
async_eventfd = ["async", "ngx/async_eventfd"]

examples/async.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,6 @@ use hyper_util::rt::TokioIo;
77
use nginx_sys::{ngx_http_core_loc_conf_t, NGX_LOG_ERR};
88
use ngx::async_::resolver::Resolver;
99
use ngx::async_::{spawn, Task};
10-
use std::cell::RefCell;
11-
use std::ffi::{c_char, c_void};
12-
use std::future::Future;
13-
use std::pin::Pin;
14-
use std::ptr::{addr_of, addr_of_mut, NonNull};
15-
use std::sync::atomic::{AtomicPtr, Ordering};
16-
use std::task::Poll;
17-
use std::time::Instant;
18-
use tokio::net::TcpStream;
19-
2010
use ngx::core::{self, Pool, Status};
2111
use ngx::ffi::{
2212
ngx_array_push, ngx_command_t, ngx_conf_t, ngx_connection_t, ngx_http_handler_pt,
@@ -29,6 +19,15 @@ use ngx::http::{HttpModuleLocationConf, HttpModuleMainConf, NgxHttpCoreModule};
2919
use ngx::{
3020
http_request_handler, ngx_conf_log_error, ngx_log_debug_http, ngx_log_error, ngx_string,
3121
};
22+
use std::cell::RefCell;
23+
use std::ffi::{c_char, c_void};
24+
use std::future::Future;
25+
use std::pin::Pin;
26+
use std::ptr::{addr_of, addr_of_mut, NonNull};
27+
use std::sync::atomic::{AtomicPtr, Ordering};
28+
use std::task::Poll;
29+
use std::time::Instant;
30+
use tokio::net::TcpStream;
3231

3332
struct Module;
3433

@@ -166,7 +165,7 @@ async fn resolve_something(
166165
.expect("resolution");
167166

168167
(
169-
format!("X-Resolve-Time"),
168+
"X-Resolve-Time".to_string(),
170169
start.elapsed().as_millis().to_string(),
171170
)
172171
}
@@ -188,7 +187,7 @@ async fn reqwest_something() -> (String, String) {
188187
async fn hyper_something() -> (String, String) {
189188
let start = Instant::now();
190189
// see https://hyper.rs/guides/1/client/basic/
191-
let url = "http://httpbin.org/ip".parse::<hyper::Uri>().expect("uri");
190+
let url = "https://example.com".parse::<hyper::Uri>().expect("uri");
192191
let host = url.host().expect("uri has no host");
193192
let port = url.port_u16().unwrap_or(80);
194193

nginx-sys/build/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ const NGX_CONF_FEATURES: &[&str] = &[
2020
"compat",
2121
"debug",
2222
"have_epollrdhup",
23+
"have_eventfd",
2324
"have_file_aio",
2425
"have_kqueue",
2526
"have_memalign",

src/async_/spawn.rs

Lines changed: 170 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,173 @@
11
extern crate std;
22

3-
use core::ffi::c_int;
43
use core::sync::atomic::{AtomicI64, Ordering};
5-
use core::{mem, ptr};
6-
use std::sync::OnceLock;
74

85
use core::future::Future;
6+
use std::sync::OnceLock;
97

10-
use alloc::boxed::Box;
118
pub use async_task::Task;
129
use async_task::{Runnable, ScheduleInfo, WithInfo};
1310
use crossbeam_channel::{unbounded, Receiver, Sender};
14-
use nginx_sys::{kill, ngx_event_t, ngx_post_event, ngx_posted_next_events, ngx_thread_tid, SIGIO};
11+
use nginx_sys::{ngx_event_t, ngx_thread_tid};
1512

1613
use crate::log::ngx_cycle_log;
1714
use crate::ngx_log_debug;
1815

16+
#[cfg(not(feature = "async_eventfd"))]
17+
mod sigio {
18+
extern crate std;
19+
20+
use core::mem;
21+
use std::{process::id, sync::OnceLock};
22+
23+
use nginx_sys::{kill, ngx_event_t, ngx_post_event, ngx_posted_next_events, SIGIO};
24+
25+
use super::async_handler;
26+
use crate::log::ngx_cycle_log;
27+
use crate::ngx_log_debug;
28+
29+
struct NotifyContext {
30+
ev: ngx_event_t,
31+
}
32+
static mut CTX: NotifyContext = NotifyContext {
33+
ev: unsafe { mem::zeroed() },
34+
};
35+
36+
static INIT: OnceLock<()> = OnceLock::new();
37+
38+
fn ensure_init() {
39+
let _ = INIT.get_or_init(|| {
40+
#[allow(clippy::deref_addrof)]
41+
let ctx = unsafe { &mut *&raw mut CTX };
42+
43+
ctx.ev.log = ngx_cycle_log().as_ptr();
44+
ctx.ev.handler = Some(async_handler);
45+
});
46+
}
47+
48+
pub(crate) fn notify() {
49+
ensure_init();
50+
51+
unsafe { ngx_post_event(&raw mut CTX.ev, &raw mut ngx_posted_next_events) };
52+
53+
let rc = unsafe { kill(id().try_into().unwrap(), SIGIO.try_into().unwrap()) };
54+
if rc != 0 {
55+
panic!("async: kill rc={rc}");
56+
}
57+
58+
ngx_log_debug!(ngx_cycle_log().as_ptr(), "async: notified (SIGIO)");
59+
}
60+
61+
// called from async_handler
62+
pub(crate) fn confirm_notification() {
63+
// nop
64+
}
65+
}
66+
#[cfg(not(feature = "async_eventfd"))]
67+
use sigio::*;
68+
69+
#[cfg(feature = "async_eventfd")]
70+
mod eventfd {
71+
extern crate std;
72+
73+
use core::ffi::c_void;
74+
use core::mem;
75+
use std::sync::OnceLock;
76+
77+
use nginx_sys::{
78+
eventfd, ngx_connection_t, ngx_event_actions, ngx_event_t, read, write, EFD_CLOEXEC,
79+
EFD_NONBLOCK, EPOLL_EVENTS_EPOLLET, EPOLL_EVENTS_EPOLLIN, EPOLL_EVENTS_EPOLLRDHUP, NGX_OK,
80+
};
81+
82+
use super::async_handler;
83+
use crate::log::ngx_cycle_log;
84+
use crate::ngx_log_debug;
85+
86+
#[cfg(not(ngx_feature = "have_eventfd"))]
87+
compile_error!("feature async_eventfd requires eventfd(), NGX_HAVE_EVENTFD");
88+
89+
struct NotifyContext {
90+
c: ngx_connection_t,
91+
rev: ngx_event_t,
92+
wev: ngx_event_t,
93+
fd: i32,
94+
}
95+
static mut CTX: NotifyContext = NotifyContext {
96+
c: unsafe { mem::zeroed() },
97+
rev: unsafe { mem::zeroed() },
98+
wev: unsafe { mem::zeroed() },
99+
fd: -1,
100+
};
101+
102+
static INIT: OnceLock<()> = OnceLock::new();
103+
104+
extern "C" fn _dummy_write_handler(_ev: *mut ngx_event_t) {}
105+
106+
fn ensure_init() {
107+
let _ = INIT.get_or_init(|| {
108+
let fd = unsafe { eventfd(0, (EFD_NONBLOCK | EFD_CLOEXEC).try_into().unwrap()) };
109+
110+
if fd == -1 {
111+
panic!("async: eventfd = -1");
112+
}
113+
114+
#[allow(clippy::deref_addrof)]
115+
let ctx = unsafe { &mut *&raw mut CTX };
116+
117+
let log = ngx_cycle_log().as_ptr();
118+
119+
ctx.c.log = log;
120+
ctx.c.fd = fd;
121+
ctx.c.read = &raw mut ctx.rev;
122+
ctx.c.write = &raw mut ctx.wev;
123+
124+
ctx.rev.log = log;
125+
ctx.rev.data = (&raw mut ctx.c).cast();
126+
ctx.rev.set_active(1);
127+
ctx.rev.handler = Some(async_handler);
128+
129+
ctx.wev.log = log;
130+
ctx.wev.data = (&raw mut ctx.c).cast();
131+
ctx.wev.handler = Some(_dummy_write_handler); // can't be null
132+
let rc = unsafe {
133+
ngx_event_actions.add.unwrap()(
134+
&raw mut ctx.rev,
135+
(EPOLL_EVENTS_EPOLLIN | EPOLL_EVENTS_EPOLLRDHUP) as isize,
136+
EPOLL_EVENTS_EPOLLET as usize,
137+
)
138+
};
139+
if rc != NGX_OK as isize {
140+
panic!("async: ngx_add_event rc={rc}");
141+
}
142+
143+
ctx.fd = fd;
144+
});
145+
}
146+
147+
pub(crate) fn notify() {
148+
ensure_init();
149+
150+
let val: u64 = 1;
151+
let ptr = &val as *const u64 as *const c_void;
152+
let res = unsafe { write(CTX.fd, ptr, core::mem::size_of::<u64>()) };
153+
if res != core::mem::size_of::<u64>() as isize {
154+
panic!("eventfd write failed: {res}");
155+
}
156+
157+
ngx_log_debug!(ngx_cycle_log().as_ptr(), "async: notified (eventfd)");
158+
}
159+
160+
// called from async_handler
161+
pub(crate) fn confirm_notification() {
162+
let mut buf: u64 = 0;
163+
let ptr = &mut buf as *mut u64 as *mut c_void;
164+
let _ = unsafe { read(CTX.fd, ptr, core::mem::size_of::<u64>()) };
165+
}
166+
}
167+
168+
#[cfg(feature = "async_eventfd")]
169+
use eventfd::*;
170+
19171
static MAIN_TID: AtomicI64 = AtomicI64::new(-1);
20172

21173
#[inline]
@@ -25,34 +177,24 @@ fn on_event_thread() -> bool {
25177
main_tid == tid
26178
}
27179

28-
extern "C" fn async_handler(ev: *mut ngx_event_t) {
180+
extern "C" fn async_handler(_ev: *mut ngx_event_t) {
29181
// initialize MAIN_TID on first execution
30182
let tid = unsafe { ngx_thread_tid().into() };
31183
let _ = MAIN_TID.compare_exchange(-1, tid, Ordering::Relaxed, Ordering::Relaxed);
184+
185+
confirm_notification();
186+
32187
let scheduler = scheduler();
188+
189+
if scheduler.rx.is_empty() {
190+
return;
191+
}
33192
let mut cnt = 0;
34193
while let Ok(r) = scheduler.rx.try_recv() {
35194
r.run();
36195
cnt += 1;
37196
}
38-
ngx_log_debug!(
39-
unsafe { (*ev).log },
40-
"async: processed {cnt} items"
41-
);
42-
43-
unsafe {
44-
drop(Box::from_raw(ev));
45-
}
46-
}
47-
48-
fn notify() -> c_int {
49-
ngx_log_debug!(ngx_cycle_log().as_ptr(), "async: notify via SIGIO");
50-
unsafe {
51-
kill(
52-
std::process::id().try_into().unwrap(),
53-
SIGIO.try_into().unwrap(),
54-
)
55-
}
197+
ngx_log_debug!(ngx_cycle_log().as_ptr(), "async: processed {cnt} items");
56198
}
57199

58200
struct Scheduler {
@@ -69,28 +211,17 @@ impl Scheduler {
69211
fn schedule(&self, runnable: Runnable, info: ScheduleInfo) {
70212
let oet = on_event_thread();
71213
// If we are on the event loop thread it's safe to simply run the Runnable, otherwise we
72-
// enqueue the Runnable, post our event, and SIGIO to interrupt epoll. The event handler
73-
// then runs the Runnable on the event loop thread.
214+
// enqueue the Runnable, post our event, and notify. The event handler then runs the
215+
// Runnable on the event loop thread.
74216
//
75217
// If woken_while_running, it indicates that a task has yielded itself to the Scheduler.
76-
// Force round-trip via queue to limit reentrancy (skipping SIGIO).
218+
// Force round-trip via queue to limit reentrancy.
77219
if oet && !info.woken_while_running {
78220
runnable.run();
79221
} else {
80222
self.tx.send(runnable).expect("send");
81-
unsafe {
82-
let event: *mut ngx_event_t = Box::into_raw(Box::new(mem::zeroed()));
83-
(*event).handler = Some(async_handler);
84-
(*event).log = ngx_cycle_log().as_ptr();
85-
ngx_post_event(event, ptr::addr_of_mut!(ngx_posted_next_events));
86-
}
87223

88-
if !oet {
89-
let rc = notify();
90-
if rc != 0 {
91-
panic!("kill: {rc}")
92-
}
93-
}
224+
notify();
94225
}
95226
}
96227
}

0 commit comments

Comments
 (0)