diff --git a/replication/src/apply/plan_executor.rs b/replication/src/apply/plan_executor.rs index 706076d..8f478f3 100644 --- a/replication/src/apply/plan_executor.rs +++ b/replication/src/apply/plan_executor.rs @@ -1,12 +1,24 @@ use std::{ - collections::{HashMap, HashSet, VecDeque}, fs::create_dir_all, path::PathBuf, sync::{Arc, Mutex, MutexGuard} + collections::{ + HashMap, + HashSet, + }, + fs::create_dir_all, + path::PathBuf, + sync::{ + Arc, + Mutex, + MutexGuard, + } }; + use colorize::AnsiColor; use rayon::prelude::*; -use source_wand_common::project_manipulator::local_project_manipulator::LocalProjectManipulator; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Error, Result}; use uuid::Uuid; +use source_wand_common::project_manipulator::local_project_manipulator::LocalProjectManipulator; + use crate::plan::{context::Context, transformation_node::{NodeId, TransformationNode}}; pub fn execute_plan(nodes: Vec>) -> Result<()> { @@ -28,112 +40,88 @@ pub fn execute_plan(nodes: Vec>) -> Result<()> { } let context_map: Arc>> = Arc::new(Mutex::new(workdesk_contexts)); - let node_map: HashMap> = - nodes.iter().map(|n| (n.id, Arc::clone(n))).collect(); - let completed: Arc>> = Arc::new(Mutex::new(HashSet::new())); - let queue: Arc>> = Arc::new(Mutex::new( - nodes - .iter() - .filter(|node| node.dependencies.is_empty()) - .map(|node| node.id) - .collect::>(), - )); - let dependents: HashMap> = { - let mut map: HashMap> = HashMap::new(); - for node in &nodes { - for &dep in &node.dependencies { - map.entry(dep).or_default().push(node.id); - } - } - map - }; - - let error: Arc>> = Arc::new(Mutex::new(Ok(()))); - - loop { - let batch: Vec = { - let mut q: MutexGuard<'_, VecDeque> = queue.lock().unwrap(); - q.drain(..).collect() - }; - - if batch.is_empty() { - break; - } - - batch.par_iter().for_each(|&node_id| { - if error.lock().unwrap().is_err() { - return; - } + let completed: Arc>> = Arc::new(Mutex::new(HashSet::new())); - let node: &Arc = node_map.get(&node_id).unwrap(); - - let ctx: Option = { - let mut contexts: MutexGuard<'_, HashMap> = context_map.lock().unwrap(); - contexts.get_mut(&node.workdesk).cloned() - }; - - if let Some(ctx) = ctx { - if let Some(reason) = node.transformation.should_skip(&ctx) { - println!( - "{:<120} context: {}", - format!( - "{} {} {}", - "[skip]".to_string().yellow(), - node.transformation.get_name().blue(), - reason.italic(), - ), - node.workdesk, - ); - } - else { - let transformation_result: Result> = node.transformation.apply(ctx); - match transformation_result { - Ok(message) => { - let message: String = message.unwrap_or_default(); + let error: Arc>> = Arc::new(Mutex::new(Ok(()))); + while completed.lock().unwrap().len() < nodes.len() { + let ready_nodes: Vec> = nodes + .iter() + .filter( + |node| + !completed.lock().unwrap().contains(&node.id) && + node.dependencies + .iter() + .all( + |dependency| + completed + .lock() + .unwrap() + .contains(dependency) + ) + ) + .map(|node| node.clone()) + .collect(); + + ready_nodes + .par_iter() + .for_each( + |node| { + let ctx: Option = { + let mut contexts: MutexGuard<'_, HashMap> = context_map.lock().unwrap(); + contexts.get_mut(&node.workdesk).cloned() + }; + + if let Some(ctx) = ctx { + if let Some(reason) = node.transformation.should_skip(&ctx) { println!( "{:<120} context: {}", format!( "{} {} {}", - "[execute]".to_string().green(), + "[skip]".to_string().yellow(), node.transformation.get_name().blue(), - message.italic(), + reason.italic(), ), node.workdesk, ); - }, - Err(e) => { - *error.lock().unwrap() = Err(e); - return; } + else { + let transformation_result: Result> = node.transformation.apply(ctx); + match transformation_result { + Ok(message) => { + let message: String = message.unwrap_or_default(); + + println!( + "{:<120} context: {}", + format!( + "{} {} {}", + "[execute]".to_string().green(), + node.transformation.get_name().blue(), + message.italic(), + ), + node.workdesk, + ); + }, + Err(e) => { + *error.lock().unwrap() = Err(e); + return; + } + } + } + } else { + *error.lock().unwrap() = Err(anyhow::anyhow!("Missing context for workdesk {}", node.workdesk)); + return; } - } - } else { - *error.lock().unwrap() = Err(anyhow::anyhow!("Missing context for workdesk {}", node.workdesk)); - return; - } - completed.lock().unwrap().insert(node_id); - - if let Some(deps) = dependents.get(&node_id) { - for &dependent_id in deps { - let dependent: &Arc = node_map.get(&dependent_id).unwrap(); - let ready: bool = dependent - .dependencies - .iter() - .all(|dep| completed.lock().unwrap().contains(dep)); - if ready { - queue.lock().unwrap().push_back(dependent_id); - } + completed.lock().unwrap().insert(node.id); } - } - }); - + ); + if error.lock().unwrap().is_err() { break; } } - + let error: MutexGuard<'_, std::result::Result<(), anyhow::Error>> = error.lock().unwrap(); match &*error { Ok(()) => Ok(()), diff --git a/replication/src/apply/plan_to_execution_graph.rs b/replication/src/apply/plan_to_execution_graph.rs index dd07a9e..d56076d 100644 --- a/replication/src/apply/plan_to_execution_graph.rs +++ b/replication/src/apply/plan_to_execution_graph.rs @@ -8,8 +8,11 @@ use crate::{ }, plan::{ environment::Environment, + execution_graph_builder::{ + ExecutionGraphBuilder, + RcExecutionNodeBuilder + }, transformation_node::{ - NodeId, TransformationNode }, transformations::{ @@ -26,9 +29,8 @@ use crate::{ impl ReplicationPlan { pub fn to_execution_graph(&self) -> Vec> { - let mut execution_graph: Vec> = Vec::new(); + let mut execution_graph_builder: ExecutionGraphBuilder = ExecutionGraphBuilder::new(); - let mut id: NodeId = 0; for package in &self.packages { if let PackageOrigin::GoCache(origin) = &package.origin { let PackageDestination::Git(destination) = &package.destination; @@ -36,10 +38,9 @@ impl ReplicationPlan { let environment: Environment = Environment::new(&origin.name, &origin.version); let workdesk: String = format!("{} ({}-24.04/edge)", environment.name, environment.version_retrocompatible); - let initialize_project: TransformationNode = TransformationNode { - id: id, - workdesk: workdesk.clone(), - transformation: Arc::new( + let initialize_project: RcExecutionNodeBuilder = execution_graph_builder.create_node( + workdesk.clone(), + Arc::new( InitializeProject::new( GitInit::new( destination.git.clone(), @@ -53,57 +54,49 @@ impl ReplicationPlan { ), GolangFetchSource::new(origin.path.clone()), ) - ), - dependencies: vec![], - dependents: vec![id + 1], - }; - - let push_code: TransformationNode = TransformationNode { - id: id + 1, - workdesk: workdesk.clone(), - transformation: Arc::new(GitPush::new( - destination.reference.clone(), - "Replicate source code".to_string(), - )), - dependencies: vec![id], - dependents: vec![id + 2] - }; + ) + ); - let initialize_sourcecraft: TransformationNode = TransformationNode { - id: id + 2, - workdesk: workdesk.clone(), - transformation: Arc::new(SourcecraftInitialize::new( - environment.name.clone(), - format!("{}-24.04", environment.version_retrocompatible.clone()), - "ubuntu@24.04".to_string(), - vec!["amd64".to_string()], - package.dependencies.clone(), - package.is_library, - )), - dependencies: vec![id + 1], - dependents: vec![id + 3] - }; + let push_code: RcExecutionNodeBuilder = execution_graph_builder.create_node( + workdesk.clone(), + Arc::new( + GitPush::new( + destination.reference.clone(), + "Replicate source code".to_string(), + ) + ) + ); - let push_sourcecraft_metadata: TransformationNode = TransformationNode { - id: id + 3, - workdesk: workdesk, - transformation: Arc::new(GitPush::new( - destination.reference.clone(), - "Initialize sourcecraft".to_string(), - )), - dependencies: vec![id + 2], - dependents: vec![] - }; + let initialize_sourcecraft: RcExecutionNodeBuilder = execution_graph_builder.create_node( + workdesk.clone(), + Arc::new( + SourcecraftInitialize::new( + environment.name.clone(), + format!("{}-24.04", environment.version_retrocompatible.clone()), + "ubuntu@24.04".to_string(), + vec!["amd64".to_string()], + package.dependencies.clone(), + package.is_library, + ) + ) + ); - execution_graph.push(Arc::new(initialize_project)); - execution_graph.push(Arc::new(push_code)); - execution_graph.push(Arc::new(initialize_sourcecraft)); - execution_graph.push(Arc::new(push_sourcecraft_metadata)); + let push_sourcecraft_metadata: RcExecutionNodeBuilder = execution_graph_builder.create_node( + workdesk.clone(), + Arc::new( + GitPush::new( + destination.reference.clone(), + "Initialize sourcecraft".to_string(), + ) + ) + ); - id += 4; + push_code.borrow_mut().depends_on(&initialize_project); + initialize_sourcecraft.borrow_mut().depends_on(&push_code); + push_sourcecraft_metadata.borrow_mut().depends_on(&initialize_sourcecraft); } } - execution_graph + execution_graph_builder.build() } } diff --git a/replication/src/plan/execution_graph_builder.rs b/replication/src/plan/execution_graph_builder.rs new file mode 100644 index 0000000..05c1f39 --- /dev/null +++ b/replication/src/plan/execution_graph_builder.rs @@ -0,0 +1,44 @@ +use std::{cell::RefCell, rc::Rc, sync::Arc}; + +use crate::plan::{transformation::Transformation, transformation_node::TransformationNode}; + +pub type RcExecutionNodeBuilder = Rc>; + +pub struct ExecutionGraphBuilder { + pub node_builders: Vec, +} + +impl ExecutionGraphBuilder { + pub fn new() -> Self { + ExecutionGraphBuilder { node_builders: Vec::new() } + } + + pub fn create_node(&mut self, workdesk: String, transformation: Arc) -> RcExecutionNodeBuilder { + let node: Rc> = Rc::new(RefCell::new(ExecutionNodeBuilder::new(workdesk, transformation))); + self.node_builders.push(node.clone()); + node + } + + pub fn build(&self) -> Vec> { + self.node_builders.iter().map(|node| Arc::new(node.borrow().build())).collect() + } +} + +pub struct ExecutionNodeBuilder { + pub node: TransformationNode, +} + +impl ExecutionNodeBuilder { + pub fn new(workdesk: String, transformation: Arc) -> Self { + let node: TransformationNode = TransformationNode::new(workdesk, transformation, Vec::new()); + ExecutionNodeBuilder { node } + } + + pub fn depends_on(&mut self, other: &RcExecutionNodeBuilder) { + self.node.dependencies.push(other.borrow().node.id); + } + + pub fn build(&self) -> TransformationNode { + self.node.clone() + } +} diff --git a/replication/src/plan/mod.rs b/replication/src/plan/mod.rs index b3a8172..54339b4 100644 --- a/replication/src/plan/mod.rs +++ b/replication/src/plan/mod.rs @@ -2,6 +2,7 @@ pub mod transformation; pub mod transformations; pub mod transformation_node; +pub mod execution_graph_builder; pub mod context; pub mod environment; diff --git a/replication/src/plan/transformation_node.rs b/replication/src/plan/transformation_node.rs index 17b1d90..ab0c4e5 100644 --- a/replication/src/plan/transformation_node.rs +++ b/replication/src/plan/transformation_node.rs @@ -1,8 +1,10 @@ use std::sync::Arc; +use uuid::Uuid; + use crate::plan::transformation::Transformation; -pub type NodeId = usize; +pub type NodeId = Uuid; #[derive(Clone)] pub struct TransformationNode { @@ -10,5 +12,11 @@ pub struct TransformationNode { pub workdesk: String, pub transformation: Arc, pub dependencies: Vec, - pub dependents: Vec, +} + +impl TransformationNode { + pub fn new(workdesk: String, transformation: Arc, dependencies: Vec) -> Self { + let id: NodeId = Uuid::new_v4(); + TransformationNode { id, workdesk, transformation, dependencies } + } }