diff --git a/Cargo.lock b/Cargo.lock index 5090e7a..4e2e25a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1214,7 +1214,7 @@ dependencies = [ "anstream", "anstyle", "clap_lex", - "strsim", + "strsim 0.11.1", "terminal_size", ] @@ -2331,6 +2331,41 @@ dependencies = [ "syn 2.0.58", ] +[[package]] +name = "darling" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d706e75d87e35569db781a9b5e2416cff1236a47ed380831f959382ccd5f858" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0c960ae2da4de88a91b2d920c2a7233b400bc33cb28453a2987822d8392519b" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim 0.9.3", + "syn 1.0.109", +] + +[[package]] +name = "darling_macro" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b5a2f4ac4969822c62224815d069952656cadc7084fdca9751e6d959189b72" +dependencies = [ + "darling_core", + "quote", + "syn 1.0.109", +] + [[package]] name = "dashmap" version = "5.5.3" @@ -2436,6 +2471,31 @@ dependencies = [ "syn 2.0.58", ] +[[package]] +name = "derive_builder" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2658621297f2cf68762a6f7dc0bb7e1ff2cfd6583daef8ee0fed6f7ec468ec0" +dependencies = [ + "darling", + "derive_builder_core", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "derive_builder_core" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2791ea3e372c8495c0bc2033991d76b512cd799d07491fbd6890124db9458bef" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "derive_more" version = "0.99.17" @@ -2538,6 +2598,18 @@ dependencies = [ "syn 2.0.58", ] +[[package]] +name = "dns-lookup" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53ecafc952c4528d9b51a458d1a8904b81783feff9fde08ab6ed2545ff396872" +dependencies = [ + "cfg-if", + "libc", + "socket2 0.4.10", + "winapi", +] + [[package]] name = "docify" version = "0.2.8" @@ -2720,6 +2792,24 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" + +[[package]] +name = "enum-as-inner" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "570d109b813e904becc80d8d5da38376818a143348413f7149f1340fe04754d4" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "enum-as-inner" version = "0.5.1" @@ -3983,6 +4073,20 @@ dependencies = [ "tokio-rustls 0.24.1", ] +[[package]] +name = "hyper-system-resolver" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6eea26c5d0b6ab9d72219f65000af310f042a740926f7b2fa3553e774036e2e7" +dependencies = [ + "derive_builder", + "dns-lookup", + "hyper", + "tokio", + "tower-service", + "tracing", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -4019,6 +4123,12 @@ dependencies = [ "cc", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "0.2.3" @@ -4732,7 +4842,7 @@ dependencies = [ "smallvec", "socket2 0.4.10", "tokio", - "trust-dns-proto", + "trust-dns-proto 0.22.0", "void", ] @@ -5711,6 +5821,15 @@ dependencies = [ "tokio", ] +[[package]] +name = "nibble_vec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" +dependencies = [ + "smallvec", +] + [[package]] name = "nix" version = "0.24.3" @@ -6335,6 +6454,7 @@ name = "pallet-container" version = "0.1.0" dependencies = [ "cumulus-primitives-core", + "derivative", "frame-benchmarking", "frame-support", "frame-system", @@ -8912,6 +9032,7 @@ dependencies = [ "polkadot-primitives", "popsicle-runtime", "primitives-container", + "public-ip", "reqwest", "ring 0.17.8", "sc-basic-authorship", @@ -9328,6 +9449,27 @@ dependencies = [ "cc", ] +[[package]] +name = "public-ip" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4c40db5262d93298c363a299f8bc1b3a956a78eecddba3bc0e58b76e2f419a" +dependencies = [ + "dns-lookup", + "futures-core", + "futures-util", + "http", + "hyper", + "hyper-system-resolver", + "pin-project-lite 0.2.14", + "thiserror", + "tokio", + "tracing", + "tracing-futures", + "trust-dns-client", + "trust-dns-proto 0.20.4", +] + [[package]] name = "quanta" version = "0.12.3" @@ -9415,6 +9557,16 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" +[[package]] +name = "radix_trie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +dependencies = [ + "endian-type", + "nibble_vec", +] + [[package]] name = "rand" version = "0.8.5" @@ -12985,6 +13137,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "strsim" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6446ced80d6c486436db5c078dde11a9f73d42b57fb273121e160b84f63d894c" + [[package]] name = "strsim" version = "0.11.1" @@ -13677,6 +13835,8 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" dependencies = [ + "futures", + "futures-task", "pin-project", "tracing", ] @@ -13772,6 +13932,51 @@ dependencies = [ "hash-db", ] +[[package]] +name = "trust-dns-client" +version = "0.20.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b4ef9b9bde0559b78a4abb00339143750085f05e5a453efb7b8bef1061f09dc" +dependencies = [ + "cfg-if", + "data-encoding", + "futures-channel", + "futures-util", + "lazy_static", + "log", + "radix_trie", + "rand", + "thiserror", + "time", + "tokio", + "trust-dns-proto 0.20.4", +] + +[[package]] +name = "trust-dns-proto" +version = "0.20.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca94d4e9feb6a181c690c4040d7a24ef34018d8313ac5044a61d21222ae24e31" +dependencies = [ + "async-trait", + "cfg-if", + "data-encoding", + "enum-as-inner 0.3.4", + "futures-channel", + "futures-io", + "futures-util", + "idna 0.2.3", + "ipnet", + "lazy_static", + "log", + "rand", + "smallvec", + "thiserror", + "tinyvec", + "tokio", + "url", +] + [[package]] name = "trust-dns-proto" version = "0.22.0" @@ -13781,7 +13986,7 @@ dependencies = [ "async-trait", "cfg-if", "data-encoding", - "enum-as-inner", + "enum-as-inner 0.5.1", "futures-channel", "futures-io", "futures-util", @@ -13815,7 +14020,7 @@ dependencies = [ "thiserror", "tokio", "tracing", - "trust-dns-proto", + "trust-dns-proto 0.22.0", ] [[package]] diff --git a/node/Cargo.toml b/node/Cargo.toml index 8b6b13e..87f0122 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -29,6 +29,7 @@ tempfile="3.9.0" error-chain="0.12.4" ring="0.17.8" tokio = { version = "1.22.0", features = ["parking_lot", "rt-multi-thread", "time"] } +public-ip = "0.2" # Local primitives-container = { path = "../primitives/container"} diff --git a/node/src/container_task.rs b/node/src/container_task.rs index 574500a..9d44e43 100644 --- a/node/src/container_task.rs +++ b/node/src/container_task.rs @@ -1,11 +1,23 @@ +//! Container Spawner +//! +//! After the node is started, a background service is created, which will be executed until the +//! node program is closed. After receiving the block of the relay chain (forming a 6s cycle), the +//! background service will call the download application task (if a download signal is detected) +//! and the execution application task (if an execution signal is detected). Download application +//! task: Determine whether the currently running node is in a certain group. If not, end this task. +//! If the new group is the same as the previous group, end this task. If it is in a certain group +//! and is different from the previously assigned group (the first time the group information is +//! obtained, the subsequent logic will also be executed), then obtain the application information +//! corresponding to the group, check whether the application has been downloaded, if not, download +//! the application, and then start the synchronization block process. Execute application task: +//! Determine whether the effective time has arrived and there is a new application to be started. +//! If not, do not execute. If so, stop the old application and start the new application. use codec::Decode; -use cumulus_primitives_core::{ - relay_chain::BlockNumber as RelayBlockNumber, ParaId, PersistedValidationData, -}; +use cumulus_primitives_core::{ParaId, PersistedValidationData}; use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; use futures::{lock::Mutex, pin_mut, select, FutureExt, Stream, StreamExt}; use polkadot_primitives::OccupiedCoreAssumption; -use primitives_container::{ContainerRuntimeApi, DownloadInfo}; +use primitives_container::{ContainerRuntimeApi, DownloadInfo, ProcessorDownloadInfo}; use reqwest::{ self, header::{HeaderValue, CONTENT_LENGTH, RANGE}, @@ -23,9 +35,9 @@ use sp_runtime::{ AccountId32, }; use std::{ + collections::HashMap, error::Error, - fs, - fs::{File, Permissions}, + fs::{self, File, Permissions}, io::{BufReader, Read}, os::unix::fs::PermissionsExt, path::{Path, PathBuf}, @@ -33,9 +45,17 @@ use std::{ str::FromStr, sync::Arc, }; -pub const RUN_ARGS_KEY: &[u8] = b"run_args"; -pub const SYNC_ARGS_KEY: &[u8] = b"sync_args"; -pub const OPTION_ARGS_KEY: &[u8] = b"option_args"; + +// Consensus client startup consensus arguments. +pub const RUN_ARGS_KEY: &str = "run_args"; +// Consensus client startup sync block arguments. +pub const SYNC_ARGS_KEY: &str = "sync_args"; +// Consensus client run as docker container startup arguments. +pub const OPTION_ARGS_KEY: &str = "option_args"; +// Processor client startup arguments. +pub const P_RUN_ARGS_KEY: &str = "p_run_args"; +// Processor client run as docker container startup arguments. +pub const P_OPTION_ARGS_KEY: &str = "p_option_args"; struct PartialRangeIter { start: u64, @@ -68,6 +88,7 @@ impl Iterator for PartialRangeIter { } } +// Calculate the sha256 value of the file. async fn sha256_digest(mut reader: R) -> Result> { let mut context = Context::new(&SHA256); let mut buffer = [0; 1024]; @@ -83,9 +104,11 @@ async fn sha256_digest(mut reader: R) -> Result Result<(), Box> { //firt create dir @@ -100,7 +123,7 @@ async fn download_sdk( let client = reqwest::blocking::Client::new(); - let web_path = format!("{}/{}", url, std::str::from_utf8(&app_info.file_name)?); + let web_path = format!("{}/{}", url, file_name); log::info!("=============download:{:?}", web_path); let response = client.head(&web_path).send()?; @@ -113,7 +136,7 @@ async fn download_sdk( let length = u64::from_str(length.to_str()?).map_err(|_| "invalid Content-Length header")?; log::info!("==========total length:{:?}", length); - let download_path = format!("{}/{}", path_str, std::str::from_utf8(&app_info.file_name)?); + let download_path = format!("{}/{}", path_str, file_name); log::info!("=============download_path:{:?}", download_path); let download_dir = Path::new(&download_path); @@ -146,7 +169,7 @@ async fn download_sdk( let digest = sha256_digest(reader).await?; println!("SHA-256 digest is {:?}", digest); - if digest.as_ref() == app_info.app_hash.as_bytes() { + if digest.as_ref() == file_hash.as_bytes() { println!("check ok"); } else { println!("check fail"); @@ -155,6 +178,8 @@ async fn download_sdk( Ok(()) } + +// Determine whether to download an application whose hash is a certain value. async fn need_download( data_path: &str, app_hash: H256, @@ -174,6 +199,20 @@ async fn need_download( } } +// Determine whether you need to pull a Docker image with a hash value. +async fn need_pull_docker_image( + docker_image: &str, + file_hash: H256, +) -> Result> { + let exist = check_docker_image_exist(docker_image).await?; + let mut need = false; + if exist { + need = check_docker_image_hash(docker_image, file_hash).await?; + } + Ok(need) +} + +// Check if a docker image exists. async fn check_docker_image_exist( docker_image: &str, ) -> Result> { @@ -198,14 +237,20 @@ async fn check_docker_image_exist( Ok(result) } -async fn download_docker_image(docker_image: &str) -> Result<(), Box> { +// Pull docker image. +async fn download_docker_image( + docker_image: &str, + file_hash: H256, +) -> Result> { let pull_cmd = format!("pull {}", docker_image); let args: Vec<&str> = pull_cmd.split(' ').into_iter().map(|arg| arg).collect(); let mut instance = Command::new("docker").args(args).spawn()?; instance.wait()?; - Ok(()) + let result = check_docker_image_hash(docker_image, file_hash).await?; + Ok(result) } +// Remove docker container by name. async fn remove_docker_container(container_name: &str) -> Result<(), Box> { let mut docker_cmd = format!("container stop {}", container_name); let mut args: Vec<&str> = docker_cmd.split(' ').into_iter().map(|arg| arg).collect(); @@ -219,6 +264,7 @@ async fn remove_docker_container(container_name: &str) -> Result<(), Box Result> { + let ls_cmd = format!("image ls {} --format", docker_image); + let mut args: Vec<&str> = ls_cmd.split(' ').into_iter().map(|arg| arg).collect(); + args.push("table {{.Digest}}"); + let mut instance = Command::new("docker").stdout(Stdio::piped()).args(args).spawn()?; + let mut result = false; + if let Some(ls_output) = instance.stdout.take() { + let grep_cmd = Command::new("grep") + .arg("sha256") + .stdin(ls_output) + .stdout(Stdio::piped()) + .spawn()?; + + let grep_stdout = grep_cmd.wait_with_output()?; + instance.wait()?; + let sha256_hash = String::from_utf8(grep_stdout.stdout)?; + if sha256_hash.len() > 0 { + let hashs: Vec<&str> = sha256_hash.split(':').into_iter().map(|arg| arg).collect(); + if hashs.len() > 0 { + let get_hash = H256::from_str(hashs[hashs.len() - 1])?; + if get_hash == app_hash { + result = true; + } + } + } + } + Ok(result) +} + +// Redirect docker logs to a file. +// It will run as a background process, so you need to save the instance and kill it when switching. +// docker logs -f -t --details {container_name} + +async fn redirect_docker_container_log( + container_name: &str, + log_file: File, +) -> Result> { + let docker_cmd = format!("logs -f -t --details {}", container_name); + let args: Vec<&str> = docker_cmd.split(' ').into_iter().map(|arg| arg).collect(); + let instance = Command::new("docker") + .args(args) + .stdin(Stdio::piped()) + .stdout(Stdio::from(log_file)) + .spawn()?; + Ok(instance) +} + #[derive(Debug, PartialEq, Eq)] enum StartType { + /// Synchronous Block SYNC, + /// Running consensus RUN, } #[derive(Debug, PartialEq, Eq)] @@ -256,22 +357,69 @@ enum RunStatus { Downloaded, Running, } + +/// Client information of sequencer operation. +/// Use instance1 and instance2 to save old clients and new clients alternately. #[derive(Debug)] struct RunningApp { + /// Group id,start from 0. group_id: u32, + /// App id,start from 1. app_id: u32, + /// App sha256 + app_hash: H256, + /// Running status. running: RunStatus, + /// App info. app_info: Option, + /// Process instance 1. instance1: Option, + /// Process instance 2. instance2: Option, + /// Instance 1 is docker container or not. instance1_docker: bool, + /// Instance 2 is docker container or not. instance2_docker: bool, + /// Instance 1 docker container name. instance1_docker_name: Option>, + /// Instance 2 docker container name. instance2_docker_name: Option>, + /// Instance 1 docker container instance of linux process. + instance1_docker_log: Option, + /// Instance 2 docker container instance of linux process. + instance2_docker_log: Option, + /// Current instance,1 or 2. cur_ins: InstanceIndex, } -async fn process_download_task( +/// Client information of processor operation. +#[derive(Debug)] +struct ProcessorInstance { + /// App id,start from 1. + app_id: u32, + /// App sha256 + app_hash: H256, + /// Running status. + running: RunStatus, + /// App info. + processor_info: Option, + /// Instance of process. + instance: Option, + /// Instance is docker container or not. + instance_docker: bool, + /// Instance docker container name. + instance_docker_name: Option>, + /// Instance docker container instance of linux process. + instance_docker_log: Option, +} +#[derive(Debug)] +struct RunningProcessor { + /// app_id:processor info + processors: HashMap, +} + +// Download the sequencer client from the network and start syncing block data. +async fn app_download_task( data_path: PathBuf, app_info: DownloadInfo, running_app: Arc>, @@ -286,14 +434,19 @@ async fn process_download_task( log::info!("===========Download app from docker hub and run the application as a container========="); if let Some(image) = app_info.clone().docker_image { let docker_image = std::str::from_utf8(&image)?; - let mut exist_docker_image = check_docker_image_exist(docker_image).await?; - if !exist_docker_image { - download_docker_image(docker_image).await?; - exist_docker_image = check_docker_image_exist(docker_image).await?; - } - if exist_docker_image { - //Start docker container - start_flag = true; + let need_pull = need_pull_docker_image(docker_image, app_info.app_hash).await; + if let Ok(need_pull) = need_pull { + if need_pull { + let result = download_docker_image(docker_image, app_info.app_hash).await; + if result.is_ok() { + start_flag = true; + } else { + log::info!("pull docker image error:{:?}", result); + } + } else { + //Start docker container + start_flag = true; + } } } } else { @@ -313,7 +466,9 @@ async fn process_download_task( if let Ok(need_down) = need_download { if need_down { - let result = download_sdk(data_path.clone(), app_info.clone(), url).await; + let file_name = std::str::from_utf8(&app_info.file_name)?; + let result = + download_sdk(data_path.clone(), file_name, app_info.app_hash, url).await; if result.is_ok() { start_flag = true; @@ -327,7 +482,7 @@ async fn process_download_task( } if start_flag { log::info!("===============start app for sync================="); - let result = process_run_task( + let result = app_run_task( data_path, app_info.clone(), sync_args, @@ -348,7 +503,207 @@ async fn process_download_task( Ok(()) } -async fn process_run_task( +// Download and start the processor client. +async fn processor_run_task( + data_path: PathBuf, + processor_info: ProcessorDownloadInfo, + run_args: Option>, + option_args: Option>, + running_processor: Arc>, +) -> Result<(), Box> { + let run_as_docker = processor_info.is_docker_image; + + let extension_args_clone; + + let extension_args: Option> = if let Some(run_args) = run_args { + extension_args_clone = run_args.clone(); + + Some( + std::str::from_utf8(&extension_args_clone)? + .split(' ') + .into_iter() + .map(|arg| std::str::from_utf8(arg.as_bytes()).unwrap()) + .collect(), + ) + } else { + None + }; + + let app_args_clone; + + let mut args: Vec<&str> = if let Some(app_args) = processor_info.args { + app_args_clone = app_args.clone(); + + std::str::from_utf8(&app_args_clone)? + .split(' ') + .into_iter() + .map(|arg| std::str::from_utf8(arg.as_bytes()).unwrap()) + .collect() + } else { + Vec::new() + }; + + if let Some(extension_args) = extension_args { + args.extend(extension_args); + } + + let log_file_name; + + let log_file_buf; + + if processor_info.log.is_none() { + log_file_name = std::str::from_utf8(&processor_info.file_name)?; + } else { + log_file_buf = processor_info.log.unwrap(); + + log_file_name = std::str::from_utf8(&log_file_buf)?; + } + + log::info!("log_file_name:{:?}", log_file_name); + log::info!("args:{:?}", args); + let outputs = File::create(log_file_name)?; + + let errors = outputs.try_clone()?; + + // start new instance + let mut instance = None; + let mut docker_log_instance = None; + if run_as_docker { + let image_name = processor_info.docker_image.ok_or("docker image not exist")?; + let docker_image = std::str::from_utf8(&image_name)?; + let start_result = start_docker_container( + std::str::from_utf8(&processor_info.file_name)?, + docker_image, + args, + option_args, + outputs.try_clone()?, + ) + .await; + log::info!("start processor docker container :{:?}", start_result); + docker_log_instance = Some( + redirect_docker_container_log(std::str::from_utf8(&processor_info.file_name)?, outputs) + .await?, + ); + } else { + let download_path = format!( + "{}/sdk/{}", + data_path.as_os_str().to_str().unwrap(), + std::str::from_utf8(&processor_info.file_name)? + ); + instance = Some( + Command::new(download_path) + .stdin(Stdio::piped()) + .stderr(Stdio::from(outputs)) + .stdout(Stdio::from(errors)) + .args(args) + .spawn() + .expect("failed to execute process"), + ); + } + let mut running_processors = running_processor.lock().await; + let processor_instances = &mut running_processors.processors; + processor_instances.entry(processor_info.app_id).and_modify(|app| { + app.instance = instance; + app.instance_docker_log = docker_log_instance; + if run_as_docker { + app.instance_docker = true; + app.instance_docker_name = Some(processor_info.file_name); + } else { + app.instance_docker = false; + app.instance_docker_name = None; + } + }); + + log::info!("app:{:?}", running_processors); + Ok(()) +} + +// Background task of processor. +async fn processor_task( + data_path: PathBuf, + processor_info: ProcessorDownloadInfo, + running_processor: Arc>, + run_args: Option>, + option_args: Option>, +) -> Result<(), Box> { + let run_as_docker = processor_info.is_docker_image; + let mut start_flag = false; + + if run_as_docker { + log::info!("===========Download app from docker hub and run the application as a container========="); + if let Some(image) = processor_info.clone().docker_image { + let docker_image = std::str::from_utf8(&image)?; + let need_pull = need_pull_docker_image(docker_image, processor_info.app_hash).await; + if let Ok(need_pull) = need_pull { + if need_pull { + let result = download_docker_image(docker_image, processor_info.app_hash).await; + if result.is_ok() { + start_flag = true; + } else { + log::info!("pull docker image error:{:?}", result); + } + } else { + //Start docker container + start_flag = true; + } + } + } + } else { + log::info!( + "===========Download app from the web and run the application as a process=========" + ); + let url = std::str::from_utf8(&processor_info.url)?; + + let download_path = format!( + "{}/sdk/{}", + data_path.as_os_str().to_str().ok_or("invalid data_path")?, + std::str::from_utf8(&processor_info.file_name)? + ); + + let need_download = need_download(&download_path, processor_info.app_hash).await; + log::info!("need_download:{:?}", need_download); + + if let Ok(need_down) = need_download { + if need_down { + let file_name = std::str::from_utf8(&processor_info.file_name)?; + let result = + download_sdk(data_path.clone(), file_name, processor_info.app_hash, url).await; + + if result.is_ok() { + start_flag = true; + } else { + log::info!("download processor client error:{:?}", result); + } + } else { + start_flag = true; + } + } + } + let status = if start_flag { + log::info!("===============run processor================="); + let result = processor_run_task( + data_path, + processor_info.clone(), + run_args, + option_args, + running_processor.clone(), + ) + .await; + log::info!("start processor result:{:?}", result); + RunStatus::Running + } else { + RunStatus::Pending + }; + let mut running_processors = running_processor.lock().await; + let processor_instances = &mut running_processors.processors; + processor_instances + .entry(processor_info.app_id) + .and_modify(|instance| instance.running = status); + Ok(()) +} + +// Background task of sequncer. +async fn app_run_task( data_path: PathBuf, app_info: DownloadInfo, run_args: Option>, @@ -412,16 +767,32 @@ async fn process_run_task( let mut app = running_app.lock().await; - let (old_instance, instance_docker, op_docker_name) = if app.cur_ins == InstanceIndex::Instance1 - { - let is_docker_instance = app.instance1_docker; - let top_docker_name = app.instance1_docker_name.clone(); - (&mut app.instance1, is_docker_instance, top_docker_name) - } else { - let is_docker_instance = app.instance2_docker; - let top_docker_name = app.instance2_docker_name.clone(); - (&mut app.instance2, is_docker_instance, top_docker_name) - }; + let (old_instance, instance_docker, op_docker_name, cur_ins) = + if app.cur_ins == InstanceIndex::Instance1 { + let is_docker_instance = app.instance1_docker; + let top_docker_name = app.instance1_docker_name.clone(); + if is_docker_instance && top_docker_name.is_some() { + //kill old docker log instance + if let Some(ref mut log_instance) = app.instance1_docker_log { + log_instance.kill()?; + let kill_result = log_instance.wait()?; + log::info!("kill old docker log instance:{:?}:{:?}", 1, kill_result); + } + } + (&mut app.instance1, is_docker_instance, top_docker_name, 1) + } else { + let is_docker_instance = app.instance2_docker; + let top_docker_name = app.instance2_docker_name.clone(); + if is_docker_instance && top_docker_name.is_some() { + //kill old docker log instance + if let Some(ref mut log_instance) = app.instance2_docker_log { + log_instance.kill()?; + let kill_result = log_instance.wait()?; + log::info!("kill old docker log instance:{:?}:{:?}", 2, kill_result); + } + } + (&mut app.instance2, is_docker_instance, top_docker_name, 2) + }; // stop old instance if instance_docker { //reomve docker container @@ -433,11 +804,12 @@ async fn process_run_task( if let Some(ref mut old_instance) = old_instance { old_instance.kill()?; let kill_result = old_instance.wait()?; - log::info!("kill old instance:{:?}", kill_result); + log::info!("kill old instance:{:?}:{:?}", cur_ins, kill_result); } } // start new instance let mut instance: Option = None; + let mut docker_log_instance: Option = None; if run_as_docker { let image_name = app_info.docker_image.ok_or("docker image not exist")?; let docker_image = std::str::from_utf8(&image_name)?; @@ -450,6 +822,11 @@ async fn process_run_task( ) .await; log::info!("start docker container :{:?}", start_result); + // redirect docker log to file + docker_log_instance = Some( + redirect_docker_container_log(std::str::from_utf8(&app_info.file_name)?, outputs) + .await?, + ); } else { let download_path = format!( "{}/sdk/{}", @@ -471,12 +848,19 @@ async fn process_run_task( if app.instance2_docker { let docker_name_op = app.instance2_docker_name.clone(); if let Some(docker_name) = docker_name_op { + //kill old docker log instance first + if let Some(ref mut log_instance) = app.instance2_docker_log { + log_instance.kill()?; + let kill_result = log_instance.wait()?; + log::info!("kill old docker log instance2:{:?}", kill_result); + } let kill_result = remove_docker_container(std::str::from_utf8(&docker_name)?).await; log::info!("kill docker instance2:{:?}", kill_result); } app.instance2_docker_name = None; app.instance2_docker = false; + app.instance2_docker_log = None; } else { let other_instance = &mut app.instance2; if let Some(ref mut other_instance) = other_instance { @@ -484,20 +868,27 @@ async fn process_run_task( let kill_result = other_instance.wait()?; log::info!("kill instance2:{:?}", kill_result); } - app.instance1 = instance; app.instance2 = None; } + app.instance1 = instance; app.cur_ins = InstanceIndex::Instance2; } else { if app.instance1_docker { let docker_name_op = app.instance1_docker_name.clone(); if let Some(docker_name) = docker_name_op { + //kill old docker log instance first + if let Some(ref mut log_instance) = app.instance1_docker_log { + log_instance.kill()?; + let kill_result = log_instance.wait()?; + log::info!("kill old docker log instance1:{:?}", kill_result); + } let kill_result = remove_docker_container(std::str::from_utf8(&docker_name)?).await; log::info!("kill docker instance1:{:?}", kill_result); } app.instance1_docker_name = None; app.instance1_docker = false; + app.instance1_docker_log = None; } else { let other_instance = &mut app.instance1; if let Some(ref mut other_instance) = other_instance { @@ -505,9 +896,9 @@ async fn process_run_task( let kill_result = other_instance.wait()?; log::info!("kill instance1:{:?}", kill_result); } - app.instance2 = instance; app.instance1 = None; } + app.instance2 = instance; app.cur_ins = InstanceIndex::Instance1; } } else { @@ -516,6 +907,7 @@ async fn process_run_task( if run_as_docker { app.instance1_docker = true; app.instance1_docker_name = Some(app_info.file_name); + app.instance1_docker_log = docker_log_instance; } else { app.instance1_docker = false; app.instance1_docker_name = None; @@ -525,6 +917,7 @@ async fn process_run_task( if run_as_docker { app.instance2_docker = true; app.instance2_docker_name = Some(app_info.file_name); + app.instance2_docker_log = docker_log_instance; } else { app.instance2_docker = false; app.instance2_docker_name = None; @@ -535,17 +928,85 @@ async fn process_run_task( Ok(()) } +// Get storage of offchain. +async fn get_offchain_storage( + offchain_storage: Option, + args: &[u8], +) -> Option> +where + Block: BlockT, + TBackend: 'static + sc_client_api::backend::Backend + Send, +{ + if let Some(storage) = offchain_storage { + storage.get(&STORAGE_PREFIX, args) + } else { + None + } +} + +// Kill instance of processor. +async fn close_processor_instance( + instance: &mut ProcessorInstance, +) -> Result<(), Box> { + if instance.running == RunStatus::Running { + if instance.instance_docker { + //reomve docker container + if let Some(docker_name) = &instance.instance_docker_name { + //kill processor docker log instance first + if let Some(ref mut log_instance) = &mut instance.instance_docker_log { + log_instance.kill()?; + let kill_result = log_instance.wait()?; + log::info!("kill processor old docker log instance:{:?}", kill_result); + } + let kill_result = remove_docker_container(std::str::from_utf8(&docker_name)?).await; + log::info!("kill old docker instance of processor:{:?}", kill_result); + } + } else { + if let Some(ref mut old_instance) = &mut instance.instance { + old_instance.kill()?; + let kill_result = old_instance.wait()?; + log::info!("kill old instance of processor:{:?}", kill_result); + } + } + } + Ok(()) +} + +// Kill processor of not assigned group. +async fn filter_processor_instance( + processors: &mut HashMap, + processor_infos: &Vec, +) -> Vec { + let mut remove_entrys = Vec::new(); + { + for processor in &mut *processors { + let mut find_flag = false; + let app_id = processor.0; + for processor_info in processor_infos { + if *app_id == processor_info.app_id { + find_flag = true; + break; + } + } + if !find_flag { + let _ = close_processor_instance(processor.1).await; + remove_entrys.push(*app_id); + } + } + } + remove_entrys +} + +// Background task of sequencer and processor. async fn handle_new_best_parachain_head( validation_data: PersistedValidationData, - height: RelayBlockNumber, parachain: &P, keystore: KeystorePtr, - relay_chain: impl RelayChainInterface + Clone, - p_hash: H256, - para_id: ParaId, data_path: PathBuf, running_app: Arc>, + running_processor: Arc>, backend: Arc, + ip_address: &str, ) -> Result<(), Box> where Block: BlockT, @@ -556,17 +1017,6 @@ where { let offchain_storage = backend.offchain_storage(); - let (mut run_args, mut sync_args, mut option_args) = - if let Some(storage) = offchain_storage.clone() { - let prefix = &STORAGE_PREFIX; - ( - storage.get(prefix, RUN_ARGS_KEY), - storage.get(prefix, SYNC_ARGS_KEY), - storage.get(prefix, OPTION_ARGS_KEY), - ) - } else { - (None, None, None) - }; // Check if there is a download task let head = validation_data.clone().parent_head.0; @@ -579,10 +1029,69 @@ where let xx = keystore.sr25519_public_keys(sp_application_crypto::key_types::AURA)[0]; + // Processor client process + + log::info!("ip_address:{:?}", ip_address); + + let processor_infos: Vec = + parachain + .runtime_api() + .processor_run(hash, xx.into(), ip_address.as_bytes().to_vec())?; + + log::info!("processor download info:{:?}", processor_infos); + + { + let mut running_processors = running_processor.lock().await; + let processors = &mut running_processors.processors; + // Close the old client before starting the new client + let remove_entrys = filter_processor_instance(processors, &processor_infos).await; + for remove_entry in remove_entrys { + processors.remove_entry(&remove_entry); + } + for processor_info in processor_infos { + let app_id = processor_info.app_id; + let app_hash = processor_info.app_hash; + let processor = processors.entry(app_id).or_insert(ProcessorInstance { + app_id, + app_hash, + running: RunStatus::Pending, + processor_info: Some(processor_info.clone()), + instance: None, + instance_docker: false, + instance_docker_name: None, + instance_docker_log: None, + }); + let run_status = &processor.running; + + if *run_status == RunStatus::Pending { + processor.running = RunStatus::Downloading; + let app_id = processor.app_id; + let p_run_args_key = format!("{}:{}", P_RUN_ARGS_KEY, app_id); + let p_option_args_key = format!("{}:{}", P_OPTION_ARGS_KEY, app_id); + let run_args = get_offchain_storage::<_, TBackend>( + offchain_storage.clone(), + p_run_args_key.as_bytes(), + ) + .await; + let option_args = get_offchain_storage::<_, TBackend>( + offchain_storage.clone(), + p_option_args_key.as_bytes(), + ) + .await; + tokio::spawn(processor_task( + data_path.clone(), + processor_info, + running_processor.clone(), + run_args, + option_args, + )); + } + } + } + //Layer2 client process let should_load: Option = parachain.runtime_api().shuld_load(hash, xx.into())?; log::info!("app download info of sequencer's group:{:?}", should_load); - let number = (*parachain_head.number()).into(); { let mut app = running_app.lock().await; @@ -592,40 +1101,34 @@ where Some(app_info) => { let new_group = app_info.group; let app_id = app_info.app_id; + let app_hash = app_info.app_hash; let run_status = &app.running; if old_group_id != new_group && *run_status == RunStatus::Pending { - if sync_args == None { - sync_args = if let Some(storage) = offchain_storage.clone() { - let prefix = &STORAGE_PREFIX; - let sync_args = format!( - "{}:{}", - std::str::from_utf8(SYNC_ARGS_KEY).unwrap(), - app_id - ); - storage.get(prefix, sync_args.as_bytes()) - } else { - None - }; - } + let sync_args_key = format!("{}:{}", SYNC_ARGS_KEY, app_id); + + let option_args_key = format!("{}:{}", OPTION_ARGS_KEY, app_id); + + let sync_args = get_offchain_storage::<_, TBackend>( + offchain_storage.clone(), + sync_args_key.as_bytes(), + ) + .await; + + let option_args = get_offchain_storage::<_, TBackend>( + offchain_storage.clone(), + option_args_key.as_bytes(), + ) + .await; + log::info!("offchain_storage of sync_args:{:?}", sync_args); - if option_args == None { - option_args = if let Some(storage) = offchain_storage.clone() { - let prefix = &STORAGE_PREFIX; - let option_args = format!( - "{}:{}", - std::str::from_utf8(OPTION_ARGS_KEY).unwrap(), - app_id - ); - - storage.get(prefix, option_args.as_bytes()) - } else { - None - }; - } + log::info!("offchain_storage of option_args:{:?}", option_args); + app.running = RunStatus::Downloading; + app.app_id = app_id; - tokio::spawn(process_download_task( + app.app_hash = app_hash; + tokio::spawn(app_download_task( data_path.clone(), app_info, running_app.clone(), @@ -644,33 +1147,27 @@ where let mut app = running_app.lock().await; let run_status = &app.running; let app_id = app.app_id; + log::info!("run:{:?}", app); if let Some(app_info) = app.app_info.clone() { if *run_status == RunStatus::Downloaded { - log::info!("run:{:?}", app); - if run_args == None { - run_args = if let Some(storage) = offchain_storage.clone() { - let prefix = &STORAGE_PREFIX; - let run_args = - format!("{}:{}", std::str::from_utf8(RUN_ARGS_KEY).unwrap(), app_id); - - storage.get(prefix, run_args.as_bytes()) - } else { - None - }; - } - if option_args == None { - option_args = if let Some(storage) = offchain_storage { - let prefix = &STORAGE_PREFIX; - let option_args = - format!("{}:{}", std::str::from_utf8(OPTION_ARGS_KEY).unwrap(), app_id); - - storage.get(prefix, option_args.as_bytes()) - } else { - None - }; - } + let run_args_key = format!("{}:{}", RUN_ARGS_KEY, app_id); + + let option_args_key = format!("{}:{}", OPTION_ARGS_KEY, app_id); + + let run_args = get_offchain_storage::<_, TBackend>( + offchain_storage.clone(), + run_args_key.as_bytes(), + ) + .await; + + let option_args = get_offchain_storage::<_, TBackend>( + offchain_storage.clone(), + option_args_key.as_bytes(), + ) + .await; + log::info!("offchain_storage of run_args:{:?}", run_args); log::info!("offchain_storage of option_args:{:?}", option_args); - tokio::spawn(process_run_task( + tokio::spawn(app_run_task( data_path, app_info, run_args, @@ -722,6 +1219,18 @@ async fn relay_chain_notification( ::Number: Into, TBackend: 'static + sc_client_api::backend::Backend + Send, { + // let mut stop = false; + // loop{ + // if !stop{ + // let download_info = DownloadInfo{ + // file_name:"magport-node-b".into(), + // app_hash:H256::from_str("9b64d63367328fd980b6e88af0dc46c437bf2c3906a9b000eccd66a6e4599938"). + // unwrap(), ..Default::default() + // }; + // download_sdk(data_path.clone(), download_info, "http://43.134.60.202:88/static").await; + // stop = true; + // } + // } let new_best_heads = match new_best_heads(relay_chain.clone(), para_id).await { Ok(best_heads_stream) => best_heads_stream.fuse(), Err(_err) => { @@ -732,6 +1241,7 @@ async fn relay_chain_notification( let runing_app = Arc::new(Mutex::new(RunningApp { group_id: 0xFFFFFFFF, app_id: 0xFFFFFFFF, + app_hash: Default::default(), running: RunStatus::Pending, app_info: None, instance1: None, @@ -740,14 +1250,18 @@ async fn relay_chain_notification( instance2_docker: false, instance1_docker_name: None, instance2_docker_name: None, + instance1_docker_log: None, + instance2_docker_log: None, cur_ins: InstanceIndex::Instance1, })); + let runing_processor = Arc::new(Mutex::new(RunningProcessor { processors: HashMap::new() })); + let ip_address = public_ip::addr().await.expect("couldn't get an IP address").to_string(); loop { select! { h = new_best_heads.next() => { match h { - Some((height, head, hash)) => { - let _ = handle_new_best_parachain_head(head,height, &*parachain,keystore.clone(), relay_chain.clone(), hash, para_id, data_path.clone(), runing_app.clone(), backend.clone()).await; + Some((_height, head, _hash)) => { + let _ = handle_new_best_parachain_head(head, &*parachain,keystore.clone(), data_path.clone(), runing_app.clone(),runing_processor.clone(), backend.clone(), &ip_address).await; }, None => { return; diff --git a/pallets/container/Cargo.toml b/pallets/container/Cargo.toml index 8c5ee03..0a3f1d4 100644 --- a/pallets/container/Cargo.toml +++ b/pallets/container/Cargo.toml @@ -29,7 +29,7 @@ cumulus-primitives-core={ workspace = true } pallet-aura ={ workspace = true } sp-consensus-aura={ workspace = true } sp-core = { workspace = true } - +derivative = { version = "2.2.0", default-features = false, features = ["use_core"] } [dev-dependencies] serde = { workspace = true } diff --git a/pallets/container/src/benchmarking.rs b/pallets/container/src/benchmarking.rs index 16fafbb..b3bd257 100644 --- a/pallets/container/src/benchmarking.rs +++ b/pallets/container/src/benchmarking.rs @@ -25,16 +25,32 @@ benchmarks! { let file_size = 123; let args = Some(BoundedVec::try_from("--chain dev".as_bytes().to_vec()).unwrap()); let log = Some(BoundedVec::try_from("aaaa".as_bytes().to_vec()).unwrap()); + let consensus_client = AppClient{ + app_hash: hash, + file_name:file_name.clone(), + size: file_size, + args:args.clone(), + log:log.clone(), + is_docker_image: None, + docker_image: None, + }; + let batch_client = AppClient{ + app_hash: hash, + file_name, + size: file_size, + args, + log, + is_docker_image: None, + docker_image: None, + }; let caller: T::AccountId = whitelisted_caller(); - }: _(RawOrigin::Signed(caller), hash, + }: _(RawOrigin::Signed(caller), project_name, - file_name, - file_size, - args, - log) + Box::new(consensus_client), + Box::new(batch_client)) verify { let app = APPInfoMap::::get(1).unwrap(); - assert_eq!(app.app_hash, H256::from([1; 32])); + assert_eq!(app.consensus_client.app_hash, H256::from([1; 32])); } } diff --git a/pallets/container/src/lib.rs b/pallets/container/src/lib.rs index d983f11..3534b53 100644 --- a/pallets/container/src/lib.rs +++ b/pallets/container/src/lib.rs @@ -1,3 +1,18 @@ +//! # Container Pallet +//! +//! This pallet is named container, hoping to be a container for various clients. +//! The function it expects to achieve is to register various layer2 clients and start these clients +//! when appropriate conditions are met. +//! +//! The roles that complete this work are called sequencer and processor. +//! The sequencer is responsible for starting the consensus client, +//! and the processor is responsible for starting the batcher client. +//! +//! In order to achieve the goal of being compatible with all layer2, the layer2 client is +//! abstracted into a consensus client and a batcher client. The client can be started and run as a +//! process or a docker container. The operation of the consensus client is based on the time +//! sequence, and there are two steps: synchronization of blocks and consensus. + #![cfg_attr(not(feature = "std"), no_std)] pub use pallet::*; @@ -14,29 +29,59 @@ mod benchmarking; pub mod weights; use codec::{Decode, Encode, MaxEncodedLen}; use cumulus_primitives_core::relay_chain::Hash; +use derivative::Derivative; use frame_support::pallet_prelude::*; use frame_system::pallet_prelude::*; use pallet_sequencer_grouping::SequencerGroup; -use primitives_container::DownloadInfo; +use primitives_container::{DownloadInfo, ProcessorDownloadInfo}; use scale_info::{prelude::vec::Vec, TypeInfo}; use sp_runtime::BoundedVec; -use sp_std::vec; +use sp_std::{boxed::Box, vec}; pub use weights::*; +/// Client basic information structure, including consensus client and batcher client. +#[derive(Derivative, Encode, Decode, TypeInfo, MaxEncodedLen)] +#[derivative( + Clone(bound = ""), + Eq(bound = ""), + PartialEq(bound = ""), + Debug(bound = ""), + Default(bound = "") +)] +#[codec(encode_bound())] +#[codec(decode_bound())] +#[scale_info(bounds(), skip_type_params(T))] +pub struct AppClient { + /// Client hash(sha256), if client is run as docker container, this is digest. + pub app_hash: Hash, + /// Client file name. + pub file_name: BoundedVec, + /// Client file size, bytes. + pub size: u32, + /// Client startup common parameters. + pub args: Option>, + /// Client operation log file. + pub log: Option>, + /// Is started as a Docker container. + pub is_docker_image: Option, + /// Docker image name + pub docker_image: Option>, +} + +/// Registered application information structure. #[derive(Encode, Decode, Default, Clone, TypeInfo, MaxEncodedLen, Debug)] #[scale_info(skip_type_params(T))] pub struct APPInfo { - app_hash: Hash, + /// Account of register an application. creator: T::AccountId, + /// Project name,uniquely identifies. project_name: BoundedVec, - file_name: BoundedVec, - uploaded: bool, - size: u32, - args: Option>, - log: Option>, - is_docker_image: Option, - docker_image: Option>, + /// Consensus client. + consensus_client: AppClient, + /// Batcher client. + batch_client: AppClient, } + #[frame_support::pallet] pub mod pallet { use super::*; @@ -50,35 +95,40 @@ pub mod pallet { type RuntimeEvent: From> + IsType<::RuntimeEvent>; /// Type representing the weight of this pallet type WeightInfo: WeightInfo; - + /// Max length of file name #[pallet::constant] type MaxLengthFileName: Get; - + /// Max number of registered app. #[pallet::constant] type MaxRuningAPP: Get; - + /// Max length of url,for download client binary file. #[pallet::constant] type MaxUrlLength: Get; - + /// Max count of arguments. #[pallet::constant] type MaxArgCount: Get; - + /// Max length of arguments. #[pallet::constant] type MaxArgLength: Get; } + /// By default, the application number starts from 1. #[pallet::type_value] pub fn ApplicationIDOnEmpty() -> u32 { 1 } + + /// The next available application id. #[pallet::storage] #[pallet::getter(fn next_application_id)] pub type NextApplicationID = StorageValue<_, u32, ValueQuery, ApplicationIDOnEmpty>; + /// Url storage. #[pallet::storage] #[pallet::getter(fn default_url)] pub type DefaultUrl = StorageValue<_, BoundedVec, OptionQuery>; + /// Registered application information, map of app_id:app_info. #[pallet::storage] #[pallet::getter(fn appinfo_map)] pub type APPInfoMap = StorageMap<_, Twox64Concat, u32, APPInfo, OptionQuery>; @@ -97,11 +147,24 @@ pub mod pallet { #[pallet::generate_deposit(pub(super) fn deposit_event)] pub enum Event { ReisterApp { + /// Account of register client. + creator: T::AccountId, + /// Assigned app id. appid: u32, + /// Project name. project_name: BoundedVec, - file_name: BoundedVec, - hash: Hash, - size: u32, + /// File name of consensus client. + consensus_client: BoundedVec, + /// Hash of consensus client. + consensus_hash: Hash, + /// File size of consensus client. + consensus_size: u32, + /// File name of batcher client. + batch_client: BoundedVec, + /// Hash of batcher client. + batch_hash: Hash, + /// File size of batcher client. + batch_size: u32, }, SetDownloadURL { url: BoundedVec, @@ -111,19 +174,27 @@ pub mod pallet { #[pallet::error] pub enum Error { AppNotExist, + AccountInconsistent, } #[pallet::hooks] impl Hooks> for Pallet { - fn on_finalize(_: BlockNumberFor) { + /// The logic executed by each parachain block queries how many groups there are and binds + /// the registered applications to the groups. One application is bound to one group. + fn on_initialize(_n: BlockNumberFor) -> Weight + where + BlockNumberFor: From, + { let groups = Self::get_groups(); log::info!("groups:{:?}", groups); let mut inuse_apps = InuseMap::::get(); log::info!("inuse_apps:{:?}", inuse_apps); - + let mut read_count = 2; + let mut write_count = 0; for group in groups.iter() { let app = GroupAPPMap::::get(group); + read_count += 1; match app { Some(_app_id) => { // TODO:alloced app to group,do nothing?? @@ -142,7 +213,7 @@ pub mod pallet { InuseMap::::mutate(|inuses| inuses[index] = true); GroupAPPMap::::insert(group, (index + 1) as u32); - + write_count += 2; break; } index += 1; @@ -154,41 +225,52 @@ pub mod pallet { } } log::info!("inuse_apps:{:?}", inuse_apps); + T::DbWeight::get().reads_writes(read_count, write_count) } } #[pallet::call] impl Pallet { + /// Register layer2 application client. + /// + /// Parameters: + /// - `project_name`: The project name. + /// - `consensus_client`: Consensus client. + /// - `batch_client`: Batcher client. #[pallet::call_index(0)] #[pallet::weight(::WeightInfo::register_app())] pub fn register_app( origin: OriginFor, - app_hash: Hash, project_name: BoundedVec, - file_name: BoundedVec, - size: u32, - args: Option>, - log: Option>, - is_docker_image: Option, - docker_image: Option>, + consensus_client: Box>, + batch_client: Box>, ) -> DispatchResult { let who = ensure_signed(origin)?; let old_application_id = NextApplicationID::::get(); + let consensus_app = *consensus_client; + + let batch_app = *batch_client; + + // we can allow same app when register app. + // for app_id in 1..old_application_id { + // let p_app_info = APPInfoMap::::get(app_id); + // if let Some(app_info) = p_app_info { + // assert!( + // (app_info.consensus_client.app_hash != consensus_app.app_hash) && + // (app_info.batch_client.app_hash != batch_app.app_hash), + // "Client with the same hash exist!", + // ); + // } + // } APPInfoMap::::insert( old_application_id, APPInfo { - app_hash, - creator: who, + creator: who.clone(), project_name: project_name.clone(), - file_name: file_name.clone(), - uploaded: false, - size, - args, - log, - is_docker_image, - docker_image, + consensus_client: consensus_app.clone(), + batch_client: batch_app.clone(), }, ); @@ -200,16 +282,24 @@ pub mod pallet { InuseMap::::put(inuse_apps); Pallet::::deposit_event(Event::::ReisterApp { + creator: who, appid: old_application_id, project_name, - file_name, - hash: app_hash, - size, + consensus_client: consensus_app.file_name, + consensus_hash: consensus_app.app_hash, + consensus_size: consensus_app.size, + batch_client: batch_app.file_name, + batch_hash: batch_app.app_hash, + batch_size: batch_app.size, }); Ok(()) } + /// Set url for download client binary file. + /// + /// Parameters: + /// - `url`: Url. #[pallet::call_index(1)] #[pallet::weight(::WeightInfo::set_default_url())] pub fn set_default_url( @@ -230,7 +320,7 @@ impl Pallet { // Obtain application information corresponding to the group. // If no group has been assigned or there are no available apps in the group, return None pub fn shuld_load(author: T::AccountId) -> Option { - log::info!("============author:{:?}", author.encode()); + // log::info!("============author:{:?}", author.encode()); //Get the group ID of the sequencer, error when got 0xFFFFFFFF let group_id = Self::get_group_id(author); @@ -240,22 +330,24 @@ impl Pallet { let url = DefaultUrl::::get()?; - let args = app_info.args.and_then(|log| Some(log.as_slice().to_vec())); + let consensus_client = app_info.consensus_client; + + let args = consensus_client.args.and_then(|log| Some(log.as_slice().to_vec())); - let log = app_info.log.and_then(|log| Some(log.as_slice().to_vec())); + let log = consensus_client.log.and_then(|log| Some(log.as_slice().to_vec())); let is_docker_image = - if let Some(is_docker) = app_info.is_docker_image { is_docker } else { false }; + if let Some(is_docker) = consensus_client.is_docker_image { is_docker } else { false }; - let docker_image = app_info + let docker_image = consensus_client .docker_image .and_then(|docker_image| Some(docker_image.as_slice().to_vec())); Some(DownloadInfo { app_id, - app_hash: app_info.app_hash, - file_name: app_info.file_name.into(), - size: app_info.size, + app_hash: consensus_client.app_hash, + file_name: consensus_client.file_name.into(), + size: consensus_client.size, group: group_id, url: url.into(), args, @@ -265,6 +357,7 @@ impl Pallet { }) } + // Consensus client startup at which block number. pub fn should_run() -> bool { let next_round = >::next_round(); @@ -277,17 +370,75 @@ impl Pallet { } } + // Get sequencer group id. pub fn get_group_id(author: T::AccountId) -> u32 { let group_id_result = >::account_in_group(author); - log::info!("new groupID:{:?}", group_id_result); - - if group_id_result.is_ok() { - group_id_result.unwrap() + if let Ok(group_id) = group_id_result { + log::info!("new groupID:{:?}", group_id); + group_id } else { 0xFFFFFFFF } } + + // Get the assigned group id. pub fn get_groups() -> Vec { >::all_group_ids() } + + // Whether the account running the current node has been assigned a group, whether it is a + // processor, and whether the IP meets the requirements. + pub fn processor_run(author: T::AccountId, ip_address: Vec) -> Vec { + // let processors = >::get_group_ids(author); + let mut download_infos: Vec = Vec::new(); + if Self::get_groups().len() == 0 { + return download_infos; + } + let url = DefaultUrl::::get().expect("Need set url"); + + let processor_info = >::processor_info(); + let processors = if let Some((_, _, group_ids)) = processor_info + .iter() + .find(|(acc, ip, _)| (*acc == author) && (ip.to_vec() == ip_address)) + { + group_ids.clone().into_inner() + } else { + Vec::new() + }; + for group_id in processors { + if let Some(app_id) = GroupAPPMap::::get(group_id) { + let p_app_info = APPInfoMap::::get(app_id); + + if let Some(app_info) = p_app_info { + let batch_client = app_info.batch_client; + + let args = batch_client.args.and_then(|log| Some(log.as_slice().to_vec())); + + let log = batch_client.log.and_then(|log| Some(log.as_slice().to_vec())); + + let is_docker_image = if let Some(is_docker) = batch_client.is_docker_image { + is_docker + } else { + false + }; + + let docker_image = batch_client + .docker_image + .and_then(|docker_image| Some(docker_image.as_slice().to_vec())); + download_infos.push(ProcessorDownloadInfo { + app_id, + app_hash: batch_client.app_hash, + file_name: batch_client.file_name.into(), + size: batch_client.size, + url: url.clone().into(), + args, + log, + is_docker_image, + docker_image, + }); + } + } + } + download_infos + } } diff --git a/pallets/container/src/tests.rs b/pallets/container/src/tests.rs index 8db13d8..ea14f0c 100644 --- a/pallets/container/src/tests.rs +++ b/pallets/container/src/tests.rs @@ -1,4 +1,4 @@ -use crate::mock::*; +use crate::{mock::*, AppClient}; use frame_support::assert_ok; use sp_core::H256; use sp_runtime::BoundedVec; @@ -22,18 +22,31 @@ fn it_works_for_default_value() { #[test] fn register_app() { new_test_ext().execute_with(|| { + let consensus_client = AppClient { + app_hash: H256::from([1; 32]), + file_name: BoundedVec::try_from("test".as_bytes().to_vec()).unwrap(), + size: 123, + args: Some(BoundedVec::try_from("--chain dev".as_bytes().to_vec()).unwrap()), + log: None, + is_docker_image: None, + docker_image: None, + }; + let batch_client = AppClient { + app_hash: H256::from([1; 32]), + file_name: BoundedVec::try_from("test".as_bytes().to_vec()).unwrap(), + size: 123, + args: Some(BoundedVec::try_from("--chain dev".as_bytes().to_vec()).unwrap()), + log: None, + is_docker_image: None, + docker_image: None, + }; assert_ok!(ContainerModule::register_app( RuntimeOrigin::signed(1), - H256::from([1; 32]), BoundedVec::try_from("test".as_bytes().to_vec()).unwrap(), - BoundedVec::try_from("test".as_bytes().to_vec()).unwrap(), - 123, - Some(BoundedVec::try_from("--chain dev".as_bytes().to_vec()).unwrap()), - None, - None, - None, + Box::new(consensus_client), + Box::new(batch_client), )); let app = ContainerModule::appinfo_map(1).unwrap(); - assert_eq!(app.app_hash, H256::from([1; 32])); + assert_eq!(app.consensus_client.app_hash, H256::from([1; 32])); }); } diff --git a/pallets/container/src/weights.rs b/pallets/container/src/weights.rs index d392594..90c051e 100644 --- a/pallets/container/src/weights.rs +++ b/pallets/container/src/weights.rs @@ -1,33 +1,35 @@ -//! Autogenerated weights for pallet_template +//! Autogenerated weights for `pallet_container` //! -//! THIS FILE WAS AUTO-GENERATED USING THE SUBSTRATE BENCHMARK CLI VERSION 4.0.0-dev -//! DATE: 2023-04-06, STEPS: `50`, REPEAT: `20`, LOW RANGE: `[]`, HIGH RANGE: `[]` +//! THIS FILE WAS AUTO-GENERATED USING THE SUBSTRATE BENCHMARK CLI VERSION 35.0.1 +//! DATE: 2024-05-23, STEPS: `50`, REPEAT: `20`, LOW RANGE: `[]`, HIGH RANGE: `[]` //! WORST CASE MAP SIZE: `1000000` -//! HOSTNAME: `Alexs-MacBook-Pro-2.local`, CPU: `` -//! EXECUTION: Some(Wasm), WASM-EXECUTION: Compiled, CHAIN: Some("dev"), DB CACHE: 1024 +//! HOSTNAME: ``, CPU: `AMD Ryzen 7 5800U with Radeon Graphics` +//! WASM-EXECUTION: `Compiled`, CHAIN: `Some("popsicle-dev")`, DB CACHE: 1024 // Executed Command: -// ../../target/release/node-template +// ./Popsicle/target/release/popsicle-node // benchmark // pallet // --chain -// dev +// popsicle-dev +// --execution=wasm +// --wasm-execution=compiled // --pallet -// pallet_template +// pallet_container // --extrinsic // * -// --steps=50 -// --repeat=20 -// --wasm-execution=compiled +// --steps +// 50 +// --repeat +// 20 // --output -// pallets/template/src/weights.rs -// --template -// ../../.maintain/frame-weight-template.hbs +// weights.rs #![cfg_attr(rustfmt, rustfmt_skip)] #![allow(unused_parens)] #![allow(unused_imports)] +#![allow(missing_docs)] use frame_support::{traits::Get, weights::{Weight, constants::RocksDbWeight}}; use core::marker::PhantomData; @@ -47,8 +49,8 @@ impl WeightInfo for SubstrateWeight { // Proof Size summary in bytes: // Measured: `0` // Estimated: `0` - // Minimum execution time: 3_767_000 picoseconds. - Weight::from_parts(3_947_000, 0) + // Minimum execution time: 6_329_000 picoseconds. + Weight::from_parts(6_540_000, 0) .saturating_add(Weight::from_parts(0, 0)) .saturating_add(T::DbWeight::get().writes(1)) } @@ -57,13 +59,13 @@ impl WeightInfo for SubstrateWeight { /// Storage: `ContainerPallet::InuseMap` (r:1 w:1) /// Proof: `ContainerPallet::InuseMap` (`max_values`: Some(1), `max_size`: Some(102), added: 597, mode: `MaxEncodedLen`) /// Storage: `ContainerPallet::APPInfoMap` (r:0 w:1) - /// Proof: `ContainerPallet::APPInfoMap` (`max_values`: None, `max_size`: Some(700), added: 3175, mode: `MaxEncodedLen`) + /// Proof: `ContainerPallet::APPInfoMap` (`max_values`: None, `max_size`: Some(2136), added: 4611, mode: `MaxEncodedLen`) fn register_app() -> Weight { // Proof Size summary in bytes: // Measured: `42` // Estimated: `1587` - // Minimum execution time: 14_487_000 picoseconds. - Weight::from_parts(14_969_000, 0) + // Minimum execution time: 18_244_000 picoseconds. + Weight::from_parts(18_766_000, 0) .saturating_add(Weight::from_parts(0, 1587)) .saturating_add(T::DbWeight::get().reads(2)) .saturating_add(T::DbWeight::get().writes(3)) @@ -78,8 +80,8 @@ impl WeightInfo for () { // Proof Size summary in bytes: // Measured: `0` // Estimated: `0` - // Minimum execution time: 3_767_000 picoseconds. - Weight::from_parts(3_947_000, 0) + // Minimum execution time: 6_329_000 picoseconds. + Weight::from_parts(6_540_000, 0) .saturating_add(Weight::from_parts(0, 0)) .saturating_add(RocksDbWeight::get().writes(1)) } @@ -88,13 +90,13 @@ impl WeightInfo for () { /// Storage: `ContainerPallet::InuseMap` (r:1 w:1) /// Proof: `ContainerPallet::InuseMap` (`max_values`: Some(1), `max_size`: Some(102), added: 597, mode: `MaxEncodedLen`) /// Storage: `ContainerPallet::APPInfoMap` (r:0 w:1) - /// Proof: `ContainerPallet::APPInfoMap` (`max_values`: None, `max_size`: Some(700), added: 3175, mode: `MaxEncodedLen`) + /// Proof: `ContainerPallet::APPInfoMap` (`max_values`: None, `max_size`: Some(2136), added: 4611, mode: `MaxEncodedLen`) fn register_app() -> Weight { // Proof Size summary in bytes: // Measured: `42` // Estimated: `1587` - // Minimum execution time: 14_487_000 picoseconds. - Weight::from_parts(14_969_000, 0) + // Minimum execution time: 18_244_000 picoseconds. + Weight::from_parts(18_766_000, 0) .saturating_add(Weight::from_parts(0, 1587)) .saturating_add(RocksDbWeight::get().reads(2)) .saturating_add(RocksDbWeight::get().writes(3)) diff --git a/pallets/sequencer-grouping/src/benchmarking.rs b/pallets/sequencer-grouping/src/benchmarking.rs index 89f1d37..73416ed 100644 --- a/pallets/sequencer-grouping/src/benchmarking.rs +++ b/pallets/sequencer-grouping/src/benchmarking.rs @@ -5,11 +5,9 @@ use super::*; #[allow(unused)] use crate::Pallet as SequencerGrouping; use frame_benchmarking::{account, benchmarks, impl_benchmark_test_suite}; -use frame_support::BoundedVec; -use frame_support::pallet_prelude::Get; +use frame_support::{pallet_prelude::Get, BoundedVec}; use frame_system::RawOrigin; -use sp_std::vec; -use sp_std::vec::Vec; +use sp_std::{vec, vec::Vec}; benchmarks! { set_group_metric { diff --git a/pallets/sequencer-grouping/src/lib.rs b/pallets/sequencer-grouping/src/lib.rs index df42efc..cc4a412 100644 --- a/pallets/sequencer-grouping/src/lib.rs +++ b/pallets/sequencer-grouping/src/lib.rs @@ -97,8 +97,14 @@ pub mod pallet { #[pallet::storage] #[pallet::getter(fn processor_info)] - pub(crate) type ProcessorInfo = StorageValue<_, BoundedVec< - (T::AccountId, BoundedVec, BoundedVec), T::MaxRunningAPP>, ValueQuery>; + pub(crate) type ProcessorInfo = StorageValue< + _, + BoundedVec< + (T::AccountId, BoundedVec, BoundedVec), + T::MaxRunningAPP, + >, + ValueQuery, + >; #[pallet::genesis_config] pub struct GenesisConfig { @@ -187,7 +193,9 @@ pub mod pallet { let who = ensure_signed(origin)?; let mut processor_info = crate::pallet::ProcessorInfo::::get(); - if let Some(existing) = processor_info.iter_mut().find(|(account, _, _)| account == &who) { + if let Some(existing) = + processor_info.iter_mut().find(|(account, _, _)| account == &who) + { if existing.1 == ip_address { return Ok(()); } @@ -261,7 +269,9 @@ pub mod pallet { pub fn get_group_ids(account: T::AccountId) -> Vec { let processor_info = ProcessorInfo::::get(); - if let Some((_, _, group_ids)) = processor_info.iter().find(|(acc, _, _)| *acc == account) { + if let Some((_, _, group_ids)) = + processor_info.iter().find(|(acc, _, _)| *acc == account) + { group_ids.clone().into_inner() } else { Vec::new() diff --git a/pallets/sequencer-grouping/src/tests.rs b/pallets/sequencer-grouping/src/tests.rs index c2a0fe1..ea59c2c 100644 --- a/pallets/sequencer-grouping/src/tests.rs +++ b/pallets/sequencer-grouping/src/tests.rs @@ -4,8 +4,7 @@ use crate::{ Error, Event, GroupMembers, NextRound, SequencerGroup, }; use frame_support::{assert_noop, assert_ok, pallet_prelude::Get}; -use sp_core::bounded::BoundedVec; -use sp_core::bounded_vec; +use sp_core::{bounded::BoundedVec, bounded_vec}; use sp_runtime::{testing::H256, traits::BadOrigin}; #[test] @@ -76,11 +75,12 @@ fn trigger_group_works() { let parent_hash = H256::from_low_u64_be(12345); frame_system::Pallet::::set_parent_hash(parent_hash); - let ip_address: BoundedVec::MaxLengthIP> = bounded_vec![127, 0, 0, 1]; + let ip_address: BoundedVec::MaxLengthIP> = + bounded_vec![127, 0, 0, 1]; assert_ok!(SequencerGrouping::register_processor( - RuntimeOrigin::signed(1), - ip_address.clone() - )); + RuntimeOrigin::signed(1), + ip_address.clone() + )); assert_ok!(SequencerGrouping::set_group_metric(RuntimeOrigin::root(), 2, 3)); assert_ok!(SequencerGrouping::trigger_group(vec![1, 2, 3, 4, 5, 6], 20, 3)); @@ -109,11 +109,12 @@ fn account_in_group_works() { let parent_hash = H256::from_low_u64_be(12345); frame_system::Pallet::::set_parent_hash(parent_hash); - let ip_address: BoundedVec::MaxLengthIP> = bounded_vec![127, 0, 0, 1]; + let ip_address: BoundedVec::MaxLengthIP> = + bounded_vec![127, 0, 0, 1]; assert_ok!(SequencerGrouping::register_processor( - RuntimeOrigin::signed(1), - ip_address.clone() - )); + RuntimeOrigin::signed(1), + ip_address.clone() + )); assert_ok!(SequencerGrouping::set_group_metric(RuntimeOrigin::root(), 2, 3)); assert_ok!(SequencerGrouping::trigger_group(vec![1, 2, 3, 4, 5, 6], 1, 1)); println!("Group Members: {:?}", GroupMembers::::get()); @@ -134,11 +135,12 @@ fn account_in_group_fails() { let parent_hash = H256::from_low_u64_be(12345); frame_system::Pallet::::set_parent_hash(parent_hash); - let ip_address: BoundedVec::MaxLengthIP> = bounded_vec![127, 0, 0, 1]; + let ip_address: BoundedVec::MaxLengthIP> = + bounded_vec![127, 0, 0, 1]; assert_ok!(SequencerGrouping::register_processor( - RuntimeOrigin::signed(1), - ip_address.clone() - )); + RuntimeOrigin::signed(1), + ip_address.clone() + )); assert_ok!(SequencerGrouping::set_group_metric(RuntimeOrigin::root(), 2, 3)); assert_ok!(SequencerGrouping::trigger_group(vec![1, 2, 3, 4, 5, 6], 1, 1)); @@ -155,11 +157,12 @@ fn all_group_ids_works() { let parent_hash = H256::from_low_u64_be(12345); frame_system::Pallet::::set_parent_hash(parent_hash); - let ip_address: BoundedVec::MaxLengthIP> = bounded_vec![127, 0, 0, 1]; + let ip_address: BoundedVec::MaxLengthIP> = + bounded_vec![127, 0, 0, 1]; assert_ok!(SequencerGrouping::register_processor( - RuntimeOrigin::signed(1), - ip_address.clone() - )); + RuntimeOrigin::signed(1), + ip_address.clone() + )); assert_ok!(SequencerGrouping::set_group_metric(RuntimeOrigin::root(), 2, 3)); assert_ok!(SequencerGrouping::trigger_group(vec![1, 2, 3, 4, 5, 6], 15, 2)); @@ -174,11 +177,12 @@ fn get_next_round_works() { new_test_ext().execute_with(|| { assert_ok!(SequencerGrouping::set_group_metric(RuntimeOrigin::root(), 2, 3)); - let ip_address: BoundedVec::MaxLengthIP> = bounded_vec![127, 0, 0, 1]; + let ip_address: BoundedVec::MaxLengthIP> = + bounded_vec![127, 0, 0, 1]; assert_ok!(SequencerGrouping::register_processor( - RuntimeOrigin::signed(1), - ip_address.clone() - )); + RuntimeOrigin::signed(1), + ip_address.clone() + )); assert_ok!(SequencerGrouping::trigger_group(vec![1, 2, 3, 4, 5, 6], 16, 3)); println!("Group Members: {:?}", GroupMembers::::get()); @@ -193,11 +197,12 @@ fn get_next_round_works() { fn register_new_processor_works() { new_test_ext().execute_with(|| { let account_id: u64 = 1; - let ip_address: BoundedVec::MaxLengthIP> = bounded_vec![127, 0, 0, 1]; + let ip_address: BoundedVec::MaxLengthIP> = + bounded_vec![127, 0, 0, 1]; assert_ok!(SequencerGrouping::register_processor( - RuntimeOrigin::signed(account_id), - ip_address.clone() - )); + RuntimeOrigin::signed(account_id), + ip_address.clone() + )); let processor_info = SequencerGrouping::processor_info(); assert_eq!(processor_info.len(), 1); @@ -210,17 +215,19 @@ fn register_new_processor_works() { fn update_existing_processor_ip_works() { new_test_ext().execute_with(|| { let account_id: u64 = 1; - let ip_address: BoundedVec::MaxLengthIP> = bounded_vec![127, 0, 0, 1]; - let new_ip_address: BoundedVec::MaxLengthIP> = bounded_vec![192, 168, 0, 1]; + let ip_address: BoundedVec::MaxLengthIP> = + bounded_vec![127, 0, 0, 1]; + let new_ip_address: BoundedVec::MaxLengthIP> = + bounded_vec![192, 168, 0, 1]; assert_ok!(SequencerGrouping::register_processor( - RuntimeOrigin::signed(account_id), - ip_address.clone() - )); + RuntimeOrigin::signed(account_id), + ip_address.clone() + )); assert_ok!(SequencerGrouping::register_processor( - RuntimeOrigin::signed(account_id), - new_ip_address.clone() - )); + RuntimeOrigin::signed(account_id), + new_ip_address.clone() + )); let processor_info = SequencerGrouping::processor_info(); assert_eq!(processor_info.len(), 1); @@ -233,16 +240,17 @@ fn update_existing_processor_ip_works() { fn do_nothing_if_ip_unchanged() { new_test_ext().execute_with(|| { let account_id: u64 = 1; - let ip_address: BoundedVec::MaxLengthIP> = bounded_vec![127, 0, 0, 1]; + let ip_address: BoundedVec::MaxLengthIP> = + bounded_vec![127, 0, 0, 1]; assert_ok!(SequencerGrouping::register_processor( - RuntimeOrigin::signed(account_id), - ip_address.clone() - )); + RuntimeOrigin::signed(account_id), + ip_address.clone() + )); assert_ok!(SequencerGrouping::register_processor( - RuntimeOrigin::signed(account_id), - ip_address.clone() - )); + RuntimeOrigin::signed(account_id), + ip_address.clone() + )); let processor_info = SequencerGrouping::processor_info(); assert_eq!(processor_info.len(), 1); @@ -254,20 +262,24 @@ fn do_nothing_if_ip_unchanged() { #[test] fn too_many_processors_fails() { new_test_ext().execute_with(|| { - let ip_address: BoundedVec::MaxLengthIP> = bounded_vec![127, 0, 0, 1]; + let ip_address: BoundedVec::MaxLengthIP> = + bounded_vec![127, 0, 0, 1]; let max_running_app: u32 = ::MaxRunningAPP::get(); - for i in 1..= max_running_app { + for i in 1..=max_running_app { assert_ok!(SequencerGrouping::register_processor( - RuntimeOrigin::signed(i as u64), - ip_address.clone() - )); + RuntimeOrigin::signed(i as u64), + ip_address.clone() + )); } assert_noop!( - SequencerGrouping::register_processor(RuntimeOrigin::signed(max_running_app as u64 + 1), ip_address.clone()), - Error::::TooManyProcessors - ); + SequencerGrouping::register_processor( + RuntimeOrigin::signed(max_running_app as u64 + 1), + ip_address.clone() + ), + Error::::TooManyProcessors + ); let processor_info = SequencerGrouping::processor_info(); assert_eq!(processor_info.len(), max_running_app as usize); @@ -277,16 +289,17 @@ fn too_many_processors_fails() { #[test] fn account_with_group_ids_works() { new_test_ext().execute_with(|| { - let ip_address: BoundedVec::MaxLengthIP> = bounded_vec![127, 0, 0, 1]; + let ip_address: BoundedVec::MaxLengthIP> = + bounded_vec![127, 0, 0, 1]; assert_ok!(SequencerGrouping::register_processor( - RuntimeOrigin::signed(1), - ip_address.clone() - )); + RuntimeOrigin::signed(1), + ip_address.clone() + )); assert_ok!(SequencerGrouping::register_processor( - RuntimeOrigin::signed(2), - ip_address.clone() - )); + RuntimeOrigin::signed(2), + ip_address.clone() + )); System::set_block_number(10); let parent_hash = H256::from_low_u64_be(12345); @@ -306,4 +319,3 @@ fn account_with_group_ids_works() { assert_eq!(result, vec![1]); }); } - diff --git a/primitives/container/src/lib.rs b/primitives/container/src/lib.rs index 1bf4cf3..5a64a5e 100644 --- a/primitives/container/src/lib.rs +++ b/primitives/container/src/lib.rs @@ -3,17 +3,52 @@ use codec::{Codec, Decode, Encode}; use scale_info::TypeInfo; use sp_core::H256; use sp_std::vec::Vec; + +/// Client info of sequencer. #[derive(Debug, Clone, TypeInfo, Encode, Decode, Default)] pub struct DownloadInfo { + /// App id. pub app_id: u32, + /// App hash. pub app_hash: H256, + /// File name of app. pub file_name: Vec, + /// File size of app. pub size: u32, + /// Group id of app. pub group: u32, + /// Url of download binary client file. + pub url: Vec, + /// Arguments of startup client. + pub args: Option>, + /// Log file of startup client. + pub log: Option>, + /// Is starup of docker container. + pub is_docker_image: bool, + /// Docker image + pub docker_image: Option>, +} + +/// Client info of processor. +#[derive(Debug, Clone, TypeInfo, Encode, Decode, Default)] +pub struct ProcessorDownloadInfo { + /// App id. + pub app_id: u32, + /// App hash. + pub app_hash: H256, + /// File name of app. + pub file_name: Vec, + /// File size of app. + pub size: u32, + /// Url of download binary client file. pub url: Vec, + /// Arguments of startup client. pub args: Option>, + /// Log file of startup client. pub log: Option>, + /// Is starup of docker container. pub is_docker_image: bool, + /// Docker image pub docker_image: Option>, } @@ -26,5 +61,6 @@ sp_api::decl_runtime_apis! { fn should_run()-> bool; fn get_group_id(author:AuthorityId) ->u32; fn get_groups()->Vec; + fn processor_run(author:AuthorityId, ip_address:Vec)->Vec; } } diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 86af838..0378920 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -53,7 +53,7 @@ use pallet_sequencer_grouping::SimpleRandomness; use pallet_sequencer_staking::WeightInfo; use pallet_xcm::{EnsureXcm, IsVoiceOfBody}; use parachains_common::message_queue::{NarrowOriginToSibling, ParaIdToSibling}; -use primitives_container::DownloadInfo; +use primitives_container::{DownloadInfo, ProcessorDownloadInfo}; pub use sp_consensus_aura::sr25519::AuthorityId as AuraId; use sp_core::ConstU128; use sp_runtime::DispatchErrorWithPostInfo; @@ -811,6 +811,10 @@ impl_runtime_apis! { fn should_run()-> bool { ContainerPallet::should_run() } + + fn processor_run(author:AccountId32, ip_address:Vec)->Vec { + ContainerPallet::processor_run(author, ip_address) + } } impl cumulus_primitives_core::CollectCollationInfo for Runtime {