Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(examples): assorted fixes in the async example #133

Merged
merged 4 commits into from
Feb 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/nginx.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ jobs:
working-directory: nginx
env:
TEST_NGINX_GLOBALS: >-
load_module ${{ github.workspace }}/nginx/objs/ngx_http_async_module.so;
load_module ${{ github.workspace }}/nginx/objs/ngx_http_awssigv4_module.so;
load_module ${{ github.workspace }}/nginx/objs/ngx_http_curl_module.so;
load_module ${{ github.workspace }}/nginx/objs/ngx_http_upstream_custom_module.so;
Expand Down
161 changes: 81 additions & 80 deletions examples/async.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use std::ffi::{c_char, c_void};
use std::ptr::{addr_of, addr_of_mut};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::slice;
use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
use std::sync::{Arc, OnceLock};
use std::time::Instant;

use ngx::core;
use ngx::ffi::{
ngx_array_push, ngx_command_t, ngx_conf_t, ngx_cycle, ngx_event_t, ngx_http_core_module, ngx_http_core_run_phases,
ngx_http_handler_pt, ngx_http_module_t, ngx_http_phases_NGX_HTTP_ACCESS_PHASE, ngx_http_request_t, ngx_int_t,
ngx_module_t, ngx_post_event, ngx_posted_events, ngx_str_t, ngx_uint_t, NGX_CONF_TAKE1, NGX_HTTP_LOC_CONF,
NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_MODULE,
ngx_array_push, ngx_command_t, ngx_conf_t, ngx_connection_t, ngx_event_t, ngx_http_core_module,
ngx_http_handler_pt, ngx_http_module_t, ngx_http_phases_NGX_HTTP_ACCESS_PHASE, ngx_int_t, ngx_module_t,
ngx_post_event, ngx_posted_events, ngx_posted_next_events, ngx_str_t, ngx_uint_t, NGX_CONF_TAKE1,
NGX_HTTP_LOC_CONF, NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_MODULE,
};
use ngx::http::{self, HTTPModule, MergeConfigError};
use ngx::{http_request_handler, ngx_log_debug_http, ngx_string};
Expand All @@ -36,22 +37,9 @@ impl http::HTTPModule for Module {
}
}

#[derive(Debug)]
#[derive(Debug, Default)]
struct ModuleConfig {
enable: bool,
rt: Runtime,
}

impl Default for ModuleConfig {
fn default() -> Self {
Self {
enable: Default::default(),
rt: tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap(),
}
}
}

