From 13e51b96624e59ec03a3647d1904e135c077a570 Mon Sep 17 00:00:00 2001 From: Alberto Ruiz <17555470+Az107@users.noreply.github.com> Date: Tue, 25 Feb 2025 18:07:09 +0100 Subject: [PATCH 01/21] reduce socket data clone --- src/hteapot/mod.rs | 61 +++++++++++++++--------------- src/hteapot/request.rs | 85 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+), 30 deletions(-) diff --git a/src/hteapot/mod.rs b/src/hteapot/mod.rs index 3191bba..0dccff1 100644 --- a/src/hteapot/mod.rs +++ b/src/hteapot/mod.rs @@ -154,12 +154,7 @@ impl Hteapot { if stream_data.status.is_none() { continue; } - let r = Hteapot::handle_client( - &stream_data.stream, - stream_data.status.as_mut().unwrap().clone(), - &action_clone, - ); - stream_data.status = r; + Hteapot::handle_client(stream_data, &action_clone); } streams_to_handle.retain(|s| s.status.is_some()); { @@ -275,20 +270,19 @@ impl Hteapot { // Handle the client when a request is received fn handle_client( - stream: &TcpStream, - socket_status: SocketStatus, + socket_data: &mut SocketData, action: &Arc HttpResponse + Send + Sync + 'static>, - ) -> Option { - let mut reader = BufReader::new(stream); - let mut writer = BufWriter::new(stream); - let mut socket_status = socket_status.clone(); - if socket_status.reading { + ) -> Option<()> { + let mut reader = BufReader::new(&socket_data.stream); + let mut writer = BufWriter::new(&socket_data.stream); + let status = socket_data.status.as_mut()?; + if status.reading { loop { let mut buffer = [0; 1024]; match reader.read(&mut buffer) { Err(e) => match e.kind() { io::ErrorKind::WouldBlock => { - return Some(socket_status); + return Some(()); } io::ErrorKind::ConnectionReset => { return None; @@ -299,24 +293,30 @@ impl Hteapot { } }, Ok(m) => { + status.data_readed.append(&mut buffer.to_vec()); if m == 0 { return None; + } else if m < 1024 { + break; } } }; - socket_status.data_readed.append(&mut buffer.to_vec()); + //socket_status if buffer[0] == 0 { + println!("buffer 0 == 0"); break; }; if *buffer.last().unwrap() == 0 { + println!("buffer last == 0"); + break; } } - socket_status.reading = false; + status.reading = false; } - let request_string = String::from_utf8(socket_status.data_readed.clone()); + let request_string = String::from_utf8(status.data_readed.clone()); let request_string = if request_string.is_err() { //This proablly means the request is a https so for the moment GTFO return None; @@ -330,11 +330,12 @@ impl Hteapot { return None; } let request = request.unwrap(); + println!("[hteapot] body: {:?}", request.body.clone()); let keep_alive = match request.headers.get("Connection") { Some(ch) => ch == "keep-alive", None => false, }; - if socket_status.data_write.len() == 0 { + if status.data_write.len() == 0 { let mut response = action(request); if !response.headers.contains_key("Conection") && keep_alive { response @@ -345,35 +346,35 @@ impl Hteapot { .headers .insert("Connection".to_string(), "close".to_string()); } - socket_status.data_write = response.to_bytes(); + status.data_write = response.to_bytes(); } - for n in socket_status.index_writed..socket_status.data_write.len() { - let r = writer.write(&[socket_status.data_write[n]]); + for n in status.index_writed..status.data_write.len() { + let r = writer.write(&[status.data_write[n]]); if r.is_err() { let error = r.err().unwrap(); if error.kind() == io::ErrorKind::WouldBlock { - return Some(socket_status); + return Some(()); } else { eprintln!("W error: {:?}", error); return None; } } - socket_status.index_writed += r.unwrap(); + status.index_writed += r.unwrap(); } let r = writer.flush(); if r.is_err() { eprintln!("Error2: {}", r.err().unwrap()); - return Some(socket_status); + return Some(()); } if keep_alive { - socket_status.reading = true; - socket_status.data_readed = vec![]; - socket_status.data_write = vec![]; - socket_status.index_writed = 0; - return Some(socket_status); + status.reading = true; + status.data_readed = vec![]; + status.data_write = vec![]; + status.index_writed = 0; + return Some(()); } else { - let _ = stream.shutdown(Shutdown::Both); + let _ = socket_data.stream.shutdown(Shutdown::Both); None } } diff --git a/src/hteapot/request.rs b/src/hteapot/request.rs index cb9a3d0..0bde16f 100644 --- a/src/hteapot/request.rs +++ b/src/hteapot/request.rs @@ -9,3 +9,88 @@ pub struct HttpRequest { pub headers: HashMap, pub body: String, } + +impl HttpRequest { + pub fn default() -> Self { + HttpRequest { + method: HttpMethod::GET, + path: String::new(), + args: HashMap::new(), + headers: HashMap::new(), + body: String::new(), + } + } +} + +pub struct HttpRequestBuilder { + request: HttpRequest, + buffer: Vec, + done: bool, +} + +impl HttpRequestBuilder { + pub fn new() -> Self { + return HttpRequestBuilder { + request: HttpRequest { + method: HttpMethod::GET, + path: String::new(), + args: HashMap::new(), + headers: HashMap::new(), + body: String::new(), + }, + buffer: Vec::new(), + done: false, + }; + } + + pub fn append(&mut self, buffer: Vec) -> Option { + self.buffer.extend(buffer); + + while let Some(pos) = self.buffer.windows(2).position(|w| w == b"\r\n") { + let line = self.buffer.drain(..pos).collect::>(); // Extraer línea + self.buffer.drain(..2); // Eliminar `\r\n` + + let line_str = String::from_utf8_lossy(&line); + + if self.request.path.is_empty() { + // Primera línea: Método + Path + Versión HTTP + let parts: Vec<&str> = line_str.split_whitespace().collect(); + if parts.len() < 2 { + return None; // Request malformada + } + + self.request.method = HttpMethod::from_str(parts[0]); // Convierte a enum + let path_parts: Vec<&str> = parts[1].split('?').collect(); + self.request.path = path_parts[0].to_string(); + + // Si hay argumentos en la URL, los parseamos + if path_parts.len() > 1 { + self.request.args = path_parts[1] + .split('&') + .filter_map(|pair| { + let kv: Vec<&str> = pair.split('=').collect(); + if kv.len() == 2 { + Some((kv[0].to_string(), kv[1].to_string())) + } else { + None + } + }) + .collect(); + } + } else if !line_str.is_empty() { + // Cabeceras HTTP + if let Some((key, value)) = line_str.split_once(": ") { + self.request + .headers + .insert(key.to_string(), value.to_string()); + } + } else { + // Fin de las cabeceras + self.done = true; + return Some(std::mem::replace(&mut self.request, HttpRequest::default())); + } + } + + None // Aún no tenemos toda la request + } +} From 4a5c589135fd30f664bf5d2079d9968c10c12e29 Mon Sep 17 00:00:00 2001 From: Alberto Ruiz <17555470+Az107@users.noreply.github.com> Date: Tue, 25 Feb 2025 18:11:46 +0100 Subject: [PATCH 02/21] removed print and set status to none --- src/hteapot/mod.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/hteapot/mod.rs b/src/hteapot/mod.rs index 0dccff1..328f213 100644 --- a/src/hteapot/mod.rs +++ b/src/hteapot/mod.rs @@ -154,7 +154,10 @@ impl Hteapot { if stream_data.status.is_none() { continue; } - Hteapot::handle_client(stream_data, &action_clone); + let r = Hteapot::handle_client(stream_data, &action_clone); + if r.is_none() { + stream_data.status = None; + } } streams_to_handle.retain(|s| s.status.is_some()); { @@ -330,7 +333,6 @@ impl Hteapot { return None; } let request = request.unwrap(); - println!("[hteapot] body: {:?}", request.body.clone()); let keep_alive = match request.headers.get("Connection") { Some(ch) => ch == "keep-alive", None => false, From af126268996201265eecc75b7835870d720c3de5 Mon Sep 17 00:00:00 2001 From: Alberto Ruiz <17555470+Az107@users.noreply.github.com> Date: Wed, 26 Feb 2025 08:34:21 +0100 Subject: [PATCH 03/21] removed unnecessary iterator --- src/hteapot/mod.rs | 34 ++++++++++++++-------------------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/src/hteapot/mod.rs b/src/hteapot/mod.rs index 328f213..369d999 100644 --- a/src/hteapot/mod.rs +++ b/src/hteapot/mod.rs @@ -150,16 +150,21 @@ impl Hteapot { } } - for stream_data in streams_to_handle.iter_mut() { - if stream_data.status.is_none() { - continue; + // for stream_data in streams_to_handle.iter_mut() { + // if stream_data.status.is_none() { + // continue; + // } + // let r = Hteapot::handle_client(stream_data, &action_clone); + // if r.is_none() { + // stream_data.status = None; + // } + // } + streams_to_handle.retain_mut(|s| { + if s.status.is_none() { + return false; } - let r = Hteapot::handle_client(stream_data, &action_clone); - if r.is_none() { - stream_data.status = None; - } - } - streams_to_handle.retain(|s| s.status.is_some()); + Hteapot::handle_client(s, &action_clone).is_some() + }); { let mut pl_lock = pl_clone.lock().expect("Errpr locking prority list"); pl_lock[_tn] = streams_to_handle.len(); @@ -304,17 +309,6 @@ impl Hteapot { } } }; - - //socket_status - if buffer[0] == 0 { - println!("buffer 0 == 0"); - break; - }; - if *buffer.last().unwrap() == 0 { - println!("buffer last == 0"); - - break; - } } status.reading = false; } From f56d981c44fa95fe189272b221c3c560bffc487d Mon Sep 17 00:00:00 2001 From: Alberto Ruiz <17555470+Az107@users.noreply.github.com> Date: Wed, 26 Feb 2025 09:05:16 +0100 Subject: [PATCH 04/21] Better write loop --- src/hteapot/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/hteapot/mod.rs b/src/hteapot/mod.rs index 369d999..c720054 100644 --- a/src/hteapot/mod.rs +++ b/src/hteapot/mod.rs @@ -344,8 +344,8 @@ impl Hteapot { } status.data_write = response.to_bytes(); } - for n in status.index_writed..status.data_write.len() { - let r = writer.write(&[status.data_write[n]]); + for n in status.data_write.chunks(1024).skip(status.index_writed) { + let r = writer.write(&n); if r.is_err() { let error = r.err().unwrap(); if error.kind() == io::ErrorKind::WouldBlock { From 2baecb9bbb90f1a845fba9f55c58391c2e537396 Mon Sep 17 00:00:00 2001 From: Alberto Ruiz <17555470+Az107@users.noreply.github.com> Date: Wed, 26 Feb 2025 09:53:33 +0100 Subject: [PATCH 05/21] better stream managment --- src/hteapot/mod.rs | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/hteapot/mod.rs b/src/hteapot/mod.rs index c720054..94b24d5 100644 --- a/src/hteapot/mod.rs +++ b/src/hteapot/mod.rs @@ -281,13 +281,13 @@ impl Hteapot { socket_data: &mut SocketData, action: &Arc HttpResponse + Send + Sync + 'static>, ) -> Option<()> { - let mut reader = BufReader::new(&socket_data.stream); - let mut writer = BufWriter::new(&socket_data.stream); + // let mut reader = BufReader::new(&socket_data.stream); + // let mut writer = BufWriter::new(&socket_data.stream); let status = socket_data.status.as_mut()?; if status.reading { loop { let mut buffer = [0; 1024]; - match reader.read(&mut buffer) { + match socket_data.stream.read(&mut buffer) { Err(e) => match e.kind() { io::ErrorKind::WouldBlock => { return Some(()); @@ -345,24 +345,25 @@ impl Hteapot { status.data_write = response.to_bytes(); } for n in status.data_write.chunks(1024).skip(status.index_writed) { - let r = writer.write(&n); - if r.is_err() { - let error = r.err().unwrap(); - if error.kind() == io::ErrorKind::WouldBlock { + match socket_data.stream.write(&n) { + Ok(size) => { + status.index_writed += 1; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { return Some(()); - } else { - eprintln!("W error: {:?}", error); + } + Err(e) => { + eprintln!("W error: {:?}", e); return None; } } - status.index_writed += r.unwrap(); } - let r = writer.flush(); + let r = socket_data.stream.flush(); if r.is_err() { - eprintln!("Error2: {}", r.err().unwrap()); return Some(()); } + println!("Flushed!"); if keep_alive { status.reading = true; status.data_readed = vec![]; From c3a5d1453a31f65dca6be98fb8dd3295ee9a587c Mon Sep 17 00:00:00 2001 From: Alberto Ruiz <17555470+Az107@users.noreply.github.com> Date: Wed, 26 Feb 2025 09:54:37 +0100 Subject: [PATCH 06/21] removed println --- src/hteapot/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/hteapot/mod.rs b/src/hteapot/mod.rs index 94b24d5..28be98e 100644 --- a/src/hteapot/mod.rs +++ b/src/hteapot/mod.rs @@ -363,7 +363,6 @@ impl Hteapot { if r.is_err() { return Some(()); } - println!("Flushed!"); if keep_alive { status.reading = true; status.data_readed = vec![]; From 042b976e74dc1271f0ca9f74f2954b8218ecad15 Mon Sep 17 00:00:00 2001 From: Alberto Ruiz <17555470+Az107@users.noreply.github.com> Date: Wed, 26 Feb 2025 11:46:01 +0100 Subject: [PATCH 07/21] add Request Builder --- src/hteapot/mod.rs | 61 +++++++++++++++++++++--------------------- src/hteapot/request.rs | 2 +- 2 files changed, 32 insertions(+), 31 deletions(-) diff --git a/src/hteapot/mod.rs b/src/hteapot/mod.rs index 28be98e..9fc6202 100644 --- a/src/hteapot/mod.rs +++ b/src/hteapot/mod.rs @@ -9,7 +9,7 @@ mod response; mod status; pub use self::methods::HttpMethod; -pub use self::request::HttpRequest; +pub use self::request::{HttpRequest, HttpRequestBuilder}; pub use self::response::HttpResponse; pub use self::status::HttpStatus; @@ -21,6 +21,7 @@ use std::thread; use std::time::Duration; const VERSION: &str = env!("CARGO_PKG_VERSION"); +const BUFFER_SIZE: usize = 1024; #[macro_export] macro_rules! headers { @@ -40,12 +41,12 @@ pub struct Hteapot { threads: u16, } -#[derive(Clone, Debug)] struct SocketStatus { // TODO: write proper ttl reading: bool, data_readed: Vec, data_write: Vec, + request: HttpRequestBuilder, index_writed: usize, } @@ -134,6 +135,7 @@ impl Hteapot { reading: true, data_readed: vec![], data_write: vec![], + request: HttpRequestBuilder::new(), index_writed: 0, }; let socket_data = SocketData { @@ -150,15 +152,6 @@ impl Hteapot { } } - // for stream_data in streams_to_handle.iter_mut() { - // if stream_data.status.is_none() { - // continue; - // } - // let r = Hteapot::handle_client(stream_data, &action_clone); - // if r.is_none() { - // stream_data.status = None; - // } - // } streams_to_handle.retain_mut(|s| { if s.status.is_none() { return false; @@ -281,12 +274,11 @@ impl Hteapot { socket_data: &mut SocketData, action: &Arc HttpResponse + Send + Sync + 'static>, ) -> Option<()> { - // let mut reader = BufReader::new(&socket_data.stream); - // let mut writer = BufWriter::new(&socket_data.stream); let status = socket_data.status.as_mut()?; + let mut request = None; if status.reading { loop { - let mut buffer = [0; 1024]; + let mut buffer = [0; BUFFER_SIZE]; match socket_data.stream.read(&mut buffer) { Err(e) => match e.kind() { io::ErrorKind::WouldBlock => { @@ -301,10 +293,11 @@ impl Hteapot { } }, Ok(m) => { - status.data_readed.append(&mut buffer.to_vec()); + request = status.request.append(buffer.to_vec()); + //status.data_readed.append(&mut buffer.to_vec()); if m == 0 { return None; - } else if m < 1024 { + } else if m < BUFFER_SIZE || request.is_some() { break; } } @@ -312,18 +305,22 @@ impl Hteapot { } status.reading = false; } - - let request_string = String::from_utf8(status.data_readed.clone()); - let request_string = if request_string.is_err() { - //This proablly means the request is a https so for the moment GTFO - return None; - } else { - request_string.unwrap() - }; - // let request_string = "GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n".to_string(); - let request = Self::request_parser(request_string); - if request.is_err() { - eprintln!("Request parse error {:?}", request.err().unwrap()); + //let request = HttpRequestBuilder::new().append(status.data_readed.clone()); + // println!("{:?}", request); + // let request_string = String::from_utf8(status.data_readed.clone()); + // let request_string = if request_string.is_err() { + // //This proablly means the request is a https so for the moment GTFO + // return None; + // } else { + // request_string.unwrap() + // }; + // let request = Self::request_parser(request_string); + // if request.is_err() { + // eprintln!("Request parse error {:?}", request.err().unwrap()); + // return None; + // } + if request.is_none() { + print!("Error parsing request"); return None; } let request = request.unwrap(); @@ -344,9 +341,13 @@ impl Hteapot { } status.data_write = response.to_bytes(); } - for n in status.data_write.chunks(1024).skip(status.index_writed) { + for n in status + .data_write + .chunks(BUFFER_SIZE) + .skip(status.index_writed) + { match socket_data.stream.write(&n) { - Ok(size) => { + Ok(_size) => { status.index_writed += 1; } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { diff --git a/src/hteapot/request.rs b/src/hteapot/request.rs index 0bde16f..926cd11 100644 --- a/src/hteapot/request.rs +++ b/src/hteapot/request.rs @@ -1,7 +1,7 @@ use super::HttpMethod; use std::collections::HashMap; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct HttpRequest { pub method: HttpMethod, pub path: String, From a70801a9f802458a6097f68ca0c165a2f095c89f Mon Sep 17 00:00:00 2001 From: Alberto Ruiz <17555470+Az107@users.noreply.github.com> Date: Fri, 7 Mar 2025 12:29:07 +0100 Subject: [PATCH 08/21] better logger --- src/logger.rs | 165 +++++++++++++++++++++++++++----------------------- src/main.rs | 47 +++----------- src/utils.rs | 19 ++++++ 3 files changed, 119 insertions(+), 112 deletions(-) create mode 100644 src/utils.rs diff --git a/src/logger.rs b/src/logger.rs index 5ac0394..724e9ca 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -1,93 +1,109 @@ - -use std::time::{SystemTime, UNIX_EPOCH}; +use std::collections::VecDeque; use std::io::{BufWriter, Write}; +use std::sync::mpsc::{channel, Receiver, Sender}; +use std::thread::{self, JoinHandle}; +use std::time::{SystemTime, UNIX_EPOCH}; struct SimpleTime; impl SimpleTime { - fn epoch_to_ymdhms(seconds: u64) -> (i32, u32, u32, u32, u32, u32) { - // Constants for time calculations - const SECONDS_IN_MINUTE: u64 = 60; - const SECONDS_IN_HOUR: u64 = 3600; - const SECONDS_IN_DAY: u64 = 86400; + fn epoch_to_ymdhms(seconds: u64) -> (i32, u32, u32, u32, u32, u32) { + // Constants for time calculations + const SECONDS_IN_MINUTE: u64 = 60; + const SECONDS_IN_HOUR: u64 = 3600; + const SECONDS_IN_DAY: u64 = 86400; + + // Leap year and normal year days + const DAYS_IN_YEAR: [u32; 2] = [365, 366]; + const DAYS_IN_MONTH: [[u32; 12]; 2] = [ + [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31], // Normal years + [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31], // Leap years + ]; + + // Calculate the number of days since the epoch + let mut remaining_days = seconds / SECONDS_IN_DAY; + + // Determine the current year + let mut year = 1970; + loop { + let leap_year = if (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0) { + 1 + } else { + 0 + }; + if remaining_days < DAYS_IN_YEAR[leap_year] as u64 { + break; + } + remaining_days -= DAYS_IN_YEAR[leap_year] as u64; + year += 1; + } - // Leap year and normal year days - const DAYS_IN_YEAR: [u32; 2] = [365, 366]; - const DAYS_IN_MONTH: [[u32; 12]; 2] = [ - [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31], // Normal years - [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31] // Leap years - ]; + // Determine the current month and day + let leap_year = if (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0) { + 1 + } else { + 0 + }; + let mut month = 0; + while remaining_days >= DAYS_IN_MONTH[leap_year][month] as u64 { + remaining_days -= DAYS_IN_MONTH[leap_year][month] as u64; + month += 1; + } + let day = remaining_days + 1; // Days are 1-based - // Calculate the number of days since the epoch - let mut remaining_days = seconds / SECONDS_IN_DAY; + // Calculate the current hour, minute, and second + let remaining_seconds = seconds % SECONDS_IN_DAY; + let hour = (remaining_seconds / SECONDS_IN_HOUR) as u32; + let minute = ((remaining_seconds % SECONDS_IN_HOUR) / SECONDS_IN_MINUTE) as u32; + let second = (remaining_seconds % SECONDS_IN_MINUTE) as u32; - // Determine the current year - let mut year = 1970; - loop { - let leap_year = if (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0) { 1 } else { 0 }; - if remaining_days < DAYS_IN_YEAR[leap_year] as u64 { - break; - } - remaining_days -= DAYS_IN_YEAR[leap_year] as u64; - year += 1; + (year, month as u32 + 1, day as u32, hour, minute, second) } - - // Determine the current month and day - let leap_year = if (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0) { 1 } else { 0 }; - let mut month = 0; - while remaining_days >= DAYS_IN_MONTH[leap_year][month] as u64 { - remaining_days -= DAYS_IN_MONTH[leap_year][month] as u64; - month += 1; + pub fn get_current_timestamp() -> String { + let now = SystemTime::now(); + let since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards"); + let secs = since_epoch.as_secs(); + let (year, month, day, hour, minute, second) = Self::epoch_to_ymdhms(secs); + + format!( + "{:04}/{:02}/{:02} - {:02}:{:02}:{:02}", + year, month, day, hour, minute, second + ) } - let day = remaining_days + 1; // Days are 1-based - - // Calculate the current hour, minute, and second - let remaining_seconds = seconds % SECONDS_IN_DAY; - let hour = (remaining_seconds / SECONDS_IN_HOUR) as u32; - let minute = ((remaining_seconds % SECONDS_IN_HOUR) / SECONDS_IN_MINUTE) as u32; - let second = (remaining_seconds % SECONDS_IN_MINUTE) as u32; - - (year, month as u32 + 1, day as u32, hour, minute, second) -} - pub fn get_current_timestamp() -> String { - let now = SystemTime::now(); - let since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards"); - let secs = since_epoch.as_secs(); - let (year, month, day, hour, minute, second) = Self::epoch_to_ymdhms(secs); - - - format!("{:04}/{:02}/{:02} - {:02}:{:02}:{:02}", - year, month, day, hour, minute, second) - } } - - - pub struct Logger { - buffers: Vec>, + buffers: Vec>, + tx: Sender, + thread: JoinHandle<()>, } impl Logger { - pub fn new(writer: W) -> Logger { - let mut buffers = Vec::new(); - buffers.push(BufWriter::new(writer)); - Logger { - buffers: buffers, + pub fn new(writer: W) -> Logger { + let mut buffers = Vec::new(); + let (tx, rx) = channel::(); + buffers.push(BufWriter::new(writer)); + let thread = thread::spawn(move || { + while let Ok(msg) = rx.recv() { + println!("[{}] - {}", SimpleTime::get_current_timestamp(), msg); + } + }); + Logger { + buffers, + tx, + thread, + } } - } - - fn log(&mut self, content: String) { - for b in self.buffers.iter_mut() { - let _ = b.write(content.as_bytes()); - let _ = b.flush(); - }; - - } - pub fn msg(&mut self, content: String) { - self.log(format!("[{}] - {}\n",SimpleTime::get_current_timestamp() ,content)); - } + fn log(&mut self, content: String) { + for b in self.buffers.iter_mut() { + let _ = b.write(content.as_bytes()); + let _ = b.flush(); + } + } + pub fn msg(&self, content: String) { + self.tx.send(content); + } } #[cfg(test)] @@ -95,7 +111,6 @@ use std::io::stdout; #[test] fn test_basic() { - - let mut logs = Logger::new(stdout()); + let mut logs = Logger::new(stdout()); logs.msg("test".to_string()); -} \ No newline at end of file +} diff --git a/src/main.rs b/src/main.rs index a22f3a2..df04d46 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,15 +2,18 @@ mod cache; mod config; pub mod hteapot; mod logger; +mod utils; use std::fs; use std::io; use std::path::Path; +use std::sync::Arc; use std::sync::Mutex; use cache::Cache; use config::Config; use hteapot::{Hteapot, HttpRequest, HttpResponse, HttpStatus}; +use utils::get_mime_tipe; use logger::Logger; @@ -41,24 +44,6 @@ fn is_proxy(config: &Config, req: HttpRequest) -> Option<(String, HttpRequest)> None } -fn get_mime_tipe(path: &String) -> String { - let extension = Path::new(path.as_str()) - .extension() - .unwrap() - .to_str() - .unwrap(); - let mimetipe = match extension { - "js" => "text/javascript", - "json" => "application/json", - "css" => "text/css", - "html" => "text/html", - "ico" => "image/x-icon", - _ => "text/plain", - }; - - mimetipe.to_string() -} - fn serve_file(path: &String) -> Option> { let r = fs::read(path); if r.is_ok() { @@ -119,40 +104,31 @@ fn main() { }; let proxy_only = config.proxy_rules.get("/").is_some(); - let logger = Mutex::new(Logger::new(io::stdout())); + let logger = Arc::new(Logger::new(io::stdout())); let cache: Mutex = Mutex::new(Cache::new(config.cache_ttl as u64)); let server = Hteapot::new_threaded(config.host.as_str(), config.port, config.threads); - logger.lock().expect("this doesnt work :C").msg(format!( + logger.msg(format!( "Server started at http://{}:{}", config.host, config.port )); if config.cache { - logger - .lock() - .expect("this doesnt work :C") - .msg("Cache Enabled".to_string()); + logger.msg("Cache Enabled".to_string()); } if proxy_only { logger - .lock() - .expect("this doesnt work :C") .msg("WARNING: All requests are proxied to /. Local paths won’t be used.".to_string()); } + let loggerc = logger.clone(); server.listen(move |req| { // SERVER CORE // for each request - logger.lock().expect("this doesnt work :C").msg(format!( - "Request {} {}", - req.method.to_str(), - req.path - )); - + loggerc.msg(format!("Request {} {}", req.method.to_str(), req.path)); let is_proxy = is_proxy(&config, req.clone()); if proxy_only || is_proxy.is_some() { - let (host, proxy_req) = is_proxy.unwrap(); + let (host, mut proxy_req) = is_proxy.unwrap(); let res = proxy_req.brew(host.as_str()); if res.is_ok() { return res.unwrap(); @@ -172,10 +148,7 @@ fn main() { } if !Path::new(full_path.as_str()).exists() { - logger - .lock() - .expect("this doesnt work :C") - .msg(format!("path {} does not exist", req.path)); + loggerc.msg(format!("path {} does not exist", req.path)); return HttpResponse::new(HttpStatus::NotFound, "Not found", None); } let mimetype = get_mime_tipe(&full_path); diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..ed7158d --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,19 @@ +use std::path::Path; + +pub fn get_mime_tipe(path: &String) -> String { + let extension = Path::new(path.as_str()) + .extension() + .unwrap() + .to_str() + .unwrap(); + let mimetipe = match extension { + "js" => "text/javascript", + "json" => "application/json", + "css" => "text/css", + "html" => "text/html", + "ico" => "image/x-icon", + _ => "text/plain", + }; + + mimetipe.to_string() +} From 4b31f4a293f8f52f1466e02de17347594f247c15 Mon Sep 17 00:00:00 2001 From: Alberto Ruiz <17555470+Az107@users.noreply.github.com> Date: Fri, 7 Mar 2025 13:00:17 +0100 Subject: [PATCH 09/21] add buffer to print thread --- src/logger.rs | 35 +++++++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/src/logger.rs b/src/logger.rs index 724e9ca..182b51a 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -2,7 +2,7 @@ use std::collections::VecDeque; use std::io::{BufWriter, Write}; use std::sync::mpsc::{channel, Receiver, Sender}; use std::thread::{self, JoinHandle}; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; struct SimpleTime; impl SimpleTime { @@ -69,6 +69,12 @@ impl SimpleTime { year, month, day, hour, minute, second ) } + + pub fn get_seconds() -> u64 { + let now = SystemTime::now(); + let since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards"); + since_epoch.as_secs() + } } pub struct Logger { @@ -83,8 +89,29 @@ impl Logger { let (tx, rx) = channel::(); buffers.push(BufWriter::new(writer)); let thread = thread::spawn(move || { - while let Ok(msg) = rx.recv() { - println!("[{}] - {}", SimpleTime::get_current_timestamp(), msg); + let mut last_flush = Instant::now(); + let mut buff = Vec::new(); + loop { + let msg = rx.recv_timeout(Duration::from_millis(100)); + match msg { + Ok(msg) => buff.push(format!( + "[{}] - {}\n", + SimpleTime::get_current_timestamp(), + msg + )), + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { /* No hacer nada, sigue el bucle */ + } + Err(_) => break, + } + + // Imprimir cada segundo o si el buffer tiene muchos logs + if last_flush.elapsed() >= Duration::from_secs(1) || buff.len() >= 100 { + if !buff.is_empty() { + print!("{}", buff.join("")); + buff.clear(); + } + last_flush = Instant::now(); + } } }); Logger { @@ -102,7 +129,7 @@ impl Logger { } pub fn msg(&self, content: String) { - self.tx.send(content); + let _ = self.tx.send(content); } } From f3500ea2d832190530b05dff2829881aff315915 Mon Sep 17 00:00:00 2001 From: Alberto Ruiz <17555470+Az107@users.noreply.github.com> Date: Sat, 8 Mar 2025 16:33:33 +0100 Subject: [PATCH 10/21] better logger and brew improvements --- .gitignore | 1 + .idea/.gitignore | 5 -- src/hteapot/brew.rs | 81 +++++++++-------------- src/hteapot/mod.rs | 142 +++++++---------------------------------- src/hteapot/request.rs | 102 +++++++++++++++++++++++++++-- src/logger.rs | 75 ++++++++++------------ src/main.rs | 22 ++++--- 7 files changed, 197 insertions(+), 231 deletions(-) delete mode 100644 .idea/.gitignore diff --git a/.gitignore b/.gitignore index 819d3fe..f8338f4 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ src/.DS_Store .idea/vcs.xml .idea/modules.xml .idea/HTeaPot.iml +*.log diff --git a/.idea/.gitignore b/.idea/.gitignore deleted file mode 100644 index b58b603..0000000 --- a/.idea/.gitignore +++ /dev/null @@ -1,5 +0,0 @@ -# Default ignored files -/shelf/ -/workspace.xml -# Editor-based HTTP Client requests -/httpRequests/ diff --git a/src/hteapot/brew.rs b/src/hteapot/brew.rs index b1b4baa..6c3ddbe 100644 --- a/src/hteapot/brew.rs +++ b/src/hteapot/brew.rs @@ -1,29 +1,16 @@ // Written by Alberto Ruiz 2024-04-08 // This is the HTTP client module, it will handle the requests and responses -use std::collections::HashMap; use std::io::{Read, Write}; use std::net::{TcpStream, ToSocketAddrs}; use std::time::Duration; -use super::methods::HttpMethod; use super::request::HttpRequest; use super::response::HttpResponse; // use super::status::HttpStatus; // use std::net::{IpAddr, Ipv4Addr, SocketAddr}; impl HttpRequest { - pub fn new(method: HttpMethod, path: &str) -> HttpRequest { - let path = path.to_string(); - HttpRequest { - method, - path, - args: HashMap::new(), - headers: HashMap::new(), - body: String::new(), - } - } - pub fn arg(&mut self, key: &str, value: &str) -> &mut HttpRequest { self.args.insert(key.to_string(), value.to_string()); return self; @@ -34,12 +21,7 @@ impl HttpRequest { return self; } - pub fn body(&mut self, body: String) -> &mut HttpRequest { - self.body = body; - return self; - } - - pub fn to_string(&self) -> String { + pub fn to_string(&mut self) -> String { let path = if self.args.is_empty() { self.path.clone() } else { @@ -62,14 +44,14 @@ impl HttpRequest { for (k, v) in self.headers.iter() { result.push_str(format!("{}: {}\r\n", k, v).as_str()); } - if !self.body.is_empty() { - result.push_str(self.body.as_str()); + if self.has_body() { + result.push_str(self.text().unwrap().as_str()); } result.push_str("\r\n\r\n"); result } - pub fn brew(&self, addr: &str) -> Result { + pub fn brew(&mut self, addr: &str) -> Result { let mut addr = addr.to_string(); if addr.starts_with("http://") { addr = addr.strip_prefix("http://").unwrap().to_string(); @@ -101,7 +83,7 @@ impl HttpRequest { } } -pub fn brew(direction: &str, request: HttpRequest) -> Result { +pub fn brew(direction: &str, request: &mut HttpRequest) -> Result { return request.brew(direction); } @@ -111,16 +93,16 @@ pub fn brew(direction: &str, request: HttpRequest) -> Result Result { - let mut lines = request.lines(); - let first_line = lines.next(); - if first_line.is_none() { - println!("{}", request); - return Err("Invalid request".to_string()); - } - let first_line = first_line.unwrap(); - let mut words = first_line.split_whitespace(); - let method = words.next(); - if method.is_none() { - return Err("Invalid method".to_string()); - } - let method = method.unwrap(); - let path = words.next(); - if path.is_none() { - return Err("Invalid path".to_string()); - } - let mut path = path.unwrap().to_string(); - let mut headers: HashMap = HashMap::new(); - loop { - let line = lines.next(); - if line.is_none() { - break; - } - let line = line.unwrap(); - if line.is_empty() { - break; - } - let mut parts = line.split(": "); - let key = parts.next().unwrap().to_string(); - let value = parts.next().unwrap(); - headers.insert(key, value.to_string()); - } - let body = lines - .collect::>() - .join("") - .trim() - .trim_end_matches(char::from(0)) - .to_string(); - let mut args: HashMap = HashMap::new(); - //remove http or https from the path - if path.starts_with("http://") { - path = path.trim_start_matches("http://").to_string(); - } else if path.starts_with("https://") { - path = path.trim_start_matches("https://").to_string(); - } - //remove the host name if present - if !path.starts_with("/") { - //remove all the characters until the first / - let mut parts = path.split("/"); - parts.next(); - path = parts.collect::>().join("/"); - //add / to beggining - path = format!("/{}", path); - } - - if path.contains('?') { - let _path = path.clone(); - let mut parts = _path.split('?'); - path = parts.next().unwrap().to_string(); - let query = parts.next().unwrap(); - let query_parts: Vec<&str> = query.split('&').collect(); - for part in query_parts { - let mut parts = part.split('='); - let key = parts.next().unwrap().to_string(); - let value = parts.next().unwrap_or("").to_string().replace("%22", "\""); - args.insert(key, value); - } - } - - Ok(HttpRequest { - method: HttpMethod::from_str(method), - path: path.to_string(), - args, - headers, - body: body.trim_end().to_string(), - }) - } - // Handle the client when a request is received fn handle_client( socket_data: &mut SocketData, action: &Arc HttpResponse + Send + Sync + 'static>, ) -> Option<()> { let status = socket_data.status.as_mut()?; - let mut request = None; - if status.reading { + if !status.request.done { loop { let mut buffer = [0; BUFFER_SIZE]; match socket_data.stream.read(&mut buffer) { @@ -293,37 +212,24 @@ impl Hteapot { } }, Ok(m) => { - request = status.request.append(buffer.to_vec()); + status.request.append(buffer.to_vec()); //status.data_readed.append(&mut buffer.to_vec()); if m == 0 { return None; - } else if m < BUFFER_SIZE || request.is_some() { + } else if m < BUFFER_SIZE || status.request.done { break; } } }; } - status.reading = false; } - //let request = HttpRequestBuilder::new().append(status.data_readed.clone()); - // println!("{:?}", request); - // let request_string = String::from_utf8(status.data_readed.clone()); - // let request_string = if request_string.is_err() { - // //This proablly means the request is a https so for the moment GTFO - // return None; - // } else { - // request_string.unwrap() - // }; - // let request = Self::request_parser(request_string); - // if request.is_err() { - // eprintln!("Request parse error {:?}", request.err().unwrap()); - // return None; - // } + let request = status.request.get(); if request.is_none() { - print!("Error parsing request"); - return None; + // println!("[hteapot] Request not ready"); + return Some(()); } - let request = request.unwrap(); + let mut request = request.unwrap(); + request.set_stream(socket_data.stream.try_clone().expect("Cagamos")); let keep_alive = match request.headers.get("Connection") { Some(ch) => ch == "keep-alive", None => false, @@ -369,6 +275,7 @@ impl Hteapot { status.data_readed = vec![]; status.data_write = vec![]; status.index_writed = 0; + status.request = HttpRequestBuilder::new(); return Some(()); } else { let _ = socket_data.stream.shutdown(Shutdown::Both); @@ -378,18 +285,17 @@ impl Hteapot { } #[cfg(test)] -#[test] -fn test_http_parser() { - let request = - "GET / HTTP/1.1\r\nHost: localhost:8080\r\nUser-Agent: curl/7.68.0\r\nAccept: */*\r\n\r\n"; - let parsed_request = Hteapot::request_parser(request.to_string()).unwrap(); - assert_eq!(parsed_request.method, HttpMethod::GET); - assert_eq!(parsed_request.path, "/"); - assert_eq!(parsed_request.args.len(), 0); - assert_eq!(parsed_request.headers.len(), 3); - assert_eq!(parsed_request.body, ""); -} - +// #[test] +// fn test_http_parser() { +// let request = +// "GET / HTTP/1.1\r\nHost: localhost:8080\r\nUser-Agent: curl/7.68.0\r\nAccept: */*\r\n\r\n"; +// let parsed_request = Hteapot::request_parser(request.to_string()).unwrap(); +// assert_eq!(parsed_request.method, HttpMethod::GET); +// assert_eq!(parsed_request.path, "/"); +// assert_eq!(parsed_request.args.len(), 0); +// assert_eq!(parsed_request.headers.len(), 3); +// assert_eq!(parsed_request.body, ""); +// } #[test] fn test_http_response_maker() { let response = HttpResponse::new(HttpStatus::IAmATeapot, "Hello, World!", None); diff --git a/src/hteapot/request.rs b/src/hteapot/request.rs index 926cd11..9211bf5 100644 --- a/src/hteapot/request.rs +++ b/src/hteapot/request.rs @@ -1,31 +1,108 @@ use super::HttpMethod; -use std::collections::HashMap; +use std::{ + collections::HashMap, + io::{Read, Write}, + net::TcpStream, + str, +}; -#[derive(Clone, Debug)] pub struct HttpRequest { pub method: HttpMethod, pub path: String, pub args: HashMap, pub headers: HashMap, - pub body: String, + pub body: Vec, + stream: Option, } impl HttpRequest { + pub fn new(method: HttpMethod, path: &str) -> Self { + return HttpRequest { + method, + path: path.to_string(), + args: HashMap::new(), + headers: HashMap::new(), + body: Vec::new(), + stream: None, + }; + } + pub fn default() -> Self { HttpRequest { method: HttpMethod::GET, path: String::new(), args: HashMap::new(), headers: HashMap::new(), - body: String::new(), + body: Vec::new(), + stream: None, + } + } + + pub fn clone(&self) -> Self { + return HttpRequest { + method: self.method.clone(), + path: self.path.clone(), + args: self.args.clone(), + headers: self.headers.clone(), + body: self.body.clone(), + stream: None, + }; + } + + pub fn has_body(&self) -> bool { + let length = self.headers.get("Content-Length"); + if length.is_none() { + return false; + } + if self.stream.is_none() && self.body.is_empty() { + return false; + } + + return true; + } + + pub fn body(&mut self) -> Option> { + if self.has_body() { + let content_length = self.headers.get("Content-Length")?; + let content_length: usize = content_length.parse().unwrap(); + if content_length > self.body.len() { + println!("{}/{}", self.body.len(), content_length); + let _ = self.stream.as_ref().unwrap().flush(); + while content_length > self.body.len() { + let r = self.stream.as_ref().unwrap().read(&mut self.body); + if r.is_err() { + println!("Error: {:?}", r.err().unwrap()); + } else { + if r.unwrap() == 0 { + break; + } + } + } + } + Some(self.body.clone()) + } else { + None } } + + pub fn set_stream(&mut self, stream: TcpStream) { + self.stream = Some(stream); + } + + pub fn text(&mut self) -> Option { + self.body()?; + let body = match str::from_utf8(self.body.as_slice()) { + Ok(v) => Some(v.to_string()), + Err(_e) => None, + }; + return body; + } } pub struct HttpRequestBuilder { request: HttpRequest, buffer: Vec, - done: bool, + pub done: bool, } impl HttpRequestBuilder { @@ -36,15 +113,25 @@ impl HttpRequestBuilder { path: String::new(), args: HashMap::new(), headers: HashMap::new(), - body: String::new(), + body: Vec::new(), + stream: None, }, buffer: Vec::new(), done: false, }; } + pub fn get(&self) -> Option { + if self.done { + return Some(self.request.clone()); + } else { + None + } + } + pub fn append(&mut self, buffer: Vec) -> Option { self.buffer.extend(buffer); + self.buffer.retain(|&b| b != 0); while let Some(pos) = self.buffer.windows(2).position(|w| w == b"\r\n") { let line = self.buffer.drain(..pos).collect::>(); // Extraer línea @@ -86,8 +173,9 @@ impl HttpRequestBuilder { } } else { // Fin de las cabeceras + self.request.body = self.buffer.clone(); self.done = true; - return Some(std::mem::replace(&mut self.request, HttpRequest::default())); + return Some(self.request.clone()); } } diff --git a/src/logger.rs b/src/logger.rs index 182b51a..d6c4f8f 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -1,6 +1,5 @@ -use std::collections::VecDeque; -use std::io::{BufWriter, Write}; -use std::sync::mpsc::{channel, Receiver, Sender}; +use std::io::Write; +use std::sync::mpsc::{channel, Sender}; use std::thread::{self, JoinHandle}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; @@ -69,45 +68,42 @@ impl SimpleTime { year, month, day, hour, minute, second ) } - - pub fn get_seconds() -> u64 { - let now = SystemTime::now(); - let since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards"); - since_epoch.as_secs() - } } -pub struct Logger { - buffers: Vec>, +pub struct Logger { tx: Sender, - thread: JoinHandle<()>, + _thread: JoinHandle<()>, } -impl Logger { - pub fn new(writer: W) -> Logger { - let mut buffers = Vec::new(); +impl Logger { + pub fn new(mut writer: W) -> Logger { let (tx, rx) = channel::(); - buffers.push(BufWriter::new(writer)); let thread = thread::spawn(move || { let mut last_flush = Instant::now(); let mut buff = Vec::new(); + let mut max_size = 100; + let timeout = Duration::from_secs(1); loop { - let msg = rx.recv_timeout(Duration::from_millis(100)); + let msg = rx.recv_timeout(timeout); match msg { - Ok(msg) => buff.push(format!( - "[{}] - {}\n", - SimpleTime::get_current_timestamp(), - msg - )), - Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { /* No hacer nada, sigue el bucle */ - } + Ok(msg) => buff.push(msg), + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {} Err(_) => break, } - // Imprimir cada segundo o si el buffer tiene muchos logs - if last_flush.elapsed() >= Duration::from_secs(1) || buff.len() >= 100 { + if last_flush.elapsed() >= timeout || buff.len() >= max_size { if !buff.is_empty() { - print!("{}", buff.join("")); + if buff.len() >= max_size { + max_size = (max_size * 10).min(1_000_000); + } else { + max_size = (max_size / 10).max(100); + } + let wr = writer.write_all(buff.join("").as_bytes()); + if wr.is_err() { + println!("{:?}", wr); + } + let _ = writer.flush(); + buff.clear(); } last_flush = Instant::now(); @@ -115,29 +111,22 @@ impl Logger { } }); Logger { - buffers, tx, - thread, - } - } - - fn log(&mut self, content: String) { - for b in self.buffers.iter_mut() { - let _ = b.write(content.as_bytes()); - let _ = b.flush(); + _thread: thread, } } pub fn msg(&self, content: String) { + let content = format!("[{}] - {}\n", SimpleTime::get_current_timestamp(), content); let _ = self.tx.send(content); } } -#[cfg(test)] -use std::io::stdout; +// #[cfg(test)] +// use std::io::stdout; -#[test] -fn test_basic() { - let mut logs = Logger::new(stdout()); - logs.msg("test".to_string()); -} +// #[test] +// fn test_basic() { +// let mut logs = Logger::new(stdout()); +// logs.msg("test".to_string()); +// } diff --git a/src/main.rs b/src/main.rs index df04d46..ba7b657 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,10 +4,11 @@ pub mod hteapot; mod logger; mod utils; -use std::fs; -use std::io; +use std::io::Write; +use std::{fs, io}; + use std::path::Path; -use std::sync::Arc; + use std::sync::Mutex; use cache::Cache; @@ -104,7 +105,14 @@ fn main() { }; let proxy_only = config.proxy_rules.get("/").is_some(); - let logger = Arc::new(Logger::new(io::stdout())); + let file = fs::File::create("./hteapot.log"); + if file.is_err() { + println!("file {:?}", file); + } + let file = file.unwrap(); + let logger = Logger::new(file); + + //let logger = Logger::new(io::stdout()); let cache: Mutex = Mutex::new(Cache::new(config.cache_ttl as u64)); let server = Hteapot::new_threaded(config.host.as_str(), config.port, config.threads); logger.msg(format!( @@ -118,13 +126,11 @@ fn main() { logger .msg("WARNING: All requests are proxied to /. Local paths won’t be used.".to_string()); } - - let loggerc = logger.clone(); server.listen(move |req| { // SERVER CORE // for each request - loggerc.msg(format!("Request {} {}", req.method.to_str(), req.path)); + logger.msg(format!("Request {} {}", req.method.to_str(), req.path)); let is_proxy = is_proxy(&config, req.clone()); if proxy_only || is_proxy.is_some() { @@ -148,7 +154,7 @@ fn main() { } if !Path::new(full_path.as_str()).exists() { - loggerc.msg(format!("path {} does not exist", req.path)); + logger.msg(format!("path {} does not exist", req.path)); return HttpResponse::new(HttpStatus::NotFound, "Not found", None); } let mimetype = get_mime_tipe(&full_path); From 2c2608d5e71114746611cb08c01316be8f40e277 Mon Sep 17 00:00:00 2001 From: Alberto Ruiz <17555470+Az107@users.noreply.github.com> Date: Sat, 8 Mar 2025 20:15:07 +0100 Subject: [PATCH 11/21] add log configuration --- src/config.rs | 3 ++ src/main.rs | 98 ++++++++++++++++++++++++--------------------------- src/utils.rs | 3 ++ 3 files changed, 53 insertions(+), 51 deletions(-) diff --git a/src/config.rs b/src/config.rs index 37d934f..fb689a5 100644 --- a/src/config.rs +++ b/src/config.rs @@ -104,6 +104,7 @@ pub struct Config { pub cache: bool, pub cache_ttl: u16, pub threads: u16, + pub log_file: Option, pub index: String, // Index file to serve by default //pub error: String, // Error file to serve when a file is not found pub proxy_rules: HashMap, @@ -127,6 +128,7 @@ impl Config { host: "localhost".to_string(), root: "./".to_string(), index: "index.html".to_string(), + log_file: None, //error: "error.html".to_string(), threads: 1, cache: false, @@ -166,6 +168,7 @@ impl Config { cache: map.get2("cache").unwrap_or(false), cache_ttl: map.get2("cache_ttl").unwrap_or(3600), index: map.get2("index").unwrap_or("index.html".to_string()), + log_file: map.get2("log_file"), //error: map.get2("error").unwrap_or("error.html".to_string()), proxy_rules, } diff --git a/src/main.rs b/src/main.rs index ba7b657..b1f5f80 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,6 @@ pub mod hteapot; mod logger; mod utils; -use std::io::Write; use std::{fs, io}; use std::path::Path; @@ -56,61 +55,58 @@ fn serve_file(path: &String) -> Option> { fn main() { let args = std::env::args().collect::>(); - let mut serving_path = None; - if args.len() >= 2 { - match args[1].as_str() { - "--help" | "-h" => { - println!("Hteapot {}", VERSION); - println!("usage: {} ", args[0]); - return; - } - "--version" | "-v" => { - println!("Hteapot {}", VERSION); - return; - } - "--serve" | "-s" => { - serving_path = Some(args.get(2).unwrap().clone()); - } - _ => (), - }; + if args.len() == 1 { + println!("Hteapot {}", VERSION); + println!("usage: {} ", args[0]); + return; } - - let config = if args.len() == 2 { - config::Config::load_config(&args[1]) - } else if serving_path.is_some() { - let serving_path_str = serving_path.unwrap(); - let serving_path_str = serving_path_str.as_str(); - let serving_path = Path::new(serving_path_str); - let mut c = config::Config::new_default(); - c.host = "0.0.0.0".to_string(); - if serving_path.is_dir() { - c.root = serving_path.to_str().unwrap_or_default().to_string(); - } else { - c.index = serving_path - .file_name() - .unwrap() - .to_str() - .unwrap_or_default() - .to_string(); - c.root = serving_path - .parent() - .unwrap_or(Path::new("./")) - .to_str() - .unwrap_or_default() - .to_string(); + let config = match args[1].as_str() { + "--help" | "-h" => { + println!("Hteapot {}", VERSION); + println!("usage: {} ", args[0]); + return; } - c - } else { - config::Config::new_default() + "--version" | "-v" => { + println!("Hteapot {}", VERSION); + return; + } + "--serve" | "-s" => { + let mut c = config::Config::new_default(); + let serving_path = Some(args.get(2).unwrap().clone()); + let serving_path_str = serving_path.unwrap(); + let serving_path_str = serving_path_str.as_str(); + let serving_path = Path::new(serving_path_str); + if serving_path.is_dir() { + c.root = serving_path.to_str().unwrap_or_default().to_string(); + } else { + c.index = serving_path + .file_name() + .unwrap() + .to_str() + .unwrap_or_default() + .to_string(); + c.root = serving_path + .parent() + .unwrap_or(Path::new("./")) + .to_str() + .unwrap_or_default() + .to_string(); + } + c.host = "0.0.0.0".to_string(); + c + } + _ => config::Config::load_config(&args[1]), }; let proxy_only = config.proxy_rules.get("/").is_some(); - let file = fs::File::create("./hteapot.log"); - if file.is_err() { - println!("file {:?}", file); - } - let file = file.unwrap(); - let logger = Logger::new(file); + let logger = match config.log_file.clone() { + Some(file_name) => { + let file = fs::File::create(file_name.clone()); + let file = file.unwrap(); + Logger::new(file) + } + None => Logger::new(io::stdout()), + }; //let logger = Logger::new(io::stdout()); let cache: Mutex = Mutex::new(Cache::new(config.cache_ttl as u64)); diff --git a/src/utils.rs b/src/utils.rs index ed7158d..c20656b 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -17,3 +17,6 @@ pub fn get_mime_tipe(path: &String) -> String { mimetipe.to_string() } + +//TODO: make a parser args to config +//pub fn args_to_dict(list: Vec) -> HashMap {} From bf94fb105797329ffa97d7422d419b0e507ce926 Mon Sep 17 00:00:00 2001 From: Alberto Ruiz <17555470+Az107@users.noreply.github.com> Date: Sun, 9 Mar 2025 11:51:27 +0100 Subject: [PATCH 12/21] update version --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8d0b91c..2c7d917 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,4 +4,4 @@ version = 3 [[package]] name = "hteapot" -version = "0.4.2" +version = "0.5.0" diff --git a/Cargo.toml b/Cargo.toml index 07b4be9..0527b97 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hteapot" -version = "0.4.2" +version = "0.5.0" # release candidate exclude = ["hteapot.toml", "public/", "readme.md"] license = "MIT" keywords = ["HTTP", "HTTP-SERVER"] From fd837d71086727168c86e5fdb382b8eae2450854 Mon Sep 17 00:00:00 2001 From: Alberto Ruiz <17555470+Az107@users.noreply.github.com> Date: Sun, 9 Mar 2025 18:48:39 +0100 Subject: [PATCH 13/21] improve body reader --- src/hteapot/request.rs | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/src/hteapot/request.rs b/src/hteapot/request.rs index 9211bf5..efbb30b 100644 --- a/src/hteapot/request.rs +++ b/src/hteapot/request.rs @@ -63,22 +63,31 @@ impl HttpRequest { pub fn body(&mut self) -> Option> { if self.has_body() { + let mut stream = self.stream.as_ref().unwrap(); let content_length = self.headers.get("Content-Length")?; let content_length: usize = content_length.parse().unwrap(); if content_length > self.body.len() { - println!("{}/{}", self.body.len(), content_length); - let _ = self.stream.as_ref().unwrap().flush(); - while content_length > self.body.len() { - let r = self.stream.as_ref().unwrap().read(&mut self.body); - if r.is_err() { - println!("Error: {:?}", r.err().unwrap()); - } else { - if r.unwrap() == 0 { + let _ = stream.flush(); + let mut total_read = 0; + self.body.resize(content_length, 0); + while total_read < content_length { + match stream.read(&mut self.body[total_read..]) { + Ok(0) => { + break; + } + Ok(n) => { + total_read += n; + } + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => { break; } } } } + Some(self.body.clone()) } else { None From 47c3690ed3477858a2f32f4464b1133ee0c1030c Mon Sep 17 00:00:00 2001 From: Alberto Ruiz <17555470+Az107@users.noreply.github.com> Date: Thu, 13 Mar 2025 11:04:29 +0100 Subject: [PATCH 14/21] Body is always parsed --- src/hteapot/brew.rs | 12 ++-- src/hteapot/mod.rs | 7 +-- src/hteapot/request.rs | 119 ++++++++++++++++++---------------------- src/hteapot/response.rs | 17 +++++- src/main.rs | 2 +- 5 files changed, 77 insertions(+), 80 deletions(-) diff --git a/src/hteapot/brew.rs b/src/hteapot/brew.rs index 6c3ddbe..ffff566 100644 --- a/src/hteapot/brew.rs +++ b/src/hteapot/brew.rs @@ -21,7 +21,7 @@ impl HttpRequest { return self; } - pub fn to_string(&mut self) -> String { + pub fn to_string(&self) -> String { let path = if self.args.is_empty() { self.path.clone() } else { @@ -44,14 +44,14 @@ impl HttpRequest { for (k, v) in self.headers.iter() { result.push_str(format!("{}: {}\r\n", k, v).as_str()); } - if self.has_body() { - result.push_str(self.text().unwrap().as_str()); - } + + result.push_str(self.text().unwrap_or(String::new()).as_str()); + result.push_str("\r\n\r\n"); result } - pub fn brew(&mut self, addr: &str) -> Result { + pub fn brew(&self, addr: &str) -> Result { let mut addr = addr.to_string(); if addr.starts_with("http://") { addr = addr.strip_prefix("http://").unwrap().to_string(); @@ -97,7 +97,7 @@ mod tests { use super::*; #[test] fn test_http_request_new() { - let mut request = HttpRequest::new(HttpMethod::GET, "/example"); + let request = HttpRequest::new(HttpMethod::GET, "/example"); assert_eq!(request.method, HttpMethod::GET); assert_eq!(request.path, "/example"); assert!(request.args.is_empty()); diff --git a/src/hteapot/mod.rs b/src/hteapot/mod.rs index d6f6fcc..eb775ee 100644 --- a/src/hteapot/mod.rs +++ b/src/hteapot/mod.rs @@ -225,11 +225,10 @@ impl Hteapot { } let request = status.request.get(); if request.is_none() { - // println!("[hteapot] Request not ready"); return Some(()); } - let mut request = request.unwrap(); - request.set_stream(socket_data.stream.try_clone().expect("Cagamos")); + let request = request.unwrap(); + //request.set_stream(socket_data.stream.try_clone().expect("")); let keep_alive = match request.headers.get("Connection") { Some(ch) => ch == "keep-alive", None => false, @@ -298,7 +297,7 @@ impl Hteapot { // } #[test] fn test_http_response_maker() { - let response = HttpResponse::new(HttpStatus::IAmATeapot, "Hello, World!", None); + let mut response = HttpResponse::new(HttpStatus::IAmATeapot, "Hello, World!", None); let response = String::from_utf8(response.to_bytes()).unwrap(); let expected_response = format!("HTTP/1.1 418 I'm a teapot\r\nContent-Length: 13\r\nServer: HTeaPot/{}\r\n\r\nHello, World!\r\n",VERSION); let expected_response_list = expected_response.split("\r\n"); diff --git a/src/hteapot/request.rs b/src/hteapot/request.rs index efbb30b..f00262f 100644 --- a/src/hteapot/request.rs +++ b/src/hteapot/request.rs @@ -1,10 +1,5 @@ use super::HttpMethod; -use std::{ - collections::HashMap, - io::{Read, Write}, - net::TcpStream, - str, -}; +use std::{collections::HashMap, net::TcpStream, str}; pub struct HttpRequest { pub method: HttpMethod, @@ -49,57 +44,47 @@ impl HttpRequest { }; } - pub fn has_body(&self) -> bool { - let length = self.headers.get("Content-Length"); - if length.is_none() { - return false; - } - if self.stream.is_none() && self.body.is_empty() { - return false; - } - - return true; - } - - pub fn body(&mut self) -> Option> { - if self.has_body() { - let mut stream = self.stream.as_ref().unwrap(); - let content_length = self.headers.get("Content-Length")?; - let content_length: usize = content_length.parse().unwrap(); - if content_length > self.body.len() { - let _ = stream.flush(); - let mut total_read = 0; - self.body.resize(content_length, 0); - while total_read < content_length { - match stream.read(&mut self.body[total_read..]) { - Ok(0) => { - break; - } - Ok(n) => { - total_read += n; - } - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - continue; - } - Err(e) => { - break; - } - } - } - } - - Some(self.body.clone()) - } else { - None - } - } + // pub fn body(&mut self) -> Option> { + // if self.has_body() { + // let mut stream = self.stream.as_ref().unwrap(); + // let content_length = self.headers.get("Content-Length")?; + // let content_length: usize = content_length.parse().unwrap(); + // if content_length > self.body.len() { + // let _ = stream.flush(); + // let mut total_read = 0; + // self.body.resize(content_length, 0); + // while total_read < content_length { + // match stream.read(&mut self.body[total_read..]) { + // Ok(0) => { + // break; + // } + // Ok(n) => { + // total_read += n; + // } + // Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + // continue; + // } + // Err(_e) => { + // break; + // } + // } + // } + // } + + // Some(self.body.clone()) + // } else { + // None + // } + // } pub fn set_stream(&mut self, stream: TcpStream) { self.stream = Some(stream); } - pub fn text(&mut self) -> Option { - self.body()?; + pub fn text(&self) -> Option { + if self.body.len() == 0 { + return None; + } let body = match str::from_utf8(self.body.as_slice()) { Ok(v) => Some(v.to_string()), Err(_e) => None, @@ -111,6 +96,7 @@ impl HttpRequest { pub struct HttpRequestBuilder { request: HttpRequest, buffer: Vec, + body_size: usize, pub done: bool, } @@ -125,6 +111,7 @@ impl HttpRequestBuilder { body: Vec::new(), stream: None, }, + body_size: 0, buffer: Vec::new(), done: false, }; @@ -141,25 +128,22 @@ impl HttpRequestBuilder { pub fn append(&mut self, buffer: Vec) -> Option { self.buffer.extend(buffer); self.buffer.retain(|&b| b != 0); - while let Some(pos) = self.buffer.windows(2).position(|w| w == b"\r\n") { - let line = self.buffer.drain(..pos).collect::>(); // Extraer línea - self.buffer.drain(..2); // Eliminar `\r\n` + let line = self.buffer.drain(..pos).collect::>(); + self.buffer.drain(..2); let line_str = String::from_utf8_lossy(&line); if self.request.path.is_empty() { - // Primera línea: Método + Path + Versión HTTP let parts: Vec<&str> = line_str.split_whitespace().collect(); if parts.len() < 2 { - return None; // Request malformada + return None; } - self.request.method = HttpMethod::from_str(parts[0]); // Convierte a enum + self.request.method = HttpMethod::from_str(parts[0]); let path_parts: Vec<&str> = parts[1].split('?').collect(); self.request.path = path_parts[0].to_string(); - // Si hay argumentos en la URL, los parseamos if path_parts.len() > 1 { self.request.args = path_parts[1] .split('&') @@ -174,20 +158,21 @@ impl HttpRequestBuilder { .collect(); } } else if !line_str.is_empty() { - // Cabeceras HTTP if let Some((key, value)) = line_str.split_once(": ") { + if key.to_lowercase() == "content-length" { + self.body_size = value.parse().unwrap_or(0); + } self.request .headers .insert(key.to_string(), value.to_string()); } - } else { - // Fin de las cabeceras - self.request.body = self.buffer.clone(); - self.done = true; - return Some(self.request.clone()); } } - - None // Aún no tenemos toda la request + self.request.body.append(&mut self.buffer.clone()); + if self.request.body.len() == self.body_size { + self.done = true; + return Some(self.request.clone()); + } + None } } diff --git a/src/hteapot/response.rs b/src/hteapot/response.rs index 8dc8cd1..c531bab 100644 --- a/src/hteapot/response.rs +++ b/src/hteapot/response.rs @@ -1,6 +1,9 @@ +use crate::HttpRequest; + use super::HttpStatus; use super::VERSION; use std::collections::HashMap; +use std::net::TcpStream; pub struct HttpResponse { pub status: HttpStatus, @@ -46,7 +49,7 @@ impl HttpResponse { self.is_raw } - pub fn to_bytes(&self) -> Vec { + pub fn to_bytes(&mut self) -> Vec { if self.is_raw() { return self.raw.clone().unwrap(); } @@ -62,9 +65,19 @@ impl HttpResponse { ); let mut response = Vec::new(); response.extend_from_slice(response_header.as_bytes()); - response.append(&mut self.content.clone()); + response.append(&mut self.content); response.push(0x0D); // Carriage Return response.push(0x0A); // Line Feed response } } + +pub struct StreamedResponse { + stream: TcpStream, +} + +impl StreamedResponse { + pub fn new(req: HttpRequest) -> Result { + Err("Request does not have a stream") + } +} diff --git a/src/main.rs b/src/main.rs index b1f5f80..e9a6e23 100644 --- a/src/main.rs +++ b/src/main.rs @@ -130,7 +130,7 @@ fn main() { let is_proxy = is_proxy(&config, req.clone()); if proxy_only || is_proxy.is_some() { - let (host, mut proxy_req) = is_proxy.unwrap(); + let (host, proxy_req) = is_proxy.unwrap(); let res = proxy_req.brew(host.as_str()); if res.is_ok() { return res.unwrap(); From 78f23066047cf29654f295e07cc9597bf9aa2751 Mon Sep 17 00:00:00 2001 From: Alberto Ruiz <17555470+Az107@users.noreply.github.com> Date: Wed, 19 Mar 2025 08:28:02 +0100 Subject: [PATCH 15/21] Add abstractions HttpResponse --- src/hteapot/mod.rs | 19 +++++++------- src/hteapot/response.rs | 55 ++++++++++++++++++++++++++++++++++++----- 2 files changed, 59 insertions(+), 15 deletions(-) diff --git a/src/hteapot/mod.rs b/src/hteapot/mod.rs index eb775ee..c7b6658 100644 --- a/src/hteapot/mod.rs +++ b/src/hteapot/mod.rs @@ -8,6 +8,8 @@ mod request; mod response; mod status; +use self::response::{EmptyHttpResponse, HttpResponseConsumer}; + pub use self::methods::HttpMethod; pub use self::request::HttpRequest; use self::request::HttpRequestBuilder; @@ -47,6 +49,7 @@ struct SocketStatus { reading: bool, data_readed: Vec, data_write: Vec, + response: Box, request: HttpRequestBuilder, index_writed: usize, } @@ -136,6 +139,7 @@ impl Hteapot { reading: true, data_readed: vec![], data_write: vec![], + response: Box::new(EmptyHttpResponse {}), request: HttpRequestBuilder::new(), index_writed: 0, }; @@ -228,13 +232,12 @@ impl Hteapot { return Some(()); } let request = request.unwrap(); - //request.set_stream(socket_data.stream.try_clone().expect("")); let keep_alive = match request.headers.get("Connection") { Some(ch) => ch == "keep-alive", None => false, }; if status.data_write.len() == 0 { - let mut response = action(request); + let mut response = action(request); //Call closure if !response.headers.contains_key("Conection") && keep_alive { response .headers @@ -244,18 +247,16 @@ impl Hteapot { .headers .insert("Connection".to_string(), "close".to_string()); } - status.data_write = response.to_bytes(); + // status.data_write = response.to_bytes(); + status.response = Box::new(response); } - for n in status - .data_write - .chunks(BUFFER_SIZE) - .skip(status.index_writed) - { + while let Ok(n) = status.response.peek() { match socket_data.stream.write(&n) { Ok(_size) => { - status.index_writed += 1; + let _ = status.response.next(); } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + println!("would block"); return Some(()); } Err(e) => { diff --git a/src/hteapot/response.rs b/src/hteapot/response.rs index c531bab..55fbb8e 100644 --- a/src/hteapot/response.rs +++ b/src/hteapot/response.rs @@ -1,9 +1,8 @@ -use crate::HttpRequest; - use super::HttpStatus; use super::VERSION; use std::collections::HashMap; -use std::net::TcpStream; + +const BUFFER_SIZE: usize = 1024; pub struct HttpResponse { pub status: HttpStatus, @@ -11,6 +10,18 @@ pub struct HttpResponse { pub content: Vec, raw: Option>, is_raw: bool, + index: usize, +} + +#[derive(Debug)] +pub enum IterError { + WouldBlock, + Finished, +} + +pub trait HttpResponseConsumer { + fn next(&mut self) -> Result, IterError>; + fn peek(&mut self) -> Result, IterError>; //TODO: come up with better solution } impl HttpResponse { @@ -32,6 +43,7 @@ impl HttpResponse { content: content.to_owned(), raw: None, is_raw: false, + index: 0, } } @@ -42,6 +54,7 @@ impl HttpResponse { content: vec![], raw: Some(raw), is_raw: true, + index: 0, } } @@ -72,12 +85,42 @@ impl HttpResponse { } } -pub struct StreamedResponse { - stream: TcpStream, +impl HttpResponseConsumer for HttpResponse { + fn next(&mut self) -> Result, IterError> { + let byte_chunk = self.peek()?; + self.index += 1; + return Ok(byte_chunk); + } + + fn peek(&mut self) -> Result, IterError> { + if self.raw.is_none() { + self.raw = Some(self.to_bytes()); + } + let raw = self.raw.as_ref().unwrap(); + let mut raw = raw.chunks(BUFFER_SIZE).skip(self.index); + // println!("{}/{}",self.) + let byte_chunk = raw.next().ok_or(IterError::Finished)?.to_vec(); + return Ok(byte_chunk); + } +} + +pub struct EmptyHttpResponse {} + +impl EmptyHttpResponse {} +impl HttpResponseConsumer for EmptyHttpResponse { + fn next(&mut self) -> Result, IterError> { + Err(IterError::Finished) + } + + fn peek(&mut self) -> Result, IterError> { + Err(IterError::Finished) + } } +pub struct StreamedResponse {} + impl StreamedResponse { - pub fn new(req: HttpRequest) -> Result { + pub fn new() -> Result { Err("Request does not have a stream") } } From bf79aa9522ad83cf7b22d3733c936ef0a361cbd3 Mon Sep 17 00:00:00 2001 From: Alberto Ruiz <17555470+Az107@users.noreply.github.com> Date: Thu, 20 Mar 2025 11:02:13 +0100 Subject: [PATCH 16/21] Add StreamedResponse and refactor Response abstractions --- src/hteapot/mod.rs | 52 ++++++++++----- src/hteapot/response.rs | 141 +++++++++++++++++++++++++++++++++------- src/main.rs | 35 ++++++++-- 3 files changed, 180 insertions(+), 48 deletions(-) diff --git a/src/hteapot/mod.rs b/src/hteapot/mod.rs index c7b6658..8b70df7 100644 --- a/src/hteapot/mod.rs +++ b/src/hteapot/mod.rs @@ -8,12 +8,16 @@ mod request; mod response; mod status; -use self::response::{EmptyHttpResponse, HttpResponseConsumer}; +use self::response::IterError; + +use self::response::HttpResponseCommon; + +use self::response::EmptyHttpResponse; pub use self::methods::HttpMethod; pub use self::request::HttpRequest; use self::request::HttpRequestBuilder; -pub use self::response::HttpResponse; +pub use self::response::{HttpResponse, StreamedResponse}; pub use self::status::HttpStatus; use std::collections::VecDeque; @@ -49,7 +53,7 @@ struct SocketStatus { reading: bool, data_readed: Vec, data_write: Vec, - response: Box, + response: Box, request: HttpRequestBuilder, index_writed: usize, } @@ -80,7 +84,10 @@ impl Hteapot { } // Start the server - pub fn listen(&self, action: impl Fn(HttpRequest) -> HttpResponse + Send + Sync + 'static) { + pub fn listen( + &self, + action: impl Fn(HttpRequest) -> Box + Send + Sync + 'static, + ) { let addr = format!("{}:{}", self.address, self.port); let listener = TcpListener::bind(addr); let listener = match listener { @@ -196,7 +203,7 @@ impl Hteapot { // Handle the client when a request is received fn handle_client( socket_data: &mut SocketData, - action: &Arc HttpResponse + Send + Sync + 'static>, + action: &Arc Box + Send + Sync + 'static>, ) -> Option<()> { let status = socket_data.status.as_mut()?; if !status.request.done { @@ -238,31 +245,40 @@ impl Hteapot { }; if status.data_write.len() == 0 { let mut response = action(request); //Call closure - if !response.headers.contains_key("Conection") && keep_alive { + if !response.base().headers.contains_key("Conection") && keep_alive { response + .base() .headers .insert("Connection".to_string(), "keep_alive".to_string()); } else { response + .base() .headers .insert("Connection".to_string(), "close".to_string()); } // status.data_write = response.to_bytes(); - status.response = Box::new(response); + status.response = response; } - while let Ok(n) = status.response.peek() { - match socket_data.stream.write(&n) { - Ok(_size) => { - let _ = status.response.next(); - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - println!("would block"); + loop { + match status.response.peek() { + Ok(n) => match socket_data.stream.write(&n) { + Ok(_size) => { + status.data_write.append(&mut n.clone()); //TODO: yapa. better status handling + let _ = status.response.next(); + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + return Some(()); + } + Err(e) => { + eprintln!("W error: {:?}", e); + return None; + } + }, + Err(IterError::WouldBlock) => { + thread::sleep(Duration::from_millis(100)); return Some(()); } - Err(e) => { - eprintln!("W error: {:?}", e); - return None; - } + Err(_) => break, } } diff --git a/src/hteapot/response.rs b/src/hteapot/response.rs index 55fbb8e..8c3157f 100644 --- a/src/hteapot/response.rs +++ b/src/hteapot/response.rs @@ -1,35 +1,62 @@ use super::HttpStatus; -use super::VERSION; +use super::{BUFFER_SIZE, VERSION}; use std::collections::HashMap; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{self, Receiver, Sender}; +use std::sync::Arc; +use std::thread; +use std::thread::JoinHandle; +use std::time::Duration; -const BUFFER_SIZE: usize = 1024; - -pub struct HttpResponse { +pub struct BaseResponse { pub status: HttpStatus, pub headers: HashMap, +} + +impl BaseResponse { + pub fn to_bytes(&mut self) -> Vec { + let mut headers_text = String::new(); + for (key, value) in self.headers.iter() { + headers_text.push_str(&format!("{}: {}\r\n", key, value)); + } + let response_header = format!( + "HTTP/1.1 {} {}\r\n{}\r\n", + self.status as u16, + self.status.to_string(), + headers_text + ); + let mut response = Vec::new(); + response.extend_from_slice(response_header.as_bytes()); + response + } +} + +pub struct HttpResponse { + base: BaseResponse, pub content: Vec, raw: Option>, is_raw: bool, index: usize, } +pub trait HttpResponseCommon { + fn base(&mut self) -> &mut BaseResponse; + fn next(&mut self) -> Result, IterError>; + fn peek(&mut self) -> Result, IterError>; +} + #[derive(Debug)] pub enum IterError { WouldBlock, Finished, } -pub trait HttpResponseConsumer { - fn next(&mut self) -> Result, IterError>; - fn peek(&mut self) -> Result, IterError>; //TODO: come up with better solution -} - impl HttpResponse { pub fn new>( status: HttpStatus, content: B, headers: Option>, - ) -> Self { + ) -> Box { let mut headers = headers.unwrap_or(HashMap::new()); let content = content.as_ref(); headers.insert("Content-Length".to_string(), content.len().to_string()); @@ -37,20 +64,21 @@ impl HttpResponse { "Server".to_string(), format!("HTeaPot/{}", VERSION).to_string(), ); - HttpResponse { - status, - headers, + Box::new(HttpResponse { + base: BaseResponse { status, headers }, content: content.to_owned(), raw: None, is_raw: false, index: 0, - } + }) } pub fn new_raw(raw: Vec) -> Self { HttpResponse { - status: HttpStatus::IAmATeapot, - headers: HashMap::new(), + base: BaseResponse { + status: HttpStatus::IAmATeapot, + headers: HashMap::new(), + }, content: vec![], raw: Some(raw), is_raw: true, @@ -67,13 +95,13 @@ impl HttpResponse { return self.raw.clone().unwrap(); } let mut headers_text = String::new(); - for (key, value) in self.headers.iter() { + for (key, value) in self.base.headers.iter() { headers_text.push_str(&format!("{}: {}\r\n", key, value)); } let response_header = format!( "HTTP/1.1 {} {}\r\n{}\r\n", - self.status as u16, - self.status.to_string(), + self.base.status as u16, + self.base.status.to_string(), headers_text ); let mut response = Vec::new(); @@ -85,7 +113,11 @@ impl HttpResponse { } } -impl HttpResponseConsumer for HttpResponse { +impl HttpResponseCommon for HttpResponse { + fn base(&mut self) -> &mut BaseResponse { + &mut self.base + } + fn next(&mut self) -> Result, IterError> { let byte_chunk = self.peek()?; self.index += 1; @@ -107,7 +139,10 @@ impl HttpResponseConsumer for HttpResponse { pub struct EmptyHttpResponse {} impl EmptyHttpResponse {} -impl HttpResponseConsumer for EmptyHttpResponse { +impl HttpResponseCommon for EmptyHttpResponse { + fn base(&mut self) -> &mut BaseResponse { + panic!("Invalid state") + } fn next(&mut self) -> Result, IterError> { Err(IterError::Finished) } @@ -117,10 +152,68 @@ impl HttpResponseConsumer for EmptyHttpResponse { } } -pub struct StreamedResponse {} +pub struct StreamedResponse { + base: BaseResponse, + receiver: Receiver>, + has_end: Arc, + join_handle: JoinHandle<()>, +} impl StreamedResponse { - pub fn new() -> Result { - Err("Request does not have a stream") + pub fn new(action: impl Fn(Sender>) + Send + Sync + 'static) -> Box { + let action = Arc::new(action); + let (tx, rx) = mpsc::channel(); + let action_clon = action.clone(); + let mut base = BaseResponse { + status: HttpStatus::OK, + headers: HashMap::new(), + }; + base.headers + .insert("Transfer-Encoding".to_string(), "chunked".to_string()); + base.headers.insert( + "Server".to_string(), + format!("HTeaPot/{}", VERSION).to_string(), + ); + let _ = tx.send(base.to_bytes()); + let has_end = Arc::new(AtomicBool::new(false)); + let has_end_clone = has_end.clone(); + let jh = thread::spawn(move || { + action_clon(tx); + println!("Ended!"); + has_end_clone.store(true, Ordering::SeqCst); + }); + Box::new(StreamedResponse { + base, + has_end, + receiver: rx, + join_handle: jh, + }) + } + + fn has_end(&self) -> bool { + self.has_end.load(Ordering::SeqCst) + } +} + +impl HttpResponseCommon for StreamedResponse { + fn base(&mut self) -> &mut BaseResponse { + &mut self.base + } + fn next(&mut self) -> Result, IterError> { + if self.has_end() { + return Err(IterError::Finished); + } + self.receiver + .recv_timeout(Duration::from_millis(100)) + .map_err(|_| IterError::WouldBlock) + } + + fn peek(&mut self) -> Result, IterError> { + if self.has_end() { + return Err(IterError::Finished); + } + self.receiver + .recv_timeout(Duration::from_millis(100)) + .map_err(|_| IterError::WouldBlock) } } diff --git a/src/main.rs b/src/main.rs index e9a6e23..60323f1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,8 @@ pub mod hteapot; mod logger; mod utils; +use std::thread; +use std::time::Duration; use std::{fs, io}; use std::path::Path; @@ -12,7 +14,7 @@ use std::sync::Mutex; use cache::Cache; use config::Config; -use hteapot::{Hteapot, HttpRequest, HttpResponse, HttpStatus}; +use hteapot::{Hteapot, HttpRequest, HttpResponse, HttpStatus, StreamedResponse}; use utils::get_mime_tipe; use logger::Logger; @@ -25,9 +27,6 @@ fn is_proxy(config: &Config, req: HttpRequest) -> Option<(String, HttpRequest)> if path_match.is_some() { let new_path = path_match.unwrap(); let url = config.proxy_rules.get(proxy_path).unwrap().clone(); - // if url.ends_with('/') { - // url = url.strip_suffix('/').to_owned(); - // } let mut proxy_req = req.clone(); proxy_req.path = new_path.to_string(); proxy_req.headers.remove("Host"); @@ -125,15 +124,39 @@ fn main() { server.listen(move |req| { // SERVER CORE // for each request - logger.msg(format!("Request {} {}", req.method.to_str(), req.path)); + if req.path == "/_stream_test".to_string() { + let times = req.args.get("t"); + let times = times.unwrap_or(&"3".to_string()).to_string(); + let times: usize = times.parse().unwrap_or(3); + return StreamedResponse::new(move |sender| { + let data = b"abcd".to_vec(); + for _ in 0..times { + let mut response = Vec::new(); + + let len_bytes = format!("{:X}\r\n", data.len()).into_bytes(); + response.extend(len_bytes); + + response.extend(&data); + response.extend(b"\r\n"); + + let _ = sender.send(response); + thread::sleep(Duration::from_secs(1)); // Simula streaming + } + + // Chunk final + let end = b"0\r\n\r\n".to_vec(); + let _ = sender.send(end); + }); + } + let is_proxy = is_proxy(&config, req.clone()); if proxy_only || is_proxy.is_some() { let (host, proxy_req) = is_proxy.unwrap(); let res = proxy_req.brew(host.as_str()); if res.is_ok() { - return res.unwrap(); + return Box::new(res.unwrap()); } else { return HttpResponse::new( HttpStatus::InternalServerError, From fb89561263b638c7ae151335e36fd0e61be08ce2 Mon Sep 17 00:00:00 2001 From: Alberto Ruiz <17555470+Az107@users.noreply.github.com> Date: Mon, 31 Mar 2025 10:08:44 +0200 Subject: [PATCH 17/21] Improve request handling add keep-alive TTL --- src/hteapot/brew.rs | 6 ++-- src/hteapot/mod.rs | 22 ++++++++++++-- src/hteapot/response.rs | 64 +++++++++++++++++++++++++++++------------ src/main.rs | 22 ++++---------- 4 files changed, 73 insertions(+), 41 deletions(-) diff --git a/src/hteapot/brew.rs b/src/hteapot/brew.rs index ffff566..59b141b 100644 --- a/src/hteapot/brew.rs +++ b/src/hteapot/brew.rs @@ -51,7 +51,7 @@ impl HttpRequest { result } - pub fn brew(&self, addr: &str) -> Result { + pub fn brew(&self, addr: &str) -> Result, &'static str> { let mut addr = addr.to_string(); if addr.starts_with("http://") { addr = addr.strip_prefix("http://").unwrap().to_string(); @@ -79,11 +79,11 @@ impl HttpRequest { let mut raw: Vec = Vec::new(); let _ = stream.read_to_end(&mut raw); - Ok(HttpResponse::new_raw(raw)) + Ok(Box::new(HttpResponse::new_raw(raw))) } } -pub fn brew(direction: &str, request: &mut HttpRequest) -> Result { +pub fn brew(direction: &str, request: &mut HttpRequest) -> Result, &'static str> { return request.brew(direction); } diff --git a/src/hteapot/mod.rs b/src/hteapot/mod.rs index 8b70df7..f40eb1b 100644 --- a/src/hteapot/mod.rs +++ b/src/hteapot/mod.rs @@ -26,9 +26,11 @@ use std::net::{Shutdown, TcpListener, TcpStream}; use std::sync::{Arc, Condvar, Mutex}; use std::thread; use std::time::Duration; +use std::time::Instant; const VERSION: &str = env!("CARGO_PKG_VERSION"); const BUFFER_SIZE: usize = 1024; +const KEEP_ALIVE_TTL: Duration = Duration::from_secs(10); #[macro_export] macro_rules! headers { @@ -50,6 +52,7 @@ pub struct Hteapot { struct SocketStatus { // TODO: write proper ttl + ttl: Instant, reading: bool, data_readed: Vec, data_write: Vec, @@ -79,7 +82,6 @@ impl Hteapot { port, address: address.to_string(), threads: if threads == 0 { 1 } else { threads }, - //cache: HashMap::new(), } } @@ -143,6 +145,7 @@ impl Hteapot { if !pool.is_empty() { let socket_status = SocketStatus { + ttl: Instant::now(), reading: true, data_readed: vec![], data_write: vec![], @@ -206,6 +209,14 @@ impl Hteapot { action: &Arc Box + Send + Sync + 'static>, ) -> Option<()> { let status = socket_data.status.as_mut()?; + + // Verificar si el TTL ha expirado + if Instant::now().duration_since(status.ttl) > KEEP_ALIVE_TTL { + println!("TTL expirado, cerrando conexión."); + let _ = socket_data.stream.shutdown(Shutdown::Both); + return None; + } + if !status.request.done { loop { let mut buffer = [0; BUFFER_SIZE]; @@ -223,6 +234,7 @@ impl Hteapot { } }, Ok(m) => { + status.ttl = Instant::now(); status.request.append(buffer.to_vec()); //status.data_readed.append(&mut buffer.to_vec()); if m == 0 { @@ -250,6 +262,10 @@ impl Hteapot { .base() .headers .insert("Connection".to_string(), "keep_alive".to_string()); + response.base().headers.insert( + "Keep-Alive".to_string(), + format!("timeout={}", KEEP_ALIVE_TTL.as_secs()), + ); } else { response .base() @@ -263,7 +279,8 @@ impl Hteapot { match status.response.peek() { Ok(n) => match socket_data.stream.write(&n) { Ok(_size) => { - status.data_write.append(&mut n.clone()); //TODO: yapa. better status handling + status.ttl = Instant::now(); + status.data_write.append(&mut n.clone()); //TODO: ñapa. better status handling let _ = status.response.next(); } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { @@ -275,6 +292,7 @@ impl Hteapot { } }, Err(IterError::WouldBlock) => { + status.ttl = Instant::now(); thread::sleep(Duration::from_millis(100)); return Some(()); } diff --git a/src/hteapot/response.rs b/src/hteapot/response.rs index 8c3157f..052d1fd 100644 --- a/src/hteapot/response.rs +++ b/src/hteapot/response.rs @@ -1,8 +1,8 @@ use super::HttpStatus; use super::{BUFFER_SIZE, VERSION}; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{self, Receiver, Sender}; +use std::sync::mpsc::{self, Receiver, RecvTimeoutError, SendError, Sender, TryRecvError}; use std::sync::Arc; use std::thread; use std::thread::JoinHandle; @@ -130,7 +130,6 @@ impl HttpResponseCommon for HttpResponse { } let raw = self.raw.as_ref().unwrap(); let mut raw = raw.chunks(BUFFER_SIZE).skip(self.index); - // println!("{}/{}",self.) let byte_chunk = raw.next().ok_or(IterError::Finished)?.to_vec(); return Ok(byte_chunk); } @@ -152,15 +151,34 @@ impl HttpResponseCommon for EmptyHttpResponse { } } +pub struct ChunkSender(Sender>); + +impl ChunkSender { + // fn new(sender: Sender>) -> Self { + // Self(sender) + // } + pub fn send(&self, msg: Vec) -> Result<(), SendError>> { + let mut response = Vec::new(); + let len_bytes = format!("{:X}\r\n", msg.len()).into_bytes(); + response.extend(len_bytes); + response.extend(&msg); + response.extend(b"\r\n"); + self.0.send(response) + } + + // fn end(&self) -> Result<(), SendError>> {} +} + pub struct StreamedResponse { base: BaseResponse, receiver: Receiver>, has_end: Arc, - join_handle: JoinHandle<()>, + _join_handle: JoinHandle<()>, + queue: VecDeque>, } impl StreamedResponse { - pub fn new(action: impl Fn(Sender>) + Send + Sync + 'static) -> Box { + pub fn new(action: impl Fn(ChunkSender) + Send + Sync + 'static) -> Box { let action = Arc::new(action); let (tx, rx) = mpsc::channel(); let action_clon = action.clone(); @@ -177,16 +195,19 @@ impl StreamedResponse { let _ = tx.send(base.to_bytes()); let has_end = Arc::new(AtomicBool::new(false)); let has_end_clone = has_end.clone(); + let jh = thread::spawn(move || { - action_clon(tx); - println!("Ended!"); + let chunk_sender = ChunkSender(tx.clone()); + action_clon(chunk_sender); + let _ = tx.clone().send(b"0\r\n\r\n".to_vec()); has_end_clone.store(true, Ordering::SeqCst); }); Box::new(StreamedResponse { base, has_end, receiver: rx, - join_handle: jh, + _join_handle: jh, + queue: VecDeque::new(), }) } @@ -200,20 +221,25 @@ impl HttpResponseCommon for StreamedResponse { &mut self.base } fn next(&mut self) -> Result, IterError> { - if self.has_end() { - return Err(IterError::Finished); - } - self.receiver - .recv_timeout(Duration::from_millis(100)) - .map_err(|_| IterError::WouldBlock) + self.peek() } fn peek(&mut self) -> Result, IterError> { - if self.has_end() { - return Err(IterError::Finished); + if self.queue.is_empty() { + let r = self.receiver.try_recv().map_err(|e| match e { + TryRecvError::Empty => IterError::WouldBlock, + TryRecvError::Disconnected => { + if self.has_end() { + IterError::Finished + } else { + IterError::WouldBlock + } + } + })?; + self.queue.push_back(r.clone()); + return Ok(r); + } else { + self.queue.pop_front().ok_or(IterError::WouldBlock) } - self.receiver - .recv_timeout(Duration::from_millis(100)) - .map_err(|_| IterError::WouldBlock) } } diff --git a/src/main.rs b/src/main.rs index 60323f1..b3e6783 100644 --- a/src/main.rs +++ b/src/main.rs @@ -130,23 +130,11 @@ fn main() { let times = times.unwrap_or(&"3".to_string()).to_string(); let times: usize = times.parse().unwrap_or(3); return StreamedResponse::new(move |sender| { - let data = b"abcd".to_vec(); - for _ in 0..times { - let mut response = Vec::new(); - - let len_bytes = format!("{:X}\r\n", data.len()).into_bytes(); - response.extend(len_bytes); - - response.extend(&data); - response.extend(b"\r\n"); - - let _ = sender.send(response); - thread::sleep(Duration::from_secs(1)); // Simula streaming + for i in 0..times { + let data = format!("{i}-abcd\n").as_bytes().to_vec(); + let _ = sender.send(data.clone()); + thread::sleep(Duration::from_secs(1)); } - - // Chunk final - let end = b"0\r\n\r\n".to_vec(); - let _ = sender.send(end); }); } @@ -156,7 +144,7 @@ fn main() { let (host, proxy_req) = is_proxy.unwrap(); let res = proxy_req.brew(host.as_str()); if res.is_ok() { - return Box::new(res.unwrap()); + return res.unwrap(); } else { return HttpResponse::new( HttpStatus::InternalServerError, From 936c21d1c05be30651a60b01f7bb57c43f84aa9d Mon Sep 17 00:00:00 2001 From: Alberto Ruiz <17555470+Az107@users.noreply.github.com> Date: Mon, 31 Mar 2025 22:04:39 +0200 Subject: [PATCH 18/21] increase read buffer 1024 -> 2048 --- src/hteapot/mod.rs | 24 ++++++++++++------------ src/hteapot/response.rs | 3 +-- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/src/hteapot/mod.rs b/src/hteapot/mod.rs index f40eb1b..8e13b65 100644 --- a/src/hteapot/mod.rs +++ b/src/hteapot/mod.rs @@ -29,7 +29,7 @@ use std::time::Duration; use std::time::Instant; const VERSION: &str = env!("CARGO_PKG_VERSION"); -const BUFFER_SIZE: usize = 1024; +const BUFFER_SIZE: usize = 1024 * 2; const KEEP_ALIVE_TTL: Duration = Duration::from_secs(10); #[macro_export] @@ -54,8 +54,7 @@ struct SocketStatus { // TODO: write proper ttl ttl: Instant, reading: bool, - data_readed: Vec, - data_write: Vec, + write: bool, response: Box, request: HttpRequestBuilder, index_writed: usize, @@ -147,8 +146,7 @@ impl Hteapot { let socket_status = SocketStatus { ttl: Instant::now(), reading: true, - data_readed: vec![], - data_write: vec![], + write: false, response: Box::new(EmptyHttpResponse {}), request: HttpRequestBuilder::new(), index_writed: 0, @@ -211,12 +209,11 @@ impl Hteapot { let status = socket_data.status.as_mut()?; // Verificar si el TTL ha expirado - if Instant::now().duration_since(status.ttl) > KEEP_ALIVE_TTL { + if Instant::now().duration_since(status.ttl) > KEEP_ALIVE_TTL && !status.write { println!("TTL expirado, cerrando conexión."); let _ = socket_data.stream.shutdown(Shutdown::Both); return None; } - if !status.request.done { loop { let mut buffer = [0; BUFFER_SIZE]; @@ -234,8 +231,12 @@ impl Hteapot { } }, Ok(m) => { + println!("bytes: {m}"); status.ttl = Instant::now(); - status.request.append(buffer.to_vec()); + let r = status.request.append(buffer.to_vec()); + if r.is_some() { + break; + } //status.data_readed.append(&mut buffer.to_vec()); if m == 0 { return None; @@ -255,7 +256,7 @@ impl Hteapot { Some(ch) => ch == "keep-alive", None => false, }; - if status.data_write.len() == 0 { + if !status.write { let mut response = action(request); //Call closure if !response.base().headers.contains_key("Conection") && keep_alive { response @@ -273,6 +274,7 @@ impl Hteapot { .insert("Connection".to_string(), "close".to_string()); } // status.data_write = response.to_bytes(); + status.write = true; status.response = response; } loop { @@ -280,7 +282,6 @@ impl Hteapot { Ok(n) => match socket_data.stream.write(&n) { Ok(_size) => { status.ttl = Instant::now(); - status.data_write.append(&mut n.clone()); //TODO: ñapa. better status handling let _ = status.response.next(); } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { @@ -306,8 +307,7 @@ impl Hteapot { } if keep_alive { status.reading = true; - status.data_readed = vec![]; - status.data_write = vec![]; + status.write = false; status.index_writed = 0; status.request = HttpRequestBuilder::new(); return Some(()); diff --git a/src/hteapot/response.rs b/src/hteapot/response.rs index 052d1fd..dae9979 100644 --- a/src/hteapot/response.rs +++ b/src/hteapot/response.rs @@ -2,11 +2,10 @@ use super::HttpStatus; use super::{BUFFER_SIZE, VERSION}; use std::collections::{HashMap, VecDeque}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{self, Receiver, RecvTimeoutError, SendError, Sender, TryRecvError}; +use std::sync::mpsc::{self, Receiver, SendError, Sender, TryRecvError}; use std::sync::Arc; use std::thread; use std::thread::JoinHandle; -use std::time::Duration; pub struct BaseResponse { pub status: HttpStatus, From e7f8a0df61ce1924d33ff80460e82b3f00ffeb0f Mon Sep 17 00:00:00 2001 From: Alberto Ruiz <17555470+Az107@users.noreply.github.com> Date: Fri, 4 Apr 2025 13:44:30 +0200 Subject: [PATCH 19/21] add encoding to html , remove println --- src/hteapot/mod.rs | 3 --- src/utils.rs | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/src/hteapot/mod.rs b/src/hteapot/mod.rs index 8e13b65..fac8f20 100644 --- a/src/hteapot/mod.rs +++ b/src/hteapot/mod.rs @@ -208,9 +208,7 @@ impl Hteapot { ) -> Option<()> { let status = socket_data.status.as_mut()?; - // Verificar si el TTL ha expirado if Instant::now().duration_since(status.ttl) > KEEP_ALIVE_TTL && !status.write { - println!("TTL expirado, cerrando conexión."); let _ = socket_data.stream.shutdown(Shutdown::Both); return None; } @@ -231,7 +229,6 @@ impl Hteapot { } }, Ok(m) => { - println!("bytes: {m}"); status.ttl = Instant::now(); let r = status.request.append(buffer.to_vec()); if r.is_some() { diff --git a/src/utils.rs b/src/utils.rs index c20656b..a007fc9 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -10,7 +10,7 @@ pub fn get_mime_tipe(path: &String) -> String { "js" => "text/javascript", "json" => "application/json", "css" => "text/css", - "html" => "text/html", + "html" => "text/html; charset=utf-8", "ico" => "image/x-icon", _ => "text/plain", }; From b6e69c254b69f593066bb12f1d4d2c15d18a0964 Mon Sep 17 00:00:00 2001 From: Alberto Ruiz <17555470+Az107@users.noreply.github.com> Date: Fri, 4 Apr 2025 13:46:21 +0200 Subject: [PATCH 20/21] change alt --- public/index.html | 151 +++++++++++++++++++++++----------------------- 1 file changed, 76 insertions(+), 75 deletions(-) diff --git a/public/index.html b/public/index.html index f12f8f1..a06c843 100644 --- a/public/index.html +++ b/public/index.html @@ -1,87 +1,88 @@ - + - - - - HTeaPot Server - + + - - - + +
+

HTeaPot Server

+

+ Congratulations! HTeaPot, the HTTP server, is correctly + installed and configured. +

+

You are now ready to serve your web applications.

+ +
- - -
-

HTeaPot Server

-

Congratulations! HTeaPot, the HTTP server, is correctly installed and configured.

-

You are now ready to serve your web applications.

- -
- -
-
- tea -
-
- +
+
+ 🍵 +
+
+ From 646a5f6d9d6f6fcc1d35e248e3cd3b5e7ac5590f Mon Sep 17 00:00:00 2001 From: Alberto Ruiz <17555470+Az107@users.noreply.github.com> Date: Mon, 7 Apr 2025 15:05:54 +0200 Subject: [PATCH 21/21] fix typos and remove sleep in writer --- src/hteapot/mod.rs | 17 ++++++----------- src/main.rs | 3 +-- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/src/hteapot/mod.rs b/src/hteapot/mod.rs index fac8f20..d205375 100644 --- a/src/hteapot/mod.rs +++ b/src/hteapot/mod.rs @@ -51,7 +51,6 @@ pub struct Hteapot { } struct SocketStatus { - // TODO: write proper ttl ttl: Instant, reading: bool, write: bool, @@ -72,7 +71,6 @@ impl Hteapot { port, address: address.to_string(), threads: 1, - //cache: HashMap::new(), } } @@ -100,7 +98,6 @@ impl Hteapot { }; let pool: Arc<(Mutex>, Condvar)> = Arc::new((Mutex::new(VecDeque::new()), Condvar::new())); - //let statusPool = Arc::new(Mutex::new(HashMap::::new())); let priority_list: Arc>> = Arc::new(Mutex::new(Vec::new())); let arc_action = Arc::new(action); for _tn in 0..self.threads { @@ -109,7 +106,7 @@ impl Hteapot { let action_clone = arc_action.clone(); let pl_clone = priority_list.clone(); { - let mut pl_lock = pl_clone.lock().expect("Error locking prority list"); + let mut pl_lock = pl_clone.lock().expect("Error locking priority list"); pl_lock.push(0); } thread::spawn(move || { @@ -120,7 +117,7 @@ impl Hteapot { let mut pool = lock.lock().expect("Error locking pool"); let pl_copy; { - let pl_lock = pl_clone.lock().expect("Error locking prority list"); + let pl_lock = pl_clone.lock().expect("Error locking priority list"); pl_copy = pl_lock.clone(); } @@ -159,7 +156,7 @@ impl Hteapot { { let mut pl_lock = - pl_clone.lock().expect("Errpr locking prority list"); + pl_clone.lock().expect("Errpr locking priority list"); pl_lock[_tn] = streams_to_handle.len(); } } @@ -172,7 +169,7 @@ impl Hteapot { Hteapot::handle_client(s, &action_clone).is_some() }); { - let mut pl_lock = pl_clone.lock().expect("Errpr locking prority list"); + let mut pl_lock = pl_clone.lock().expect("Errpr locking priority list"); pl_lock[_tn] = streams_to_handle.len(); } } @@ -234,7 +231,6 @@ impl Hteapot { if r.is_some() { break; } - //status.data_readed.append(&mut buffer.to_vec()); if m == 0 { return None; } else if m < BUFFER_SIZE || status.request.done { @@ -255,7 +251,7 @@ impl Hteapot { }; if !status.write { let mut response = action(request); //Call closure - if !response.base().headers.contains_key("Conection") && keep_alive { + if !response.base().headers.contains_key("Connection") && keep_alive { response .base() .headers @@ -270,7 +266,6 @@ impl Hteapot { .headers .insert("Connection".to_string(), "close".to_string()); } - // status.data_write = response.to_bytes(); status.write = true; status.response = response; } @@ -291,7 +286,7 @@ impl Hteapot { }, Err(IterError::WouldBlock) => { status.ttl = Instant::now(); - thread::sleep(Duration::from_millis(100)); + //thread::sleep(Duration::from_millis(100)); return Some(()); } Err(_) => break, diff --git a/src/main.rs b/src/main.rs index b3e6783..e0ec529 100644 --- a/src/main.rs +++ b/src/main.rs @@ -107,7 +107,6 @@ fn main() { None => Logger::new(io::stdout()), }; - //let logger = Logger::new(io::stdout()); let cache: Mutex = Mutex::new(Cache::new(config.cache_ttl as u64)); let server = Hteapot::new_threaded(config.host.as_str(), config.port, config.threads); logger.msg(format!( @@ -133,7 +132,7 @@ fn main() { for i in 0..times { let data = format!("{i}-abcd\n").as_bytes().to_vec(); let _ = sender.send(data.clone()); - thread::sleep(Duration::from_secs(1)); + //thread::sleep(Duration::from_secs(1)); } }); }