diff --git a/.gitignore b/.gitignore index 819d3fe..f8338f4 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ src/.DS_Store .idea/vcs.xml .idea/modules.xml .idea/HTeaPot.iml +*.log diff --git a/.idea/.gitignore b/.idea/.gitignore deleted file mode 100644 index b58b603..0000000 --- a/.idea/.gitignore +++ /dev/null @@ -1,5 +0,0 @@ -# Default ignored files -/shelf/ -/workspace.xml -# Editor-based HTTP Client requests -/httpRequests/ diff --git a/Cargo.lock b/Cargo.lock index 8d0b91c..2c7d917 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,4 +4,4 @@ version = 3 [[package]] name = "hteapot" -version = "0.4.2" +version = "0.5.0" diff --git a/Cargo.toml b/Cargo.toml index 07b4be9..0527b97 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hteapot" -version = "0.4.2" +version = "0.5.0" # release candidate exclude = ["hteapot.toml", "public/", "readme.md"] license = "MIT" keywords = ["HTTP", "HTTP-SERVER"] diff --git a/public/index.html b/public/index.html index f12f8f1..a06c843 100644 --- a/public/index.html +++ b/public/index.html @@ -1,87 +1,88 @@ - + - - - - HTeaPot Server - + + - - - + +
+

HTeaPot Server

+

+ Congratulations! HTeaPot, the HTTP server, is correctly + installed and configured. +

+

You are now ready to serve your web applications.

+ +
- - -
-

HTeaPot Server

-

Congratulations! HTeaPot, the HTTP server, is correctly installed and configured.

-

You are now ready to serve your web applications.