static mut NGX_HTTP_ASYNC_COMMANDS: [ngx_command_t; 2] = [
Expand Down Expand Up @@ -102,98 +90,99 @@ impl http::Merge for ModuleConfig {
}

unsafe extern "C" fn check_async_work_done(event: *mut ngx_event_t) {
let event = &mut (*event);
let data = Arc::from_raw(event.data as *const EventData);
let req = &mut (*(data.request as *const _ as *mut ngx_http_request_t));
if data.done_flag.load(std::sync::atomic::Ordering::Relaxed) {
(*req.main).set_count((*req.main).count() - 1);
ngx_http_core_run_phases(req);
let ctx = ngx::ngx_container_of!(event, RequestCTX, event);
let c: *mut ngx_connection_t = (*event).data.cast();

if (*ctx).done.load(Ordering::Relaxed) {
// Triggering async_access_handler again
ngx_post_event((*c).write, addr_of_mut!(ngx_posted_events));
} else {
// this doesn't have have good performance but works as a simple thread-safe example and doesn't causes
// segfault. The best method that provides both thread-safety and performance requires
// an nginx patch.
ngx_post_event(event, addr_of_mut!(ngx_posted_events));
ngx_post_event(event, addr_of_mut!(ngx_posted_next_events));
}
}

struct RequestCTX {
event_data: Option<Arc<EventData>>,
done: Arc<AtomicBool>,
event: ngx_event_t,
task: Option<tokio::task::JoinHandle<()>>,
}

struct EventData {
done_flag: AtomicBool,
request: *mut ngx_http_request_t,
impl Default for RequestCTX {
fn default() -> Self {
Self {
done: AtomicBool::new(false).into(),
event: unsafe { std::mem::zeroed() },
task: Default::default(),
}
}
}

unsafe impl Send for EventData {}
unsafe impl Sync for EventData {}
impl Drop for RequestCTX {
fn drop(&mut self) {
if let Some(handle) = self.task.take() {
handle.abort();
}

if self.event.posted() != 0 {
unsafe { ngx::ffi::ngx_delete_posted_event(&mut self.event) };
}
}
}

http_request_handler!(async_access_handler, |request: &mut http::Request| {
let co = unsafe { request.get_module_loc_conf::<ModuleConfig>(&*addr_of!(ngx_http_async_module)) };
let co = co.expect("module config is none");

ngx_log_debug_http!(request, "async module enabled: {}", co.enable);

if !co.enable {
return core::Status::NGX_DECLINED;
}

let event_data = unsafe {
let ctx = request.get_inner().ctx.add(ngx_http_async_module.ctx_index);
if (*ctx).is_null() {
let ctx_data = &mut *(request.pool().alloc(std::mem::size_of::<RequestCTX>()) as *mut RequestCTX);
ctx_data.event_data = Some(Arc::new(EventData {
done_flag: AtomicBool::new(false),
request: &request.get_inner() as *const _ as *mut _,
}));
*ctx = ctx_data as *const _ as _;
ctx_data.event_data.as_ref().unwrap().clone()
} else {
let ctx = &*(ctx as *const RequestCTX);
if ctx
.event_data
.as_ref()
.unwrap()
.done_flag
.load(std::sync::atomic::Ordering::Relaxed)
{
return core::Status::NGX_OK;
} else {
return core::Status::NGX_DONE;
}
if let Some(ctx) = unsafe { request.get_module_ctx::<RequestCTX>(&*addr_of!(ngx_http_async_module)) } {
if !ctx.done.load(Ordering::Relaxed) {
return core::Status::NGX_AGAIN;
}
};

event_data.done_flag.load(std::sync::atomic::Ordering::Relaxed);

// create a posted event
unsafe {
let event = &mut *(request.pool().calloc(std::mem::size_of::<ngx_event_t>()) as *mut ngx_event_t);
event.handler = Some(check_async_work_done);
event.data = Arc::into_raw(event_data.clone()) as _;
event.log = (*ngx_cycle).log;
return core::Status::NGX_OK;
}

ngx_post_event(event, addr_of_mut!(ngx_posted_events));
let ctx = request.pool().allocate(RequestCTX::default());
if ctx.is_null() {
return core::Status::NGX_ERROR;
}
request.set_module_ctx(ctx.cast(), unsafe { &*addr_of!(ngx_http_async_module) });

ngx_log_debug_http!(request, "async module enabled: {}", co.enable);
let ctx = unsafe { &mut *ctx };
ctx.event.handler = Some(check_async_work_done);
ctx.event.data = request.connection().cast();
ctx.event.log = unsafe { (*request.connection()).log };
unsafe { ngx_post_event(&mut ctx.event, addr_of_mut!(ngx_posted_next_events)) };

// Request is no longer needed and can be converted to something movable to the async block
let req = AtomicPtr::new(request.into());
let done_flag = ctx.done.clone();

co.rt.spawn(async move {
let rt = ngx_http_async_runtime();
ctx.task = Some(rt.spawn(async move {
let start = Instant::now();
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let req = unsafe { http::Request::from_ngx_http_request(event_data.request) };
let req = unsafe { http::Request::from_ngx_http_request(req.load(Ordering::Relaxed)) };
// not really thread safe, we should apply all these operation in nginx thread
// but this is just an example. proper way would be storing these headers in the request ctx
// and apply them when we get back to the nginx thread.
req.add_header_out("X-Async-Time", start.elapsed().as_millis().to_string().as_str());

event_data.done_flag.store(true, std::sync::atomic::Ordering::Release);
done_flag.store(true, Ordering::Release);
// there is a small issue here. If traffic is low we may get stuck behind a 300ms timer
// in the nginx event loop. To workaround it we can notify the event loop using pthread_kill( nginx_thread, SIGIO )
// to wake up the event loop. (or patch nginx and use the same trick as the thread pool)
});
}));

unsafe {
(*request.get_inner().main).set_count((*request.get_inner().main).count() + 1);
}
core::Status::NGX_DONE
core::Status::NGX_AGAIN
});

extern "C" fn ngx_http_async_commands_set_enable(
Expand All @@ -203,19 +192,31 @@ extern "C" fn ngx_http_async_commands_set_enable(
) -> *mut c_char {
unsafe {
let conf = &mut *(conf as *mut ModuleConfig);
let args = (*(*cf).args).elts as *mut ngx_str_t;

let val = (*args.add(1)).to_str();
let args = slice::from_raw_parts((*(*cf).args).elts as *mut ngx_str_t, (*(*cf).args).nelts);
let val = args[1].to_str();

// set default value optionally
conf.enable = false;

if val.len() == 2 && val.eq_ignore_ascii_case("on") {
if val.eq_ignore_ascii_case("on") {
conf.enable = true;
} else if val.len() == 3 && val.eq_ignore_ascii_case("off") {
} else if val.eq_ignore_ascii_case("off") {
conf.enable = false;
}
};

std::ptr::null_mut()
}

fn ngx_http_async_runtime() -> &'static Runtime {
// Should not be called from the master process
assert_ne!(unsafe { ngx::ffi::ngx_process }, ngx::ffi::NGX_PROCESS_MASTER as _);

static RUNTIME: OnceLock<Runtime> = OnceLock::new();
RUNTIME.get_or_init(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("tokio runtime init")
})
}
2 changes: 1 addition & 1 deletion examples/config
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ if [ $HTTP = YES ]; then
ngx_rust_target_type=EXAMPLE
ngx_rust_target_features=

if [ "$ngx_module_link" = DYNAMIC ]; then
if :; then
ngx_module_name=ngx_http_async_module
ngx_module_libs="-lm"
ngx_rust_target_name=async
Expand Down
56 changes: 56 additions & 0 deletions examples/t/async.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/usr/bin/perl

# (C) Nginx, Inc

# Tests for ngx-rust example modules.

###############################################################################

use warnings;
use strict;

use Test::More;

BEGIN { use FindBin; chdir($FindBin::Bin); }

use lib 'lib';
use Test::Nginx;

###############################################################################

select STDERR; $| = 1;
select STDOUT; $| = 1;

my $t = Test::Nginx->new()->has(qw/http/)->plan(1)
->write_file_expand('nginx.conf', <<'EOF');

%%TEST_GLOBALS%%

daemon off;

events {
}

http {
%%TEST_GLOBALS_HTTP%%

server {
listen 127.0.0.1:8080;
server_name localhost;

location / {
async on;
}
}
}

EOF

$t->write_file('index.html', '');
$t->run();

###############################################################################

like(http_get('/index.html'), qr/X-Async-Time:/, 'async handler');

###############################################################################
13 changes: 13 additions & 0 deletions src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,16 @@ pub use buffer::*;
pub use pool::*;
pub use status::*;
pub use string::*;

/// Gets an outer object pointer from a pointer to one of its fields.
/// While there is no corresponding C macro, the pattern is common in the NGINX source.
///
/// # Safety
///
/// `$ptr` must be a valid pointer to the field `$field` of `$type`.
#[macro_export]
macro_rules! ngx_container_of {
($ptr:expr, $type:path, $field:ident) => {
$ptr.byte_sub(::core::mem::offset_of!($type, $field)).cast::<$type>()
};
}
17 changes: 12 additions & 5 deletions src/http/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,18 @@ impl<'a> From<&'a mut Request> for *mut ngx_http_request_t {
}
}

impl AsRef<ngx_http_request_t> for Request {
fn as_ref(&self) -> &ngx_http_request_t {
&self.0
}
}

impl AsMut<ngx_http_request_t> for Request {
fn as_mut(&mut self) -> &mut ngx_http_request_t {
&mut self.0
}
}

impl Request {
/// Create a [`Request`] from an [`ngx_http_request_t`].
///
Expand Down Expand Up @@ -402,11 +414,6 @@ impl Request {
pub fn headers_out_iterator(&self) -> NgxListIterator {
unsafe { list_iterator(&self.0.headers_out.headers) }
}

/// Returns the inner data structure that the Request object is wrapping.
pub fn get_inner(&self) -> &ngx_http_request_t {
&self.0
}
}

// trait OnSubRequestDone {
Expand Down
Loading