diff --git a/cli/src/commands/replication/apply.rs b/cli/src/commands/replication/apply.rs index 39adb26..7a27cd4 100644 --- a/cli/src/commands/replication/apply.rs +++ b/cli/src/commands/replication/apply.rs @@ -1,71 +1,25 @@ -use std::{fs::{create_dir_all, remove_dir_all}, path::PathBuf}; +use std::{fs::remove_dir_all, path::PathBuf, sync::Arc}; use anyhow::Result; use clap::Parser; -use colorize::AnsiColor; -use rayon::iter::{IntoParallelIterator, ParallelIterator}; -use source_wand_common::{ - project_manipulator::{ - local_project_manipulator::LocalProjectManipulator, - project_manipulator::ProjectManipulator - } +use source_wand_replication::{ + apply::plan_executor::execute_plan, + model::replication_plan::ReplicationPlan, + plan::transformation_node::TransformationNode }; -use source_wand_replication::model::{package::Package, package_destination::PackageDestination, package_origin::PackageOrigin, replication_plan::ReplicationPlan}; -use uuid::Uuid; use crate::commands::replication::plan::plan_replication; #[derive(Debug, Parser)] pub struct ReplicationApplyArgs; -fn replicate_package(package: Package) -> Result<()> { - if let PackageOrigin::GoCache(origin) = package.origin { - let PackageDestination::Git(destination) = package.destination; - - let uuid: Uuid = Uuid::new_v4(); - let dependency_directory: PathBuf = PathBuf::from(format!("./source-wand/{}", uuid)); - - create_dir_all(&dependency_directory)?; - - let sh: LocalProjectManipulator = LocalProjectManipulator::new(dependency_directory, true); - - let ls_remote: Result = sh.run_shell(format!("git ls-remote --exit-code --heads {} {}", destination.git, destination.reference)); - - if ls_remote.is_ok() { - println!("{}", format!("[skipped] {} ({}), already exists on remote", origin.name, origin.version).yellow()); - return Ok(()); - } - - sh.run_shell(format!("cp -r {}/* .", origin.path))?; - sh.run_shell("git init".to_string())?; - sh.run_shell(format!("git remote add origin {}", destination.git))?; - sh.run_shell(format!("git checkout --orphan {}", destination.reference))?; - sh.run_shell("git add .".to_string())?; - sh.run_shell("git commit -m 'Replicate source code'".to_string())?; - sh.run_shell(format!("git push -u origin {}", destination.reference))?; - - println!("{}", format!("[replicated] {} ({})", origin.name, origin.version).green()); - - sh.cleanup(); - } - - Ok(()) -} - pub fn replicate_apply_command(_args: &ReplicationApplyArgs) -> Result<()> { let replication_plan: ReplicationPlan = plan_replication()?; - let results: Vec> = replication_plan - .packages - .into_par_iter() - .map(replicate_package) - .collect(); + let execution_graph: Vec> = replication_plan.to_execution_graph(); + execute_plan(execution_graph)?; remove_dir_all(PathBuf::from("./source-wand")).ok(); - for result in results { - result?; - } - Ok(()) } diff --git a/replication/Cargo.toml b/replication/Cargo.toml index 6c6cbdc..3f7c862 100644 --- a/replication/Cargo.toml +++ b/replication/Cargo.toml @@ -5,7 +5,10 @@ edition = "2021" [dependencies] anyhow = "1.0.98" +colorize = "0.1.0" +rayon = "1.11.0" regex = "1.11.1" serde = { version = "1.0.219", features = ["derive"] } +uuid = { version = "1.18.0", features = ["v4"] } source-wand-common = { path = "../common" } diff --git a/replication/src/apply/mod.rs b/replication/src/apply/mod.rs new file mode 100644 index 0000000..37ef9ae --- /dev/null +++ b/replication/src/apply/mod.rs @@ -0,0 +1,2 @@ +pub mod plan_executor; +pub mod plan_to_execution_graph; diff --git a/replication/src/apply/plan_executor.rs b/replication/src/apply/plan_executor.rs new file mode 100644 index 0000000..c652b4a --- /dev/null +++ b/replication/src/apply/plan_executor.rs @@ -0,0 +1,137 @@ +use std::{ + collections::{HashMap, HashSet, VecDeque}, fs::create_dir_all, path::PathBuf, sync::{Arc, Mutex, MutexGuard} +}; +use colorize::AnsiColor; +use rayon::prelude::*; +use source_wand_common::project_manipulator::local_project_manipulator::LocalProjectManipulator; +use anyhow::{anyhow, Result}; +use uuid::Uuid; + +use crate::plan::{context::Context, transformation_node::{NodeId, TransformationNode}}; + +pub fn execute_plan(nodes: Vec>) -> Result<()> { + let mut workdesk_contexts: HashMap = HashMap::new(); + for node in &nodes { + if !workdesk_contexts.contains_key(&node.workdesk) { + let uuid: Uuid = Uuid::new_v4(); + let source_directory: PathBuf = PathBuf::from(format!("./source-wand/{}", uuid)); + + create_dir_all(&source_directory)?; + + let sh: LocalProjectManipulator = LocalProjectManipulator::new( + source_directory, + false, + ); + + workdesk_contexts.insert(node.workdesk.clone(), Context::new(sh)); + } + } + + let context_map: Arc>> = Arc::new(Mutex::new(workdesk_contexts)); + let node_map: HashMap> = + nodes.iter().map(|n| (n.id, Arc::clone(n))).collect(); + let completed: Arc>> = Arc::new(Mutex::new(HashSet::new())); + let queue: Arc>> = Arc::new(Mutex::new( + nodes + .iter() + .filter(|node| node.dependencies.is_empty()) + .map(|node| node.id) + .collect::>(), + )); + let dependents: HashMap> = { + let mut map: HashMap> = HashMap::new(); + for node in &nodes { + for &dep in &node.dependencies { + map.entry(dep).or_default().push(node.id); + } + } + map + }; + + let error: Arc>> = Arc::new(Mutex::new(Ok(()))); + + loop { + let batch: Vec = { + let mut q: MutexGuard<'_, VecDeque> = queue.lock().unwrap(); + q.drain(..).collect() + }; + + if batch.is_empty() { + break; + } + + batch.par_iter().for_each(|&node_id| { + if error.lock().unwrap().is_err() { + return; + } + + let node: &Arc = node_map.get(&node_id).unwrap(); + + let ctx: Option = { + let mut contexts: MutexGuard<'_, HashMap> = context_map.lock().unwrap(); + contexts.get_mut(&node.workdesk).cloned() + }; + + if let Some(ctx) = ctx { + if let Some(reason) = node.transformation.should_skip(&ctx) { + println!( + "{:<120} context: {}", + format!( + "{} {} {}", + "[skip]".to_string().yellow(), + node.transformation.get_name().blue(), + reason.italic(), + ), + node.workdesk, + ); + } + else { + let transformation_result: Result = node.transformation.apply(ctx); + if let Err(e) = transformation_result { + *error.lock().unwrap() = Err(e); + return; + } + else { + println!( + "{:<106} context: {}", + format!( + "{} {}", + "[execute]".to_string().green(), + node.transformation.get_name().blue(), + ), + node.workdesk, + ); + } + } + } else { + *error.lock().unwrap() = Err(anyhow::anyhow!("Missing context for workdesk {}", node.workdesk)); + return; + } + + completed.lock().unwrap().insert(node_id); + + if let Some(deps) = dependents.get(&node_id) { + for &dependent_id in deps { + let dependent: &Arc = node_map.get(&dependent_id).unwrap(); + let ready: bool = dependent + .dependencies + .iter() + .all(|dep| completed.lock().unwrap().contains(dep)); + if ready { + queue.lock().unwrap().push_back(dependent_id); + } + } + } + }); + + if error.lock().unwrap().is_err() { + break; + } + } + + let error: MutexGuard<'_, std::result::Result<(), anyhow::Error>> = error.lock().unwrap(); + match &*error { + Ok(()) => Ok(()), + Err(e) => Err(anyhow!(e.to_string())), + } +} diff --git a/replication/src/apply/plan_to_execution_graph.rs b/replication/src/apply/plan_to_execution_graph.rs new file mode 100644 index 0000000..0680865 --- /dev/null +++ b/replication/src/apply/plan_to_execution_graph.rs @@ -0,0 +1,157 @@ +use std::sync::Arc; + +use crate::{model::{package_destination::PackageDestination, package_origin::PackageOrigin, replication_plan::ReplicationPlan}, plan::{transformation_node::{NodeId, TransformationNode}, transformations::{git::{git_init::GitInit, git_push::GitPush}, golang::fetch_source::GolangFetchSource}}}; + +impl ReplicationPlan { + pub fn to_execution_graph(&self) -> Vec> { + let mut execution_graph: Vec> = Vec::new(); + + let mut id: NodeId = 0; + for package in &self.packages { + if let PackageOrigin::GoCache(origin) = &package.origin { + let PackageDestination::Git(destination) = &package.destination; + + let workdesk: String = format!("{} ({})", origin.name, origin.version); + + let fetch: TransformationNode = TransformationNode { + id, + workdesk: workdesk.clone(), + transformation: Arc::new(GolangFetchSource::new(origin.path.clone())), + dependencies: vec![], + dependents: vec![id + 1] + }; + + let init: TransformationNode = TransformationNode { + id: id + 1, + workdesk: workdesk.clone(), + transformation: Arc::new(GitInit::new(destination.git.clone(), destination.reference.clone())), + dependencies: vec![id], + dependents: vec![id + 2] + }; + + let push: TransformationNode = TransformationNode { + id: id + 2, + workdesk, + transformation: Arc::new(GitPush::new(destination.git.clone(), destination.reference.clone())), + dependencies: vec![id + 1], + dependents: vec![] + }; + + execution_graph.push(Arc::new(fetch)); + execution_graph.push(Arc::new(init)); + execution_graph.push(Arc::new(push)); + + id += 3; + } + } + + execution_graph + } +} + +// use std::{collections::HashMap, sync::Arc}; + +// use crate::{model::{package::Package, replication_plan::ReplicationPlan}, plan::{transformation_node::{NodeId, TransformationNode}, transformations::{git::{git_init::GitInit, git_push::GitPush}, golang::fetch_source::GolangFetchSource}}}; + + +// impl ReplicationPlan { +// pub fn to_execution_graph(&self) -> Vec> { +// let mut nodes: Vec> = Vec::new(); +// let mut node_id_counter = 0; + +// let mut package_nodes: HashMap = HashMap::new(); + +// for package in &self.packages { +// let golang_fetch = Arc::new(GolangFetchSource::new(match &package.origin { +// crate::model::package_origin::PackageOrigin::GoCache(origin) => origin.path.clone(), +// crate::model::package_origin::PackageOrigin::Git(origin) => origin.git.clone(), +// })); + +// let golang_fetch_node_id = node_id_counter; +// node_id_counter += 1; + +// let golang_fetch_node = TransformationNode { +// id: golang_fetch_node_id, +// transformation: golang_fetch, +// dependencies: Vec::new(), +// dependents: Vec::new(), +// }; + +// let (repo_url, reference) = match &package.destination { +// crate::model::package_destination::PackageDestination::Git(dest) => (&dest.git, &dest.reference), +// }; +// let git_init = Arc::new(GitInit::new(repo_url.clone(), reference.clone())); + +// let git_init_node_id = node_id_counter; +// node_id_counter += 1; + +// let git_init_node = TransformationNode { +// id: git_init_node_id, +// transformation: git_init, +// dependencies: vec![golang_fetch_node_id], +// dependents: Vec::new(), +// }; + +// let git_push = Arc::new(GitPush::new(repo_url.clone(), reference.clone())); + +// let git_push_node_id = node_id_counter; +// node_id_counter += 1; + +// let git_push_node = TransformationNode { +// id: git_push_node_id, +// transformation: git_push, +// dependencies: vec![git_init_node_id], +// dependents: Vec::new(), +// }; + +// let mut golang_fetch_node = Arc::try_unwrap(golang_fetch_node.into()) +// .unwrap_or_else(|arc| (*arc).clone()); +// golang_fetch_node.dependents.push(git_init_node_id); +// let mut git_init_node = Arc::try_unwrap(git_init_node.into()) +// .unwrap_or_else(|arc| (*arc).clone()); +// git_init_node.dependents.push(git_push_node_id); + +// nodes.push(Arc::new(golang_fetch_node)); +// nodes.push(Arc::new(git_init_node)); +// nodes.push(Arc::new(git_push_node)); + +// package_nodes.insert(package.origin_key(), (golang_fetch_node_id, git_push_node_id)); +// } + +// let mut id_to_node = nodes.iter().map(|n| (n.id, Arc::clone(n))).collect::>(); + +// for package in &self.packages { +// let (pkg_golang_fetch_id, _) = package_nodes.get(&package.origin_key()).unwrap(); + +// for dependency in &package.dependencies { +// if let Some((_, dep_git_push_id)) = package_nodes.get(&dependency.name) { +// let mut pkg_golang_fetch_node = Arc::try_unwrap(id_to_node[pkg_golang_fetch_id].clone()) +// .unwrap_or_else(|arc| (*arc).clone()); +// if !pkg_golang_fetch_node.dependencies.contains(dep_git_push_id) { +// pkg_golang_fetch_node.dependencies.push(*dep_git_push_id); +// } + +// let mut dep_git_push_node = Arc::try_unwrap(id_to_node[dep_git_push_id].clone()) +// .unwrap_or_else(|arc| (*arc).clone()); +// if !dep_git_push_node.dependents.contains(pkg_golang_fetch_id) { +// dep_git_push_node.dependents.push(*pkg_golang_fetch_id); +// } + +// id_to_node.insert(*pkg_golang_fetch_id, Arc::new(pkg_golang_fetch_node)); +// id_to_node.insert(*dep_git_push_id, Arc::new(dep_git_push_node)); +// } +// } +// } + +// id_to_node.values().cloned().collect() +// } +// } + +// impl Package { +// fn origin_key(&self) -> String { +// match &self.origin { +// crate::model::package_origin::PackageOrigin::Git(origin) => origin.git.clone(), +// crate::model::package_origin::PackageOrigin::GoCache(origin) => origin.name.clone(), +// } +// } +// } diff --git a/replication/src/lib.rs b/replication/src/lib.rs index f743b81..8118f86 100644 --- a/replication/src/lib.rs +++ b/replication/src/lib.rs @@ -1,2 +1,3 @@ pub mod model; pub mod plan; +pub mod apply; diff --git a/replication/src/plan/context.rs b/replication/src/plan/context.rs new file mode 100644 index 0000000..0db9d06 --- /dev/null +++ b/replication/src/plan/context.rs @@ -0,0 +1,12 @@ +use source_wand_common::project_manipulator::local_project_manipulator::LocalProjectManipulator; + +#[derive(Debug, Clone)] +pub struct Context { + pub sh: LocalProjectManipulator, +} + +impl Context { + pub fn new(sh: LocalProjectManipulator) -> Self { + Context { sh } + } +} diff --git a/replication/src/plan/fetch_source.rs b/replication/src/plan/fetch_source.rs deleted file mode 100644 index 3124e1a..0000000 --- a/replication/src/plan/fetch_source.rs +++ /dev/null @@ -1,77 +0,0 @@ -use std::{fs::create_dir_all, path::PathBuf}; - -use anyhow::{bail, Ok, Result}; -use regex::Regex; -use source_wand_common::{ - project::Project, - project_manipulator::{ - local_project_manipulator::LocalProjectManipulator, - project_manipulator::ProjectManipulator - } -}; - -use crate::model::{ - package_origin::PackageOrigin, - package_origin_git::PackageOriginGit -}; - -pub fn fetch_source(project: &Project) -> Result { - let project_directory: PathBuf = PathBuf::from( - format!( - "./packages/{}-{}/repository/", - project.name.replace("/", "-"), - project.version.replace("/", "-"), - ) - ); - create_dir_all(&project_directory)?; - - let project_manipulator: LocalProjectManipulator = LocalProjectManipulator::new(project_directory, false); - - let short_name: &str = match project.name.split("/").last() { - Some(short_name) => short_name, - None => project.name.as_str(), - }; - - let tags_raw: String = project_manipulator.run_shell(format!("git ls-remote --tags {}", project.repository))?; - let tags: Vec<&str> = tags_raw.lines() - .into_iter() - .filter_map(|tag| tag.split("\t").last()) - .collect(); - - let branches_raw: String = project_manipulator.run_shell(format!("git ls-remote --heads {}", project.repository))?; - let branches: Vec<&str> = branches_raw.lines() - .filter_map(|branch| branch.split("\t").last()) - .collect(); - - let commit_hash_regex = Regex::new( - r"^v\d+\.\d+\.\d+(?:-[^+]+)?-(\d{14})-([a-f0-9]+)(?:\+incompatible)?$" - )?; - let potential_commit_hash: Option = commit_hash_regex - .captures(project.version.as_str()) - .and_then(|captures| captures.get(2).map(|part| part.as_str().to_string())); - - // let mut path: Option = None; - - let version_tag: &str = project.version.split('+').next().unwrap_or(&project.version); - let checkout: String = - if let Some(tag) = tags.iter().find(|tag| tag.contains(&format!("{}/{}", short_name, version_tag))) { - // path = Some(short_name.to_string()); - tag.to_string() - } - else if let Some(tag) = tags.iter().find(|tag| tag.contains(version_tag)) { - tag.to_string() - } - else if let Some(branch) = branches.iter().find(|branch| branch.contains(version_tag)) { - branch.to_string() - } - else if let Some(potential_commit_hash) = potential_commit_hash { - project_manipulator.run_shell(format!("git clone --no-checkout {} .", project.repository))?; - project_manipulator.run_shell(format!("git checkout {}", potential_commit_hash))?; - potential_commit_hash - } - else { - bail!("No tag, branch or commit matches the package version") - }; - - Ok(PackageOriginGit::new(project.repository.clone(), checkout)) -} diff --git a/replication/src/plan/mod.rs b/replication/src/plan/mod.rs index 4cfcc99..bb3ca15 100644 --- a/replication/src/plan/mod.rs +++ b/replication/src/plan/mod.rs @@ -1,2 +1,6 @@ -pub mod plan_replication; -pub mod fetch_source; +pub mod transformation; +pub mod transformations; + +pub mod transformation_node; + +pub mod context; diff --git a/replication/src/plan/plan_replication.rs b/replication/src/plan/plan_replication.rs deleted file mode 100644 index 4aee67b..0000000 --- a/replication/src/plan/plan_replication.rs +++ /dev/null @@ -1,7 +0,0 @@ -use anyhow::Result; - -use crate::model::replication_manifest::ReplicationManifest; - -pub fn plan_replication() -> Result { - todo!() -} diff --git a/replication/src/plan/transformation.rs b/replication/src/plan/transformation.rs new file mode 100644 index 0000000..1d916b6 --- /dev/null +++ b/replication/src/plan/transformation.rs @@ -0,0 +1,25 @@ +use anyhow::Result; + +use crate::plan::context::Context; + +pub trait Transformation: Send + Sync + TransformationClone { + fn apply(&self, ctx: Context) -> Result; + fn should_skip(&self, ctx: &Context) -> Option; + fn get_name(&self) -> String; +} + +pub trait TransformationClone { + fn clone_box(&self) -> Box; +} + +impl TransformationClone for T where T: 'static + Transformation + Clone { + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } +} + +impl Clone for Box { + fn clone(&self) -> Self { + self.clone_box() + } +} diff --git a/replication/src/plan/transformation_node.rs b/replication/src/plan/transformation_node.rs new file mode 100644 index 0000000..17b1d90 --- /dev/null +++ b/replication/src/plan/transformation_node.rs @@ -0,0 +1,14 @@ +use std::sync::Arc; + +use crate::plan::transformation::Transformation; + +pub type NodeId = usize; + +#[derive(Clone)] +pub struct TransformationNode { + pub id: NodeId, + pub workdesk: String, + pub transformation: Arc, + pub dependencies: Vec, + pub dependents: Vec, +} diff --git a/replication/src/plan/transformations/git/git_init.rs b/replication/src/plan/transformations/git/git_init.rs new file mode 100644 index 0000000..1f49500 --- /dev/null +++ b/replication/src/plan/transformations/git/git_init.rs @@ -0,0 +1,47 @@ +use anyhow::Result; +use source_wand_common::project_manipulator::project_manipulator::ProjectManipulator; + +use crate::plan::{context::Context, transformation::Transformation}; + +#[derive(Debug, Clone)] +pub struct GitInit { + repository_url: String, + reference: String, +} + +impl GitInit { + pub fn new(repository_url: String, reference: String) -> Self { + GitInit { repository_url, reference } + } +} + +impl Transformation for GitInit { + fn apply(&self, ctx: Context) -> Result { + ctx.sh.run_shell("git init".to_string())?; + ctx.sh.run_shell(format!("git remote add origin {}", self.repository_url))?; + ctx.sh.run_shell(format!("git checkout --orphan {}", self.reference))?; + + Ok(ctx) + } + + fn should_skip(&self, ctx: &Context) -> Option { + let ls_remote: Result = ctx.sh.run_shell( + format!( + "git ls-remote --exit-code --heads {} {}", + self.repository_url, + self.reference + ) + ); + + if ls_remote.is_ok() { + Some("reference already exists on remote".to_string()) + } + else { + None + } + } + + fn get_name(&self) -> String { + "initialize git repository".to_string() + } +} diff --git a/replication/src/plan/transformations/git/git_push.rs b/replication/src/plan/transformations/git/git_push.rs new file mode 100644 index 0000000..584ece6 --- /dev/null +++ b/replication/src/plan/transformations/git/git_push.rs @@ -0,0 +1,47 @@ +use anyhow::Result; +use source_wand_common::project_manipulator::project_manipulator::ProjectManipulator; + +use crate::plan::{context::Context, transformation::Transformation}; + +#[derive(Debug, Clone)] +pub struct GitPush { + repository_url: String, + reference: String, +} + +impl GitPush { + pub fn new(repository_url: String, reference: String) -> Self { + GitPush { repository_url, reference } + } +} + +impl Transformation for GitPush { + fn apply(&self, ctx: Context) -> Result { + ctx.sh.run_shell("git add .".to_string())?; + ctx.sh.run_shell("git commit -m 'Replicate source code'".to_string())?; + ctx.sh.run_shell(format!("git push -u origin {}", self.reference))?; + + Ok(ctx) + } + + fn should_skip(&self, ctx: &Context) -> Option { + let ls_remote: Result = ctx.sh.run_shell( + format!( + "git ls-remote --exit-code --heads {} {}", + self.repository_url, + self.reference + ) + ); + + if ls_remote.is_ok() { + Some("reference already exists on remote".to_string()) + } + else { + None + } + } + + fn get_name(&self) -> String { + "push to git".to_string() + } +} diff --git a/replication/src/plan/transformations/git/mod.rs b/replication/src/plan/transformations/git/mod.rs new file mode 100644 index 0000000..8246f8f --- /dev/null +++ b/replication/src/plan/transformations/git/mod.rs @@ -0,0 +1,2 @@ +pub mod git_init; +pub mod git_push; diff --git a/replication/src/plan/transformations/golang/fetch_source.rs b/replication/src/plan/transformations/golang/fetch_source.rs new file mode 100644 index 0000000..7b5c1b4 --- /dev/null +++ b/replication/src/plan/transformations/golang/fetch_source.rs @@ -0,0 +1,30 @@ +use anyhow::Result; +use source_wand_common::project_manipulator::project_manipulator::ProjectManipulator; + +use crate::plan::{context::Context, transformation::Transformation}; + +#[derive(Debug, Clone)] +pub struct GolangFetchSource { + pub origin: String, +} + +impl GolangFetchSource { + pub fn new(origin: String) -> Self { + GolangFetchSource { origin } + } +} + +impl Transformation for GolangFetchSource { + fn apply(&self, ctx: Context) -> Result { + ctx.sh.run_shell(format!("cp -r {}/* .", self.origin))?; + Ok(ctx) + } + + fn should_skip(&self, _: &Context) -> Option { + None + } + + fn get_name(&self) -> String { + "fetch go source code".to_string() + } +} diff --git a/replication/src/plan/transformations/golang/mod.rs b/replication/src/plan/transformations/golang/mod.rs new file mode 100644 index 0000000..321be6b --- /dev/null +++ b/replication/src/plan/transformations/golang/mod.rs @@ -0,0 +1 @@ +pub mod fetch_source; diff --git a/replication/src/plan/transformations/mod.rs b/replication/src/plan/transformations/mod.rs new file mode 100644 index 0000000..e9645c5 --- /dev/null +++ b/replication/src/plan/transformations/mod.rs @@ -0,0 +1,2 @@ +pub mod golang; +pub mod git;