Skip to content

Commit

Permalink
fix(examples): assorted fixes in the async example
Browse files Browse the repository at this point in the history
* Use single tokio runtime instance. Move runtime initialization to the
  worker process.
* Use `ngx_posted_next_events` to avoid posting events to the tail of
  currently executed queue.
* Move `event` and `done` to the request context, remove `EventData`.
* Ensure the async task cancellation when the request is aborted.
  • Loading branch information
bavshin-f5 committed Feb 15, 2025
1 parent aed46ba commit 7bfeabf
Showing 1 changed file with 81 additions and 80 deletions.
161 changes: 81 additions & 80 deletions examples/async.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use std::ffi::{c_char, c_void};
use std::ptr::{addr_of, addr_of_mut};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::slice;
use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
use std::sync::{Arc, OnceLock};
use std::time::Instant;

use ngx::core;
use ngx::ffi::{
ngx_array_push, ngx_command_t, ngx_conf_t, ngx_cycle, ngx_event_t, ngx_http_core_module, ngx_http_core_run_phases,
ngx_http_handler_pt, ngx_http_module_t, ngx_http_phases_NGX_HTTP_ACCESS_PHASE, ngx_http_request_t, ngx_int_t,
ngx_module_t, ngx_post_event, ngx_posted_events, ngx_str_t, ngx_uint_t, NGX_CONF_TAKE1, NGX_HTTP_LOC_CONF,
NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_MODULE,
ngx_array_push, ngx_command_t, ngx_conf_t, ngx_connection_t, ngx_event_t, ngx_http_core_module,
ngx_http_handler_pt, ngx_http_module_t, ngx_http_phases_NGX_HTTP_ACCESS_PHASE, ngx_int_t, ngx_module_t,
ngx_post_event, ngx_posted_events, ngx_posted_next_events, ngx_str_t, ngx_uint_t, NGX_CONF_TAKE1,
NGX_HTTP_LOC_CONF, NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_MODULE,
};
use ngx::http::{self, HTTPModule, MergeConfigError};
use ngx::{http_request_handler, ngx_log_debug_http, ngx_string};
Expand All @@ -36,22 +37,9 @@ impl http::HTTPModule for Module {
}
}

#[derive(Debug)]
#[derive(Debug, Default)]
struct ModuleConfig {
enable: bool,
rt: Runtime,
}

impl Default for ModuleConfig {
fn default() -> Self {
Self {
enable: Default::default(),
rt: tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap(),
}
}
}