- -
- -
-
- tea -
-
- +
+
+ 🍵 +
+
+ diff --git a/src/config.rs b/src/config.rs index 37d934f..fb689a5 100644 --- a/src/config.rs +++ b/src/config.rs @@ -104,6 +104,7 @@ pub struct Config { pub cache: bool, pub cache_ttl: u16, pub threads: u16, + pub log_file: Option, pub index: String, // Index file to serve by default //pub error: String, // Error file to serve when a file is not found pub proxy_rules: HashMap, @@ -127,6 +128,7 @@ impl Config { host: "localhost".to_string(), root: "./".to_string(), index: "index.html".to_string(), + log_file: None, //error: "error.html".to_string(), threads: 1, cache: false, @@ -166,6 +168,7 @@ impl Config { cache: map.get2("cache").unwrap_or(false), cache_ttl: map.get2("cache_ttl").unwrap_or(3600), index: map.get2("index").unwrap_or("index.html".to_string()), + log_file: map.get2("log_file"), //error: map.get2("error").unwrap_or("error.html".to_string()), proxy_rules, } diff --git a/src/hteapot/brew.rs b/src/hteapot/brew.rs index b1b4baa..59b141b 100644 --- a/src/hteapot/brew.rs +++ b/src/hteapot/brew.rs @@ -1,29 +1,16 @@ // Written by Alberto Ruiz 2024-04-08 // This is the HTTP client module, it will handle the requests and responses -use std::collections::HashMap; use std::io::{Read, Write}; use std::net::{TcpStream, ToSocketAddrs}; use std::time::Duration; -use super::methods::HttpMethod; use super::request::HttpRequest; use super::response::HttpResponse; // use super::status::HttpStatus; // use std::net::{IpAddr, Ipv4Addr, SocketAddr}; impl HttpRequest { - pub fn new(method: HttpMethod, path: &str) -> HttpRequest { - let path = path.to_string(); - HttpRequest { - method, - path, - args: HashMap::new(), - headers: HashMap::new(), - body: String::new(), - } - } - pub fn arg(&mut self, key: &str, value: &str) -> &mut HttpRequest { self.args.insert(key.to_string(), value.to_string()); return self; @@ -34,11 +21,6 @@ impl HttpRequest { return self; } - pub fn body(&mut self, body: String) -> &mut HttpRequest { - self.body = body; - return self; - } - pub fn to_string(&self) -> String { let path = if self.args.is_empty() { self.path.clone() @@ -62,14 +44,14 @@ impl HttpRequest { for (k, v) in self.headers.iter() { result.push_str(format!("{}: {}\r\n", k, v).as_str()); } - if !self.body.is_empty() { - result.push_str(self.body.as_str()); - } + + result.push_str(self.text().unwrap_or(String::new()).as_str()); + result.push_str("\r\n\r\n"); result } - pub fn brew(&self, addr: &str) -> Result { + pub fn brew(&self, addr: &str) -> Result, &'static str> { let mut addr = addr.to_string(); if addr.starts_with("http://") { addr = addr.strip_prefix("http://").unwrap().to_string(); @@ -97,11 +79,11 @@ impl HttpRequest { let mut raw: Vec = Vec::new(); let _ = stream.read_to_end(&mut raw); - Ok(HttpResponse::new_raw(raw)) + Ok(Box::new(HttpResponse::new_raw(raw))) } } -pub fn brew(direction: &str, request: HttpRequest) -> Result { +pub fn brew(direction: &str, request: &mut HttpRequest) -> Result, &'static str> { return request.brew(direction); } @@ -111,8 +93,8 @@ pub fn brew(direction: &str, request: HttpRequest) -> Result, - data_write: Vec, + write: bool, + response: Box, + request: HttpRequestBuilder, index_writed: usize, } @@ -61,7 +71,6 @@ impl Hteapot { port, address: address.to_string(), threads: 1, - //cache: HashMap::new(), } } @@ -70,12 +79,14 @@ impl Hteapot { port, address: address.to_string(), threads: if threads == 0 { 1 } else { threads }, - //cache: HashMap::new(), } } // Start the server - pub fn listen(&self, action: impl Fn(HttpRequest) -> HttpResponse + Send + Sync + 'static) { + pub fn listen( + &self, + action: impl Fn(HttpRequest) -> Box + Send + Sync + 'static, + ) { let addr = format!("{}:{}", self.address, self.port); let listener = TcpListener::bind(addr); let listener = match listener { @@ -87,7 +98,6 @@ impl Hteapot { }; let pool: Arc<(Mutex>, Condvar)> = Arc::new((Mutex::new(VecDeque::new()), Condvar::new())); - //let statusPool = Arc::new(Mutex::new(HashMap::::new())); let priority_list: Arc>> = Arc::new(Mutex::new(Vec::new())); let arc_action = Arc::new(action); for _tn in 0..self.threads { @@ -96,7 +106,7 @@ impl Hteapot { let action_clone = arc_action.clone(); let pl_clone = priority_list.clone(); { - let mut pl_lock = pl_clone.lock().expect("Error locking prority list"); + let mut pl_lock = pl_clone.lock().expect("Error locking priority list"); pl_lock.push(0); } thread::spawn(move || { @@ -107,7 +117,7 @@ impl Hteapot { let mut pool = lock.lock().expect("Error locking pool"); let pl_copy; { - let pl_lock = pl_clone.lock().expect("Error locking prority list"); + let pl_lock = pl_clone.lock().expect("Error locking priority list"); pl_copy = pl_lock.clone(); } @@ -131,9 +141,11 @@ impl Hteapot { if !pool.is_empty() { let socket_status = SocketStatus { + ttl: Instant::now(), reading: true, - data_readed: vec![], - data_write: vec![], + write: false, + response: Box::new(EmptyHttpResponse {}), + request: HttpRequestBuilder::new(), index_writed: 0, }; let socket_data = SocketData { @@ -144,26 +156,20 @@ impl Hteapot { { let mut pl_lock = - pl_clone.lock().expect("Errpr locking prority list"); + pl_clone.lock().expect("Errpr locking priority list"); pl_lock[_tn] = streams_to_handle.len(); } } } - for stream_data in streams_to_handle.iter_mut() { - if stream_data.status.is_none() { - continue; + streams_to_handle.retain_mut(|s| { + if s.status.is_none() { + return false; } - let r = Hteapot::handle_client( - &stream_data.stream, - stream_data.status.as_mut().unwrap().clone(), - &action_clone, - ); - stream_data.status = r; - } - streams_to_handle.retain(|s| s.status.is_some()); + Hteapot::handle_client(s, &action_clone).is_some() + }); { - let mut pl_lock = pl_clone.lock().expect("Errpr locking prority list"); + let mut pl_lock = pl_clone.lock().expect("Errpr locking priority list"); pl_lock[_tn] = streams_to_handle.len(); } } @@ -192,103 +198,24 @@ impl Hteapot { } } - // Parse the request - pub fn request_parser(request: String) -> Result { - let mut lines = request.lines(); - let first_line = lines.next(); - if first_line.is_none() { - println!("{}", request); - return Err("Invalid request".to_string()); - } - let first_line = first_line.unwrap(); - let mut words = first_line.split_whitespace(); - let method = words.next(); - if method.is_none() { - return Err("Invalid method".to_string()); - } - let method = method.unwrap(); - let path = words.next(); - if path.is_none() { - return Err("Invalid path".to_string()); - } - let mut path = path.unwrap().to_string(); - let mut headers: HashMap = HashMap::new(); - loop { - let line = lines.next(); - if line.is_none() { - break; - } - let line = line.unwrap(); - if line.is_empty() { - break; - } - let mut parts = line.split(": "); - let key = parts.next().unwrap().to_string(); - let value = parts.next().unwrap(); - headers.insert(key, value.to_string()); - } - let body = lines - .collect::>() - .join("") - .trim() - .trim_end_matches(char::from(0)) - .to_string(); - let mut args: HashMap = HashMap::new(); - //remove http or https from the path - if path.starts_with("http://") { - path = path.trim_start_matches("http://").to_string(); - } else if path.starts_with("https://") { - path = path.trim_start_matches("https://").to_string(); - } - //remove the host name if present - if !path.starts_with("/") { - //remove all the characters until the first / - let mut parts = path.split("/"); - parts.next(); - path = parts.collect::>().join("/"); - //add / to beggining - path = format!("/{}", path); - } - - if path.contains('?') { - let _path = path.clone(); - let mut parts = _path.split('?'); - path = parts.next().unwrap().to_string(); - let query = parts.next().unwrap(); - let query_parts: Vec<&str> = query.split('&').collect(); - for part in query_parts { - let mut parts = part.split('='); - let key = parts.next().unwrap().to_string(); - let value = parts.next().unwrap_or("").to_string().replace("%22", "\""); - args.insert(key, value); - } - } - - Ok(HttpRequest { - method: HttpMethod::from_str(method), - path: path.to_string(), - args, - headers, - body: body.trim_end().to_string(), - }) - } - // Handle the client when a request is received fn handle_client( - stream: &TcpStream, - socket_status: SocketStatus, - action: &Arc HttpResponse + Send + Sync + 'static>, - ) -> Option { - let mut reader = BufReader::new(stream); - let mut writer = BufWriter::new(stream); - let mut socket_status = socket_status.clone(); - if socket_status.reading { + socket_data: &mut SocketData, + action: &Arc Box + Send + Sync + 'static>, + ) -> Option<()> { + let status = socket_data.status.as_mut()?; + + if Instant::now().duration_since(status.ttl) > KEEP_ALIVE_TTL && !status.write { + let _ = socket_data.stream.shutdown(Shutdown::Both); + return None; + } + if !status.request.done { loop { - let mut buffer = [0; 1024]; - match reader.read(&mut buffer) { + let mut buffer = [0; BUFFER_SIZE]; + match socket_data.stream.read(&mut buffer) { Err(e) => match e.kind() { io::ErrorKind::WouldBlock => { - return Some(socket_status); + return Some(()); } io::ErrorKind::ConnectionReset => { return None; @@ -299,102 +226,105 @@ impl Hteapot { } }, Ok(m) => { + status.ttl = Instant::now(); + let r = status.request.append(buffer.to_vec()); + if r.is_some() { + break; + } if m == 0 { return None; + } else if m < BUFFER_SIZE || status.request.done { + break; } } }; - socket_status.data_readed.append(&mut buffer.to_vec()); - //socket_status - if buffer[0] == 0 { - break; - }; - if *buffer.last().unwrap() == 0 { - break; - } } - socket_status.reading = false; } - - let request_string = String::from_utf8(socket_status.data_readed.clone()); - let request_string = if request_string.is_err() { - //This proablly means the request is a https so for the moment GTFO - return None; - } else { - request_string.unwrap() - }; - // let request_string = "GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n".to_string(); - let request = Self::request_parser(request_string); - if request.is_err() { - eprintln!("Request parse error {:?}", request.err().unwrap()); - return None; + let request = status.request.get(); + if request.is_none() { + return Some(()); } let request = request.unwrap(); let keep_alive = match request.headers.get("Connection") { Some(ch) => ch == "keep-alive", None => false, }; - if socket_status.data_write.len() == 0 { - let mut response = action(request); - if !response.headers.contains_key("Conection") && keep_alive { + if !status.write { + let mut response = action(request); //Call closure + if !response.base().headers.contains_key("Connection") && keep_alive { response + .base() .headers .insert("Connection".to_string(), "keep_alive".to_string()); + response.base().headers.insert( + "Keep-Alive".to_string(), + format!("timeout={}", KEEP_ALIVE_TTL.as_secs()), + ); } else { response + .base() .headers .insert("Connection".to_string(), "close".to_string()); } - socket_status.data_write = response.to_bytes(); + status.write = true; + status.response = response; } - for n in socket_status.index_writed..socket_status.data_write.len() { - let r = writer.write(&[socket_status.data_write[n]]); - if r.is_err() { - let error = r.err().unwrap(); - if error.kind() == io::ErrorKind::WouldBlock { - return Some(socket_status); - } else { - eprintln!("W error: {:?}", error); - return None; + loop { + match status.response.peek() { + Ok(n) => match socket_data.stream.write(&n) { + Ok(_size) => { + status.ttl = Instant::now(); + let _ = status.response.next(); + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + return Some(()); + } + Err(e) => { + eprintln!("W error: {:?}", e); + return None; + } + }, + Err(IterError::WouldBlock) => { + status.ttl = Instant::now(); + //thread::sleep(Duration::from_millis(100)); + return Some(()); } + Err(_) => break, } - socket_status.index_writed += r.unwrap(); } - let r = writer.flush(); + let r = socket_data.stream.flush(); if r.is_err() { - eprintln!("Error2: {}", r.err().unwrap()); - return Some(socket_status); + return Some(()); } if keep_alive { - socket_status.reading = true; - socket_status.data_readed = vec![]; - socket_status.data_write = vec![]; - socket_status.index_writed = 0; - return Some(socket_status); + status.reading = true; + status.write = false; + status.index_writed = 0; + status.request = HttpRequestBuilder::new(); + return Some(()); } else { - let _ = stream.shutdown(Shutdown::Both); + let _ = socket_data.stream.shutdown(Shutdown::Both); None } } } #[cfg(test)] -#[test] -fn test_http_parser() { - let request = - "GET / HTTP/1.1\r\nHost: localhost:8080\r\nUser-Agent: curl/7.68.0\r\nAccept: */*\r\n\r\n"; - let parsed_request = Hteapot::request_parser(request.to_string()).unwrap(); - assert_eq!(parsed_request.method, HttpMethod::GET); - assert_eq!(parsed_request.path, "/"); - assert_eq!(parsed_request.args.len(), 0); - assert_eq!(parsed_request.headers.len(), 3); - assert_eq!(parsed_request.body, ""); -} - +// #[test] +// fn test_http_parser() { +// let request = +// "GET / HTTP/1.1\r\nHost: localhost:8080\r\nUser-Agent: curl/7.68.0\r\nAccept: */*\r\n\r\n"; +// let parsed_request = Hteapot::request_parser(request.to_string()).unwrap(); +// assert_eq!(parsed_request.method, HttpMethod::GET); +// assert_eq!(parsed_request.path, "/"); +// assert_eq!(parsed_request.args.len(), 0); +// assert_eq!(parsed_request.headers.len(), 3); +// assert_eq!(parsed_request.body, ""); +// } #[test] fn test_http_response_maker() { - let response = HttpResponse::new(HttpStatus::IAmATeapot, "Hello, World!", None); + let mut response = HttpResponse::new(HttpStatus::IAmATeapot, "Hello, World!", None); let response = String::from_utf8(response.to_bytes()).unwrap(); let expected_response = format!("HTTP/1.1 418 I'm a teapot\r\nContent-Length: 13\r\nServer: HTeaPot/{}\r\n\r\nHello, World!\r\n",VERSION); let expected_response_list = expected_response.split("\r\n"); diff --git a/src/hteapot/request.rs b/src/hteapot/request.rs index cb9a3d0..f00262f 100644 --- a/src/hteapot/request.rs +++ b/src/hteapot/request.rs @@ -1,11 +1,178 @@ use super::HttpMethod; -use std::collections::HashMap; +use std::{collections::HashMap, net::TcpStream, str}; -#[derive(Clone)] pub struct HttpRequest { pub method: HttpMethod, pub path: String, pub args: HashMap, pub headers: HashMap, - pub body: String, + pub body: Vec, + stream: Option, +} + +impl HttpRequest { + pub fn new(method: HttpMethod, path: &str) -> Self { + return HttpRequest { + method, + path: path.to_string(), + args: HashMap::new(), + headers: HashMap::new(), + body: Vec::new(), + stream: None, + }; + } + + pub fn default() -> Self { + HttpRequest { + method: HttpMethod::GET, + path: String::new(), + args: HashMap::new(), + headers: HashMap::new(), + body: Vec::new(), + stream: None, + } + } + + pub fn clone(&self) -> Self { + return HttpRequest { + method: self.method.clone(), + path: self.path.clone(), + args: self.args.clone(), + headers: self.headers.clone(), + body: self.body.clone(), + stream: None, + }; + } + + // pub fn body(&mut self) -> Option> { + // if self.has_body() { + // let mut stream = self.stream.as_ref().unwrap(); + // let content_length = self.headers.get("Content-Length")?; + // let content_length: usize = content_length.parse().unwrap(); + // if content_length > self.body.len() { + // let _ = stream.flush(); + // let mut total_read = 0; + // self.body.resize(content_length, 0); + // while total_read < content_length { + // match stream.read(&mut self.body[total_read..]) { + // Ok(0) => { + // break; + // } + // Ok(n) => { + // total_read += n; + // } + // Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + // continue; + // } + // Err(_e) => { + // break; + // } + // } + // } + // } + + // Some(self.body.clone()) + // } else { + // None + // } + // } + + pub fn set_stream(&mut self, stream: TcpStream) { + self.stream = Some(stream); + } + + pub fn text(&self) -> Option { + if self.body.len() == 0 { + return None; + } + let body = match str::from_utf8(self.body.as_slice()) { + Ok(v) => Some(v.to_string()), + Err(_e) => None, + }; + return body; + } +} + +pub struct HttpRequestBuilder { + request: HttpRequest, + buffer: Vec, + body_size: usize, + pub done: bool, +} + +impl HttpRequestBuilder { + pub fn new() -> Self { + return HttpRequestBuilder { + request: HttpRequest { + method: HttpMethod::GET, + path: String::new(), + args: HashMap::new(), + headers: HashMap::new(), + body: Vec::new(), + stream: None, + }, + body_size: 0, + buffer: Vec::new(), + done: false, + }; + } + + pub fn get(&self) -> Option { + if self.done { + return Some(self.request.clone()); + } else { + None + } + } + + pub fn append(&mut self, buffer: Vec) -> Option { + self.buffer.extend(buffer); + self.buffer.retain(|&b| b != 0); + while let Some(pos) = self.buffer.windows(2).position(|w| w == b"\r\n") { + let line = self.buffer.drain(..pos).collect::>(); + self.buffer.drain(..2); + + let line_str = String::from_utf8_lossy(&line); + + if self.request.path.is_empty() { + let parts: Vec<&str> = line_str.split_whitespace().collect(); + if parts.len() < 2 { + return None; + } + + self.request.method = HttpMethod::from_str(parts[0]); + let path_parts: Vec<&str> = parts[1].split('?').collect(); + self.request.path = path_parts[0].to_string(); + + if path_parts.len() > 1 { + self.request.args = path_parts[1] + .split('&') + .filter_map(|pair| { + let kv: Vec<&str> = pair.split('=').collect(); + if kv.len() == 2 { + Some((kv[0].to_string(), kv[1].to_string())) + } else { + None + } + }) + .collect(); + } + } else if !line_str.is_empty() { + if let Some((key, value)) = line_str.split_once(": ") { + if key.to_lowercase() == "content-length" { + self.body_size = value.parse().unwrap_or(0); + } + self.request + .headers + .insert(key.to_string(), value.to_string()); + } + } + } + self.request.body.append(&mut self.buffer.clone()); + if self.request.body.len() == self.body_size { + self.done = true; + return Some(self.request.clone()); + } + None + } } diff --git a/src/hteapot/response.rs b/src/hteapot/response.rs index 8dc8cd1..dae9979 100644 --- a/src/hteapot/response.rs +++ b/src/hteapot/response.rs @@ -1,13 +1,53 @@ use super::HttpStatus; -use super::VERSION; -use std::collections::HashMap; +use super::{BUFFER_SIZE, VERSION}; +use std::collections::{HashMap, VecDeque}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{self, Receiver, SendError, Sender, TryRecvError}; +use std::sync::Arc; +use std::thread; +use std::thread::JoinHandle; -pub struct HttpResponse { +pub struct BaseResponse { pub status: HttpStatus, pub headers: HashMap, +} + +impl BaseResponse { + pub fn to_bytes(&mut self) -> Vec { + let mut headers_text = String::new(); + for (key, value) in self.headers.iter() { + headers_text.push_str(&format!("{}: {}\r\n", key, value)); + } + let response_header = format!( + "HTTP/1.1 {} {}\r\n{}\r\n", + self.status as u16, + self.status.to_string(), + headers_text + ); + let mut response = Vec::new(); + response.extend_from_slice(response_header.as_bytes()); + response + } +} + +pub struct HttpResponse { + base: BaseResponse, pub content: Vec, raw: Option>, is_raw: bool, + index: usize, +} + +pub trait HttpResponseCommon { + fn base(&mut self) -> &mut BaseResponse; + fn next(&mut self) -> Result, IterError>; + fn peek(&mut self) -> Result, IterError>; +} + +#[derive(Debug)] +pub enum IterError { + WouldBlock, + Finished, } impl HttpResponse { @@ -15,7 +55,7 @@ impl HttpResponse { status: HttpStatus, content: B, headers: Option>, - ) -> Self { + ) -> Box { let mut headers = headers.unwrap_or(HashMap::new()); let content = content.as_ref(); headers.insert("Content-Length".to_string(), content.len().to_string()); @@ -23,22 +63,25 @@ impl HttpResponse { "Server".to_string(), format!("HTeaPot/{}", VERSION).to_string(), ); - HttpResponse { - status, - headers, + Box::new(HttpResponse { + base: BaseResponse { status, headers }, content: content.to_owned(), raw: None, is_raw: false, - } + index: 0, + }) } pub fn new_raw(raw: Vec) -> Self { HttpResponse { - status: HttpStatus::IAmATeapot, - headers: HashMap::new(), + base: BaseResponse { + status: HttpStatus::IAmATeapot, + headers: HashMap::new(), + }, content: vec![], raw: Some(raw), is_raw: true, + index: 0, } } @@ -46,25 +89,156 @@ impl HttpResponse { self.is_raw } - pub fn to_bytes(&self) -> Vec { + pub fn to_bytes(&mut self) -> Vec { if self.is_raw() { return self.raw.clone().unwrap(); } let mut headers_text = String::new(); - for (key, value) in self.headers.iter() { + for (key, value) in self.base.headers.iter() { headers_text.push_str(&format!("{}: {}\r\n", key, value)); } let response_header = format!( "HTTP/1.1 {} {}\r\n{}\r\n", - self.status as u16, - self.status.to_string(), + self.base.status as u16, + self.base.status.to_string(), headers_text ); let mut response = Vec::new(); response.extend_from_slice(response_header.as_bytes()); - response.append(&mut self.content.clone()); + response.append(&mut self.content); response.push(0x0D); // Carriage Return response.push(0x0A); // Line Feed response } } + +impl HttpResponseCommon for HttpResponse { + fn base(&mut self) -> &mut BaseResponse { + &mut self.base + } + + fn next(&mut self) -> Result, IterError> { + let byte_chunk = self.peek()?; + self.index += 1; + return Ok(byte_chunk); + } + + fn peek(&mut self) -> Result, IterError> { + if self.raw.is_none() { + self.raw = Some(self.to_bytes()); + } + let raw = self.raw.as_ref().unwrap(); + let mut raw = raw.chunks(BUFFER_SIZE).skip(self.index); + let byte_chunk = raw.next().ok_or(IterError::Finished)?.to_vec(); + return Ok(byte_chunk); + } +} + +pub struct EmptyHttpResponse {} + +impl EmptyHttpResponse {} +impl HttpResponseCommon for EmptyHttpResponse { + fn base(&mut self) -> &mut BaseResponse { + panic!("Invalid state") + } + fn next(&mut self) -> Result, IterError> { + Err(IterError::Finished) + } + + fn peek(&mut self) -> Result, IterError> { + Err(IterError::Finished) + } +} + +pub struct ChunkSender(Sender>); + +impl ChunkSender { + // fn new(sender: Sender>) -> Self { + // Self(sender) + // } + pub fn send(&self, msg: Vec) -> Result<(), SendError>> { + let mut response = Vec::new(); + let len_bytes = format!("{:X}\r\n", msg.len()).into_bytes(); + response.extend(len_bytes); + response.extend(&msg); + response.extend(b"\r\n"); + self.0.send(response) + } + + // fn end(&self) -> Result<(), SendError>> {} +} + +pub struct StreamedResponse { + base: BaseResponse, + receiver: Receiver>, + has_end: Arc, + _join_handle: JoinHandle<()>, + queue: VecDeque>, +} + +impl StreamedResponse { + pub fn new(action: impl Fn(ChunkSender) + Send + Sync + 'static) -> Box { + let action = Arc::new(action); + let (tx, rx) = mpsc::channel(); + let action_clon = action.clone(); + let mut base = BaseResponse { + status: HttpStatus::OK, + headers: HashMap::new(), + }; + base.headers + .insert("Transfer-Encoding".to_string(), "chunked".to_string()); + base.headers.insert( + "Server".to_string(), + format!("HTeaPot/{}", VERSION).to_string(), + ); + let _ = tx.send(base.to_bytes()); + let has_end = Arc::new(AtomicBool::new(false)); + let has_end_clone = has_end.clone(); + + let jh = thread::spawn(move || { + let chunk_sender = ChunkSender(tx.clone()); + action_clon(chunk_sender); + let _ = tx.clone().send(b"0\r\n\r\n".to_vec()); + has_end_clone.store(true, Ordering::SeqCst); + }); + Box::new(StreamedResponse { + base, + has_end, + receiver: rx, + _join_handle: jh, + queue: VecDeque::new(), + }) + } + + fn has_end(&self) -> bool { + self.has_end.load(Ordering::SeqCst) + } +} + +impl HttpResponseCommon for StreamedResponse { + fn base(&mut self) -> &mut BaseResponse { + &mut self.base + } + fn next(&mut self) -> Result, IterError> { + self.peek() + } + + fn peek(&mut self) -> Result, IterError> { + if self.queue.is_empty() { + let r = self.receiver.try_recv().map_err(|e| match e { + TryRecvError::Empty => IterError::WouldBlock, + TryRecvError::Disconnected => { + if self.has_end() { + IterError::Finished + } else { + IterError::WouldBlock + } + } + })?; + self.queue.push_back(r.clone()); + return Ok(r); + } else { + self.queue.pop_front().ok_or(IterError::WouldBlock) + } + } +} diff --git a/src/logger.rs b/src/logger.rs index 5ac0394..d6c4f8f 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -1,101 +1,132 @@ - -use std::time::{SystemTime, UNIX_EPOCH}; -use std::io::{BufWriter, Write}; +use std::io::Write; +use std::sync::mpsc::{channel, Sender}; +use std::thread::{self, JoinHandle}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; struct SimpleTime; impl SimpleTime { - fn epoch_to_ymdhms(seconds: u64) -> (i32, u32, u32, u32, u32, u32) { - // Constants for time calculations - const SECONDS_IN_MINUTE: u64 = 60; - const SECONDS_IN_HOUR: u64 = 3600; - const SECONDS_IN_DAY: u64 = 86400; + fn epoch_to_ymdhms(seconds: u64) -> (i32, u32, u32, u32, u32, u32) { + // Constants for time calculations + const SECONDS_IN_MINUTE: u64 = 60; + const SECONDS_IN_HOUR: u64 = 3600; + const SECONDS_IN_DAY: u64 = 86400; + + // Leap year and normal year days + const DAYS_IN_YEAR: [u32; 2] = [365, 366]; + const DAYS_IN_MONTH: [[u32; 12]; 2] = [ + [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31], // Normal years + [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31], // Leap years + ]; + + // Calculate the number of days since the epoch + let mut remaining_days = seconds / SECONDS_IN_DAY; + + // Determine the current year + let mut year = 1970; + loop { + let leap_year = if (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0) { + 1 + } else { + 0 + }; + if remaining_days < DAYS_IN_YEAR[leap_year] as u64 { + break; + } + remaining_days -= DAYS_IN_YEAR[leap_year] as u64; + year += 1; + } - // Leap year and normal year days - const DAYS_IN_YEAR: [u32; 2] = [365, 366]; - const DAYS_IN_MONTH: [[u32; 12]; 2] = [ - [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31], // Normal years - [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31] // Leap years - ]; + // Determine the current month and day + let leap_year = if (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0) { + 1 + } else { + 0 + }; + let mut month = 0; + while remaining_days >= DAYS_IN_MONTH[leap_year][month] as u64 { + remaining_days -= DAYS_IN_MONTH[leap_year][month] as u64; + month += 1; + } + let day = remaining_days + 1; // Days are 1-based - // Calculate the number of days since the epoch - let mut remaining_days = seconds / SECONDS_IN_DAY; + // Calculate the current hour, minute, and second + let remaining_seconds = seconds % SECONDS_IN_DAY; + let hour = (remaining_seconds / SECONDS_IN_HOUR) as u32; + let minute = ((remaining_seconds % SECONDS_IN_HOUR) / SECONDS_IN_MINUTE) as u32; + let second = (remaining_seconds % SECONDS_IN_MINUTE) as u32; - // Determine the current year - let mut year = 1970; - loop { - let leap_year = if (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0) { 1 } else { 0 }; - if remaining_days < DAYS_IN_YEAR[leap_year] as u64 { - break; - } - remaining_days -= DAYS_IN_YEAR[leap_year] as u64; - year += 1; + (year, month as u32 + 1, day as u32, hour, minute, second) } - - // Determine the current month and day - let leap_year = if (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0) { 1 } else { 0 }; - let mut month = 0; - while remaining_days >= DAYS_IN_MONTH[leap_year][month] as u64 { - remaining_days -= DAYS_IN_MONTH[leap_year][month] as u64; - month += 1; + pub fn get_current_timestamp() -> String { + let now = SystemTime::now(); + let since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards"); + let secs = since_epoch.as_secs(); + let (year, month, day, hour, minute, second) = Self::epoch_to_ymdhms(secs); + + format!( + "{:04}/{:02}/{:02} - {:02}:{:02}:{:02}", + year, month, day, hour, minute, second + ) } - let day = remaining_days + 1; // Days are 1-based - - // Calculate the current hour, minute, and second - let remaining_seconds = seconds % SECONDS_IN_DAY; - let hour = (remaining_seconds / SECONDS_IN_HOUR) as u32; - let minute = ((remaining_seconds % SECONDS_IN_HOUR) / SECONDS_IN_MINUTE) as u32; - let second = (remaining_seconds % SECONDS_IN_MINUTE) as u32; - - (year, month as u32 + 1, day as u32, hour, minute, second) -} - pub fn get_current_timestamp() -> String { - let now = SystemTime::now(); - let since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards"); - let secs = since_epoch.as_secs(); - let (year, month, day, hour, minute, second) = Self::epoch_to_ymdhms(secs); - - - format!("{:04}/{:02}/{:02} - {:02}:{:02}:{:02}", - year, month, day, hour, minute, second) - } } - - - -pub struct Logger { - buffers: Vec>, +pub struct Logger { + tx: Sender, + _thread: JoinHandle<()>, } -impl Logger { - pub fn new(writer: W) -> Logger { - let mut buffers = Vec::new(); - buffers.push(BufWriter::new(writer)); - Logger { - buffers: buffers, +impl Logger { + pub fn new(mut writer: W) -> Logger { + let (tx, rx) = channel::(); + let thread = thread::spawn(move || { + let mut last_flush = Instant::now(); + let mut buff = Vec::new(); + let mut max_size = 100; + let timeout = Duration::from_secs(1); + loop { + let msg = rx.recv_timeout(timeout); + match msg { + Ok(msg) => buff.push(msg), + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {} + Err(_) => break, + } + + if last_flush.elapsed() >= timeout || buff.len() >= max_size { + if !buff.is_empty() { + if buff.len() >= max_size { + max_size = (max_size * 10).min(1_000_000); + } else { + max_size = (max_size / 10).max(100); + } + let wr = writer.write_all(buff.join("").as_bytes()); + if wr.is_err() { + println!("{:?}", wr); + } + let _ = writer.flush(); + + buff.clear(); + } + last_flush = Instant::now(); + } + } + }); + Logger { + tx, + _thread: thread, + } } - } - - fn log(&mut self, content: String) { - for b in self.buffers.iter_mut() { - let _ = b.write(content.as_bytes()); - let _ = b.flush(); - }; - - } - - pub fn msg(&mut self, content: String) { - self.log(format!("[{}] - {}\n",SimpleTime::get_current_timestamp() ,content)); - } + pub fn msg(&self, content: String) { + let content = format!("[{}] - {}\n", SimpleTime::get_current_timestamp(), content); + let _ = self.tx.send(content); + } } -#[cfg(test)] -use std::io::stdout; - -#[test] -fn test_basic() { +// #[cfg(test)] +// use std::io::stdout; - let mut logs = Logger::new(stdout()); - logs.msg("test".to_string()); -} \ No newline at end of file +// #[test] +// fn test_basic() { +// let mut logs = Logger::new(stdout()); +// logs.msg("test".to_string()); +// } diff --git a/src/main.rs b/src/main.rs index a22f3a2..e0ec529 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,15 +2,20 @@ mod cache; mod config; pub mod hteapot; mod logger; +mod utils; + +use std::thread; +use std::time::Duration; +use std::{fs, io}; -use std::fs; -use std::io; use std::path::Path; + use std::sync::Mutex; use cache::Cache; use config::Config; -use hteapot::{Hteapot, HttpRequest, HttpResponse, HttpStatus}; +use hteapot::{Hteapot, HttpRequest, HttpResponse, HttpStatus, StreamedResponse}; +use utils::get_mime_tipe; use logger::Logger; @@ -22,9 +27,6 @@ fn is_proxy(config: &Config, req: HttpRequest) -> Option<(String, HttpRequest)> if path_match.is_some() { let new_path = path_match.unwrap(); let url = config.proxy_rules.get(proxy_path).unwrap().clone(); - // if url.ends_with('/') { - // url = url.strip_suffix('/').to_owned(); - // } let mut proxy_req = req.clone(); proxy_req.path = new_path.to_string(); proxy_req.headers.remove("Host"); @@ -41,24 +43,6 @@ fn is_proxy(config: &Config, req: HttpRequest) -> Option<(String, HttpRequest)> None } -fn get_mime_tipe(path: &String) -> String { - let extension = Path::new(path.as_str()) - .extension() - .unwrap() - .to_str() - .unwrap(); - let mimetipe = match extension { - "js" => "text/javascript", - "json" => "application/json", - "css" => "text/css", - "html" => "text/html", - "ico" => "image/x-icon", - _ => "text/plain", - }; - - mimetipe.to_string() -} - fn serve_file(path: &String) -> Option> { let r = fs::read(path); if r.is_ok() { @@ -70,84 +54,88 @@ fn serve_file(path: &String) -> Option> { fn main() { let args = std::env::args().collect::>(); - let mut serving_path = None; - if args.len() >= 2 { - match args[1].as_str() { - "--help" | "-h" => { - println!("Hteapot {}", VERSION); - println!("usage: {} ", args[0]); - return; - } - "--version" | "-v" => { - println!("Hteapot {}", VERSION); - return; - } - "--serve" | "-s" => { - serving_path = Some(args.get(2).unwrap().clone()); - } - _ => (), - }; + if args.len() == 1 { + println!("Hteapot {}", VERSION); + println!("usage: {} ", args[0]); + return; } - - let config = if args.len() == 2 { - config::Config::load_config(&args[1]) - } else if serving_path.is_some() { - let serving_path_str = serving_path.unwrap(); - let serving_path_str = serving_path_str.as_str(); - let serving_path = Path::new(serving_path_str); - let mut c = config::Config::new_default(); - c.host = "0.0.0.0".to_string(); - if serving_path.is_dir() { - c.root = serving_path.to_str().unwrap_or_default().to_string(); - } else { - c.index = serving_path - .file_name() - .unwrap() - .to_str() - .unwrap_or_default() - .to_string(); - c.root = serving_path - .parent() - .unwrap_or(Path::new("./")) - .to_str() - .unwrap_or_default() - .to_string(); + let config = match args[1].as_str() { + "--help" | "-h" => { + println!("Hteapot {}", VERSION); + println!("usage: {} ", args[0]); + return; } - c - } else { - config::Config::new_default() + "--version" | "-v" => { + println!("Hteapot {}", VERSION); + return; + } + "--serve" | "-s" => { + let mut c = config::Config::new_default(); + let serving_path = Some(args.get(2).unwrap().clone()); + let serving_path_str = serving_path.unwrap(); + let serving_path_str = serving_path_str.as_str(); + let serving_path = Path::new(serving_path_str); + if serving_path.is_dir() { + c.root = serving_path.to_str().unwrap_or_default().to_string(); + } else { + c.index = serving_path + .file_name() + .unwrap() + .to_str() + .unwrap_or_default() + .to_string(); + c.root = serving_path + .parent() + .unwrap_or(Path::new("./")) + .to_str() + .unwrap_or_default() + .to_string(); + } + c.host = "0.0.0.0".to_string(); + c + } + _ => config::Config::load_config(&args[1]), }; let proxy_only = config.proxy_rules.get("/").is_some(); - let logger = Mutex::new(Logger::new(io::stdout())); + let logger = match config.log_file.clone() { + Some(file_name) => { + let file = fs::File::create(file_name.clone()); + let file = file.unwrap(); + Logger::new(file) + } + None => Logger::new(io::stdout()), + }; + let cache: Mutex = Mutex::new(Cache::new(config.cache_ttl as u64)); let server = Hteapot::new_threaded(config.host.as_str(), config.port, config.threads); - logger.lock().expect("this doesnt work :C").msg(format!( + logger.msg(format!( "Server started at http://{}:{}", config.host, config.port )); if config.cache { - logger - .lock() - .expect("this doesnt work :C") - .msg("Cache Enabled".to_string()); + logger.msg("Cache Enabled".to_string()); } if proxy_only { logger - .lock() - .expect("this doesnt work :C") .msg("WARNING: All requests are proxied to /. Local paths won’t be used.".to_string()); } - server.listen(move |req| { // SERVER CORE // for each request - - logger.lock().expect("this doesnt work :C").msg(format!( - "Request {} {}", - req.method.to_str(), - req.path - )); + logger.msg(format!("Request {} {}", req.method.to_str(), req.path)); + if req.path == "/_stream_test".to_string() { + let times = req.args.get("t"); + let times = times.unwrap_or(&"3".to_string()).to_string(); + let times: usize = times.parse().unwrap_or(3); + return StreamedResponse::new(move |sender| { + for i in 0..times { + let data = format!("{i}-abcd\n").as_bytes().to_vec(); + let _ = sender.send(data.clone()); + //thread::sleep(Duration::from_secs(1)); + } + }); + } let is_proxy = is_proxy(&config, req.clone()); @@ -172,10 +160,7 @@ fn main() { } if !Path::new(full_path.as_str()).exists() { - logger - .lock() - .expect("this doesnt work :C") - .msg(format!("path {} does not exist", req.path)); + logger.msg(format!("path {} does not exist", req.path)); return HttpResponse::new(HttpStatus::NotFound, "Not found", None); } let mimetype = get_mime_tipe(&full_path); diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..a007fc9 --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,22 @@ +use std::path::Path; + +pub fn get_mime_tipe(path: &String) -> String { + let extension = Path::new(path.as_str()) + .extension() + .unwrap() + .to_str() + .unwrap(); + let mimetipe = match extension { + "js" => "text/javascript", + "json" => "application/json", + "css" => "text/css", + "html" => "text/html; charset=utf-8", + "ico" => "image/x-icon", + _ => "text/plain", + }; + + mimetipe.to_string() +} + +//TODO: make a parser args to config +//pub fn args_to_dict(list: Vec) -> HashMap {}