From a8530682d40542bfdc46218d8e309503b246fac0 Mon Sep 17 00:00:00 2001 From: Small_Ku Date: Wed, 8 Oct 2025 21:04:34 +0800 Subject: [PATCH] fix: optimize with aho-corasick --- Cargo.lock | 23 +++-- cli/src/main.rs | 1 - lib/Cargo.toml | 2 +- lib/src/lib.rs | 27 +---- lib/src/server/process.rs | 205 +++++++++++++++++++++++--------------- 5 files changed, 141 insertions(+), 117 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5d58db1..af70c56 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,15 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "aho-corasick" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" +dependencies = [ + "memchr", +] + [[package]] name = "android-tzdata" version = "0.1.1" @@ -1260,9 +1269,9 @@ dependencies = [ name = "rsrpc" version = "0.24.3" dependencies = [ + "aho-corasick", "chrono", "interprocess", - "rayon", "serde", "serde_json", "serde_with", @@ -1979,8 +1988,8 @@ version = "0.61.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" dependencies = [ - "windows-implement 0.60.0", - "windows-interface 0.59.1", + "windows-implement 0.60.2", + "windows-interface 0.59.3", "windows-link", "windows-result 0.3.4", "windows-strings 0.4.2", @@ -1999,9 +2008,9 @@ dependencies = [ [[package]] name = "windows-implement" -version = "0.60.0" +version = "0.60.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836" +checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" dependencies = [ "proc-macro2", "quote", @@ -2021,9 +2030,9 @@ dependencies = [ [[package]] name = "windows-interface" -version = "0.59.1" +version = "0.59.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8" +checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" dependencies = [ "proc-macro2", "quote", diff --git a/cli/src/main.rs b/cli/src/main.rs index 179e96a..715ab2d 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -30,7 +30,6 @@ pub fn main() { .expect("Failed to create RPCServer") }; - // Starts the other threads (process detector, client connector, etc) client.start(); diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 27fe520..1cf486c 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -11,8 +11,8 @@ serde_json = "1.0" serde_with = "3.11" simple-websockets = { git = "https://github.com/SpikeHD/simple-websockets.git", branch = "master" } chrono = "0.4" -rayon = "1.7" interprocess = "2.2" +aho-corasick = "1.1" [target.'cfg(target_os = "windows")'.dependencies] winapi = { version = "0.3", features = ["namedpipeapi", "winbase"] } diff --git a/lib/src/lib.rs b/lib/src/lib.rs index ff460dd..aab3fda 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -1,5 +1,4 @@ use detection::DetectableActivity; -use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator}; use serde_json::Value; use server::{ client_connector::ClientConnector, @@ -56,25 +55,6 @@ pub struct RPCServer { on_process_scan_complete: Option>>, } -// Make paths consistent, and fix some additional checks -fn normalize_detectables(detectable: &mut Vec) { - detectable.par_iter_mut().for_each(|activity| { - if let Some(ref mut execs) = activity.executables { - for exec in execs.iter_mut() { - // Replace backslashes with forward slashes and lowercase - exec.name = exec.name.replace('\\', "/").to_lowercase(); - - // Checks adapted from arrpc - if exec.name.starts_with(">") { - exec.name.replace_range(0..1, "/"); - } else if !exec.name.starts_with("/") { - exec.name.insert(0, '/'); - } - } - } - }); -} - impl RPCServer { pub fn from_json_str( detectable: impl AsRef, @@ -86,7 +66,7 @@ impl RPCServer { // Turn detectable into a vector of DetectableActivity let detectable_arr = detectable.as_array(); - let mut detectable: Vec; + let detectable: Vec; if let Some(detectable_arr) = detectable_arr { detectable = detectable_arr @@ -98,10 +78,6 @@ impl RPCServer { detectable = vec![]; } - log!("[RPC Server] Normalizing detectable activities..."); - normalize_detectables(&mut detectable); - log!("[RPC Server] Done!"); - Ok(Self { detectable: Arc::new(Mutex::new(detectable)), @@ -209,7 +185,6 @@ impl RPCServer { process_server: Arc::new(Mutex::new(ProcessServer::new( self.detectable.lock().unwrap().to_vec(), proc_event_sender, - 8, ProcessEventListeners { on_process_scan_complete: self.on_process_scan_complete.clone(), }, diff --git a/lib/src/server/process.rs b/lib/src/server/process.rs index c400220..e5fe8f9 100644 --- a/lib/src/server/process.rs +++ b/lib/src/server/process.rs @@ -1,4 +1,4 @@ -use rayon::prelude::*; +use aho_corasick::{AhoCorasick, PatternID}; use std::sync::atomic::AtomicBool; use std::sync::mpsc; use std::sync::Arc; @@ -36,11 +36,15 @@ pub struct ProcessDetectedEvent { #[derive(Clone)] pub struct ProcessServer { detected_list: Arc>>, - detectable_chunks: Arc>>>, custom_detectables: Arc>>, - thread_count: u16, scanning: Arc, + detectable_indexes: Arc>>, + detectable_ac: Arc>, + + custom_detectable_indexes: Arc>>, + custom_detectable_ac: Arc>>, + pub detectable_list: Vec, pub event_sender: mpsc::Sender, @@ -53,23 +57,42 @@ impl ProcessServer { pub fn new( detectable: Vec, event_sender: mpsc::Sender, - thread_count: u16, event_listeners: ProcessEventListeners, ) -> Self { + log!("[Process Scanner] Building Aho-Corasick patterns for main detectable activities..."); + let (ac, idx) = build_ac_patterns(&detectable); + log!("[Process Scanner] Done!"); + ProcessServer { scanning: Arc::new(AtomicBool::new(false)), - thread_count, detected_list: Arc::new(Mutex::new(vec![])), - detectable_chunks: Arc::new(Mutex::new(vec![])), custom_detectables: Arc::new(Mutex::new(vec![])), detectable_list: detectable, event_sender, + // Aho-Corasick matching with detectables mapping + detectable_indexes: Arc::new(Mutex::new(idx)), + detectable_ac: Arc::new(Mutex::new(ac)), + custom_detectable_indexes: Arc::new(Mutex::new(vec![])), + custom_detectable_ac: Arc::new(Mutex::new(None)), + // Event listeners event_listeners: Arc::new(Mutex::new(event_listeners)), } } + fn update_custom_detectables(&self) { + log!("[Process Scanner] Updating Aho-Corasick patterns for custom detectable activities..."); + let (ac, idx) = build_ac_patterns(&self.custom_detectables.lock().unwrap()); + if !idx.is_empty() { + *self.custom_detectable_ac.lock().unwrap() = Some(ac); + } else { + *self.custom_detectable_ac.lock().unwrap() = None; + } + *self.custom_detectable_indexes.lock().unwrap() = idx; + log!("[Process Scanner] Done!"); + } + pub fn append_detectables(&mut self, mut detectable: Vec) { // Append to detectable chunks, since that's what is actually scanned self @@ -77,6 +100,7 @@ impl ProcessServer { .lock() .unwrap() .append(&mut detectable); + self.update_custom_detectables(); } pub fn remove_detectable_by_name(&mut self, name: String) { @@ -85,31 +109,14 @@ impl ProcessServer { .lock() .unwrap() .retain(|x| x.name != name); + self.update_custom_detectables(); } pub fn start(&self) { let wait_time = Duration::from_secs(10); let clone = self.clone(); - // Evenly split the detectable list into chunks - let mut chunks: Vec> = vec![]; - - for _ in 0..self.thread_count { - chunks.push(vec![]); - } - - let mut i = 0; - for obj in &self.detectable_list { - chunks[i].push(obj.clone()); - - i += 1; - - if i >= self.thread_count.into() { - i = 0; - } - } - - *clone.detectable_chunks.lock().unwrap() = chunks; + self.update_custom_detectables(); std::thread::spawn(move || { // Run the process scan repeatedly (every 3 seconds) @@ -220,7 +227,7 @@ impl ProcessServer { processes.push(Exec { pid: proc.0.to_string().parse::()?, path: proc.1.exe().unwrap_or(Path::new("")).display().to_string(), - arguments: if let Some(_) = cmd.next() { + arguments: if cmd.next().is_some() { Some( cmd .map(|x| x.to_string_lossy()) @@ -283,7 +290,6 @@ impl ProcessServer { } pub fn scan_for_processes(&self) -> Result, Box> { - let chunks = self.detectable_chunks.lock().unwrap(); let processes = ProcessServer::process_list()?; log!("[Process Scanner] Process scan triggered"); @@ -295,64 +301,69 @@ impl ProcessServer { let process_scan_state = Mutex::new(ProcessScanState::default()); - let mut detected_list: Vec = (0..self.thread_count + 1) - .into_par_iter() - .flat_map(|i| { - // if this is the last thread, we are supposed to scan the custom detectables - let detectable_chunk: &Vec = if self.thread_count == i { - &self.custom_detectables.lock().unwrap() + let ac = self.detectable_ac.lock().unwrap(); + let custom_ac = self.custom_detectable_ac.lock().unwrap(); + + let mut detected_list: Vec = processes + .iter() + .filter_map(|process| { + // Process path (but consistent slashes, so we can compare properly) + let process_path = process.path.to_lowercase().replace('\\', "/"); + + if process_path.contains("obs64") || process_path.contains("streamlabs") { + process_scan_state.lock().unwrap().obs_open = true; + } + + // Aho-Corasick matching + let reversed_path: String = process_path.chars().rev().collect(); + let (obj, exe_index) = if let Some(mat) = ac.find(&reversed_path) { + let pattern_id: PatternID = mat.pattern(); + let exe_index = self.detectable_indexes.lock().unwrap()[pattern_id.as_usize()]; + (&self.detectable_list[exe_index[0]], exe_index[1]) + } else if custom_ac.is_some() { + let custom_ac = custom_ac.as_ref().unwrap(); + if let Some(mat) = custom_ac.find(&reversed_path) { + let pattern_id: PatternID = mat.pattern(); + let exe_index = self.custom_detectable_indexes.lock().unwrap()[pattern_id.as_usize()]; + ( + &self.custom_detectables.lock().unwrap()[exe_index[0]], + exe_index[1], + ) + } else { + return None; + } } else { - &chunks[i as usize] + return None; }; - detectable_chunk - .iter() - .filter_map(|obj| { - let mut new_activity = obj.clone(); - - if let Some(executables) = &obj.executables { - for executable in executables { - if executable.is_launcher { - continue; - } - - for process in &processes { - // Process path (but consistent slashes, so we can compare properly) - let process_path = process.path.to_lowercase().replace('\\', "/"); - - if process_path.contains("obs64") || process_path.contains("streamlabs") { - process_scan_state.lock().unwrap().obs_open = true; - } - - if !process_path.ends_with(&executable.name) { - continue; - } - - if let Some(exec_args) = &executable.arguments { - if let Some(process_args) = &process.arguments { - if !process_args.contains(exec_args) { - continue; - } - } else if executable.name.starts_with(">") { - continue; - } - } - - new_activity.pid = Some(process.pid); - new_activity.timestamp = Some(format!( - "{:?}", - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_millis() - )); - return Some(new_activity); - } - } - } - None - }) - .collect::>() + // Argument checks + let mut new_activity = obj.clone(); + let executable = &obj.executables.as_ref().unwrap()[exe_index]; + + if let Some(exec_args) = &executable.arguments { + // Only require argument checks if executable starts with '>' + // like Minecraft: { arguments: "net.minecraft.client.main.Main", is_launcher: false, name: ">java", … } + // Other games might provide arguments but not necessary be checked + // like Left 4 Dead 2: { arguments: "-game left4dead2", is_launcher: false, name: "left 4 dead 2/left4dead2.exe", … } + if executable.name.starts_with(">") + && !process + .arguments + .as_ref() + .is_some_and(|args| args.contains(exec_args)) + { + return None; + } + } + + new_activity.pid = Some(process.pid); + new_activity.timestamp = Some(format!( + "{:?}", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() + )); + Some(new_activity) }) .collect(); @@ -374,6 +385,36 @@ impl ProcessServer { } } +fn build_ac_patterns(detectables: &[DetectableActivity]) -> (AhoCorasick, Vec<[usize; 2]>) { + let mut exe_patterns: Vec = Vec::new(); + let mut exe_indexes: Vec<[usize; 2]> = Vec::new(); + + for (activity_index, activity) in detectables.iter().enumerate() { + if let Some(executables) = &activity.executables { + for (exe_index, executable) in executables.iter().enumerate() { + if executable.is_launcher { + continue; + } + + // Make paths consistent, and fix some additional checks + let mut exec_name = executable.name.replace('\\', "/").to_lowercase(); + + // Checks adapted from arrpc, remain the '>' in DetectableActivity for later argument checks + if exec_name.starts_with(">") { + exec_name.replace_range(0..1, "/"); + } else if !exec_name.starts_with("/") { + exec_name.insert(0, '/'); + } + + exe_patterns.push(exec_name.chars().rev().collect::()); + exe_indexes.push([activity_index, exe_index]); + } + } + } + + (AhoCorasick::new(exe_patterns).unwrap(), exe_indexes) +} + // pub fn name_no_ext(name: &String) -> String { // if name.contains('.') { // // Split the name by the dot