static mut NGX_HTTP_ASYNC_COMMANDS: [ngx_command_t; 2] = [
Expand Down Expand Up @@ -102,98 +90,99 @@ impl http::Merge for ModuleConfig {
}

unsafe extern "C" fn check_async_work_done(event: *mut ngx_event_t) {
let event = &mut (*event);
let data = Arc::from_raw(event.data as *const EventData);
let req = &mut (*(data.request as *const _ as *mut ngx_http_request_t));
if data.done_flag.load(std::sync::atomic::Ordering::Relaxed) {
(*req.main).set_count((*req.main).count() - 1);
ngx_http_core_run_phases(req);
let ctx = ngx::ngx_container_of!(event, RequestCTX, event);
let c: *mut ngx_connection_t = (*event).data.cast();

if (*ctx).done.load(Ordering::Relaxed) {
// Triggering async_access_handler again
ngx_post_event((*c).write, addr_of_mut!(ngx_posted_events));
} else {
// this doesn't have have good performance but works as a simple thread-safe example and doesn't causes
// segfault. The best method that provides both thread-safety and performance requires
// an nginx patch.
ngx_post_event(event, addr_of_mut!(ngx_posted_events));
ngx_post_event(event, addr_of_mut!(ngx_posted_next_events));
}
}

struct RequestCTX {
event_data: Option<Arc<EventData>>,
done: Arc<AtomicBool>,
event: ngx_event_t,
task: Option<tokio::task::JoinHandle<()>>,
}

struct EventData {
done_flag: AtomicBool,
request: *mut ngx_http_request_t,
impl Default for RequestCTX {
fn default() -> Self {
Self {
done: AtomicBool::new(false).into(),
event: unsafe { std::mem::zeroed() },
task: Default::default(),
}
}
}

unsafe impl Send for EventData {}
unsafe impl Sync for EventData {}
impl Drop for RequestCTX {
fn drop(&mut self) {
if let Some(handle) = self.task.take() {
handle.abort();
}

if self.event.posted() != 0 {
unsafe { ngx::ffi::ngx_delete_posted_event(&mut self.event) };
}
}
}

http_request_handler!(async_access_handler, |request: &mut http::Request| {
let co = unsafe { request.get_module_loc_conf::<ModuleConfig>(&*addr_of!(ngx_http_async_module)) };
let co = co.expect("module config is none");

ngx_log_debug_http!(request, "async module enabled: {}", co.enable);

if !co.enable {
return core::Status::NGX_DECLINED;
}

let event_data = unsafe {
let ctx = request.as_ref().ctx.add(ngx_http_async_module.ctx_index);
if (*ctx).is_null() {
let ctx_data = &mut *(request.pool().alloc(std::mem::size_of::<RequestCTX>()) as *mut RequestCTX);
ctx_data.event_data = Some(Arc::new(EventData {
done_flag: AtomicBool::new(false),
request: &request.as_ref() as *const _ as *mut _,
}));
*ctx = ctx_data as *const _ as _;
ctx_data.event_data.as_ref().unwrap().clone()
} else {
let ctx = &*(ctx as *const RequestCTX);
if ctx
.event_data
.as_ref()
.unwrap()
.done_flag
.load(std::sync::atomic::Ordering::Relaxed)
{
return core::Status::NGX_OK;
} else {
return core::Status::NGX_DONE;
}
if let Some(ctx) = unsafe { request.get_module_ctx::<RequestCTX>(&*addr_of!(ngx_http_async_module)) } {
if !ctx.done.load(Ordering::Relaxed) {
return core::Status::NGX_AGAIN;
}
};

event_data.done_flag.load(std::sync::atomic::Ordering::Relaxed);

// create a posted event
unsafe {
let event = &mut *(request.pool().calloc(std::mem::size_of::<ngx_event_t>()) as *mut ngx_event_t);
event.handler = Some(check_async_work_done);
event.data = Arc::into_raw(event_data.clone()) as _;
event.log = (*ngx_cycle).log;
return core::Status::NGX_OK;
}

ngx_post_event(event, addr_of_mut!(ngx_posted_events));
let ctx = request.pool().allocate(RequestCTX::default());
if ctx.is_null() {
return core::Status::NGX_ERROR;
}
request.set_module_ctx(ctx.cast(), unsafe { &*addr_of!(ngx_http_async_module) });

ngx_log_debug_http!(request, "async module enabled: {}", co.enable);
let ctx = unsafe { &mut *ctx };
ctx.event.handler = Some(check_async_work_done);
ctx.event.data = request.connection().cast();
ctx.event.log = unsafe { (*request.connection()).log };
unsafe { ngx_post_event(&mut ctx.event, addr_of_mut!(ngx_posted_next_events)) };

// Request is no longer needed and can be converted to something movable to the async block
let req = AtomicPtr::new(request.into());
let done_flag = ctx.done.clone();

co.rt.spawn(async move {
let rt = ngx_http_async_runtime();
ctx.task = Some(rt.spawn(async move {
let start = Instant::now();
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let req = unsafe { http::Request::from_ngx_http_request(event_data.request) };
let req = unsafe { http::Request::from_ngx_http_request(req.load(Ordering::Relaxed)) };
// not really thread safe, we should apply all these operation in nginx thread
// but this is just an example. proper way would be storing these headers in the request ctx
// and apply them when we get back to the nginx thread.
req.add_header_out("X-Async-Time", start.elapsed().as_millis().to_string().as_str());

event_data.done_flag.store(true, std::sync::atomic::Ordering::Release);
done_flag.store(true, Ordering::Release);
// there is a small issue here. If traffic is low we may get stuck behind a 300ms timer
// in the nginx event loop. To workaround it we can notify the event loop using pthread_kill( nginx_thread, SIGIO )
// to wake up the event loop. (or patch nginx and use the same trick as the thread pool)
});
}));

unsafe {
(*request.as_ref().main).set_count((*request.as_ref().main).count() + 1);
}
core::Status::NGX_DONE
core::Status::NGX_AGAIN
});

extern "C" fn ngx_http_async_commands_set_enable(
Expand All @@ -203,19 +192,31 @@ extern "C" fn ngx_http_async_commands_set_enable(
) -> *mut c_char {
unsafe {
let conf = &mut *(conf as *mut ModuleConfig);
let args = (*(*cf).args).elts as *mut ngx_str_t;

let val = (*args.add(1)).to_str();
let args = slice::from_raw_parts((*(*cf).args).elts as *mut ngx_str_t, (*(*cf).args).nelts);
let val = args[1].to_str();

// set default value optionally
conf.enable = false;

if val.len() == 2 && val.eq_ignore_ascii_case("on") {
if val.eq_ignore_ascii_case("on") {
conf.enable = true;
} else if val.len() == 3 && val.eq_ignore_ascii_case("off") {
} else if val.eq_ignore_ascii_case("off") {
conf.enable = false;
}
};

std::ptr::null_mut()
}

fn ngx_http_async_runtime() -> &'static Runtime {
// Should not be called from the master process
assert_ne!(unsafe { ngx::ffi::ngx_process }, ngx::ffi::NGX_PROCESS_MASTER as _);

static RUNTIME: OnceLock<Runtime> = OnceLock::new();
RUNTIME.get_or_init(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("tokio runtime init")
})
}

0 comments on commit 7bfeabf

Please sign in to comment.