diff --git a/src/hteapot/mod.rs b/src/hteapot/mod.rs index d205375..4e1fadf 100644 --- a/src/hteapot/mod.rs +++ b/src/hteapot/mod.rs @@ -8,11 +8,9 @@ mod request; mod response; mod status; -use self::response::IterError; - -use self::response::HttpResponseCommon; - use self::response::EmptyHttpResponse; +use self::response::HttpResponseCommon; +use self::response::IterError; pub use self::methods::HttpMethod; pub use self::request::HttpRequest; @@ -25,8 +23,7 @@ use std::io::{self, Read, Write}; use std::net::{Shutdown, TcpListener, TcpStream}; use std::sync::{Arc, Condvar, Mutex}; use std::thread; -use std::time::Duration; -use std::time::Instant; +use std::time::{Duration, Instant}; const VERSION: &str = env!("CARGO_PKG_VERSION"); const BUFFER_SIZE: usize = 1024 * 2; @@ -96,50 +93,32 @@ impl Hteapot { return; } }; + let pool: Arc<(Mutex>, Condvar)> = Arc::new((Mutex::new(VecDeque::new()), Condvar::new())); - let priority_list: Arc>> = Arc::new(Mutex::new(Vec::new())); + let priority_list: Arc>> = + Arc::new(Mutex::new(vec![0; self.threads as usize])); let arc_action = Arc::new(action); - for _tn in 0..self.threads { - let _tn = _tn as usize; + + for thread_index in 0..self.threads { let pool_clone = pool.clone(); let action_clone = arc_action.clone(); - let pl_clone = priority_list.clone(); - { - let mut pl_lock = pl_clone.lock().expect("Error locking priority list"); - pl_lock.push(0); - } + let priority_list_clone = priority_list.clone(); + thread::spawn(move || { let mut streams_to_handle = Vec::new(); loop { { let (lock, cvar) = &*pool_clone; let mut pool = lock.lock().expect("Error locking pool"); - let pl_copy; - { - let pl_lock = pl_clone.lock().expect("Error locking priority list"); - pl_copy = pl_lock.clone(); - } if streams_to_handle.is_empty() { pool = cvar .wait_while(pool, |pool| pool.is_empty()) .expect("Error waiting on cvar"); - } else if pl_copy.len() != 1 - && streams_to_handle.len() < 10 - && pl_copy - .iter() - .find(|&&v| streams_to_handle.len() > v) - .is_none() - { - (pool, _) = cvar - .wait_timeout_while(pool, Duration::from_millis(500), |pool| { - pool.is_empty() - }) - .expect("Error waiting on cvar"); } - if !pool.is_empty() { + while let Some(stream) = pool.pop_back() { let socket_status = SocketStatus { ttl: Instant::now(), reading: true, @@ -149,34 +128,30 @@ impl Hteapot { index_writed: 0, }; let socket_data = SocketData { - stream: pool.pop_back().unwrap(), + stream, status: Some(socket_status), }; streams_to_handle.push(socket_data); - - { - let mut pl_lock = - pl_clone.lock().expect("Errpr locking priority list"); - pl_lock[_tn] = streams_to_handle.len(); - } } } + { + let mut priority_list = priority_list_clone + .lock() + .expect("Error locking priority list"); + priority_list[thread_index as usize] = streams_to_handle.len(); + } + streams_to_handle.retain_mut(|s| { if s.status.is_none() { return false; } Hteapot::handle_client(s, &action_clone).is_some() }); - { - let mut pl_lock = pl_clone.lock().expect("Errpr locking priority list"); - pl_lock[_tn] = streams_to_handle.len(); - } } }); } - let pool_clone = pool.clone(); loop { let stream = listener.accept(); if stream.is_err() { @@ -185,77 +160,85 @@ impl Hteapot { let (stream, _) = stream.unwrap(); stream .set_nonblocking(true) - .expect("Error seting non blocking"); - stream.set_nodelay(true).expect("Error seting no delay"); + .expect("Error setting non-blocking"); + stream.set_nodelay(true).expect("Error setting no delay"); + { - let (lock, cvar) = &*pool_clone; + let (lock, cvar) = &*pool; let mut pool = lock.lock().expect("Error locking pool"); + // Add the connection to the pool for the least-loaded thread pool.push_front(stream); cvar.notify_one(); } - // Notify one waiting thread } } - // Handle the client when a request is received fn handle_client( socket_data: &mut SocketData, action: &Arc Box + Send + Sync + 'static>, ) -> Option<()> { let status = socket_data.status.as_mut()?; + // Fix by miky-rola 2025-04-08 + // Check if the TTL (time-to-live) for the connection has expired. + // If the connection is idle for longer than `KEEP_ALIVE_TTL` and no data is being written, + // the connection is gracefully shut down to free resources. if Instant::now().duration_since(status.ttl) > KEEP_ALIVE_TTL && !status.write { let _ = socket_data.stream.shutdown(Shutdown::Both); return None; } + + // If the request is not yet complete, read data from the stream into a buffer. + // This ensures that the server can handle partial or chunked requests. if !status.request.done { - loop { - let mut buffer = [0; BUFFER_SIZE]; - match socket_data.stream.read(&mut buffer) { - Err(e) => match e.kind() { - io::ErrorKind::WouldBlock => { - return Some(()); - } - io::ErrorKind::ConnectionReset => { - return None; - } - _ => { - println!("R Error{:?}", e); - return None; - } - }, - Ok(m) => { - status.ttl = Instant::now(); - let r = status.request.append(buffer.to_vec()); - if r.is_some() { - break; - } - if m == 0 { - return None; - } else if m < BUFFER_SIZE || status.request.done { - break; - } + let mut buffer = [0; BUFFER_SIZE]; + match socket_data.stream.read(&mut buffer) { + Err(e) => match e.kind() { + io::ErrorKind::WouldBlock => { + return Some(()); } - }; + io::ErrorKind::ConnectionReset => { + return None; + } + _ => { + eprintln!("Read error: {:?}", e); + return None; + } + }, + Ok(m) => { + if m == 0 { + return None; + } + status.ttl = Instant::now(); + let r = status.request.append(buffer[..m].to_vec()); + if r.is_some() { + return Some(()); + } + } } } + let request = status.request.get(); if request.is_none() { return Some(()); } let request = request.unwrap(); - let keep_alive = match request.headers.get("Connection") { - Some(ch) => ch == "keep-alive", - None => false, - }; + + let keep_alive = request + .headers + .get("Connection") + .map(|v| v == "keep-alive") + .unwrap_or(false); + if !status.write { - let mut response = action(request); //Call closure - if !response.base().headers.contains_key("Connection") && keep_alive { + let mut response = action(request); + if keep_alive { response .base() .headers - .insert("Connection".to_string(), "keep_alive".to_string()); + .entry("Connection".to_string()) + .or_insert("keep-alive".to_string()); response.base().headers.insert( "Keep-Alive".to_string(), format!("timeout={}", KEEP_ALIVE_TTL.as_secs()), @@ -269,10 +252,13 @@ impl Hteapot { status.write = true; status.response = response; } + + // Write the response to the client in chunks using the `peek` and `next` methods. + // This ensures that large responses are sent incrementally without blocking the server. loop { match status.response.peek() { Ok(n) => match socket_data.stream.write(&n) { - Ok(_size) => { + Ok(_) => { status.ttl = Instant::now(); let _ = status.response.next(); } @@ -280,23 +266,18 @@ impl Hteapot { return Some(()); } Err(e) => { - eprintln!("W error: {:?}", e); + eprintln!("Write error: {:?}", e); return None; } }, Err(IterError::WouldBlock) => { status.ttl = Instant::now(); - //thread::sleep(Duration::from_millis(100)); return Some(()); } Err(_) => break, } } - let r = socket_data.stream.flush(); - if r.is_err() { - return Some(()); - } if keep_alive { status.reading = true; status.write = false; @@ -311,17 +292,6 @@ impl Hteapot { } #[cfg(test)] -// #[test] -// fn test_http_parser() { -// let request = -// "GET / HTTP/1.1\r\nHost: localhost:8080\r\nUser-Agent: curl/7.68.0\r\nAccept: */*\r\n\r\n"; -// let parsed_request = Hteapot::request_parser(request.to_string()).unwrap(); -// assert_eq!(parsed_request.method, HttpMethod::GET); -// assert_eq!(parsed_request.path, "/"); -// assert_eq!(parsed_request.args.len(), 0); -// assert_eq!(parsed_request.headers.len(), 3); -// assert_eq!(parsed_request.body, ""); -// } #[test] fn test_http_response_maker() { let mut response = HttpResponse::new(HttpStatus::IAmATeapot, "Hello, World!", None); @@ -332,3 +302,55 @@ fn test_http_response_maker() { assert!(response.contains(item)); } } + +#[cfg(test)] +#[test] +fn test_keep_alive_connection() { + let mut response = HttpResponse::new( + HttpStatus::OK, + "Keep-Alive Test", + headers! { + "Connection" => "keep-alive", + "Content-Length" => "15" + }, + ); + + response.base().headers.insert( + "Keep-Alive".to_string(), + format!("timeout={}", KEEP_ALIVE_TTL.as_secs()), + ); + + let response_bytes = response.to_bytes(); + let response_str = String::from_utf8(response_bytes.clone()).unwrap(); + + assert!(response_str.contains("HTTP/1.1 200 OK")); + assert!(response_str.contains("Content-Length: 15")); + assert!(response_str.contains("Connection: keep-alive")); + assert!(response_str.contains("Keep-Alive: timeout=10")); + assert!(response_str.contains("Server: HTeaPot/")); + assert!(response_str.contains("Keep-Alive Test")); + + let mut second_response = HttpResponse::new( + HttpStatus::OK, + "Second Request", + headers! { + "Connection" => "keep-alive", + "Content-Length" => "14" // Length for "Second Request" + }, + ); + + second_response.base().headers.insert( + "Keep-Alive".to_string(), + format!("timeout={}", KEEP_ALIVE_TTL.as_secs()), + ); + + let second_response_bytes = second_response.to_bytes(); + let second_response_str = String::from_utf8(second_response_bytes.clone()).unwrap(); + + assert!(second_response_str.contains("HTTP/1.1 200 OK")); + assert!(second_response_str.contains("Content-Length: 14")); + assert!(second_response_str.contains("Connection: keep-alive")); + assert!(second_response_str.contains("Keep-Alive: timeout=10")); + assert!(second_response_str.contains("Server: HTeaPot/")); + assert!(second_response_str.contains("Second Request")); +}