Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 7 additions & 53 deletions cli/src/commands/replication/apply.rs
Original file line number Diff line number Diff line change
@@ -1,71 +1,25 @@
use std::{fs::{create_dir_all, remove_dir_all}, path::PathBuf};
use std::{fs::remove_dir_all, path::PathBuf, sync::Arc};

use anyhow::Result;
use clap::Parser;
use colorize::AnsiColor;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use source_wand_common::{
project_manipulator::{
local_project_manipulator::LocalProjectManipulator,
project_manipulator::ProjectManipulator
}
use source_wand_replication::{
apply::plan_executor::execute_plan,
model::replication_plan::ReplicationPlan,
plan::transformation_node::TransformationNode
};
use source_wand_replication::model::{package::Package, package_destination::PackageDestination, package_origin::PackageOrigin, replication_plan::ReplicationPlan};
use uuid::Uuid;

use crate::commands::replication::plan::plan_replication;

#[derive(Debug, Parser)]
pub struct ReplicationApplyArgs;

fn replicate_package(package: Package) -> Result<()> {
if let PackageOrigin::GoCache(origin) = package.origin {
let PackageDestination::Git(destination) = package.destination;

let uuid: Uuid = Uuid::new_v4();
let dependency_directory: PathBuf = PathBuf::from(format!("./source-wand/{}", uuid));

create_dir_all(&dependency_directory)?;

let sh: LocalProjectManipulator = LocalProjectManipulator::new(dependency_directory, true);

let ls_remote: Result<String> = sh.run_shell(format!("git ls-remote --exit-code --heads {} {}", destination.git, destination.reference));

if ls_remote.is_ok() {
println!("{}", format!("[skipped] {} ({}), already exists on remote", origin.name, origin.version).yellow());
return Ok(());
}

sh.run_shell(format!("cp -r {}/* .", origin.path))?;
sh.run_shell("git init".to_string())?;
sh.run_shell(format!("git remote add origin {}", destination.git))?;
sh.run_shell(format!("git checkout --orphan {}", destination.reference))?;
sh.run_shell("git add .".to_string())?;
sh.run_shell("git commit -m 'Replicate source code'".to_string())?;
sh.run_shell(format!("git push -u origin {}", destination.reference))?;

println!("{}", format!("[replicated] {} ({})", origin.name, origin.version).green());

sh.cleanup();
}

Ok(())
}

pub fn replicate_apply_command(_args: &ReplicationApplyArgs) -> Result<()> {
let replication_plan: ReplicationPlan = plan_replication()?;

let results: Vec<Result<()>> = replication_plan
.packages
.into_par_iter()
.map(replicate_package)
.collect();
let execution_graph: Vec<Arc<TransformationNode>> = replication_plan.to_execution_graph();
execute_plan(execution_graph)?;

remove_dir_all(PathBuf::from("./source-wand")).ok();

for result in results {
result?;
}

Ok(())
}
3 changes: 3 additions & 0 deletions replication/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ edition = "2021"

[dependencies]
anyhow = "1.0.98"
colorize = "0.1.0"
rayon = "1.11.0"
regex = "1.11.1"
serde = { version = "1.0.219", features = ["derive"] }
uuid = { version = "1.18.0", features = ["v4"] }

source-wand-common = { path = "../common" }
2 changes: 2 additions & 0 deletions replication/src/apply/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod plan_executor;
pub mod plan_to_execution_graph;
137 changes: 137 additions & 0 deletions replication/src/apply/plan_executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use std::{
collections::{HashMap, HashSet, VecDeque}, fs::create_dir_all, path::PathBuf, sync::{Arc, Mutex, MutexGuard}
};
use colorize::AnsiColor;
use rayon::prelude::*;
use source_wand_common::project_manipulator::local_project_manipulator::LocalProjectManipulator;
use anyhow::{anyhow, Result};
use uuid::Uuid;

use crate::plan::{context::Context, transformation_node::{NodeId, TransformationNode}};

pub fn execute_plan(nodes: Vec<Arc<TransformationNode>>) -> Result<()> {
let mut workdesk_contexts: HashMap<String, Context> = HashMap::new();
for node in &nodes {
if !workdesk_contexts.contains_key(&node.workdesk) {
let uuid: Uuid = Uuid::new_v4();
let source_directory: PathBuf = PathBuf::from(format!("./source-wand/{}", uuid));

create_dir_all(&source_directory)?;

let sh: LocalProjectManipulator = LocalProjectManipulator::new(
source_directory,
false,
);

workdesk_contexts.insert(node.workdesk.clone(), Context::new(sh));
}
}

let context_map: Arc<Mutex<HashMap<String, Context>>> = Arc::new(Mutex::new(workdesk_contexts));
let node_map: HashMap<NodeId, Arc<TransformationNode>> =
nodes.iter().map(|n| (n.id, Arc::clone(n))).collect();
let completed: Arc<Mutex<HashSet<usize>>> = Arc::new(Mutex::new(HashSet::new()));
let queue: Arc<Mutex<VecDeque<usize>>> = Arc::new(Mutex::new(
nodes
.iter()
.filter(|node| node.dependencies.is_empty())
.map(|node| node.id)
.collect::<VecDeque<_>>(),
));
let dependents: HashMap<NodeId, Vec<NodeId>> = {
let mut map: HashMap<NodeId, Vec<NodeId>> = HashMap::new();
for node in &nodes {
for &dep in &node.dependencies {
map.entry(dep).or_default().push(node.id);
}
}
map
};

let error: Arc<Mutex<std::result::Result<(), anyhow::Error>>> = Arc::new(Mutex::new(Ok(())));

loop {
let batch: Vec<NodeId> = {
let mut q: MutexGuard<'_, VecDeque<usize>> = queue.lock().unwrap();
q.drain(..).collect()
};

if batch.is_empty() {
break;
}

batch.par_iter().for_each(|&node_id| {
if error.lock().unwrap().is_err() {
return;
}

let node: &Arc<TransformationNode> = node_map.get(&node_id).unwrap();

let ctx: Option<Context> = {
let mut contexts: MutexGuard<'_, HashMap<String, Context>> = context_map.lock().unwrap();
contexts.get_mut(&node.workdesk).cloned()
};

if let Some(ctx) = ctx {
if let Some(reason) = node.transformation.should_skip(&ctx) {
println!(
"{:<120} context: {}",
format!(
"{} {} {}",
"[skip]".to_string().yellow(),
node.transformation.get_name().blue(),
reason.italic(),
),
node.workdesk,
);
}
else {
let transformation_result: Result<Context> = node.transformation.apply(ctx);
if let Err(e) = transformation_result {
*error.lock().unwrap() = Err(e);
return;
}
else {
println!(
"{:<106} context: {}",
format!(
"{} {}",
"[execute]".to_string().green(),
node.transformation.get_name().blue(),
),
node.workdesk,
);
}
}
} else {
*error.lock().unwrap() = Err(anyhow::anyhow!("Missing context for workdesk {}", node.workdesk));
return;
}

completed.lock().unwrap().insert(node_id);

if let Some(deps) = dependents.get(&node_id) {
for &dependent_id in deps {
let dependent: &Arc<TransformationNode> = node_map.get(&dependent_id).unwrap();
let ready: bool = dependent
.dependencies
.iter()
.all(|dep| completed.lock().unwrap().contains(dep));
if ready {
queue.lock().unwrap().push_back(dependent_id);
}
}
}
});

if error.lock().unwrap().is_err() {
break;
}
}

let error: MutexGuard<'_, std::result::Result<(), anyhow::Error>> = error.lock().unwrap();
match &*error {
Ok(()) => Ok(()),
Err(e) => Err(anyhow!(e.to_string())),
}
}
157 changes: 157 additions & 0 deletions replication/src/apply/plan_to_execution_graph.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
use std::sync::Arc;

use crate::{model::{package_destination::PackageDestination, package_origin::PackageOrigin, replication_plan::ReplicationPlan}, plan::{transformation_node::{NodeId, TransformationNode}, transformations::{git::{git_init::GitInit, git_push::GitPush}, golang::fetch_source::GolangFetchSource}}};

impl ReplicationPlan {
pub fn to_execution_graph(&self) -> Vec<Arc<TransformationNode>> {
let mut execution_graph: Vec<Arc<TransformationNode>> = Vec::new();

let mut id: NodeId = 0;
for package in &self.packages {
if let PackageOrigin::GoCache(origin) = &package.origin {
let PackageDestination::Git(destination) = &package.destination;

let workdesk: String = format!("{} ({})", origin.name, origin.version);

let fetch: TransformationNode = TransformationNode {
id,
workdesk: workdesk.clone(),
transformation: Arc::new(GolangFetchSource::new(origin.path.clone())),
dependencies: vec![],
dependents: vec![id + 1]
};

let init: TransformationNode = TransformationNode {
id: id + 1,
workdesk: workdesk.clone(),
transformation: Arc::new(GitInit::new(destination.git.clone(), destination.reference.clone())),
dependencies: vec![id],
dependents: vec![id + 2]
};

let push: TransformationNode = TransformationNode {
id: id + 2,
workdesk,
transformation: Arc::new(GitPush::new(destination.git.clone(), destination.reference.clone())),
dependencies: vec![id + 1],
dependents: vec![]
};

execution_graph.push(Arc::new(fetch));
execution_graph.push(Arc::new(init));
execution_graph.push(Arc::new(push));

id += 3;
}
}

execution_graph
}
}

// use std::{collections::HashMap, sync::Arc};

// use crate::{model::{package::Package, replication_plan::ReplicationPlan}, plan::{transformation_node::{NodeId, TransformationNode}, transformations::{git::{git_init::GitInit, git_push::GitPush}, golang::fetch_source::GolangFetchSource}}};


// impl ReplicationPlan {
// pub fn to_execution_graph(&self) -> Vec<Arc<TransformationNode>> {
// let mut nodes: Vec<Arc<TransformationNode>> = Vec::new();
// let mut node_id_counter = 0;

// let mut package_nodes: HashMap<String, (NodeId, NodeId)> = HashMap::new();

// for package in &self.packages {
// let golang_fetch = Arc::new(GolangFetchSource::new(match &package.origin {
// crate::model::package_origin::PackageOrigin::GoCache(origin) => origin.path.clone(),
// crate::model::package_origin::PackageOrigin::Git(origin) => origin.git.clone(),
// }));

// let golang_fetch_node_id = node_id_counter;
// node_id_counter += 1;

// let golang_fetch_node = TransformationNode {
// id: golang_fetch_node_id,
// transformation: golang_fetch,
// dependencies: Vec::new(),
// dependents: Vec::new(),
// };

// let (repo_url, reference) = match &package.destination {
// crate::model::package_destination::PackageDestination::Git(dest) => (&dest.git, &dest.reference),
// };
// let git_init = Arc::new(GitInit::new(repo_url.clone(), reference.clone()));

// let git_init_node_id = node_id_counter;
// node_id_counter += 1;

// let git_init_node = TransformationNode {
// id: git_init_node_id,
// transformation: git_init,
// dependencies: vec![golang_fetch_node_id],
// dependents: Vec::new(),
// };

// let git_push = Arc::new(GitPush::new(repo_url.clone(), reference.clone()));

// let git_push_node_id = node_id_counter;
// node_id_counter += 1;

// let git_push_node = TransformationNode {
// id: git_push_node_id,
// transformation: git_push,
// dependencies: vec![git_init_node_id],
// dependents: Vec::new(),
// };

// let mut golang_fetch_node = Arc::try_unwrap(golang_fetch_node.into())
// .unwrap_or_else(|arc| (*arc).clone());
// golang_fetch_node.dependents.push(git_init_node_id);
// let mut git_init_node = Arc::try_unwrap(git_init_node.into())
// .unwrap_or_else(|arc| (*arc).clone());
// git_init_node.dependents.push(git_push_node_id);

// nodes.push(Arc::new(golang_fetch_node));
// nodes.push(Arc::new(git_init_node));
// nodes.push(Arc::new(git_push_node));

// package_nodes.insert(package.origin_key(), (golang_fetch_node_id, git_push_node_id));
// }

// let mut id_to_node = nodes.iter().map(|n| (n.id, Arc::clone(n))).collect::<HashMap<_, _>>();

// for package in &self.packages {
// let (pkg_golang_fetch_id, _) = package_nodes.get(&package.origin_key()).unwrap();

// for dependency in &package.dependencies {
// if let Some((_, dep_git_push_id)) = package_nodes.get(&dependency.name) {
// let mut pkg_golang_fetch_node = Arc::try_unwrap(id_to_node[pkg_golang_fetch_id].clone())
// .unwrap_or_else(|arc| (*arc).clone());
// if !pkg_golang_fetch_node.dependencies.contains(dep_git_push_id) {
// pkg_golang_fetch_node.dependencies.push(*dep_git_push_id);
// }

// let mut dep_git_push_node = Arc::try_unwrap(id_to_node[dep_git_push_id].clone())
// .unwrap_or_else(|arc| (*arc).clone());
// if !dep_git_push_node.dependents.contains(pkg_golang_fetch_id) {
// dep_git_push_node.dependents.push(*pkg_golang_fetch_id);
// }

// id_to_node.insert(*pkg_golang_fetch_id, Arc::new(pkg_golang_fetch_node));
// id_to_node.insert(*dep_git_push_id, Arc::new(dep_git_push_node));
// }
// }
// }

// id_to_node.values().cloned().collect()
// }
// }

// impl Package {
// fn origin_key(&self) -> String {
// match &self.origin {
// crate::model::package_origin::PackageOrigin::Git(origin) => origin.git.clone(),
// crate::model::package_origin::PackageOrigin::GoCache(origin) => origin.name.clone(),
// }
// }
// }
1 change: 1 addition & 0 deletions replication/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod model;
pub mod plan;
pub mod apply;
Loading