Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 64 additions & 68 deletions examples/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@ use std::time::Instant;

use ngx::core;
use ngx::ffi::{
ngx_array_push, ngx_command_t, ngx_conf_t, ngx_connection_t, ngx_event_t, ngx_http_handler_pt,
ngx_http_module_t, ngx_http_phases_NGX_HTTP_ACCESS_PHASE, ngx_int_t, ngx_module_t,
ngx_post_event, ngx_posted_events, ngx_posted_next_events, ngx_str_t, ngx_uint_t,
ngx_command_t, ngx_conf_t, ngx_connection_t, ngx_event_t, ngx_http_module_t, ngx_int_t,
ngx_module_t, ngx_post_event, ngx_posted_events, ngx_posted_next_events, ngx_str_t, ngx_uint_t,
NGX_CONF_TAKE1, NGX_HTTP_LOC_CONF, NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_MODULE, NGX_LOG_EMERG,
};
use ngx::http::{self, HttpModule, MergeConfigError};
use ngx::http::{HttpModuleLocationConf, HttpModuleMainConf, NgxHttpCoreModule};
use ngx::{http_request_handler, ngx_conf_log_error, ngx_log_debug_http, ngx_string};
use ngx::http::{self, HttpModule, HttpModuleLocationConf, HttpRequestHandler, MergeConfigError};
use ngx::{ngx_conf_log_error, ngx_log_debug_http, ngx_string};
use tokio::runtime::Runtime;

struct Module;
Expand All @@ -25,18 +23,10 @@ impl http::HttpModule for Module {

unsafe extern "C" fn postconfiguration(cf: *mut ngx_conf_t) -> ngx_int_t {
// SAFETY: this function is called with non-NULL cf always
let cf = &mut *cf;
let cmcf = NgxHttpCoreModule::main_conf_mut(cf).expect("http core main conf");

let h = ngx_array_push(
&mut cmcf.phases[ngx_http_phases_NGX_HTTP_ACCESS_PHASE as usize].handlers,
) as *mut ngx_http_handler_pt;
if h.is_null() {
return core::Status::NGX_ERROR.into();
}
// set an Access phase handler
*h = Some(async_access_handler);
core::Status::NGX_OK.into()
let cf = unsafe { &mut *cf };
http::add_phase_handler::<AsyncAccessHandler, _>(cf)
.map_or(core::Status::NGX_ERROR, |_| core::Status::NGX_OK)
.into()
}
}

Expand Down Expand Up @@ -139,63 +129,69 @@ impl Drop for RequestCTX {
}
}

