Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 79 additions & 91 deletions replication/src/apply/plan_executor.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
use std::{
collections::{HashMap, HashSet, VecDeque}, fs::create_dir_all, path::PathBuf, sync::{Arc, Mutex, MutexGuard}
collections::{
HashMap,
HashSet,
},
fs::create_dir_all,
path::PathBuf,
sync::{
Arc,
Mutex,
MutexGuard,
}
};

use colorize::AnsiColor;
use rayon::prelude::*;
use source_wand_common::project_manipulator::local_project_manipulator::LocalProjectManipulator;
use anyhow::{anyhow, Result};
use anyhow::{anyhow, Error, Result};
use uuid::Uuid;

use source_wand_common::project_manipulator::local_project_manipulator::LocalProjectManipulator;

use crate::plan::{context::Context, transformation_node::{NodeId, TransformationNode}};

pub fn execute_plan(nodes: Vec<Arc<TransformationNode>>) -> Result<()> {
Expand All @@ -28,112 +40,88 @@ pub fn execute_plan(nodes: Vec<Arc<TransformationNode>>) -> Result<()> {
}

let context_map: Arc<Mutex<HashMap<String, Context>>> = Arc::new(Mutex::new(workdesk_contexts));
let node_map: HashMap<NodeId, Arc<TransformationNode>> =
nodes.iter().map(|n| (n.id, Arc::clone(n))).collect();
let completed: Arc<Mutex<HashSet<usize>>> = Arc::new(Mutex::new(HashSet::new()));
let queue: Arc<Mutex<VecDeque<usize>>> = Arc::new(Mutex::new(
nodes
.iter()
.filter(|node| node.dependencies.is_empty())
.map(|node| node.id)
.collect::<VecDeque<_>>(),
));
let dependents: HashMap<NodeId, Vec<NodeId>> = {
let mut map: HashMap<NodeId, Vec<NodeId>> = HashMap::new();
for node in &nodes {
for &dep in &node.dependencies {
map.entry(dep).or_default().push(node.id);
}
}
map
};

let error: Arc<Mutex<std::result::Result<(), anyhow::Error>>> = Arc::new(Mutex::new(Ok(())));

loop {
let batch: Vec<NodeId> = {
let mut q: MutexGuard<'_, VecDeque<usize>> = queue.lock().unwrap();
q.drain(..).collect()
};

if batch.is_empty() {
break;
}

batch.par_iter().for_each(|&node_id| {
if error.lock().unwrap().is_err() {
return;
}
let completed: Arc<Mutex<HashSet<NodeId>>> = Arc::new(Mutex::new(HashSet::new()));

let node: &Arc<TransformationNode> = node_map.get(&node_id).unwrap();

let ctx: Option<Context> = {
let mut contexts: MutexGuard<'_, HashMap<String, Context>> = context_map.lock().unwrap();
contexts.get_mut(&node.workdesk).cloned()
};

if let Some(ctx) = ctx {
if let Some(reason) = node.transformation.should_skip(&ctx) {
println!(
"{:<120} context: {}",
format!(
"{} {} {}",
"[skip]".to_string().yellow(),
node.transformation.get_name().blue(),
reason.italic(),
),
node.workdesk,
);
}
else {
let transformation_result: Result<Option<String>> = node.transformation.apply(ctx);
match transformation_result {
Ok(message) => {
let message: String = message.unwrap_or_default();
let error: Arc<Mutex<Result<(), Error>>> = Arc::new(Mutex::new(Ok(())));

while completed.lock().unwrap().len() < nodes.len() {
let ready_nodes: Vec<Arc<TransformationNode>> = nodes
.iter()
.filter(
|node|
!completed.lock().unwrap().contains(&node.id) &&
node.dependencies
.iter()
.all(
|dependency|
completed
.lock()
.unwrap()
.contains(dependency)
)
)
.map(|node| node.clone())
.collect();

ready_nodes
.par_iter()
.for_each(
|node| {
let ctx: Option<Context> = {
let mut contexts: MutexGuard<'_, HashMap<String, Context>> = context_map.lock().unwrap();
contexts.get_mut(&node.workdesk).cloned()
};

if let Some(ctx) = ctx {
if let Some(reason) = node.transformation.should_skip(&ctx) {
println!(
"{:<120} context: {}",
format!(
"{} {} {}",
"[execute]".to_string().green(),
"[skip]".to_string().yellow(),
node.transformation.get_name().blue(),
message.italic(),
reason.italic(),
),
node.workdesk,
);
},
Err(e) => {
*error.lock().unwrap() = Err(e);
return;
}
else {
let transformation_result: Result<Option<String>> = node.transformation.apply(ctx);
match transformation_result {
Ok(message) => {
let message: String = message.unwrap_or_default();

println!(
"{:<120} context: {}",
format!(
"{} {} {}",
"[execute]".to_string().green(),
node.transformation.get_name().blue(),
message.italic(),
),
node.workdesk,
);
},
Err(e) => {
*error.lock().unwrap() = Err(e);
return;
}
}
}
} else {
*error.lock().unwrap() = Err(anyhow::anyhow!("Missing context for workdesk {}", node.workdesk));
return;
}
}
} else {
*error.lock().unwrap() = Err(anyhow::anyhow!("Missing context for workdesk {}", node.workdesk));
return;
}

completed.lock().unwrap().insert(node_id);

if let Some(deps) = dependents.get(&node_id) {
for &dependent_id in deps {
let dependent: &Arc<TransformationNode> = node_map.get(&dependent_id).unwrap();
let ready: bool = dependent
.dependencies
.iter()
.all(|dep| completed.lock().unwrap().contains(dep));
if ready {
queue.lock().unwrap().push_back(dependent_id);
}
completed.lock().unwrap().insert(node.id);
}
}
});

);

if error.lock().unwrap().is_err() {
break;
}
}

let error: MutexGuard<'_, std::result::Result<(), anyhow::Error>> = error.lock().unwrap();
match &*error {
Ok(()) => Ok(()),
Expand Down
97 changes: 45 additions & 52 deletions replication/src/apply/plan_to_execution_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ use crate::{
},
plan::{
environment::Environment,
execution_graph_builder::{
ExecutionGraphBuilder,
RcExecutionNodeBuilder
},
transformation_node::{
NodeId,
TransformationNode
},
transformations::{
Expand All @@ -26,20 +29,18 @@ use crate::{

impl ReplicationPlan {
pub fn to_execution_graph(&self) -> Vec<Arc<TransformationNode>> {
let mut execution_graph: Vec<Arc<TransformationNode>> = Vec::new();
let mut execution_graph_builder: ExecutionGraphBuilder = ExecutionGraphBuilder::new();

let mut id: NodeId = 0;
for package in &self.packages {
if let PackageOrigin::GoCache(origin) = &package.origin {
let PackageDestination::Git(destination) = &package.destination;

let environment: Environment = Environment::new(&origin.name, &origin.version);
let workdesk: String = format!("{} ({}-24.04/edge)", environment.name, environment.version_retrocompatible);

let initialize_project: TransformationNode = TransformationNode {
id: id,
workdesk: workdesk.clone(),
transformation: Arc::new(
let initialize_project: RcExecutionNodeBuilder = execution_graph_builder.create_node(
workdesk.clone(),
Arc::new(
InitializeProject::new(
GitInit::new(
destination.git.clone(),
Expand All @@ -53,57 +54,49 @@ impl ReplicationPlan {
),
GolangFetchSource::new(origin.path.clone()),
)
),
dependencies: vec![],
dependents: vec![id + 1],
};

let push_code: TransformationNode = TransformationNode {
id: id + 1,
workdesk: workdesk.clone(),
transformation: Arc::new(GitPush::new(
destination.reference.clone(),
"Replicate source code".to_string(),
)),
dependencies: vec![id],
dependents: vec![id + 2]
};
)
);

let initialize_sourcecraft: TransformationNode = TransformationNode {
id: id + 2,
workdesk: workdesk.clone(),
transformation: Arc::new(SourcecraftInitialize::new(
environment.name.clone(),
format!("{}-24.04", environment.version_retrocompatible.clone()),
"ubuntu@24.04".to_string(),
vec!["amd64".to_string()],
package.dependencies.clone(),
package.is_library,
)),
dependencies: vec![id + 1],
dependents: vec![id + 3]
};
let push_code: RcExecutionNodeBuilder = execution_graph_builder.create_node(
workdesk.clone(),
Arc::new(
GitPush::new(
destination.reference.clone(),
"Replicate source code".to_string(),
)
)
);

let push_sourcecraft_metadata: TransformationNode = TransformationNode {
id: id + 3,
workdesk: workdesk,
transformation: Arc::new(GitPush::new(
destination.reference.clone(),
"Initialize sourcecraft".to_string(),
)),
dependencies: vec![id + 2],
dependents: vec![]
};
let initialize_sourcecraft: RcExecutionNodeBuilder = execution_graph_builder.create_node(
workdesk.clone(),
Arc::new(
SourcecraftInitialize::new(
environment.name.clone(),
format!("{}-24.04", environment.version_retrocompatible.clone()),
"ubuntu@24.04".to_string(),
vec!["amd64".to_string()],
package.dependencies.clone(),
package.is_library,
)
)
);

execution_graph.push(Arc::new(initialize_project));
execution_graph.push(Arc::new(push_code));
execution_graph.push(Arc::new(initialize_sourcecraft));
execution_graph.push(Arc::new(push_sourcecraft_metadata));
let push_sourcecraft_metadata: RcExecutionNodeBuilder = execution_graph_builder.create_node(
workdesk.clone(),
Arc::new(
GitPush::new(
destination.reference.clone(),
"Initialize sourcecraft".to_string(),
)
)
);

id += 4;
push_code.borrow_mut().depends_on(&initialize_project);
initialize_sourcecraft.borrow_mut().depends_on(&push_code);
push_sourcecraft_metadata.borrow_mut().depends_on(&initialize_sourcecraft);
}
}

execution_graph
execution_graph_builder.build()
}
}
44 changes: 44 additions & 0 deletions replication/src/plan/execution_graph_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use std::{cell::RefCell, rc::Rc, sync::Arc};

use crate::plan::{transformation::Transformation, transformation_node::TransformationNode};

pub type RcExecutionNodeBuilder = Rc<RefCell<ExecutionNodeBuilder>>;

pub struct ExecutionGraphBuilder {
pub node_builders: Vec<RcExecutionNodeBuilder>,
}

impl ExecutionGraphBuilder {
pub fn new() -> Self {
ExecutionGraphBuilder { node_builders: Vec::new() }
}

pub fn create_node(&mut self, workdesk: String, transformation: Arc<dyn Transformation>) -> RcExecutionNodeBuilder {
let node: Rc<RefCell<ExecutionNodeBuilder>> = Rc::new(RefCell::new(ExecutionNodeBuilder::new(workdesk, transformation)));
self.node_builders.push(node.clone());
node
}

pub fn build(&self) -> Vec<Arc<TransformationNode>> {
self.node_builders.iter().map(|node| Arc::new(node.borrow().build())).collect()
}
}

pub struct ExecutionNodeBuilder {
pub node: TransformationNode,
}

impl ExecutionNodeBuilder {
pub fn new(workdesk: String, transformation: Arc<dyn Transformation>) -> Self {
let node: TransformationNode = TransformationNode::new(workdesk, transformation, Vec::new());
ExecutionNodeBuilder { node }
}

pub fn depends_on(&mut self, other: &RcExecutionNodeBuilder) {
self.node.dependencies.push(other.borrow().node.id);
}

pub fn build(&self) -> TransformationNode {
self.node.clone()
}
}
1 change: 1 addition & 0 deletions replication/src/plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod transformation;
pub mod transformations;

pub mod transformation_node;
pub mod execution_graph_builder;

pub mod context;
pub mod environment;
Loading