Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 123 additions & 101 deletions src/hteapot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ mod request;
mod response;
mod status;

use self::response::IterError;

use self::response::HttpResponseCommon;

use self::response::EmptyHttpResponse;
use self::response::HttpResponseCommon;
use self::response::IterError;

pub use self::methods::HttpMethod;
pub use self::request::HttpRequest;
Expand All @@ -25,8 +23,7 @@ use std::io::{self, Read, Write};
use std::net::{Shutdown, TcpListener, TcpStream};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
use std::time::Instant;
use std::time::{Duration, Instant};

const VERSION: &str = env!("CARGO_PKG_VERSION");
const BUFFER_SIZE: usize = 1024 * 2;
Expand Down Expand Up @@ -96,50 +93,32 @@ impl Hteapot {
return;
}
};

let pool: Arc<(Mutex<VecDeque<TcpStream>>, Condvar)> =
Arc::new((Mutex::new(VecDeque::new()), Condvar::new()));
let priority_list: Arc<Mutex<Vec<usize>>> = Arc::new(Mutex::new(Vec::new()));
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This handles the load balancing of the threads.
It not provides a significant performance boost (~3%), but maybe we could improve the LB system

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

let priority_list: Arc<Mutex<Vec<usize>>> =
Arc::new(Mutex::new(vec![0; self.threads as usize]));
let arc_action = Arc::new(action);
for _tn in 0..self.threads {
let _tn = _tn as usize;

for thread_index in 0..self.threads {
let pool_clone = pool.clone();
let action_clone = arc_action.clone();
let pl_clone = priority_list.clone();
{
let mut pl_lock = pl_clone.lock().expect("Error locking priority list");
pl_lock.push(0);
}
let priority_list_clone = priority_list.clone();

thread::spawn(move || {
let mut streams_to_handle = Vec::new();
loop {
{
let (lock, cvar) = &*pool_clone;
let mut pool = lock.lock().expect("Error locking pool");
let pl_copy;
{
let pl_lock = pl_clone.lock().expect("Error locking priority list");
pl_copy = pl_lock.clone();
}

if streams_to_handle.is_empty() {
pool = cvar
.wait_while(pool, |pool| pool.is_empty())
.expect("Error waiting on cvar");
} else if pl_copy.len() != 1
&& streams_to_handle.len() < 10
&& pl_copy
.iter()
.find(|&&v| streams_to_handle.len() > v)
.is_none()
{
(pool, _) = cvar
.wait_timeout_while(pool, Duration::from_millis(500), |pool| {
pool.is_empty()
})
.expect("Error waiting on cvar");
}

if !pool.is_empty() {
while let Some(stream) = pool.pop_back() {
let socket_status = SocketStatus {
ttl: Instant::now(),
reading: true,
Expand All @@ -149,34 +128,30 @@ impl Hteapot {
index_writed: 0,
};
let socket_data = SocketData {
stream: pool.pop_back().unwrap(),
stream,
status: Some(socket_status),
};
streams_to_handle.push(socket_data);

{
let mut pl_lock =
pl_clone.lock().expect("Errpr locking priority list");
pl_lock[_tn] = streams_to_handle.len();
}
}
}

{
let mut priority_list = priority_list_clone
.lock()
.expect("Error locking priority list");
priority_list[thread_index as usize] = streams_to_handle.len();
}

streams_to_handle.retain_mut(|s| {
if s.status.is_none() {
return false;
}
Hteapot::handle_client(s, &action_clone).is_some()
});
{
let mut pl_lock = pl_clone.lock().expect("Errpr locking priority list");
pl_lock[_tn] = streams_to_handle.len();
}
}
});
}

let pool_clone = pool.clone();
loop {
let stream = listener.accept();
if stream.is_err() {
Expand All @@ -185,77 +160,85 @@ impl Hteapot {
let (stream, _) = stream.unwrap();
stream
.set_nonblocking(true)
.expect("Error seting non blocking");
stream.set_nodelay(true).expect("Error seting no delay");
.expect("Error setting non-blocking");
stream.set_nodelay(true).expect("Error setting no delay");

{
let (lock, cvar) = &*pool_clone;
let (lock, cvar) = &*pool;
let mut pool = lock.lock().expect("Error locking pool");

// Add the connection to the pool for the least-loaded thread
pool.push_front(stream);
cvar.notify_one();
}
// Notify one waiting thread
}
}

// Handle the client when a request is received
fn handle_client(
socket_data: &mut SocketData,
action: &Arc<impl Fn(HttpRequest) -> Box<dyn HttpResponseCommon> + Send + Sync + 'static>,
) -> Option<()> {
let status = socket_data.status.as_mut()?;

// Fix by miky-rola 2025-04-08
// Check if the TTL (time-to-live) for the connection has expired.
// If the connection is idle for longer than `KEEP_ALIVE_TTL` and no data is being written,
// the connection is gracefully shut down to free resources.
if Instant::now().duration_since(status.ttl) > KEEP_ALIVE_TTL && !status.write {
let _ = socket_data.stream.shutdown(Shutdown::Both);
return None;
}

// If the request is not yet complete, read data from the stream into a buffer.
// This ensures that the server can handle partial or chunked requests.
if !status.request.done {
loop {
let mut buffer = [0; BUFFER_SIZE];
match socket_data.stream.read(&mut buffer) {
Err(e) => match e.kind() {
io::ErrorKind::WouldBlock => {
return Some(());
}
io::ErrorKind::ConnectionReset => {
return None;
}
_ => {
println!("R Error{:?}", e);
return None;
}
},
Ok(m) => {
status.ttl = Instant::now();
let r = status.request.append(buffer.to_vec());
if r.is_some() {
break;
}
if m == 0 {
return None;
} else if m < BUFFER_SIZE || status.request.done {
break;
}
let mut buffer = [0; BUFFER_SIZE];
match socket_data.stream.read(&mut buffer) {
Err(e) => match e.kind() {
io::ErrorKind::WouldBlock => {
return Some(());
}
};
io::ErrorKind::ConnectionReset => {
return None;
}
_ => {
eprintln!("Read error: {:?}", e);
return None;
}
},
Ok(m) => {
if m == 0 {
return None;
}
status.ttl = Instant::now();
let r = status.request.append(buffer[..m].to_vec());
if r.is_some() {
return Some(());
}
}
}
}

let request = status.request.get();
if request.is_none() {
return Some(());
}
let request = request.unwrap();
let keep_alive = match request.headers.get("Connection") {
Some(ch) => ch == "keep-alive",
None => false,
};

let keep_alive = request
.headers
.get("Connection")
.map(|v| v == "keep-alive")
.unwrap_or(false);

if !status.write {
let mut response = action(request); //Call closure
if !response.base().headers.contains_key("Connection") && keep_alive {
let mut response = action(request);
if keep_alive {
response
.base()
.headers
.insert("Connection".to_string(), "keep_alive".to_string());
.entry("Connection".to_string())
.or_insert("keep-alive".to_string());
response.base().headers.insert(
"Keep-Alive".to_string(),
format!("timeout={}", KEEP_ALIVE_TTL.as_secs()),
Expand All @@ -269,34 +252,32 @@ impl Hteapot {
status.write = true;
status.response = response;
}

// Write the response to the client in chunks using the `peek` and `next` methods.
// This ensures that large responses are sent incrementally without blocking the server.
loop {
match status.response.peek() {
Ok(n) => match socket_data.stream.write(&n) {
Ok(_size) => {
Ok(_) => {
status.ttl = Instant::now();
let _ = status.response.next();
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
return Some(());
}
Err(e) => {
eprintln!("W error: {:?}", e);
eprintln!("Write error: {:?}", e);
return None;
}
},
Err(IterError::WouldBlock) => {
status.ttl = Instant::now();
//thread::sleep(Duration::from_millis(100));
return Some(());
}
Err(_) => break,
}
}

let r = socket_data.stream.flush();
if r.is_err() {
return Some(());
}
if keep_alive {
status.reading = true;
status.write = false;
Expand All @@ -311,17 +292,6 @@ impl Hteapot {
}

#[cfg(test)]
// #[test]
// fn test_http_parser() {
// let request =
// "GET / HTTP/1.1\r\nHost: localhost:8080\r\nUser-Agent: curl/7.68.0\r\nAccept: */*\r\n\r\n";
// let parsed_request = Hteapot::request_parser(request.to_string()).unwrap();
// assert_eq!(parsed_request.method, HttpMethod::GET);
// assert_eq!(parsed_request.path, "/");
// assert_eq!(parsed_request.args.len(), 0);
// assert_eq!(parsed_request.headers.len(), 3);
// assert_eq!(parsed_request.body, "");
// }
#[test]
fn test_http_response_maker() {
let mut response = HttpResponse::new(HttpStatus::IAmATeapot, "Hello, World!", None);
Expand All @@ -332,3 +302,55 @@ fn test_http_response_maker() {
assert!(response.contains(item));
}
}

#[cfg(test)]
#[test]
fn test_keep_alive_connection() {
let mut response = HttpResponse::new(
HttpStatus::OK,
"Keep-Alive Test",
headers! {
"Connection" => "keep-alive",
"Content-Length" => "15"
},
);

response.base().headers.insert(
"Keep-Alive".to_string(),
format!("timeout={}", KEEP_ALIVE_TTL.as_secs()),
);

let response_bytes = response.to_bytes();
let response_str = String::from_utf8(response_bytes.clone()).unwrap();

assert!(response_str.contains("HTTP/1.1 200 OK"));
assert!(response_str.contains("Content-Length: 15"));
assert!(response_str.contains("Connection: keep-alive"));
assert!(response_str.contains("Keep-Alive: timeout=10"));
assert!(response_str.contains("Server: HTeaPot/"));
assert!(response_str.contains("Keep-Alive Test"));

let mut second_response = HttpResponse::new(
HttpStatus::OK,
"Second Request",
headers! {
"Connection" => "keep-alive",
"Content-Length" => "14" // Length for "Second Request"
},
);

second_response.base().headers.insert(
"Keep-Alive".to_string(),
format!("timeout={}", KEEP_ALIVE_TTL.as_secs()),
);

let second_response_bytes = second_response.to_bytes();
let second_response_str = String::from_utf8(second_response_bytes.clone()).unwrap();

assert!(second_response_str.contains("HTTP/1.1 200 OK"));
assert!(second_response_str.contains("Content-Length: 14"));
assert!(second_response_str.contains("Connection: keep-alive"));
assert!(second_response_str.contains("Keep-Alive: timeout=10"));
assert!(second_response_str.contains("Server: HTeaPot/"));
assert!(second_response_str.contains("Second Request"));
}