http_request_handler!(async_access_handler, |request: &mut http::Request| {
let co = Module::location_conf(request).expect("module config is none");
struct AsyncAccessHandler;

ngx_log_debug_http!(request, "async module enabled: {}", co.enable);
impl HttpRequestHandler<Option<ngx_int_t>> for AsyncAccessHandler {
const PHASE: ngx::http::Phases = ngx::http::Phases::Access;

if !co.enable {
return core::Status::NGX_DECLINED;
}
fn handler(request: &mut http::Request) -> Option<ngx_int_t> {
let co = Module::location_conf(request).expect("module config is none");

ngx_log_debug_http!(request, "async module enabled: {}", co.enable);

if let Some(ctx) =
unsafe { request.get_module_ctx::<RequestCTX>(&*addr_of!(ngx_http_async_module)) }
{
if !ctx.done.load(Ordering::Relaxed) {
return core::Status::NGX_AGAIN;
if !co.enable {
return Some(core::Status::NGX_DECLINED.into());
}

return core::Status::NGX_OK;
}
if let Some(ctx) =
unsafe { request.get_module_ctx::<RequestCTX>(&*addr_of!(ngx_http_async_module)) }
{
if !ctx.done.load(Ordering::Relaxed) {
return Some(core::Status::NGX_AGAIN.into());
}

let ctx = request.pool().allocate(RequestCTX::default());
if ctx.is_null() {
return core::Status::NGX_ERROR;
return Some(core::Status::NGX_OK.into());
}

let ctx = request.pool().allocate(RequestCTX::default());
if ctx.is_null() {
return Some(core::Status::NGX_ERROR.into());
}
request.set_module_ctx(ctx.cast(), unsafe { &*addr_of!(ngx_http_async_module) });

let ctx = unsafe { &mut *ctx };
ctx.event.handler = Some(check_async_work_done);
ctx.event.data = request.connection().cast();
ctx.event.log = unsafe { (*request.connection()).log };
unsafe { ngx_post_event(&mut ctx.event, addr_of_mut!(ngx_posted_next_events)) };

// Request is no longer needed and can be converted to something movable to the async block
let req = AtomicPtr::new(request.into());
let done_flag = ctx.done.clone();

let rt = ngx_http_async_runtime();
ctx.task = Some(rt.spawn(async move {
let start = Instant::now();
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let req = unsafe { http::Request::from_ngx_http_request(req.load(Ordering::Relaxed)) };
// not really thread safe, we should apply all these operation in nginx thread
// but this is just an example. proper way would be storing these headers in the request ctx
// and apply them when we get back to the nginx thread.
req.add_header_out(
"X-Async-Time",
start.elapsed().as_millis().to_string().as_str(),
);

done_flag.store(true, Ordering::Release);
// there is a small issue here. If traffic is low we may get stuck behind a 300ms timer
// in the nginx event loop. To workaround it we can notify the event loop using
// pthread_kill( nginx_thread, SIGIO ) to wake up the event loop. (or patch nginx
// and use the same trick as the thread pool)
}));

Some(core::Status::NGX_AGAIN.into())
}
request.set_module_ctx(ctx.cast(), unsafe { &*addr_of!(ngx_http_async_module) });

let ctx = unsafe { &mut *ctx };
ctx.event.handler = Some(check_async_work_done);
ctx.event.data = request.connection().cast();
ctx.event.log = unsafe { (*request.connection()).log };
unsafe { ngx_post_event(&mut ctx.event, addr_of_mut!(ngx_posted_next_events)) };

// Request is no longer needed and can be converted to something movable to the async block
let req = AtomicPtr::new(request.into());
let done_flag = ctx.done.clone();

let rt = ngx_http_async_runtime();
ctx.task = Some(rt.spawn(async move {
let start = Instant::now();
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let req = unsafe { http::Request::from_ngx_http_request(req.load(Ordering::Relaxed)) };
// not really thread safe, we should apply all these operation in nginx thread
// but this is just an example. proper way would be storing these headers in the request ctx
// and apply them when we get back to the nginx thread.
req.add_header_out(
"X-Async-Time",
start.elapsed().as_millis().to_string().as_str(),
);

done_flag.store(true, Ordering::Release);
// there is a small issue here. If traffic is low we may get stuck behind a 300ms timer
// in the nginx event loop. To workaround it we can notify the event loop using
// pthread_kill( nginx_thread, SIGIO ) to wake up the event loop. (or patch nginx
// and use the same trick as the thread pool)
}));

core::Status::NGX_AGAIN
});
}

extern "C" fn ngx_http_async_commands_set_enable(
cf: *mut ngx_conf_t,
Expand Down
165 changes: 81 additions & 84 deletions examples/awssig.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ use std::ffi::{c_char, c_void};
use http::HeaderMap;
use ngx::core;
use ngx::ffi::{
ngx_array_push, ngx_command_t, ngx_conf_t, ngx_http_handler_pt, ngx_http_module_t,
ngx_http_phases_NGX_HTTP_PRECONTENT_PHASE, ngx_int_t, ngx_module_t, ngx_str_t, ngx_uint_t,
ngx_command_t, ngx_conf_t, ngx_http_module_t, ngx_int_t, ngx_module_t, ngx_str_t, ngx_uint_t,
NGX_CONF_TAKE1, NGX_HTTP_LOC_CONF, NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_MODULE,
NGX_HTTP_SRV_CONF, NGX_LOG_EMERG,
};
use ngx::http::*;
use ngx::{http_request_handler, ngx_conf_log_error, ngx_log_debug_http, ngx_string};
use ngx::{ngx_conf_log_error, ngx_log_debug_http, ngx_string};

struct Module;

Expand All @@ -20,18 +19,10 @@ impl HttpModule for Module {

unsafe extern "C" fn postconfiguration(cf: *mut ngx_conf_t) -> ngx_int_t {
// SAFETY: this function is called with non-NULL cf always
let cf = &mut *cf;
let cmcf = NgxHttpCoreModule::main_conf_mut(cf).expect("http core main conf");

let h = ngx_array_push(
&mut cmcf.phases[ngx_http_phases_NGX_HTTP_PRECONTENT_PHASE as usize].handlers,
) as *mut ngx_http_handler_pt;
if h.is_null() {
return core::Status::NGX_ERROR.into();
}
// set an phase handler
*h = Some(awssigv4_header_handler);
core::Status::NGX_OK.into()
let cf = unsafe { &mut *cf };
ngx::http::add_phase_handler::<AwsSigV4HeaderHandler, _>(cf)
.map_or(core::Status::NGX_ERROR, |_| core::Status::NGX_OK)
.into()
}
}

Expand Down Expand Up @@ -261,82 +252,88 @@ extern "C" fn ngx_http_awssigv4_commands_set_s3_endpoint(
ngx::core::NGX_CONF_OK
}

http_request_handler!(awssigv4_header_handler, |request: &mut Request| {
// get Module Config from request
let conf = Module::location_conf(request).expect("module conf");
ngx_log_debug_http!(request, "AWS signature V4 module {}", {
if conf.enable {
"enabled"
} else {
"disabled"
}
});
if !conf.enable {
return core::Status::NGX_DECLINED;
}
struct AwsSigV4HeaderHandler;

// TODO: build url properly from the original URL from client
let method = request.method();
if !matches!(method, ngx::http::Method::HEAD | ngx::http::Method::GET) {
return HTTPStatus::FORBIDDEN.into();
}
impl HttpRequestHandler<Option<ngx_int_t>> for AwsSigV4HeaderHandler {
const PHASE: Phases = Phases::PreContent;

let datetime = chrono::Utc::now();
let uri = match request.unparsed_uri().to_str() {
Ok(v) => format!("https://{}.{}{}", conf.s3_bucket, conf.s3_endpoint, v),
Err(_) => return core::Status::NGX_DECLINED,
};
fn handler(request: &mut Request) -> Option<ngx_int_t> {
// get Module Config from request
let conf = Module::location_conf(request).expect("module conf");
ngx_log_debug_http!(request, "AWS signature V4 module {}", {
if conf.enable {
"enabled"
} else {
"disabled"
}
});
if !conf.enable {
return Some(core::Status::NGX_DECLINED.into());
}

// TODO: build url properly from the original URL from client
let method = request.method();
if !matches!(method, ngx::http::Method::HEAD | ngx::http::Method::GET) {
return Some(HTTPStatus::FORBIDDEN.into());
}

let datetime_now = datetime.format("%Y%m%dT%H%M%SZ");
let datetime_now = datetime_now.to_string();
let datetime = chrono::Utc::now();
let uri = match request.unparsed_uri().to_str() {
Ok(v) => format!("https://{}.{}{}", conf.s3_bucket, conf.s3_endpoint, v),
Err(_) => return Some(core::Status::NGX_DECLINED.into()),
};

let signature = {
// NOTE: aws_sign_v4::AwsSign::new() implementation requires a HeaderMap.
// Iterate over requests headers_in and copy into HeaderMap
// Copy only headers that will be used to sign the request
let mut headers = HeaderMap::new();
for (name, value) in request.headers_in_iterator() {
if let Ok(name) = name.to_str() {
if name.to_lowercase() == "host" {
if let Ok(value) = http::HeaderValue::from_bytes(value.as_bytes()) {
headers.insert(http::header::HOST, value);
} else {
return core::Status::NGX_DECLINED;
let datetime_now = datetime.format("%Y%m%dT%H%M%SZ");
let datetime_now = datetime_now.to_string();

let signature = {
// NOTE: aws_sign_v4::AwsSign::new() implementation requires a HeaderMap.
// Iterate over requests headers_in and copy into HeaderMap
// Copy only headers that will be used to sign the request
let mut headers = HeaderMap::new();
for (name, value) in request.headers_in_iterator() {
if let Ok(name) = name.to_str() {
if name.to_lowercase() == "host" {
if let Ok(value) = http::HeaderValue::from_bytes(value.as_bytes()) {
headers.insert(http::header::HOST, value);
} else {
return Some(core::Status::NGX_DECLINED.into());
}
}
} else {
return Some(core::Status::NGX_DECLINED.into());
}
} else {
return core::Status::NGX_DECLINED;
}
}
headers.insert("X-Amz-Date", datetime_now.parse().unwrap());
ngx_log_debug_http!(request, "headers {:?}", headers);
ngx_log_debug_http!(request, "method {:?}", method);
ngx_log_debug_http!(request, "uri {:?}", uri);
ngx_log_debug_http!(request, "datetime_now {:?}", datetime_now);

let s = aws_sign_v4::AwsSign::new(
method.as_str(),
&uri,
&datetime,
&headers,
"us-east-1",
conf.access_key.as_str(),
conf.secret_key.as_str(),
"s3",
"",
);
s.sign()
};
headers.insert("X-Amz-Date", datetime_now.parse().unwrap());
ngx_log_debug_http!(request, "headers {:?}", headers);
ngx_log_debug_http!(request, "method {:?}", method);
ngx_log_debug_http!(request, "uri {:?}", uri);
ngx_log_debug_http!(request, "datetime_now {:?}", datetime_now);

let s = aws_sign_v4::AwsSign::new(
method.as_str(),
&uri,
&datetime,
&headers,
"us-east-1",
conf.access_key.as_str(),
conf.secret_key.as_str(),
"s3",
"",
);
s.sign()
};

request.add_header_in("authorization", signature.as_str());
request.add_header_in("X-Amz-Date", datetime_now.as_str());
request.add_header_in("authorization", signature.as_str());
request.add_header_in("X-Amz-Date", datetime_now.as_str());

for (name, value) in request.headers_out_iterator() {
ngx_log_debug_http!(request, "headers_out {name}: {value}",);
}
for (name, value) in request.headers_in_iterator() {
ngx_log_debug_http!(request, "headers_in {name}: {value}",);
}
for (name, value) in request.headers_out_iterator() {
ngx_log_debug_http!(request, "headers_out {name}: {value}",);
}
for (name, value) in request.headers_in_iterator() {
ngx_log_debug_http!(request, "headers_in {name}: {value}",);
}

core::Status::NGX_OK
});
Some(core::Status::NGX_OK.into())
}
}
Loading
Loading