From 72d7d910fd9a02313b9b55b1a57cc8ce6f015a14 Mon Sep 17 00:00:00 2001 From: sulijia <984115358@qq.com> Date: Thu, 16 May 2024 22:21:58 +0800 Subject: [PATCH 01/16] Feat:Add register processor function to pallet --- node/src/container_task.rs | 11 +- pallets/container/src/lib.rs | 192 +++++++++++++++++++++++++++++++- primitives/container/src/lib.rs | 13 +++ runtime/src/lib.rs | 8 +- 4 files changed, 215 insertions(+), 9 deletions(-) diff --git a/node/src/container_task.rs b/node/src/container_task.rs index 574500a..d661e6e 100644 --- a/node/src/container_task.rs +++ b/node/src/container_task.rs @@ -5,7 +5,7 @@ use cumulus_primitives_core::{ use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; use futures::{lock::Mutex, pin_mut, select, FutureExt, Stream, StreamExt}; use polkadot_primitives::OccupiedCoreAssumption; -use primitives_container::{ContainerRuntimeApi, DownloadInfo}; +use primitives_container::{ContainerRuntimeApi, DownloadInfo, ProcessorDownloadInfo}; use reqwest::{ self, header::{HeaderValue, CONTENT_LENGTH, RANGE}, @@ -36,7 +36,8 @@ use std::{ pub const RUN_ARGS_KEY: &[u8] = b"run_args"; pub const SYNC_ARGS_KEY: &[u8] = b"sync_args"; pub const OPTION_ARGS_KEY: &[u8] = b"option_args"; - +pub const P_RUN_ARGS_KEY: &[u8] = b"p_run_args"; +pub const P_OPTION_ARGS_KEY: &[u8] = b"p_option_args"; struct PartialRangeIter { start: u64, end: u64, @@ -578,7 +579,11 @@ where let hash = parachain_head.hash(); let xx = keystore.sr25519_public_keys(sp_application_crypto::key_types::AURA)[0]; - + // Processor client process + let processor_run: Option = + parachain.runtime_api().processor_run(hash, Vec::from("127.0.0.1"))?; + log::info!("processor download info:{:?}", processor_run); + //Layer2 client process let should_load: Option = parachain.runtime_api().shuld_load(hash, xx.into())?; log::info!("app download info of sequencer's group:{:?}", should_load); diff --git a/pallets/container/src/lib.rs b/pallets/container/src/lib.rs index d983f11..debe879 100644 --- a/pallets/container/src/lib.rs +++ b/pallets/container/src/lib.rs @@ -17,7 +17,7 @@ use cumulus_primitives_core::relay_chain::Hash; use frame_support::pallet_prelude::*; use frame_system::pallet_prelude::*; use pallet_sequencer_grouping::SequencerGroup; -use primitives_container::DownloadInfo; +use primitives_container::{DownloadInfo, ProcessorDownloadInfo}; use scale_info::{prelude::vec::Vec, TypeInfo}; use sp_runtime::BoundedVec; use sp_std::vec; @@ -37,6 +37,22 @@ pub struct APPInfo { is_docker_image: Option, docker_image: Option>, } + +#[derive(Encode, Decode, Default, Clone, TypeInfo, MaxEncodedLen, Debug)] +#[scale_info(skip_type_params(T))] +pub struct ProcessorInfo { + app_hash: Hash, + creator: T::AccountId, + ip_address: BoundedVec, + project_name: BoundedVec, + file_name: BoundedVec, + size: u32, + args: Option>, + log: Option>, + is_docker_image: Option, + docker_image: Option>, +} + #[frame_support::pallet] pub mod pallet { use super::*; @@ -65,6 +81,9 @@ pub mod pallet { #[pallet::constant] type MaxArgLength: Get; + + #[pallet::constant] + type MaxLengthIP: Get; } #[pallet::type_value] @@ -83,6 +102,11 @@ pub mod pallet { #[pallet::getter(fn appinfo_map)] pub type APPInfoMap = StorageMap<_, Twox64Concat, u32, APPInfo, OptionQuery>; + #[pallet::storage] + #[pallet::getter(fn processorinfo_map)] + pub type ProcessorInfoMap = + StorageMap<_, Twox64Concat, u32, ProcessorInfo, OptionQuery>; + // app_id,inuse #[pallet::storage] #[pallet::getter(fn inuse_map)] @@ -111,6 +135,7 @@ pub mod pallet { #[pallet::error] pub enum Error { AppNotExist, + AccountInconsistent, } #[pallet::hooks] @@ -223,6 +248,103 @@ pub mod pallet { Pallet::::deposit_event(Event::::SetDownloadURL { url }); Ok(()) } + + #[pallet::call_index(2)] + #[pallet::weight(::WeightInfo::register_app())] + pub fn register_processor( + origin: OriginFor, + ip_address: BoundedVec, + app_hash: Hash, + project_name: BoundedVec, + file_name: BoundedVec, + size: u32, + args: Option>, + log: Option>, + is_docker_image: Option, + docker_image: Option>, + ) -> DispatchResult { + let who = ensure_signed(origin)?; + + let max_app_id = NextApplicationID::::get(); + + let mut processor_id = 0; + + for app_id in 1..max_app_id { + let app_info = ProcessorInfoMap::::get(app_id); + if let Some(app) = app_info { + if app.project_name == project_name { + processor_id = app_id; + + ensure!(app.creator == who, Error::::AccountInconsistent); + } + } + } + ensure!(processor_id > 0, Error::::AppNotExist); + ProcessorInfoMap::::insert( + processor_id, + ProcessorInfo { + app_hash, + creator: who, + ip_address, + project_name: project_name.clone(), + file_name: file_name.clone(), + size, + args, + log, + is_docker_image, + docker_image, + }, + ); + Ok(()) + } + + #[pallet::call_index(3)] + #[pallet::weight(::WeightInfo::register_app())] + pub fn update_processor( + origin: OriginFor, + ip_address: BoundedVec, + app_hash: Hash, + project_name: BoundedVec, + file_name: BoundedVec, + size: u32, + args: Option>, + log: Option>, + is_docker_image: Option, + docker_image: Option>, + ) -> DispatchResult { + let who = ensure_signed(origin)?; + + let max_app_id = NextApplicationID::::get(); + + let mut processor_id = 0; + + for app_id in 1..max_app_id { + let app_info = ProcessorInfoMap::::get(app_id); + if let Some(app) = app_info { + if app.project_name == project_name { + processor_id = app_id; + + ensure!(app.creator == who, Error::::AccountInconsistent); + } + } + } + ensure!(processor_id > 0, Error::::AppNotExist); + ProcessorInfoMap::::mutate(processor_id, |info| { + *info = Some(ProcessorInfo { + app_hash, + creator: who, + ip_address, + project_name: project_name.clone(), + file_name: file_name.clone(), + size, + args, + log, + is_docker_image, + docker_image, + }) + }); + Ok(()) + } } } @@ -279,10 +401,9 @@ impl Pallet { pub fn get_group_id(author: T::AccountId) -> u32 { let group_id_result = >::account_in_group(author); - log::info!("new groupID:{:?}", group_id_result); - - if group_id_result.is_ok() { - group_id_result.unwrap() + if let Ok(group_id) = group_id_result { + log::info!("new groupID:{:?}", group_id); + group_id } else { 0xFFFFFFFF } @@ -290,4 +411,65 @@ impl Pallet { pub fn get_groups() -> Vec { >::all_group_ids() } + + pub fn processor_run(ip: Vec) -> Option { + let max_app_id = NextApplicationID::::get(); + + let mut processor_id = 0; + + for app_id in 1..max_app_id { + let app_info = ProcessorInfoMap::::get(app_id); + if let Some(app) = app_info { + if app.ip_address == ip { + processor_id = app_id; + } + } + } + if processor_id == 0 { + return None; + } + + let groups = Self::get_groups(); + let mut valid_id = 0; + for group in groups.iter() { + let app = GroupAPPMap::::get(group); + match app { + Some(app_id) => + if processor_id == app_id { + valid_id = app_id; + break; + }, + None => {}, + } + } + if processor_id == 0 { + return None; + } + let app_id = valid_id; + let app_info = ProcessorInfoMap::::get(app_id).ok_or(Error::::AppNotExist).ok()?; + + let url = DefaultUrl::::get()?; + + let args = app_info.args.and_then(|log| Some(log.as_slice().to_vec())); + + let log = app_info.log.and_then(|log| Some(log.as_slice().to_vec())); + + let is_docker_image = + if let Some(is_docker) = app_info.is_docker_image { is_docker } else { false }; + + let docker_image = app_info + .docker_image + .and_then(|docker_image| Some(docker_image.as_slice().to_vec())); + + Some(ProcessorDownloadInfo { + app_hash: app_info.app_hash, + file_name: app_info.file_name.into(), + size: app_info.size, + url: url.into(), + args, + log, + is_docker_image, + docker_image, + }) + } } diff --git a/primitives/container/src/lib.rs b/primitives/container/src/lib.rs index 1bf4cf3..ef466a8 100644 --- a/primitives/container/src/lib.rs +++ b/primitives/container/src/lib.rs @@ -17,6 +17,18 @@ pub struct DownloadInfo { pub docker_image: Option>, } +#[derive(Debug, Clone, TypeInfo, Encode, Decode, Default)] +pub struct ProcessorDownloadInfo { + pub app_hash: H256, + pub file_name: Vec, + pub size: u32, + pub url: Vec, + pub args: Option>, + pub log: Option>, + pub is_docker_image: bool, + pub docker_image: Option>, +} + sp_api::decl_runtime_apis! { #[api_version(2)] pub trait ContainerRuntimeApi where @@ -26,5 +38,6 @@ sp_api::decl_runtime_apis! { fn should_run()-> bool; fn get_group_id(author:AuthorityId) ->u32; fn get_groups()->Vec; + fn processor_run(ip:Vec)->Option; } } diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index b709381..4a64859 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -53,7 +53,7 @@ use pallet_sequencer_grouping::SimpleRandomness; use pallet_sequencer_staking::WeightInfo; use pallet_xcm::{EnsureXcm, IsVoiceOfBody}; use parachains_common::message_queue::{NarrowOriginToSibling, ParaIdToSibling}; -use primitives_container::DownloadInfo; +use primitives_container::{DownloadInfo, ProcessorDownloadInfo}; pub use sp_consensus_aura::sr25519::AuthorityId as AuraId; use sp_core::ConstU128; use sp_runtime::DispatchErrorWithPostInfo; @@ -585,6 +585,7 @@ parameter_types! { pub const MaxUrlLength: u32 = 300; pub const MaxArgCount: u32 = 10; pub const MaxArgLength: u32 = 100; + pub const MaxLengthIP: u32 = 30; } impl pallet_container::Config for Runtime { @@ -595,6 +596,7 @@ impl pallet_container::Config for Runtime { type MaxUrlLength = MaxUrlLength; type MaxArgCount = MaxArgCount; type MaxArgLength = MaxArgLength; + type MaxLengthIP = MaxLengthIP; } // Create the runtime by composing the FRAME pallets that were previously configured. @@ -809,6 +811,10 @@ impl_runtime_apis! { fn should_run()-> bool { ContainerPallet::should_run() } + + fn processor_run(ip:Vec)->Option { + ContainerPallet::processor_run(ip) + } } impl cumulus_primitives_core::CollectCollationInfo for Runtime { From a696a9dfcd340f2fa47d9d4dd5232de2375ed248 Mon Sep 17 00:00:00 2001 From: sulijia <984115358@qq.com> Date: Sat, 18 May 2024 13:34:31 +0800 Subject: [PATCH 02/16] Feat:Add the logic code of the node part of the processor --- Cargo.lock | 155 +++++++++++++++++++ node/Cargo.toml | 1 + node/src/container_task.rs | 307 +++++++++++++++++++++++++++++++++---- 3 files changed, 432 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5090e7a..4b3dd52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2538,6 +2538,27 @@ dependencies = [ "syn 2.0.58", ] +[[package]] +name = "dlv-list" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f" +dependencies = [ + "const-random", +] + +[[package]] +name = "dns-lookup" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5766087c2235fec47fafa4cfecc81e494ee679d0fd4a59887ea0919bfb0e4fc" +dependencies = [ + "cfg-if", + "libc", + "socket2 0.5.6", + "windows-sys 0.48.0", +] + [[package]] name = "docify" version = "0.2.8" @@ -3606,6 +3627,19 @@ dependencies = [ "zeroize", ] +[[package]] +name = "get_local_info" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a602b5db70aabc62c623c723997426190e1cd95fc3cdf357a343669bf801239" +dependencies = [ + "chrono", + "dns-lookup", + "pnet", + "reqwest", + "rust-ini", +] + [[package]] name = "gethostname" version = "0.2.3" @@ -4225,6 +4259,15 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +[[package]] +name = "ipnetwork" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf466541e9d546596ee94f9f69590f89473455f88372423e0008fc1a7daf100e" +dependencies = [ + "serde", +] + [[package]] name = "is-terminal" version = "0.4.12" @@ -6002,6 +6045,16 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-multimap" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ed8acf08e98e744e5384c8bc63ceb0364e68a6854187221c18df61c4797690e" +dependencies = [ + "dlv-list", + "hashbrown 0.13.2", +] + [[package]] name = "pallet-asset-conversion" version = "13.0.0" @@ -7594,6 +7647,97 @@ version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db23d408679286588f4d4644f965003d056e3dd5abcaaa938116871d7ce2fee7" +[[package]] +name = "pnet" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "130c5b738eeda2dc5796fe2671e49027e6935e817ab51b930a36ec9e6a206a64" +dependencies = [ + "ipnetwork", + "pnet_base", + "pnet_datalink", + "pnet_packet", + "pnet_sys", + "pnet_transport", +] + +[[package]] +name = "pnet_base" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe4cf6fb3ab38b68d01ab2aea03ed3d1132b4868fa4e06285f29f16da01c5f4c" +dependencies = [ + "no-std-net", +] + +[[package]] +name = "pnet_datalink" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad5854abf0067ebbd3967f7d45ebc8976ff577ff0c7bd101c4973ae3c70f98fe" +dependencies = [ + "ipnetwork", + "libc", + "pnet_base", + "pnet_sys", + "winapi", +] + +[[package]] +name = "pnet_macros" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "688b17499eee04a0408aca0aa5cba5fc86401d7216de8a63fdf7a4c227871804" +dependencies = [ + "proc-macro2", + "quote", + "regex", + "syn 2.0.58", +] + +[[package]] +name = "pnet_macros_support" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eea925b72f4bd37f8eab0f221bbe4c78b63498350c983ffa9dd4bcde7e030f56" +dependencies = [ + "pnet_base", +] + +[[package]] +name = "pnet_packet" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9a005825396b7fe7a38a8e288dbc342d5034dac80c15212436424fef8ea90ba" +dependencies = [ + "glob", + "pnet_base", + "pnet_macros", + "pnet_macros_support", +] + +[[package]] +name = "pnet_sys" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "417c0becd1b573f6d544f73671070b039051e5ad819cc64aa96377b536128d00" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "pnet_transport" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2637e14d7de974ee2f74393afccbc8704f3e54e6eb31488715e72481d1662cc3" +dependencies = [ + "libc", + "pnet_base", + "pnet_packet", + "pnet_sys", +] + [[package]] name = "polkadot-approval-distribution" version = "10.0.0" @@ -8904,6 +9048,7 @@ dependencies = [ "frame-benchmarking", "frame-benchmarking-cli", "futures", + "get_local_info", "jsonrpsee", "log", "pallet-transaction-payment-rpc", @@ -9925,6 +10070,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "rust-ini" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e2a3bcec1f113553ef1c88aae6c020a369d03d55b58de9869a0908930385091" +dependencies = [ + "cfg-if", + "ordered-multimap", +] + [[package]] name = "rustc-demangle" version = "0.1.23" diff --git a/node/Cargo.toml b/node/Cargo.toml index 8b6b13e..64d8d1d 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -29,6 +29,7 @@ tempfile="3.9.0" error-chain="0.12.4" ring="0.17.8" tokio = { version = "1.22.0", features = ["parking_lot", "rt-multi-thread", "time"] } +get_local_info = "0.2.4" # Local primitives-container = { path = "../primitives/container"} diff --git a/node/src/container_task.rs b/node/src/container_task.rs index d661e6e..68efb21 100644 --- a/node/src/container_task.rs +++ b/node/src/container_task.rs @@ -5,6 +5,7 @@ use cumulus_primitives_core::{ use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; use futures::{lock::Mutex, pin_mut, select, FutureExt, Stream, StreamExt}; use polkadot_primitives::OccupiedCoreAssumption; +use popsicle_runtime::pallet_container::ProcessorInfo; use primitives_container::{ContainerRuntimeApi, DownloadInfo, ProcessorDownloadInfo}; use reqwest::{ self, @@ -24,15 +25,15 @@ use sp_runtime::{ }; use std::{ error::Error, - fs, - fs::{File, Permissions}, + fs::{self, File, Permissions}, io::{BufReader, Read}, - os::unix::fs::PermissionsExt, + os::unix::{fs::PermissionsExt, process}, path::{Path, PathBuf}, process::{Child, Command, Stdio}, str::FromStr, sync::Arc, }; + pub const RUN_ARGS_KEY: &[u8] = b"run_args"; pub const SYNC_ARGS_KEY: &[u8] = b"sync_args"; pub const OPTION_ARGS_KEY: &[u8] = b"option_args"; @@ -86,7 +87,8 @@ async fn sha256_digest(mut reader: R) -> Result Result<(), Box> { //firt create dir @@ -101,7 +103,7 @@ async fn download_sdk( let client = reqwest::blocking::Client::new(); - let web_path = format!("{}/{}", url, std::str::from_utf8(&app_info.file_name)?); + let web_path = format!("{}/{}", url, file_name); log::info!("=============download:{:?}", web_path); let response = client.head(&web_path).send()?; @@ -114,7 +116,7 @@ async fn download_sdk( let length = u64::from_str(length.to_str()?).map_err(|_| "invalid Content-Length header")?; log::info!("==========total length:{:?}", length); - let download_path = format!("{}/{}", path_str, std::str::from_utf8(&app_info.file_name)?); + let download_path = format!("{}/{}", path_str, file_name); log::info!("=============download_path:{:?}", download_path); let download_dir = Path::new(&download_path); @@ -147,7 +149,7 @@ async fn download_sdk( let digest = sha256_digest(reader).await?; println!("SHA-256 digest is {:?}", digest); - if digest.as_ref() == app_info.app_hash.as_bytes() { + if digest.as_ref() == file_hash.as_bytes() { println!("check ok"); } else { println!("check fail"); @@ -271,8 +273,16 @@ struct RunningApp { instance2_docker_name: Option>, cur_ins: InstanceIndex, } +#[derive(Debug)] +struct RunningProcessor { + running: RunStatus, + processor_info: Option, + instance: Option, + instance_docker: bool, + instance_docker_name: Option>, +} -async fn process_download_task( +async fn app_download_task( data_path: PathBuf, app_info: DownloadInfo, running_app: Arc>, @@ -314,7 +324,9 @@ async fn process_download_task( if let Ok(need_down) = need_download { if need_down { - let result = download_sdk(data_path.clone(), app_info.clone(), url).await; + let file_name = std::str::from_utf8(&app_info.file_name)?; + let result = + download_sdk(data_path.clone(), file_name, app_info.app_hash, url).await; if result.is_ok() { start_flag = true; @@ -328,7 +340,7 @@ async fn process_download_task( } if start_flag { log::info!("===============start app for sync================="); - let result = process_run_task( + let result = app_run_task( data_path, app_info.clone(), sync_args, @@ -349,7 +361,186 @@ async fn process_download_task( Ok(()) } -async fn process_run_task( +async fn processor_run_task( + data_path: PathBuf, + processor_info: ProcessorDownloadInfo, + run_args: Option>, + option_args: Option>, + running_processor: Arc>, +) -> Result<(), Box> { + let run_as_docker = processor_info.is_docker_image; + + let extension_args_clone; + + let extension_args: Option> = if let Some(run_args) = run_args { + extension_args_clone = run_args.clone(); + + Some( + std::str::from_utf8(&extension_args_clone)? + .split(' ') + .into_iter() + .map(|arg| std::str::from_utf8(arg.as_bytes()).unwrap()) + .collect(), + ) + } else { + None + }; + + let app_args_clone; + + let mut args: Vec<&str> = if let Some(app_args) = processor_info.args { + app_args_clone = app_args.clone(); + + std::str::from_utf8(&app_args_clone)? + .split(' ') + .into_iter() + .map(|arg| std::str::from_utf8(arg.as_bytes()).unwrap()) + .collect() + } else { + Vec::new() + }; + + if let Some(extension_args) = extension_args { + args.extend(extension_args); + } + + let log_file_name; + + let log_file_buf; + + if processor_info.log.is_none() { + log_file_name = std::str::from_utf8(&processor_info.file_name)?; + } else { + log_file_buf = processor_info.log.unwrap(); + + log_file_name = std::str::from_utf8(&log_file_buf)?; + } + + log::info!("log_file_name:{:?}", log_file_name); + log::info!("args:{:?}", args); + let outputs = File::create(log_file_name)?; + + let errors = outputs.try_clone()?; + + let mut app = running_processor.lock().await; + + // start new instance + let mut instance: Option = None; + if run_as_docker { + let image_name = processor_info.docker_image.ok_or("docker image not exist")?; + let docker_image = std::str::from_utf8(&image_name)?; + let start_result = start_docker_container( + std::str::from_utf8(&processor_info.file_name)?, + docker_image, + args, + option_args, + outputs.try_clone()?, + ) + .await; + log::info!("start docker container :{:?}", start_result); + } else { + let download_path = format!( + "{}/sdk/{}", + data_path.as_os_str().to_str().unwrap(), + std::str::from_utf8(&processor_info.file_name)? + ); + instance = Some( + Command::new(download_path) + .stdin(Stdio::piped()) + .stderr(Stdio::from(outputs)) + .stdout(Stdio::from(errors)) + .args(args) + .spawn() + .expect("failed to execute process"), + ); + } + app.instance = instance; + if run_as_docker { + app.instance_docker = true; + app.instance_docker_name = Some(processor_info.file_name); + } else { + app.instance_docker = false; + app.instance_docker_name = None; + } + log::info!("app:{:?}", app); + Ok(()) +} +async fn processor_task( + data_path: PathBuf, + processor_info: ProcessorDownloadInfo, + running_processor: Arc>, + run_args: Option>, + option_args: Option>, +) -> Result<(), Box> { + let run_as_docker = processor_info.is_docker_image; + let mut start_flag = false; + + if run_as_docker { + log::info!("===========Download app from docker hub and run the application as a container========="); + if let Some(image) = processor_info.clone().docker_image { + let docker_image = std::str::from_utf8(&image)?; + let mut exist_docker_image = check_docker_image_exist(docker_image).await?; + if !exist_docker_image { + download_docker_image(docker_image).await?; + exist_docker_image = check_docker_image_exist(docker_image).await?; + } + if exist_docker_image { + //Start docker container + start_flag = true; + } + } + } else { + log::info!( + "===========Download app from the web and run the application as a process=========" + ); + let url = std::str::from_utf8(&processor_info.url)?; + + let download_path = format!( + "{}/sdk/{}", + data_path.as_os_str().to_str().ok_or("invalid data_path")?, + std::str::from_utf8(&processor_info.file_name)? + ); + + let need_download = need_download(&download_path, processor_info.app_hash).await; + log::info!("need_download:{:?}", need_download); + + if let Ok(need_down) = need_download { + if need_down { + let file_name = std::str::from_utf8(&processor_info.file_name)?; + let result = + download_sdk(data_path.clone(), file_name, processor_info.app_hash, url).await; + + if result.is_ok() { + start_flag = true; + } else { + log::info!("download processor client error:{:?}", result); + } + } else { + start_flag = true; + } + } + } + if start_flag { + log::info!("===============run processor================="); + let result = processor_run_task( + data_path, + processor_info.clone(), + run_args, + option_args, + running_processor.clone(), + ) + .await; + log::info!("start processor result:{:?}", result); + let mut processor = running_processor.lock().await; + processor.running = RunStatus::Downloaded; + } else { + let mut processor = running_processor.lock().await; + processor.running = RunStatus::Pending; + } + Ok(()) +} + +async fn app_run_task( data_path: PathBuf, app_info: DownloadInfo, run_args: Option>, @@ -413,16 +604,16 @@ async fn process_run_task( let mut app = running_app.lock().await; - let (old_instance, instance_docker, op_docker_name) = if app.cur_ins == InstanceIndex::Instance1 - { - let is_docker_instance = app.instance1_docker; - let top_docker_name = app.instance1_docker_name.clone(); - (&mut app.instance1, is_docker_instance, top_docker_name) - } else { - let is_docker_instance = app.instance2_docker; - let top_docker_name = app.instance2_docker_name.clone(); - (&mut app.instance2, is_docker_instance, top_docker_name) - }; + let (old_instance, instance_docker, op_docker_name, cur_ins) = + if app.cur_ins == InstanceIndex::Instance1 { + let is_docker_instance = app.instance1_docker; + let top_docker_name = app.instance1_docker_name.clone(); + (&mut app.instance1, is_docker_instance, top_docker_name, 1) + } else { + let is_docker_instance = app.instance2_docker; + let top_docker_name = app.instance2_docker_name.clone(); + (&mut app.instance2, is_docker_instance, top_docker_name, 2) + }; // stop old instance if instance_docker { //reomve docker container @@ -434,7 +625,11 @@ async fn process_run_task( if let Some(ref mut old_instance) = old_instance { old_instance.kill()?; let kill_result = old_instance.wait()?; - log::info!("kill old instance:{:?}", kill_result); + log::info!("kill old instance:{:?}:{:?}", cur_ins, kill_result); + match kill_result.code() { + Some(code) => log::info!("Exited with status code: {code}"), + None => log::info!("Process terminated by signal"), + } } } // start new instance @@ -538,14 +733,11 @@ async fn process_run_task( async fn handle_new_best_parachain_head( validation_data: PersistedValidationData, - height: RelayBlockNumber, parachain: &P, keystore: KeystorePtr, - relay_chain: impl RelayChainInterface + Clone, - p_hash: H256, - para_id: ParaId, data_path: PathBuf, running_app: Arc>, + running_processor: Arc>, backend: Arc, ) -> Result<(), Box> where @@ -579,15 +771,49 @@ where let hash = parachain_head.hash(); let xx = keystore.sr25519_public_keys(sp_application_crypto::key_types::AURA)[0]; + // Processor client process + let ip_address = get_local_info::get_pc_ipv4(); + + log::info!("ip_address:{:?}", ip_address); + let processor_run: Option = - parachain.runtime_api().processor_run(hash, Vec::from("127.0.0.1"))?; + parachain.runtime_api().processor_run(hash, Vec::from(ip_address))?; + log::info!("processor download info:{:?}", processor_run); + + match processor_run { + Some(pcsInfo) => { + let mut processor = running_processor.lock().await; + + let run_status = &processor.running; + + if *run_status == RunStatus::Pending { + processor.running = RunStatus::Downloading; + + let (mut run_args, mut option_args) = if let Some(storage) = + offchain_storage.clone() + { + let prefix = &STORAGE_PREFIX; + (storage.get(prefix, P_RUN_ARGS_KEY), storage.get(prefix, P_OPTION_ARGS_KEY)) + } else { + (None, None) + }; + tokio::spawn(processor_task( + data_path.clone(), + pcsInfo, + running_processor.clone(), + run_args, + option_args, + )); + } + }, + None => log::info!("Processor None"), + }; //Layer2 client process let should_load: Option = parachain.runtime_api().shuld_load(hash, xx.into())?; log::info!("app download info of sequencer's group:{:?}", should_load); - let number = (*parachain_head.number()).into(); { let mut app = running_app.lock().await; @@ -630,7 +856,7 @@ where log::info!("offchain_storage of option_args:{:?}", option_args); app.running = RunStatus::Downloading; app.app_id = app_id; - tokio::spawn(process_download_task( + tokio::spawn(app_download_task( data_path.clone(), app_info, running_app.clone(), @@ -675,7 +901,7 @@ where }; } log::info!("offchain_storage of option_args:{:?}", option_args); - tokio::spawn(process_run_task( + tokio::spawn(app_run_task( data_path, app_info, run_args, @@ -727,6 +953,18 @@ async fn relay_chain_notification( ::Number: Into, TBackend: 'static + sc_client_api::backend::Backend + Send, { + // let mut stop = false; + // loop{ + // if !stop{ + // let download_info = DownloadInfo{ + // file_name:"magport-node-b".into(), + // app_hash:H256::from_str("9b64d63367328fd980b6e88af0dc46c437bf2c3906a9b000eccd66a6e4599938"). + // unwrap(), ..Default::default() + // }; + // download_sdk(data_path.clone(), download_info, "http://43.134.60.202:88/static").await; + // stop = true; + // } + // } let new_best_heads = match new_best_heads(relay_chain.clone(), para_id).await { Ok(best_heads_stream) => best_heads_stream.fuse(), Err(_err) => { @@ -747,12 +985,19 @@ async fn relay_chain_notification( instance2_docker_name: None, cur_ins: InstanceIndex::Instance1, })); + let runing_processor = Arc::new(Mutex::new(RunningProcessor { + running: RunStatus::Pending, + processor_info: None, + instance: None, + instance_docker: false, + instance_docker_name: None, + })); loop { select! { h = new_best_heads.next() => { match h { Some((height, head, hash)) => { - let _ = handle_new_best_parachain_head(head,height, &*parachain,keystore.clone(), relay_chain.clone(), hash, para_id, data_path.clone(), runing_app.clone(), backend.clone()).await; + let _ = handle_new_best_parachain_head(head, &*parachain,keystore.clone(), data_path.clone(), runing_app.clone(),runing_processor.clone(), backend.clone()).await; }, None => { return; From 7725190586f91d63737cd9da71fb63addf4fb1aa Mon Sep 17 00:00:00 2001 From: sulijia <984115358@qq.com> Date: Sun, 19 May 2024 15:34:14 +0800 Subject: [PATCH 03/16] Fix:kill docker container then store instance --- Cargo.lock | 1 + node/src/container_task.rs | 4 ++-- pallets/container/Cargo.toml | 2 +- pallets/container/src/lib.rs | 6 +++--- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4b3dd52..c964cba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6388,6 +6388,7 @@ name = "pallet-container" version = "0.1.0" dependencies = [ "cumulus-primitives-core", + "derivative", "frame-benchmarking", "frame-support", "frame-system", diff --git a/node/src/container_task.rs b/node/src/container_task.rs index 68efb21..b050e77 100644 --- a/node/src/container_task.rs +++ b/node/src/container_task.rs @@ -680,9 +680,9 @@ async fn app_run_task( let kill_result = other_instance.wait()?; log::info!("kill instance2:{:?}", kill_result); } - app.instance1 = instance; app.instance2 = None; } + app.instance1 = instance; app.cur_ins = InstanceIndex::Instance2; } else { if app.instance1_docker { @@ -701,9 +701,9 @@ async fn app_run_task( let kill_result = other_instance.wait()?; log::info!("kill instance1:{:?}", kill_result); } - app.instance2 = instance; app.instance1 = None; } + app.instance2 = instance; app.cur_ins = InstanceIndex::Instance1; } } else { diff --git a/pallets/container/Cargo.toml b/pallets/container/Cargo.toml index 8c5ee03..0a3f1d4 100644 --- a/pallets/container/Cargo.toml +++ b/pallets/container/Cargo.toml @@ -29,7 +29,7 @@ cumulus-primitives-core={ workspace = true } pallet-aura ={ workspace = true } sp-consensus-aura={ workspace = true } sp-core = { workspace = true } - +derivative = { version = "2.2.0", default-features = false, features = ["use_core"] } [dev-dependencies] serde = { workspace = true } diff --git a/pallets/container/src/lib.rs b/pallets/container/src/lib.rs index debe879..6242dc2 100644 --- a/pallets/container/src/lib.rs +++ b/pallets/container/src/lib.rs @@ -270,7 +270,7 @@ pub mod pallet { let mut processor_id = 0; for app_id in 1..max_app_id { - let app_info = ProcessorInfoMap::::get(app_id); + let app_info = APPInfoMap::::get(app_id); if let Some(app) = app_info { if app.project_name == project_name { processor_id = app_id; @@ -319,7 +319,7 @@ pub mod pallet { let mut processor_id = 0; for app_id in 1..max_app_id { - let app_info = ProcessorInfoMap::::get(app_id); + let app_info = APPInfoMap::::get(app_id); if let Some(app) = app_info { if app.project_name == project_name { processor_id = app_id; @@ -352,7 +352,7 @@ impl Pallet { // Obtain application information corresponding to the group. // If no group has been assigned or there are no available apps in the group, return None pub fn shuld_load(author: T::AccountId) -> Option { - log::info!("============author:{:?}", author.encode()); + // log::info!("============author:{:?}", author.encode()); //Get the group ID of the sequencer, error when got 0xFFFFFFFF let group_id = Self::get_group_id(author); From f979ea1eb10980a9541d7b29f421d6c43b9acfa1 Mon Sep 17 00:00:00 2001 From: sulijia <984115358@qq.com> Date: Thu, 23 May 2024 17:31:24 +0800 Subject: [PATCH 04/16] Feat:add processor to run batch client --- node/src/container_task.rs | 98 +++++---- pallets/container/src/benchmarking.rs | 28 ++- pallets/container/src/lib.rs | 306 ++++++++------------------ pallets/container/src/tests.rs | 31 ++- pallets/container/src/weights.rs | 50 +++-- primitives/container/src/lib.rs | 2 +- runtime/src/lib.rs | 6 +- 7 files changed, 224 insertions(+), 297 deletions(-) diff --git a/node/src/container_task.rs b/node/src/container_task.rs index b050e77..dd798b7 100644 --- a/node/src/container_task.rs +++ b/node/src/container_task.rs @@ -1,11 +1,8 @@ use codec::Decode; -use cumulus_primitives_core::{ - relay_chain::BlockNumber as RelayBlockNumber, ParaId, PersistedValidationData, -}; +use cumulus_primitives_core::{ParaId, PersistedValidationData}; use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; use futures::{lock::Mutex, pin_mut, select, FutureExt, Stream, StreamExt}; use polkadot_primitives::OccupiedCoreAssumption; -use popsicle_runtime::pallet_container::ProcessorInfo; use primitives_container::{ContainerRuntimeApi, DownloadInfo, ProcessorDownloadInfo}; use reqwest::{ self, @@ -24,10 +21,11 @@ use sp_runtime::{ AccountId32, }; use std::{ + collections::HashMap, error::Error, fs::{self, File, Permissions}, io::{BufReader, Read}, - os::unix::{fs::PermissionsExt, process}, + os::unix::fs::PermissionsExt, path::{Path, PathBuf}, process::{Child, Command, Stdio}, str::FromStr, @@ -273,14 +271,20 @@ struct RunningApp { instance2_docker_name: Option>, cur_ins: InstanceIndex, } + #[derive(Debug)] -struct RunningProcessor { +struct ProcessorInstance { + app_hash: H256, running: RunStatus, processor_info: Option, instance: Option, instance_docker: bool, instance_docker_name: Option>, } +#[derive(Debug)] +struct RunningProcessor { + processors: HashMap, +} async fn app_download_task( data_path: PathBuf, @@ -422,10 +426,8 @@ async fn processor_run_task( let errors = outputs.try_clone()?; - let mut app = running_processor.lock().await; - // start new instance - let mut instance: Option = None; + let mut instance = None; if run_as_docker { let image_name = processor_info.docker_image.ok_or("docker image not exist")?; let docker_image = std::str::from_utf8(&image_name)?; @@ -454,15 +456,20 @@ async fn processor_run_task( .expect("failed to execute process"), ); } - app.instance = instance; - if run_as_docker { - app.instance_docker = true; - app.instance_docker_name = Some(processor_info.file_name); - } else { - app.instance_docker = false; - app.instance_docker_name = None; - } - log::info!("app:{:?}", app); + let mut running_processors = running_processor.lock().await; + let processor_instances = &mut running_processors.processors; + processor_instances.entry(processor_info.app_hash).and_modify(|app| { + app.instance = instance; + if run_as_docker { + app.instance_docker = true; + app.instance_docker_name = Some(processor_info.file_name); + } else { + app.instance_docker = false; + app.instance_docker_name = None; + } + }); + + log::info!("app:{:?}", running_processors); Ok(()) } async fn processor_task( @@ -520,7 +527,7 @@ async fn processor_task( } } } - if start_flag { + let status = if start_flag { log::info!("===============run processor================="); let result = processor_run_task( data_path, @@ -531,12 +538,15 @@ async fn processor_task( ) .await; log::info!("start processor result:{:?}", result); - let mut processor = running_processor.lock().await; - processor.running = RunStatus::Downloaded; + RunStatus::Downloaded } else { - let mut processor = running_processor.lock().await; - processor.running = RunStatus::Pending; - } + RunStatus::Pending + }; + let mut running_processors = running_processor.lock().await; + let processor_instances = &mut running_processors.processors; + processor_instances + .entry(processor_info.app_hash) + .and_modify(|instance| instance.running = status); Ok(()) } @@ -777,15 +787,24 @@ where log::info!("ip_address:{:?}", ip_address); - let processor_run: Option = - parachain.runtime_api().processor_run(hash, Vec::from(ip_address))?; - - log::info!("processor download info:{:?}", processor_run); + let processor_infos: Vec = + parachain.runtime_api().processor_run(hash, xx.into())?; - match processor_run { - Some(pcsInfo) => { - let mut processor = running_processor.lock().await; + log::info!("processor download info:{:?}", processor_infos); + { + let mut running_processors = running_processor.lock().await; + let processors = &mut running_processors.processors; + for processor_info in processor_infos { + let app_hash = processor_info.app_hash; + let processor = processors.entry(app_hash).or_insert(ProcessorInstance { + app_hash, + running: RunStatus::Pending, + processor_info: None, + instance: None, + instance_docker: false, + instance_docker_name: None, + }); let run_status = &processor.running; if *run_status == RunStatus::Pending { @@ -801,15 +820,14 @@ where }; tokio::spawn(processor_task( data_path.clone(), - pcsInfo, + processor_info, running_processor.clone(), run_args, option_args, )); } - }, - None => log::info!("Processor None"), - }; + } + } //Layer2 client process let should_load: Option = parachain.runtime_api().shuld_load(hash, xx.into())?; log::info!("app download info of sequencer's group:{:?}", should_load); @@ -985,18 +1003,12 @@ async fn relay_chain_notification( instance2_docker_name: None, cur_ins: InstanceIndex::Instance1, })); - let runing_processor = Arc::new(Mutex::new(RunningProcessor { - running: RunStatus::Pending, - processor_info: None, - instance: None, - instance_docker: false, - instance_docker_name: None, - })); + let runing_processor = Arc::new(Mutex::new(RunningProcessor { processors: HashMap::new() })); loop { select! { h = new_best_heads.next() => { match h { - Some((height, head, hash)) => { + Some((_height, head, _hash)) => { let _ = handle_new_best_parachain_head(head, &*parachain,keystore.clone(), data_path.clone(), runing_app.clone(),runing_processor.clone(), backend.clone()).await; }, None => { diff --git a/pallets/container/src/benchmarking.rs b/pallets/container/src/benchmarking.rs index 16fafbb..b3bd257 100644 --- a/pallets/container/src/benchmarking.rs +++ b/pallets/container/src/benchmarking.rs @@ -25,16 +25,32 @@ benchmarks! { let file_size = 123; let args = Some(BoundedVec::try_from("--chain dev".as_bytes().to_vec()).unwrap()); let log = Some(BoundedVec::try_from("aaaa".as_bytes().to_vec()).unwrap()); + let consensus_client = AppClient{ + app_hash: hash, + file_name:file_name.clone(), + size: file_size, + args:args.clone(), + log:log.clone(), + is_docker_image: None, + docker_image: None, + }; + let batch_client = AppClient{ + app_hash: hash, + file_name, + size: file_size, + args, + log, + is_docker_image: None, + docker_image: None, + }; let caller: T::AccountId = whitelisted_caller(); - }: _(RawOrigin::Signed(caller), hash, + }: _(RawOrigin::Signed(caller), project_name, - file_name, - file_size, - args, - log) + Box::new(consensus_client), + Box::new(batch_client)) verify { let app = APPInfoMap::::get(1).unwrap(); - assert_eq!(app.app_hash, H256::from([1; 32])); + assert_eq!(app.consensus_client.app_hash, H256::from([1; 32])); } } diff --git a/pallets/container/src/lib.rs b/pallets/container/src/lib.rs index 6242dc2..ad81291 100644 --- a/pallets/container/src/lib.rs +++ b/pallets/container/src/lib.rs @@ -14,43 +14,53 @@ mod benchmarking; pub mod weights; use codec::{Decode, Encode, MaxEncodedLen}; use cumulus_primitives_core::relay_chain::Hash; +use derivative::Derivative; use frame_support::pallet_prelude::*; use frame_system::pallet_prelude::*; use pallet_sequencer_grouping::SequencerGroup; use primitives_container::{DownloadInfo, ProcessorDownloadInfo}; use scale_info::{prelude::vec::Vec, TypeInfo}; use sp_runtime::BoundedVec; -use sp_std::vec; +use sp_std::{boxed::Box, vec}; pub use weights::*; -#[derive(Encode, Decode, Default, Clone, TypeInfo, MaxEncodedLen, Debug)] -#[scale_info(skip_type_params(T))] -pub struct APPInfo { - app_hash: Hash, - creator: T::AccountId, - project_name: BoundedVec, - file_name: BoundedVec, - uploaded: bool, - size: u32, - args: Option>, - log: Option>, - is_docker_image: Option, - docker_image: Option>, +#[derive(Derivative, Encode, Decode, TypeInfo, MaxEncodedLen)] +#[derivative( + Clone(bound = ""), + Eq(bound = ""), + PartialEq(bound = ""), + Debug(bound = ""), + Default(bound = "") +)] +#[codec(encode_bound())] +#[codec(decode_bound())] +#[scale_info(bounds(), skip_type_params(T))] +pub struct AppClient { + pub app_hash: Hash, + + pub file_name: BoundedVec, + + pub size: u32, + + pub args: Option>, + + pub log: Option>, + + pub is_docker_image: Option, + + pub docker_image: Option>, } #[derive(Encode, Decode, Default, Clone, TypeInfo, MaxEncodedLen, Debug)] #[scale_info(skip_type_params(T))] -pub struct ProcessorInfo { - app_hash: Hash, +pub struct APPInfo { creator: T::AccountId, - ip_address: BoundedVec, + project_name: BoundedVec, - file_name: BoundedVec, - size: u32, - args: Option>, - log: Option>, - is_docker_image: Option, - docker_image: Option>, + + consensus_client: AppClient, + + batch_client: AppClient, } #[frame_support::pallet] @@ -81,9 +91,6 @@ pub mod pallet { #[pallet::constant] type MaxArgLength: Get; - - #[pallet::constant] - type MaxLengthIP: Get; } #[pallet::type_value] @@ -102,11 +109,6 @@ pub mod pallet { #[pallet::getter(fn appinfo_map)] pub type APPInfoMap = StorageMap<_, Twox64Concat, u32, APPInfo, OptionQuery>; - #[pallet::storage] - #[pallet::getter(fn processorinfo_map)] - pub type ProcessorInfoMap = - StorageMap<_, Twox64Concat, u32, ProcessorInfo, OptionQuery>; - // app_id,inuse #[pallet::storage] #[pallet::getter(fn inuse_map)] @@ -121,11 +123,15 @@ pub mod pallet { #[pallet::generate_deposit(pub(super) fn deposit_event)] pub enum Event { ReisterApp { + creator: T::AccountId, appid: u32, project_name: BoundedVec, - file_name: BoundedVec, - hash: Hash, - size: u32, + consensus_client: BoundedVec, + consensus_hash: Hash, + consensus_size: u32, + batch_client: BoundedVec, + batch_hash: Hash, + batch_size: u32, }, SetDownloadURL { url: BoundedVec, @@ -188,32 +194,22 @@ pub mod pallet { #[pallet::weight(::WeightInfo::register_app())] pub fn register_app( origin: OriginFor, - app_hash: Hash, project_name: BoundedVec, - file_name: BoundedVec, - size: u32, - args: Option>, - log: Option>, - is_docker_image: Option, - docker_image: Option>, + consensus_client: Box>, + batch_client: Box>, ) -> DispatchResult { let who = ensure_signed(origin)?; let old_application_id = NextApplicationID::::get(); - + let consensus_app = *consensus_client; + let batch_app = *batch_client; APPInfoMap::::insert( old_application_id, APPInfo { - app_hash, - creator: who, + creator: who.clone(), project_name: project_name.clone(), - file_name: file_name.clone(), - uploaded: false, - size, - args, - log, - is_docker_image, - docker_image, + consensus_client: consensus_app.clone(), + batch_client: batch_app.clone(), }, ); @@ -225,11 +221,15 @@ pub mod pallet { InuseMap::::put(inuse_apps); Pallet::::deposit_event(Event::::ReisterApp { + creator: who, appid: old_application_id, project_name, - file_name, - hash: app_hash, - size, + consensus_client: consensus_app.file_name, + consensus_hash: consensus_app.app_hash, + consensus_size: consensus_app.size, + batch_client: batch_app.file_name, + batch_hash: batch_app.app_hash, + batch_size: batch_app.size, }); Ok(()) @@ -248,103 +248,6 @@ pub mod pallet { Pallet::::deposit_event(Event::::SetDownloadURL { url }); Ok(()) } - - #[pallet::call_index(2)] - #[pallet::weight(::WeightInfo::register_app())] - pub fn register_processor( - origin: OriginFor, - ip_address: BoundedVec, - app_hash: Hash, - project_name: BoundedVec, - file_name: BoundedVec, - size: u32, - args: Option>, - log: Option>, - is_docker_image: Option, - docker_image: Option>, - ) -> DispatchResult { - let who = ensure_signed(origin)?; - - let max_app_id = NextApplicationID::::get(); - - let mut processor_id = 0; - - for app_id in 1..max_app_id { - let app_info = APPInfoMap::::get(app_id); - if let Some(app) = app_info { - if app.project_name == project_name { - processor_id = app_id; - - ensure!(app.creator == who, Error::::AccountInconsistent); - } - } - } - ensure!(processor_id > 0, Error::::AppNotExist); - ProcessorInfoMap::::insert( - processor_id, - ProcessorInfo { - app_hash, - creator: who, - ip_address, - project_name: project_name.clone(), - file_name: file_name.clone(), - size, - args, - log, - is_docker_image, - docker_image, - }, - ); - Ok(()) - } - - #[pallet::call_index(3)] - #[pallet::weight(::WeightInfo::register_app())] - pub fn update_processor( - origin: OriginFor, - ip_address: BoundedVec, - app_hash: Hash, - project_name: BoundedVec, - file_name: BoundedVec, - size: u32, - args: Option>, - log: Option>, - is_docker_image: Option, - docker_image: Option>, - ) -> DispatchResult { - let who = ensure_signed(origin)?; - - let max_app_id = NextApplicationID::::get(); - - let mut processor_id = 0; - - for app_id in 1..max_app_id { - let app_info = APPInfoMap::::get(app_id); - if let Some(app) = app_info { - if app.project_name == project_name { - processor_id = app_id; - - ensure!(app.creator == who, Error::::AccountInconsistent); - } - } - } - ensure!(processor_id > 0, Error::::AppNotExist); - ProcessorInfoMap::::mutate(processor_id, |info| { - *info = Some(ProcessorInfo { - app_hash, - creator: who, - ip_address, - project_name: project_name.clone(), - file_name: file_name.clone(), - size, - args, - log, - is_docker_image, - docker_image, - }) - }); - Ok(()) - } } } @@ -362,22 +265,24 @@ impl Pallet { let url = DefaultUrl::::get()?; - let args = app_info.args.and_then(|log| Some(log.as_slice().to_vec())); + let consensus_client = app_info.consensus_client; + + let args = consensus_client.args.and_then(|log| Some(log.as_slice().to_vec())); - let log = app_info.log.and_then(|log| Some(log.as_slice().to_vec())); + let log = consensus_client.log.and_then(|log| Some(log.as_slice().to_vec())); let is_docker_image = - if let Some(is_docker) = app_info.is_docker_image { is_docker } else { false }; + if let Some(is_docker) = consensus_client.is_docker_image { is_docker } else { false }; - let docker_image = app_info + let docker_image = consensus_client .docker_image .and_then(|docker_image| Some(docker_image.as_slice().to_vec())); Some(DownloadInfo { app_id, - app_hash: app_info.app_hash, - file_name: app_info.file_name.into(), - size: app_info.size, + app_hash: consensus_client.app_hash, + file_name: consensus_client.file_name.into(), + size: consensus_client.size, group: group_id, url: url.into(), args, @@ -412,64 +317,45 @@ impl Pallet { >::all_group_ids() } - pub fn processor_run(ip: Vec) -> Option { - let max_app_id = NextApplicationID::::get(); - - let mut processor_id = 0; - - for app_id in 1..max_app_id { - let app_info = ProcessorInfoMap::::get(app_id); - if let Some(app) = app_info { - if app.ip_address == ip { - processor_id = app_id; - } - } - } - if processor_id == 0 { - return None; - } - - let groups = Self::get_groups(); - let mut valid_id = 0; - for group in groups.iter() { - let app = GroupAPPMap::::get(group); - match app { - Some(app_id) => - if processor_id == app_id { - valid_id = app_id; - break; - }, - None => {}, - } + pub fn processor_run(author: T::AccountId) -> Vec { + let processors = vec![1, 2]; + let mut download_infos: Vec = Vec::new(); + if Self::get_groups().len() == 0 { + return download_infos; } - if processor_id == 0 { - return None; - } - let app_id = valid_id; - let app_info = ProcessorInfoMap::::get(app_id).ok_or(Error::::AppNotExist).ok()?; + let url = DefaultUrl::::get().expect("Need set url"); - let url = DefaultUrl::::get()?; + for app_id in processors { + let p_app_info = APPInfoMap::::get(app_id); - let args = app_info.args.and_then(|log| Some(log.as_slice().to_vec())); + if let Some(app_info) = p_app_info { + let batch_client = app_info.batch_client; - let log = app_info.log.and_then(|log| Some(log.as_slice().to_vec())); + let args = batch_client.args.and_then(|log| Some(log.as_slice().to_vec())); - let is_docker_image = - if let Some(is_docker) = app_info.is_docker_image { is_docker } else { false }; + let log = batch_client.log.and_then(|log| Some(log.as_slice().to_vec())); - let docker_image = app_info - .docker_image - .and_then(|docker_image| Some(docker_image.as_slice().to_vec())); + let is_docker_image = if let Some(is_docker) = batch_client.is_docker_image { + is_docker + } else { + false + }; - Some(ProcessorDownloadInfo { - app_hash: app_info.app_hash, - file_name: app_info.file_name.into(), - size: app_info.size, - url: url.into(), - args, - log, - is_docker_image, - docker_image, - }) + let docker_image = batch_client + .docker_image + .and_then(|docker_image| Some(docker_image.as_slice().to_vec())); + download_infos.push(ProcessorDownloadInfo { + app_hash: batch_client.app_hash, + file_name: batch_client.file_name.into(), + size: batch_client.size, + url: url.clone().into(), + args, + log, + is_docker_image, + docker_image, + }); + } + } + download_infos } } diff --git a/pallets/container/src/tests.rs b/pallets/container/src/tests.rs index 8db13d8..ea14f0c 100644 --- a/pallets/container/src/tests.rs +++ b/pallets/container/src/tests.rs @@ -1,4 +1,4 @@ -use crate::mock::*; +use crate::{mock::*, AppClient}; use frame_support::assert_ok; use sp_core::H256; use sp_runtime::BoundedVec; @@ -22,18 +22,31 @@ fn it_works_for_default_value() { #[test] fn register_app() { new_test_ext().execute_with(|| { + let consensus_client = AppClient { + app_hash: H256::from([1; 32]), + file_name: BoundedVec::try_from("test".as_bytes().to_vec()).unwrap(), + size: 123, + args: Some(BoundedVec::try_from("--chain dev".as_bytes().to_vec()).unwrap()), + log: None, + is_docker_image: None, + docker_image: None, + }; + let batch_client = AppClient { + app_hash: H256::from([1; 32]), + file_name: BoundedVec::try_from("test".as_bytes().to_vec()).unwrap(), + size: 123, + args: Some(BoundedVec::try_from("--chain dev".as_bytes().to_vec()).unwrap()), + log: None, + is_docker_image: None, + docker_image: None, + }; assert_ok!(ContainerModule::register_app( RuntimeOrigin::signed(1), - H256::from([1; 32]), BoundedVec::try_from("test".as_bytes().to_vec()).unwrap(), - BoundedVec::try_from("test".as_bytes().to_vec()).unwrap(), - 123, - Some(BoundedVec::try_from("--chain dev".as_bytes().to_vec()).unwrap()), - None, - None, - None, + Box::new(consensus_client), + Box::new(batch_client), )); let app = ContainerModule::appinfo_map(1).unwrap(); - assert_eq!(app.app_hash, H256::from([1; 32])); + assert_eq!(app.consensus_client.app_hash, H256::from([1; 32])); }); } diff --git a/pallets/container/src/weights.rs b/pallets/container/src/weights.rs index d392594..90c051e 100644 --- a/pallets/container/src/weights.rs +++ b/pallets/container/src/weights.rs @@ -1,33 +1,35 @@ -//! Autogenerated weights for pallet_template +//! Autogenerated weights for `pallet_container` //! -//! THIS FILE WAS AUTO-GENERATED USING THE SUBSTRATE BENCHMARK CLI VERSION 4.0.0-dev -//! DATE: 2023-04-06, STEPS: `50`, REPEAT: `20`, LOW RANGE: `[]`, HIGH RANGE: `[]` +//! THIS FILE WAS AUTO-GENERATED USING THE SUBSTRATE BENCHMARK CLI VERSION 35.0.1 +//! DATE: 2024-05-23, STEPS: `50`, REPEAT: `20`, LOW RANGE: `[]`, HIGH RANGE: `[]` //! WORST CASE MAP SIZE: `1000000` -//! HOSTNAME: `Alexs-MacBook-Pro-2.local`, CPU: `` -//! EXECUTION: Some(Wasm), WASM-EXECUTION: Compiled, CHAIN: Some("dev"), DB CACHE: 1024 +//! HOSTNAME: ``, CPU: `AMD Ryzen 7 5800U with Radeon Graphics` +//! WASM-EXECUTION: `Compiled`, CHAIN: `Some("popsicle-dev")`, DB CACHE: 1024 // Executed Command: -// ../../target/release/node-template +// ./Popsicle/target/release/popsicle-node // benchmark // pallet // --chain -// dev +// popsicle-dev +// --execution=wasm +// --wasm-execution=compiled // --pallet -// pallet_template +// pallet_container // --extrinsic // * -// --steps=50 -// --repeat=20 -// --wasm-execution=compiled +// --steps +// 50 +// --repeat +// 20 // --output -// pallets/template/src/weights.rs -// --template -// ../../.maintain/frame-weight-template.hbs +// weights.rs #![cfg_attr(rustfmt, rustfmt_skip)] #![allow(unused_parens)] #![allow(unused_imports)] +#![allow(missing_docs)] use frame_support::{traits::Get, weights::{Weight, constants::RocksDbWeight}}; use core::marker::PhantomData; @@ -47,8 +49,8 @@ impl WeightInfo for SubstrateWeight { // Proof Size summary in bytes: // Measured: `0` // Estimated: `0` - // Minimum execution time: 3_767_000 picoseconds. - Weight::from_parts(3_947_000, 0) + // Minimum execution time: 6_329_000 picoseconds. + Weight::from_parts(6_540_000, 0) .saturating_add(Weight::from_parts(0, 0)) .saturating_add(T::DbWeight::get().writes(1)) } @@ -57,13 +59,13 @@ impl WeightInfo for SubstrateWeight { /// Storage: `ContainerPallet::InuseMap` (r:1 w:1) /// Proof: `ContainerPallet::InuseMap` (`max_values`: Some(1), `max_size`: Some(102), added: 597, mode: `MaxEncodedLen`) /// Storage: `ContainerPallet::APPInfoMap` (r:0 w:1) - /// Proof: `ContainerPallet::APPInfoMap` (`max_values`: None, `max_size`: Some(700), added: 3175, mode: `MaxEncodedLen`) + /// Proof: `ContainerPallet::APPInfoMap` (`max_values`: None, `max_size`: Some(2136), added: 4611, mode: `MaxEncodedLen`) fn register_app() -> Weight { // Proof Size summary in bytes: // Measured: `42` // Estimated: `1587` - // Minimum execution time: 14_487_000 picoseconds. - Weight::from_parts(14_969_000, 0) + // Minimum execution time: 18_244_000 picoseconds. + Weight::from_parts(18_766_000, 0) .saturating_add(Weight::from_parts(0, 1587)) .saturating_add(T::DbWeight::get().reads(2)) .saturating_add(T::DbWeight::get().writes(3)) @@ -78,8 +80,8 @@ impl WeightInfo for () { // Proof Size summary in bytes: // Measured: `0` // Estimated: `0` - // Minimum execution time: 3_767_000 picoseconds. - Weight::from_parts(3_947_000, 0) + // Minimum execution time: 6_329_000 picoseconds. + Weight::from_parts(6_540_000, 0) .saturating_add(Weight::from_parts(0, 0)) .saturating_add(RocksDbWeight::get().writes(1)) } @@ -88,13 +90,13 @@ impl WeightInfo for () { /// Storage: `ContainerPallet::InuseMap` (r:1 w:1) /// Proof: `ContainerPallet::InuseMap` (`max_values`: Some(1), `max_size`: Some(102), added: 597, mode: `MaxEncodedLen`) /// Storage: `ContainerPallet::APPInfoMap` (r:0 w:1) - /// Proof: `ContainerPallet::APPInfoMap` (`max_values`: None, `max_size`: Some(700), added: 3175, mode: `MaxEncodedLen`) + /// Proof: `ContainerPallet::APPInfoMap` (`max_values`: None, `max_size`: Some(2136), added: 4611, mode: `MaxEncodedLen`) fn register_app() -> Weight { // Proof Size summary in bytes: // Measured: `42` // Estimated: `1587` - // Minimum execution time: 14_487_000 picoseconds. - Weight::from_parts(14_969_000, 0) + // Minimum execution time: 18_244_000 picoseconds. + Weight::from_parts(18_766_000, 0) .saturating_add(Weight::from_parts(0, 1587)) .saturating_add(RocksDbWeight::get().reads(2)) .saturating_add(RocksDbWeight::get().writes(3)) diff --git a/primitives/container/src/lib.rs b/primitives/container/src/lib.rs index ef466a8..774521f 100644 --- a/primitives/container/src/lib.rs +++ b/primitives/container/src/lib.rs @@ -38,6 +38,6 @@ sp_api::decl_runtime_apis! { fn should_run()-> bool; fn get_group_id(author:AuthorityId) ->u32; fn get_groups()->Vec; - fn processor_run(ip:Vec)->Option; + fn processor_run(author:AuthorityId)->Vec; } } diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 4a64859..6a130b5 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -585,7 +585,6 @@ parameter_types! { pub const MaxUrlLength: u32 = 300; pub const MaxArgCount: u32 = 10; pub const MaxArgLength: u32 = 100; - pub const MaxLengthIP: u32 = 30; } impl pallet_container::Config for Runtime { @@ -596,7 +595,6 @@ impl pallet_container::Config for Runtime { type MaxUrlLength = MaxUrlLength; type MaxArgCount = MaxArgCount; type MaxArgLength = MaxArgLength; - type MaxLengthIP = MaxLengthIP; } // Create the runtime by composing the FRAME pallets that were previously configured. @@ -812,8 +810,8 @@ impl_runtime_apis! { ContainerPallet::should_run() } - fn processor_run(ip:Vec)->Option { - ContainerPallet::processor_run(ip) + fn processor_run(author:AccountId32)->Vec { + ContainerPallet::processor_run(author) } } From f91cf619e9f2fb5eef1adb523ad6c867db78712b Mon Sep 17 00:00:00 2001 From: sulijia <984115358@qq.com> Date: Sat, 25 May 2024 21:37:43 +0800 Subject: [PATCH 05/16] Feat:Bind application parameters to hash --- node/src/container_task.rs | 158 +++++++++++++++++++------------------ 1 file changed, 81 insertions(+), 77 deletions(-) diff --git a/node/src/container_task.rs b/node/src/container_task.rs index dd798b7..ddfbb75 100644 --- a/node/src/container_task.rs +++ b/node/src/container_task.rs @@ -13,7 +13,7 @@ use ring::digest::{Context, Digest, SHA256}; use sc_client_api::UsageProvider; use sc_service::TaskManager; use sp_api::ProvideRuntimeApi; -use sp_core::{offchain::OffchainStorage, H256}; +use sp_core::{hexdisplay::HexDisplay, offchain::OffchainStorage, H256}; use sp_keystore::KeystorePtr; use sp_offchain::STORAGE_PREFIX; use sp_runtime::{ @@ -32,11 +32,11 @@ use std::{ sync::Arc, }; -pub const RUN_ARGS_KEY: &[u8] = b"run_args"; -pub const SYNC_ARGS_KEY: &[u8] = b"sync_args"; -pub const OPTION_ARGS_KEY: &[u8] = b"option_args"; -pub const P_RUN_ARGS_KEY: &[u8] = b"p_run_args"; -pub const P_OPTION_ARGS_KEY: &[u8] = b"p_option_args"; +pub const RUN_ARGS_KEY: &str = "run_args"; +pub const SYNC_ARGS_KEY: &str = "sync_args"; +pub const OPTION_ARGS_KEY: &str = "option_args"; +pub const P_RUN_ARGS_KEY: &str = "p_run_args"; +pub const P_OPTION_ARGS_KEY: &str = "p_option_args"; struct PartialRangeIter { start: u64, end: u64, @@ -261,6 +261,7 @@ enum RunStatus { struct RunningApp { group_id: u32, app_id: u32, + app_hash: H256, running: RunStatus, app_info: Option, instance1: Option, @@ -741,6 +742,21 @@ async fn app_run_task( Ok(()) } +async fn get_offchain_storage( + offchain_storage: Option, + args: &[u8], +) -> Option> +where + Block: BlockT, + TBackend: 'static + sc_client_api::backend::Backend + Send, +{ + if let Some(storage) = offchain_storage { + storage.get(&STORAGE_PREFIX, args) + } else { + None + } +} + async fn handle_new_best_parachain_head( validation_data: PersistedValidationData, parachain: &P, @@ -759,17 +775,6 @@ where { let offchain_storage = backend.offchain_storage(); - let (mut run_args, mut sync_args, mut option_args) = - if let Some(storage) = offchain_storage.clone() { - let prefix = &STORAGE_PREFIX; - ( - storage.get(prefix, RUN_ARGS_KEY), - storage.get(prefix, SYNC_ARGS_KEY), - storage.get(prefix, OPTION_ARGS_KEY), - ) - } else { - (None, None, None) - }; // Check if there is a download task let head = validation_data.clone().parent_head.0; @@ -800,7 +805,7 @@ where let processor = processors.entry(app_hash).or_insert(ProcessorInstance { app_hash, running: RunStatus::Pending, - processor_info: None, + processor_info: Some(processor_info.clone()), instance: None, instance_docker: false, instance_docker_name: None, @@ -809,15 +814,21 @@ where if *run_status == RunStatus::Pending { processor.running = RunStatus::Downloading; - - let (mut run_args, mut option_args) = if let Some(storage) = - offchain_storage.clone() - { - let prefix = &STORAGE_PREFIX; - (storage.get(prefix, P_RUN_ARGS_KEY), storage.get(prefix, P_OPTION_ARGS_KEY)) - } else { - (None, None) - }; + let app_hash = processor.app_hash; + let p_run_args_key = + format!("{}:{}", P_RUN_ARGS_KEY, HexDisplay::from(&app_hash.as_bytes())); + let p_option_args_key = + format!("{}:{}", P_OPTION_ARGS_KEY, HexDisplay::from(&app_hash.as_bytes())); + let run_args = get_offchain_storage::<_, TBackend>( + offchain_storage.clone(), + p_run_args_key.as_bytes(), + ) + .await; + let option_args = get_offchain_storage::<_, TBackend>( + offchain_storage.clone(), + p_option_args_key.as_bytes(), + ) + .await; tokio::spawn(processor_task( data_path.clone(), processor_info, @@ -841,39 +852,35 @@ where Some(app_info) => { let new_group = app_info.group; let app_id = app_info.app_id; + let app_hash = app_info.app_hash; let run_status = &app.running; if old_group_id != new_group && *run_status == RunStatus::Pending { - if sync_args == None { - sync_args = if let Some(storage) = offchain_storage.clone() { - let prefix = &STORAGE_PREFIX; - let sync_args = format!( - "{}:{}", - std::str::from_utf8(SYNC_ARGS_KEY).unwrap(), - app_id - ); - storage.get(prefix, sync_args.as_bytes()) - } else { - None - }; - } + let sync_args_key = + format!("{}:{}", SYNC_ARGS_KEY, HexDisplay::from(&app_hash.as_bytes())); + + let option_args_key = + format!("{}:{}", OPTION_ARGS_KEY, HexDisplay::from(&app_hash.as_bytes())); + + let sync_args = get_offchain_storage::<_, TBackend>( + offchain_storage.clone(), + sync_args_key.as_bytes(), + ) + .await; + + let option_args = get_offchain_storage::<_, TBackend>( + offchain_storage.clone(), + option_args_key.as_bytes(), + ) + .await; + log::info!("offchain_storage of sync_args:{:?}", sync_args); - if option_args == None { - option_args = if let Some(storage) = offchain_storage.clone() { - let prefix = &STORAGE_PREFIX; - let option_args = format!( - "{}:{}", - std::str::from_utf8(OPTION_ARGS_KEY).unwrap(), - app_id - ); - - storage.get(prefix, option_args.as_bytes()) - } else { - None - }; - } + log::info!("offchain_storage of option_args:{:?}", option_args); + app.running = RunStatus::Downloading; + app.app_id = app_id; + app.app_hash = app_hash; tokio::spawn(app_download_task( data_path.clone(), app_info, @@ -892,32 +899,28 @@ where if should_run { let mut app = running_app.lock().await; let run_status = &app.running; - let app_id = app.app_id; + let app_hash = app.app_hash; if let Some(app_info) = app.app_info.clone() { if *run_status == RunStatus::Downloaded { log::info!("run:{:?}", app); - if run_args == None { - run_args = if let Some(storage) = offchain_storage.clone() { - let prefix = &STORAGE_PREFIX; - let run_args = - format!("{}:{}", std::str::from_utf8(RUN_ARGS_KEY).unwrap(), app_id); - - storage.get(prefix, run_args.as_bytes()) - } else { - None - }; - } - if option_args == None { - option_args = if let Some(storage) = offchain_storage { - let prefix = &STORAGE_PREFIX; - let option_args = - format!("{}:{}", std::str::from_utf8(OPTION_ARGS_KEY).unwrap(), app_id); - - storage.get(prefix, option_args.as_bytes()) - } else { - None - }; - } + let run_args_key = + format!("{}:{}", RUN_ARGS_KEY, HexDisplay::from(&app_hash.as_bytes())); + + let option_args_key = + format!("{}:{}", OPTION_ARGS_KEY, HexDisplay::from(&app_hash.as_bytes())); + + let run_args = get_offchain_storage::<_, TBackend>( + offchain_storage.clone(), + run_args_key.as_bytes(), + ) + .await; + + let option_args = get_offchain_storage::<_, TBackend>( + offchain_storage.clone(), + option_args_key.as_bytes(), + ) + .await; + log::info!("offchain_storage of run_args:{:?}", run_args); log::info!("offchain_storage of option_args:{:?}", option_args); tokio::spawn(app_run_task( data_path, @@ -993,6 +996,7 @@ async fn relay_chain_notification( let runing_app = Arc::new(Mutex::new(RunningApp { group_id: 0xFFFFFFFF, app_id: 0xFFFFFFFF, + app_hash: Default::default(), running: RunStatus::Pending, app_info: None, instance1: None, From 6677ba88cf200cc06be4ed31460e40054e26a595 Mon Sep 17 00:00:00 2001 From: sulijia <984115358@qq.com> Date: Mon, 27 May 2024 23:18:57 +0800 Subject: [PATCH 06/16] Feat:merge processor group code --- pallets/container/src/lib.rs | 2 +- .../sequencer-grouping/src/benchmarking.rs | 6 +- pallets/sequencer-grouping/src/lib.rs | 18 ++- pallets/sequencer-grouping/src/tests.rs | 126 ++++++++++-------- 4 files changed, 86 insertions(+), 66 deletions(-) diff --git a/pallets/container/src/lib.rs b/pallets/container/src/lib.rs index ad81291..3d719dd 100644 --- a/pallets/container/src/lib.rs +++ b/pallets/container/src/lib.rs @@ -318,7 +318,7 @@ impl Pallet { } pub fn processor_run(author: T::AccountId) -> Vec { - let processors = vec![1, 2]; + let processors = >::get_group_ids(author); let mut download_infos: Vec = Vec::new(); if Self::get_groups().len() == 0 { return download_infos; diff --git a/pallets/sequencer-grouping/src/benchmarking.rs b/pallets/sequencer-grouping/src/benchmarking.rs index 89f1d37..73416ed 100644 --- a/pallets/sequencer-grouping/src/benchmarking.rs +++ b/pallets/sequencer-grouping/src/benchmarking.rs @@ -5,11 +5,9 @@ use super::*; #[allow(unused)] use crate::Pallet as SequencerGrouping; use frame_benchmarking::{account, benchmarks, impl_benchmark_test_suite}; -use frame_support::BoundedVec; -use frame_support::pallet_prelude::Get; +use frame_support::{pallet_prelude::Get, BoundedVec}; use frame_system::RawOrigin; -use sp_std::vec; -use sp_std::vec::Vec; +use sp_std::{vec, vec::Vec}; benchmarks! { set_group_metric { diff --git a/pallets/sequencer-grouping/src/lib.rs b/pallets/sequencer-grouping/src/lib.rs index df42efc..cc4a412 100644 --- a/pallets/sequencer-grouping/src/lib.rs +++ b/pallets/sequencer-grouping/src/lib.rs @@ -97,8 +97,14 @@ pub mod pallet { #[pallet::storage] #[pallet::getter(fn processor_info)] - pub(crate) type ProcessorInfo = StorageValue<_, BoundedVec< - (T::AccountId, BoundedVec, BoundedVec), T::MaxRunningAPP>, ValueQuery>; + pub(crate) type ProcessorInfo = StorageValue< + _, + BoundedVec< + (T::AccountId, BoundedVec, BoundedVec), + T::MaxRunningAPP, + >, + ValueQuery, + >; #[pallet::genesis_config] pub struct GenesisConfig { @@ -187,7 +193,9 @@ pub mod pallet { let who = ensure_signed(origin)?; let mut processor_info = crate::pallet::ProcessorInfo::::get(); - if let Some(existing) = processor_info.iter_mut().find(|(account, _, _)| account == &who) { + if let Some(existing) = + processor_info.iter_mut().find(|(account, _, _)| account == &who) + { if existing.1 == ip_address { return Ok(()); } @@ -261,7 +269,9 @@ pub mod pallet { pub fn get_group_ids(account: T::AccountId) -> Vec { let processor_info = ProcessorInfo::::get(); - if let Some((_, _, group_ids)) = processor_info.iter().find(|(acc, _, _)| *acc == account) { + if let Some((_, _, group_ids)) = + processor_info.iter().find(|(acc, _, _)| *acc == account) + { group_ids.clone().into_inner() } else { Vec::new() diff --git a/pallets/sequencer-grouping/src/tests.rs b/pallets/sequencer-grouping/src/tests.rs index c2a0fe1..ea59c2c 100644 --- a/pallets/sequencer-grouping/src/tests.rs +++ b/pallets/sequencer-grouping/src/tests.rs @@ -4,8 +4,7 @@ use crate::{ Error, Event, GroupMembers, NextRound, SequencerGroup, }; use frame_support::{assert_noop, assert_ok, pallet_prelude::Get}; -use sp_core::bounded::BoundedVec; -use sp_core::bounded_vec; +use sp_core::{bounded::BoundedVec, bounded_vec}; use sp_runtime::{testing::H256, traits::BadOrigin}; #[test] @@ -76,11 +75,12 @@ fn trigger_group_works() { let parent_hash = H256::from_low_u64_be(12345); frame_system::Pallet::::set_parent_hash(parent_hash); - let ip_address: BoundedVec::MaxLengthIP> = bounded_vec![127, 0, 0, 1]; + let ip_address: BoundedVec::MaxLengthIP> = + bounded_vec![127, 0, 0, 1]; assert_ok!(SequencerGrouping::register_processor( - RuntimeOrigin::signed(1), - ip_address.clone() - )); + RuntimeOrigin::signed(1), + ip_address.clone() + )); assert_ok!(SequencerGrouping::set_group_metric(RuntimeOrigin::root(), 2, 3)); assert_ok!(SequencerGrouping::trigger_group(vec![1, 2, 3, 4, 5, 6], 20, 3)); @@ -109,11 +109,12 @@ fn account_in_group_works() { let parent_hash = H256::from_low_u64_be(12345); frame_system::Pallet::::set_parent_hash(parent_hash); - let ip_address: BoundedVec::MaxLengthIP> = bounded_vec![127, 0, 0, 1]; + let ip_address: BoundedVec::MaxLengthIP> = + bounded_vec![127, 0, 0, 1]; assert_ok!(SequencerGrouping::register_processor( - RuntimeOrigin::signed(1), - ip_address.clone() - )); + RuntimeOrigin::signed(1), + ip_address.clone() + )); assert_ok!(SequencerGrouping::set_group_metric(RuntimeOrigin::root(), 2, 3)); assert_ok!(SequencerGrouping::trigger_group(vec![1, 2, 3, 4, 5, 6], 1, 1)); println!("Group Members: {:?}", GroupMembers::::get()); @@ -134,11 +135,12 @@ fn account_in_group_fails() { let parent_hash = H256::from_low_u64_be(12345); frame_system::Pallet::::set_parent_hash(parent_hash); - let ip_address: BoundedVec::MaxLengthIP> = bounded_vec![127, 0, 0, 1]; + let ip_address: BoundedVec::MaxLengthIP> = + bounded_vec![127, 0, 0, 1]; assert_ok!(SequencerGrouping::register_processor( - RuntimeOrigin::signed(1), - ip_address.clone() - )); + RuntimeOrigin::signed(1), + ip_address.clone() + )); assert_ok!(SequencerGrouping::set_group_metric(RuntimeOrigin::root(), 2, 3)); assert_ok!(SequencerGrouping::trigger_group(vec![1, 2, 3, 4, 5, 6], 1, 1)); @@ -155,11 +157,12 @@ fn all_group_ids_works() { let parent_hash = H256::from_low_u64_be(12345); frame_system::Pallet::::set_parent_hash(parent_hash); - let ip_address: BoundedVec::MaxLengthIP> = bounded_vec![127, 0, 0, 1]; + let ip_address: BoundedVec::MaxLengthIP> = + bounded_vec![127, 0, 0, 1]; assert_ok!(SequencerGrouping::register_processor( - RuntimeOrigin::signed(1), - ip_address.clone() - )); + RuntimeOrigin::signed(1), + ip_address.clone() + )); assert_ok!(SequencerGrouping::set_group_metric(RuntimeOrigin::root(), 2, 3)); assert_ok!(SequencerGrouping::trigger_group(vec![1, 2, 3, 4, 5, 6], 15, 2)); @@ -174,11 +177,12 @@ fn get_next_round_works() { new_test_ext().execute_with(|| { assert_ok!(SequencerGrouping::set_group_metric(RuntimeOrigin::root(), 2, 3)); - let ip_address: BoundedVec::MaxLengthIP> = bounded_vec![127, 0, 0, 1]; + let ip_address: BoundedVec::MaxLengthIP> = + bounded_vec![127, 0, 0, 1]; assert_ok!(SequencerGrouping::register_processor( - RuntimeOrigin::signed(1), - ip_address.clone() - )); + RuntimeOrigin::signed(1), + ip_address.clone() + )); assert_ok!(SequencerGrouping::trigger_group(vec![1, 2, 3, 4, 5, 6], 16, 3)); println!("Group Members: {:?}", GroupMembers::::get()); @@ -193,11 +197,12 @@ fn get_next_round_works() { fn register_new_processor_works() { new_test_ext().execute_with(|| { let account_id: u64 = 1; - let ip_address: BoundedVec::MaxLengthIP> = bounded_vec![127, 0, 0, 1]; + let ip_address: BoundedVec::MaxLengthIP> = + bounded_vec![127, 0, 0, 1]; assert_ok!(SequencerGrouping::register_processor( - RuntimeOrigin::signed(account_id), - ip_address.clone() - )); + RuntimeOrigin::signed(account_id), + ip_address.clone() + )); let processor_info = SequencerGrouping::processor_info(); assert_eq!(processor_info.len(), 1); @@ -210,17 +215,19 @@ fn register_new_processor_works() { fn update_existing_processor_ip_works() { new_test_ext().execute_with(|| { let account_id: u64 = 1; - let ip_address: BoundedVec::MaxLengthIP> = bounded_vec![127, 0, 0, 1]; - let new_ip_address: BoundedVec::MaxLengthIP> = bounded_vec![192, 168, 0, 1]; + let ip_address: BoundedVec::MaxLengthIP> = + bounded_vec![127, 0, 0, 1]; + let new_ip_address: BoundedVec::MaxLengthIP> = + bounded_vec![192, 168, 0, 1]; assert_ok!(SequencerGrouping::register_processor( - RuntimeOrigin::signed(account_id), - ip_address.clone() - )); + RuntimeOrigin::signed(account_id), + ip_address.clone() + )); assert_ok!(SequencerGrouping::register_processor( - RuntimeOrigin::signed(account_id), - new_ip_address.clone() - )); + RuntimeOrigin::signed(account_id), + new_ip_address.clone() + )); let processor_info = SequencerGrouping::processor_info(); assert_eq!(processor_info.len(), 1); @@ -233,16 +240,17 @@ fn update_existing_processor_ip_works() { fn do_nothing_if_ip_unchanged() { new_test_ext().execute_with(|| { let account_id: u64 = 1; - let ip_address: BoundedVec::MaxLengthIP> = bounded_vec![127, 0, 0, 1]; + let ip_address: BoundedVec::MaxLengthIP> = + bounded_vec![127, 0, 0, 1]; assert_ok!(SequencerGrouping::register_processor( - RuntimeOrigin::signed(account_id), - ip_address.clone() - )); + RuntimeOrigin::signed(account_id), + ip_address.clone() + )); assert_ok!(SequencerGrouping::register_processor( - RuntimeOrigin::signed(account_id), - ip_address.clone() - )); + RuntimeOrigin::signed(account_id), + ip_address.clone() + )); let processor_info = SequencerGrouping::processor_info(); assert_eq!(processor_info.len(), 1); @@ -254,20 +262,24 @@ fn do_nothing_if_ip_unchanged() { #[test] fn too_many_processors_fails() { new_test_ext().execute_with(|| { - let ip_address: BoundedVec::MaxLengthIP> = bounded_vec![127, 0, 0, 1]; + let ip_address: BoundedVec::MaxLengthIP> = + bounded_vec![127, 0, 0, 1]; let max_running_app: u32 = ::MaxRunningAPP::get(); - for i in 1..= max_running_app { + for i in 1..=max_running_app { assert_ok!(SequencerGrouping::register_processor( - RuntimeOrigin::signed(i as u64), - ip_address.clone() - )); + RuntimeOrigin::signed(i as u64), + ip_address.clone() + )); } assert_noop!( - SequencerGrouping::register_processor(RuntimeOrigin::signed(max_running_app as u64 + 1), ip_address.clone()), - Error::::TooManyProcessors - ); + SequencerGrouping::register_processor( + RuntimeOrigin::signed(max_running_app as u64 + 1), + ip_address.clone() + ), + Error::::TooManyProcessors + ); let processor_info = SequencerGrouping::processor_info(); assert_eq!(processor_info.len(), max_running_app as usize); @@ -277,16 +289,17 @@ fn too_many_processors_fails() { #[test] fn account_with_group_ids_works() { new_test_ext().execute_with(|| { - let ip_address: BoundedVec::MaxLengthIP> = bounded_vec![127, 0, 0, 1]; + let ip_address: BoundedVec::MaxLengthIP> = + bounded_vec![127, 0, 0, 1]; assert_ok!(SequencerGrouping::register_processor( - RuntimeOrigin::signed(1), - ip_address.clone() - )); + RuntimeOrigin::signed(1), + ip_address.clone() + )); assert_ok!(SequencerGrouping::register_processor( - RuntimeOrigin::signed(2), - ip_address.clone() - )); + RuntimeOrigin::signed(2), + ip_address.clone() + )); System::set_block_number(10); let parent_hash = H256::from_low_u64_be(12345); @@ -306,4 +319,3 @@ fn account_with_group_ids_works() { assert_eq!(result, vec![1]); }); } - From 28a4d8572aade27391e9a052f2ddcf9d24f5ff32 Mon Sep 17 00:00:00 2001 From: sulijia <984115358@qq.com> Date: Tue, 28 May 2024 20:04:34 +0800 Subject: [PATCH 07/16] Feat:1.Verify whether the same hash app exists 2.change final hook to initial --- pallets/container/src/lib.rs | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/pallets/container/src/lib.rs b/pallets/container/src/lib.rs index 3d719dd..d6e940c 100644 --- a/pallets/container/src/lib.rs +++ b/pallets/container/src/lib.rs @@ -146,15 +146,20 @@ pub mod pallet { #[pallet::hooks] impl Hooks> for Pallet { - fn on_finalize(_: BlockNumberFor) { + fn on_initialize(_n: BlockNumberFor) -> Weight + where + BlockNumberFor: From, + { let groups = Self::get_groups(); log::info!("groups:{:?}", groups); let mut inuse_apps = InuseMap::::get(); log::info!("inuse_apps:{:?}", inuse_apps); - + let mut read_count = 2; + let mut write_count = 0; for group in groups.iter() { let app = GroupAPPMap::::get(group); + read_count += 1; match app { Some(_app_id) => { // TODO:alloced app to group,do nothing?? @@ -173,7 +178,7 @@ pub mod pallet { InuseMap::::mutate(|inuses| inuses[index] = true); GroupAPPMap::::insert(group, (index + 1) as u32); - + write_count += 2; break; } index += 1; @@ -185,6 +190,7 @@ pub mod pallet { } } log::info!("inuse_apps:{:?}", inuse_apps); + T::DbWeight::get().reads_writes(read_count, write_count) } } @@ -201,8 +207,21 @@ pub mod pallet { let who = ensure_signed(origin)?; let old_application_id = NextApplicationID::::get(); + let consensus_app = *consensus_client; + let batch_app = *batch_client; + + for app_id in 1..old_application_id { + let p_app_info = APPInfoMap::::get(app_id); + if let Some(app_info) = p_app_info { + assert!( + (app_info.consensus_client.app_hash != consensus_app.app_hash) && + (app_info.batch_client.app_hash != batch_app.app_hash), + "Client with the same hash exist!", + ); + } + } APPInfoMap::::insert( old_application_id, APPInfo { From baad56137ce2fab011d1f9235d30d2580abc5637 Mon Sep 17 00:00:00 2001 From: sulijia <984115358@qq.com> Date: Tue, 28 May 2024 23:01:17 +0800 Subject: [PATCH 08/16] Feat:Shut down processor client that are not in the group --- node/src/container_task.rs | 50 +++++++++++++++++++++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/node/src/container_task.rs b/node/src/container_task.rs index ddfbb75..d0a2d14 100644 --- a/node/src/container_task.rs +++ b/node/src/container_task.rs @@ -539,7 +539,7 @@ async fn processor_task( ) .await; log::info!("start processor result:{:?}", result); - RunStatus::Downloaded + RunStatus::Running } else { RunStatus::Pending }; @@ -757,6 +757,49 @@ where } } +async fn close_processor_instance( + instance: &mut ProcessorInstance, +) -> Result<(), Box> { + if instance.running == RunStatus::Running { + if instance.instance_docker { + //reomve docker container + if let Some(docker_name) = &instance.instance_docker_name { + let kill_result = remove_docker_container(std::str::from_utf8(&docker_name)?).await; + log::info!("kill old docker instance of processor:{:?}", kill_result); + } + } else { + if let Some(ref mut old_instance) = &mut instance.instance { + old_instance.kill()?; + let kill_result = old_instance.wait()?; + log::info!("kill old instance of processor:{:?}", kill_result); + } + } + } + Ok(()) +} +async fn filter_processor_instance( + processors: &mut HashMap, + processor_infos: &Vec, +) -> Vec { + let mut remove_entrys = Vec::new(); + { + for processor in &mut *processors { + let mut find_flag = false; + let app_hash = processor.0; + for processor_info in processor_infos { + if *app_hash == processor_info.app_hash { + find_flag = true; + break; + } + } + if !find_flag { + let _ = close_processor_instance(processor.1).await; + remove_entrys.push(*app_hash); + } + } + } + remove_entrys +} async fn handle_new_best_parachain_head( validation_data: PersistedValidationData, parachain: &P, @@ -800,6 +843,11 @@ where { let mut running_processors = running_processor.lock().await; let processors = &mut running_processors.processors; + // Close the old client before starting the new client + let remove_entrys = filter_processor_instance(processors, &processor_infos).await; + for remove_entry in remove_entrys { + processors.remove_entry(&remove_entry); + } for processor_info in processor_infos { let app_hash = processor_info.app_hash; let processor = processors.entry(app_hash).or_insert(ProcessorInstance { From 7fcf63e92a071f6e167118d1d8dd2461259ec52f Mon Sep 17 00:00:00 2001 From: sulijia <984115358@qq.com> Date: Wed, 29 May 2024 23:06:11 +0800 Subject: [PATCH 09/16] Feat:Add a function to redirect docker logs to a file --- node/src/container_task.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/node/src/container_task.rs b/node/src/container_task.rs index d0a2d14..3097aee 100644 --- a/node/src/container_task.rs +++ b/node/src/container_task.rs @@ -240,6 +240,24 @@ async fn start_docker_container( instance.wait()?; Ok(()) } + +// Redirect docker logs to a file +// docker logs {container_name}> {log_file} 2>&1 +// docker logs {container_name}>& {log_file} +async fn redirect_docker_container_log( + container_name: &str, + in_args: Vec<&str>, + log_file: File, +) -> Result<(), Box> { + let docker_cmd = format!("logs {}", container_name); + let mut args: Vec<&str> = docker_cmd.split(' ').into_iter().map(|arg| arg).collect(); + args.extend(in_args); + log::info!("=======================args:{:?}", args); + let mut instance = Command::new("docker").args(args).stdout(Stdio::from(log_file)).spawn()?; + instance.wait()?; + Ok(()) +} + #[derive(Debug, PartialEq, Eq)] enum StartType { SYNC, From 7cb20bf06515fd0f85b05561ced8c23ef13dd06a Mon Sep 17 00:00:00 2001 From: sulijia <984115358@qq.com> Date: Thu, 30 May 2024 23:03:21 +0800 Subject: [PATCH 10/16] Fix:redirect docker log to file function --- node/src/container_task.rs | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/node/src/container_task.rs b/node/src/container_task.rs index 3097aee..2277ab1 100644 --- a/node/src/container_task.rs +++ b/node/src/container_task.rs @@ -241,21 +241,22 @@ async fn start_docker_container( Ok(()) } -// Redirect docker logs to a file -// docker logs {container_name}> {log_file} 2>&1 -// docker logs {container_name}>& {log_file} +// Redirect docker logs to a file. +// It will run as a background process, so you need to save the instance and kill it when switching. +// docker logs -f -t --details {container_name} + async fn redirect_docker_container_log( container_name: &str, - in_args: Vec<&str>, log_file: File, -) -> Result<(), Box> { - let docker_cmd = format!("logs {}", container_name); - let mut args: Vec<&str> = docker_cmd.split(' ').into_iter().map(|arg| arg).collect(); - args.extend(in_args); - log::info!("=======================args:{:?}", args); - let mut instance = Command::new("docker").args(args).stdout(Stdio::from(log_file)).spawn()?; - instance.wait()?; - Ok(()) +) -> Result> { + let docker_cmd = format!("logs -f -t --details {}", container_name); + let args: Vec<&str> = docker_cmd.split(' ').into_iter().map(|arg| arg).collect(); + let instance = Command::new("docker") + .args(args) + .stdin(Stdio::piped()) + .stdout(Stdio::from(log_file)) + .spawn()?; + Ok(instance) } #[derive(Debug, PartialEq, Eq)] @@ -288,6 +289,9 @@ struct RunningApp { instance2_docker: bool, instance1_docker_name: Option>, instance2_docker_name: Option>, + // TODO:need self test. + // instance1_docker_log: Option, + // instance2_docker_log: Option, cur_ins: InstanceIndex, } @@ -650,6 +654,7 @@ async fn app_run_task( let kill_result = remove_docker_container(std::str::from_utf8(&docker_name)?).await; log::info!("kill old docker instance:{:?}", kill_result); } + // TODO:kill old docker log } else { if let Some(ref mut old_instance) = old_instance { old_instance.kill()?; @@ -675,6 +680,8 @@ async fn app_run_task( ) .await; log::info!("start docker container :{:?}", start_result); + // TODO:redirect docker log to file,nee self test + // redirect_docker_container_log(std::str::from_utf8(&app_info.file_name)?, outputs).await; } else { let download_path = format!( "{}/sdk/{}", @@ -700,6 +707,7 @@ async fn app_run_task( remove_docker_container(std::str::from_utf8(&docker_name)?).await; log::info!("kill docker instance2:{:?}", kill_result); } + // TODO:kill old docker log instance app.instance2_docker_name = None; app.instance2_docker = false; } else { @@ -721,6 +729,7 @@ async fn app_run_task( remove_docker_container(std::str::from_utf8(&docker_name)?).await; log::info!("kill docker instance1:{:?}", kill_result); } + // TODO:kill old docker log instance app.instance1_docker_name = None; app.instance1_docker = false; } else { From f2b8bd6ea742a51b2a8b80f7e4d14d20efc3a986 Mon Sep 17 00:00:00 2001 From: sulijia <984115358@qq.com> Date: Fri, 31 May 2024 22:34:53 +0800 Subject: [PATCH 11/16] Fix:consensus client run as docker container output log success --- node/src/container_task.rs | 55 +++++++++++++++++++++++++++++--------- 1 file changed, 43 insertions(+), 12 deletions(-) diff --git a/node/src/container_task.rs b/node/src/container_task.rs index 2277ab1..c103311 100644 --- a/node/src/container_task.rs +++ b/node/src/container_task.rs @@ -289,9 +289,8 @@ struct RunningApp { instance2_docker: bool, instance1_docker_name: Option>, instance2_docker_name: Option>, - // TODO:need self test. - // instance1_docker_log: Option, - // instance2_docker_log: Option, + instance1_docker_log: Option, + instance2_docker_log: Option, cur_ins: InstanceIndex, } @@ -641,10 +640,26 @@ async fn app_run_task( if app.cur_ins == InstanceIndex::Instance1 { let is_docker_instance = app.instance1_docker; let top_docker_name = app.instance1_docker_name.clone(); + if is_docker_instance && top_docker_name.is_some() { + //kill old docker log instance + if let Some(ref mut log_instance) = app.instance1_docker_log { + log_instance.kill()?; + let kill_result = log_instance.wait()?; + log::info!("kill old docker log instance:{:?}:{:?}", 1, kill_result); + } + } (&mut app.instance1, is_docker_instance, top_docker_name, 1) } else { let is_docker_instance = app.instance2_docker; let top_docker_name = app.instance2_docker_name.clone(); + if is_docker_instance && top_docker_name.is_some() { + //kill old docker log instance + if let Some(ref mut log_instance) = app.instance2_docker_log { + log_instance.kill()?; + let kill_result = log_instance.wait()?; + log::info!("kill old docker log instance:{:?}:{:?}", 2, kill_result); + } + } (&mut app.instance2, is_docker_instance, top_docker_name, 2) }; // stop old instance @@ -654,20 +669,16 @@ async fn app_run_task( let kill_result = remove_docker_container(std::str::from_utf8(&docker_name)?).await; log::info!("kill old docker instance:{:?}", kill_result); } - // TODO:kill old docker log } else { if let Some(ref mut old_instance) = old_instance { old_instance.kill()?; let kill_result = old_instance.wait()?; log::info!("kill old instance:{:?}:{:?}", cur_ins, kill_result); - match kill_result.code() { - Some(code) => log::info!("Exited with status code: {code}"), - None => log::info!("Process terminated by signal"), - } } } // start new instance let mut instance: Option = None; + let mut docker_log_instance: Option = None; if run_as_docker { let image_name = app_info.docker_image.ok_or("docker image not exist")?; let docker_image = std::str::from_utf8(&image_name)?; @@ -680,8 +691,11 @@ async fn app_run_task( ) .await; log::info!("start docker container :{:?}", start_result); - // TODO:redirect docker log to file,nee self test - // redirect_docker_container_log(std::str::from_utf8(&app_info.file_name)?, outputs).await; + // redirect docker log to file + docker_log_instance = Some( + redirect_docker_container_log(std::str::from_utf8(&app_info.file_name)?, outputs) + .await?, + ); } else { let download_path = format!( "{}/sdk/{}", @@ -703,13 +717,19 @@ async fn app_run_task( if app.instance2_docker { let docker_name_op = app.instance2_docker_name.clone(); if let Some(docker_name) = docker_name_op { + //kill old docker log instance first + if let Some(ref mut log_instance) = app.instance2_docker_log { + log_instance.kill()?; + let kill_result = log_instance.wait()?; + log::info!("kill old docker log instance2:{:?}", kill_result); + } let kill_result = remove_docker_container(std::str::from_utf8(&docker_name)?).await; log::info!("kill docker instance2:{:?}", kill_result); } - // TODO:kill old docker log instance app.instance2_docker_name = None; app.instance2_docker = false; + app.instance2_docker_log = None; } else { let other_instance = &mut app.instance2; if let Some(ref mut other_instance) = other_instance { @@ -725,6 +745,12 @@ async fn app_run_task( if app.instance1_docker { let docker_name_op = app.instance1_docker_name.clone(); if let Some(docker_name) = docker_name_op { + //kill old docker log instance first + if let Some(ref mut log_instance) = app.instance1_docker_log { + log_instance.kill()?; + let kill_result = log_instance.wait()?; + log::info!("kill old docker log instance1:{:?}", kill_result); + } let kill_result = remove_docker_container(std::str::from_utf8(&docker_name)?).await; log::info!("kill docker instance1:{:?}", kill_result); @@ -732,6 +758,7 @@ async fn app_run_task( // TODO:kill old docker log instance app.instance1_docker_name = None; app.instance1_docker = false; + app.instance1_docker_log = None; } else { let other_instance = &mut app.instance1; if let Some(ref mut other_instance) = other_instance { @@ -750,6 +777,7 @@ async fn app_run_task( if run_as_docker { app.instance1_docker = true; app.instance1_docker_name = Some(app_info.file_name); + app.instance1_docker_log = docker_log_instance; } else { app.instance1_docker = false; app.instance1_docker_name = None; @@ -759,6 +787,7 @@ async fn app_run_task( if run_as_docker { app.instance2_docker = true; app.instance2_docker_name = Some(app_info.file_name); + app.instance2_docker_log = docker_log_instance; } else { app.instance2_docker = false; app.instance2_docker_name = None; @@ -975,9 +1004,9 @@ where let mut app = running_app.lock().await; let run_status = &app.running; let app_hash = app.app_hash; + log::info!("run:{:?}", app); if let Some(app_info) = app.app_info.clone() { if *run_status == RunStatus::Downloaded { - log::info!("run:{:?}", app); let run_args_key = format!("{}:{}", RUN_ARGS_KEY, HexDisplay::from(&app_hash.as_bytes())); @@ -1080,6 +1109,8 @@ async fn relay_chain_notification( instance2_docker: false, instance1_docker_name: None, instance2_docker_name: None, + instance1_docker_log: None, + instance2_docker_log: None, cur_ins: InstanceIndex::Instance1, })); let runing_processor = Arc::new(Mutex::new(RunningProcessor { processors: HashMap::new() })); From 5d8f7074d522a157f70bf91342a432efcc39077b Mon Sep 17 00:00:00 2001 From: sulijia <984115358@qq.com> Date: Sat, 1 Jun 2024 14:07:48 +0800 Subject: [PATCH 12/16] Fix:processor run as docker container should direct log to file --- node/src/container_task.rs | 18 ++++++++++- pallets/container/src/lib.rs | 60 +++++++++++++++++++----------------- 2 files changed, 48 insertions(+), 30 deletions(-) diff --git a/node/src/container_task.rs b/node/src/container_task.rs index c103311..57a1bde 100644 --- a/node/src/container_task.rs +++ b/node/src/container_task.rs @@ -302,6 +302,7 @@ struct ProcessorInstance { instance: Option, instance_docker: bool, instance_docker_name: Option>, + instance_docker_log: Option, } #[derive(Debug)] struct RunningProcessor { @@ -450,6 +451,7 @@ async fn processor_run_task( // start new instance let mut instance = None; + let mut docker_log_instance = None; if run_as_docker { let image_name = processor_info.docker_image.ok_or("docker image not exist")?; let docker_image = std::str::from_utf8(&image_name)?; @@ -461,7 +463,11 @@ async fn processor_run_task( outputs.try_clone()?, ) .await; - log::info!("start docker container :{:?}", start_result); + log::info!("start processor docker container :{:?}", start_result); + docker_log_instance = Some( + redirect_docker_container_log(std::str::from_utf8(&processor_info.file_name)?, outputs) + .await?, + ); } else { let download_path = format!( "{}/sdk/{}", @@ -482,6 +488,7 @@ async fn processor_run_task( let processor_instances = &mut running_processors.processors; processor_instances.entry(processor_info.app_hash).and_modify(|app| { app.instance = instance; + app.instance_docker_log = docker_log_instance; if run_as_docker { app.instance_docker = true; app.instance_docker_name = Some(processor_info.file_name); @@ -494,6 +501,7 @@ async fn processor_run_task( log::info!("app:{:?}", running_processors); Ok(()) } + async fn processor_task( data_path: PathBuf, processor_info: ProcessorDownloadInfo, @@ -820,6 +828,12 @@ async fn close_processor_instance( if instance.instance_docker { //reomve docker container if let Some(docker_name) = &instance.instance_docker_name { + //kill processor docker log instance first + if let Some(ref mut log_instance) = &mut instance.instance_docker_log { + log_instance.kill()?; + let kill_result = log_instance.wait()?; + log::info!("kill processor old docker log instance:{:?}", kill_result); + } let kill_result = remove_docker_container(std::str::from_utf8(&docker_name)?).await; log::info!("kill old docker instance of processor:{:?}", kill_result); } @@ -833,6 +847,7 @@ async fn close_processor_instance( } Ok(()) } + async fn filter_processor_instance( processors: &mut HashMap, processor_infos: &Vec, @@ -913,6 +928,7 @@ where instance: None, instance_docker: false, instance_docker_name: None, + instance_docker_log: None, }); let run_status = &processor.running; diff --git a/pallets/container/src/lib.rs b/pallets/container/src/lib.rs index d6e940c..2864987 100644 --- a/pallets/container/src/lib.rs +++ b/pallets/container/src/lib.rs @@ -344,35 +344,37 @@ impl Pallet { } let url = DefaultUrl::::get().expect("Need set url"); - for app_id in processors { - let p_app_info = APPInfoMap::::get(app_id); - - if let Some(app_info) = p_app_info { - let batch_client = app_info.batch_client; - - let args = batch_client.args.and_then(|log| Some(log.as_slice().to_vec())); - - let log = batch_client.log.and_then(|log| Some(log.as_slice().to_vec())); - - let is_docker_image = if let Some(is_docker) = batch_client.is_docker_image { - is_docker - } else { - false - }; - - let docker_image = batch_client - .docker_image - .and_then(|docker_image| Some(docker_image.as_slice().to_vec())); - download_infos.push(ProcessorDownloadInfo { - app_hash: batch_client.app_hash, - file_name: batch_client.file_name.into(), - size: batch_client.size, - url: url.clone().into(), - args, - log, - is_docker_image, - docker_image, - }); + for group_id in processors { + if let Some(app_id) = GroupAPPMap::::get(group_id) { + let p_app_info = APPInfoMap::::get(app_id); + + if let Some(app_info) = p_app_info { + let batch_client = app_info.batch_client; + + let args = batch_client.args.and_then(|log| Some(log.as_slice().to_vec())); + + let log = batch_client.log.and_then(|log| Some(log.as_slice().to_vec())); + + let is_docker_image = if let Some(is_docker) = batch_client.is_docker_image { + is_docker + } else { + false + }; + + let docker_image = batch_client + .docker_image + .and_then(|docker_image| Some(docker_image.as_slice().to_vec())); + download_infos.push(ProcessorDownloadInfo { + app_hash: batch_client.app_hash, + file_name: batch_client.file_name.into(), + size: batch_client.size, + url: url.clone().into(), + args, + log, + is_docker_image, + docker_image, + }); + } } } download_infos From 37313640202df4d6381f7ef7a6c4f50f60549be8 Mon Sep 17 00:00:00 2001 From: sulijia <984115358@qq.com> Date: Sat, 1 Jun 2024 22:07:59 +0800 Subject: [PATCH 13/16] Feat:check hash when pull docker image --- node/src/container_task.rs | 96 ++++++++++++++++++++++++++++++-------- 1 file changed, 77 insertions(+), 19 deletions(-) diff --git a/node/src/container_task.rs b/node/src/container_task.rs index 57a1bde..70f1148 100644 --- a/node/src/container_task.rs +++ b/node/src/container_task.rs @@ -175,6 +175,17 @@ async fn need_download( } } +async fn need_pull_docker_image( + docker_image: &str, + file_hash: H256, +) -> Result> { + let exist = check_docker_image_exist(docker_image).await?; + let mut need = false; + if exist { + need = check_docker_image_hash(docker_image, file_hash).await?; + } + Ok(need) +} async fn check_docker_image_exist( docker_image: &str, ) -> Result> { @@ -199,12 +210,16 @@ async fn check_docker_image_exist( Ok(result) } -async fn download_docker_image(docker_image: &str) -> Result<(), Box> { +async fn download_docker_image( + docker_image: &str, + file_hash: H256, +) -> Result> { let pull_cmd = format!("pull {}", docker_image); let args: Vec<&str> = pull_cmd.split(' ').into_iter().map(|arg| arg).collect(); let mut instance = Command::new("docker").args(args).spawn()?; instance.wait()?; - Ok(()) + let result = check_docker_image_hash(docker_image, file_hash).await?; + Ok(result) } async fn remove_docker_container(container_name: &str) -> Result<(), Box> { @@ -241,6 +256,40 @@ async fn start_docker_container( Ok(()) } +/// Verify that the hash of the docker image file is consistent. +/// The shell command is:docker image ls {docker_image} --format "table {{.Digest}}"|grep sha256 +async fn check_docker_image_hash( + docker_image: &str, + app_hash: H256, +) -> Result> { + let ls_cmd = format!("image ls {} --format", docker_image); + let mut args: Vec<&str> = ls_cmd.split(' ').into_iter().map(|arg| arg).collect(); + args.push("table {{.Digest}}"); + let mut instance = Command::new("docker").stdout(Stdio::piped()).args(args).spawn()?; + let mut result = false; + if let Some(ls_output) = instance.stdout.take() { + let grep_cmd = Command::new("grep") + .arg("sha256") + .stdin(ls_output) + .stdout(Stdio::piped()) + .spawn()?; + + let grep_stdout = grep_cmd.wait_with_output()?; + instance.wait()?; + let sha256_hash = String::from_utf8(grep_stdout.stdout)?; + if sha256_hash.len() > 0 { + let hashs: Vec<&str> = sha256_hash.split(':').into_iter().map(|arg| arg).collect(); + if hashs.len() > 0 { + let get_hash = H256::from_str(hashs[hashs.len() - 1])?; + if get_hash == app_hash { + result = true; + } + } + } + } + Ok(result) +} + // Redirect docker logs to a file. // It will run as a background process, so you need to save the instance and kill it when switching. // docker logs -f -t --details {container_name} @@ -324,14 +373,19 @@ async fn app_download_task( log::info!("===========Download app from docker hub and run the application as a container========="); if let Some(image) = app_info.clone().docker_image { let docker_image = std::str::from_utf8(&image)?; - let mut exist_docker_image = check_docker_image_exist(docker_image).await?; - if !exist_docker_image { - download_docker_image(docker_image).await?; - exist_docker_image = check_docker_image_exist(docker_image).await?; - } - if exist_docker_image { - //Start docker container - start_flag = true; + let need_pull = need_pull_docker_image(docker_image, app_info.app_hash).await; + if let Ok(need_pull) = need_pull { + if need_pull { + let result = download_docker_image(docker_image, app_info.app_hash).await; + if result.is_ok() { + start_flag = true; + } else { + log::info!("pull docker image error:{:?}", result); + } + } else { + //Start docker container + start_flag = true; + } } } } else { @@ -516,14 +570,19 @@ async fn processor_task( log::info!("===========Download app from docker hub and run the application as a container========="); if let Some(image) = processor_info.clone().docker_image { let docker_image = std::str::from_utf8(&image)?; - let mut exist_docker_image = check_docker_image_exist(docker_image).await?; - if !exist_docker_image { - download_docker_image(docker_image).await?; - exist_docker_image = check_docker_image_exist(docker_image).await?; - } - if exist_docker_image { - //Start docker container - start_flag = true; + let need_pull = need_pull_docker_image(docker_image, processor_info.app_hash).await; + if let Ok(need_pull) = need_pull { + if need_pull { + let result = download_docker_image(docker_image, processor_info.app_hash).await; + if result.is_ok() { + start_flag = true; + } else { + log::info!("pull docker image error:{:?}", result); + } + } else { + //Start docker container + start_flag = true; + } } } } else { @@ -763,7 +822,6 @@ async fn app_run_task( remove_docker_container(std::str::from_utf8(&docker_name)?).await; log::info!("kill docker instance1:{:?}", kill_result); } - // TODO:kill old docker log instance app.instance1_docker_name = None; app.instance1_docker = false; app.instance1_docker_log = None; From 5100e9530b59732dfa64791b74365127950a0861 Mon Sep 17 00:00:00 2001 From: sulijia <984115358@qq.com> Date: Sat, 1 Jun 2024 22:42:46 +0800 Subject: [PATCH 14/16] Fix:The startup application parameters need to be bound to the app id --- node/src/container_task.rs | 45 +++++++++++++++------------------ pallets/container/src/lib.rs | 22 ++++++++-------- primitives/container/src/lib.rs | 1 + 3 files changed, 34 insertions(+), 34 deletions(-) diff --git a/node/src/container_task.rs b/node/src/container_task.rs index 70f1148..fbd2d62 100644 --- a/node/src/container_task.rs +++ b/node/src/container_task.rs @@ -13,7 +13,7 @@ use ring::digest::{Context, Digest, SHA256}; use sc_client_api::UsageProvider; use sc_service::TaskManager; use sp_api::ProvideRuntimeApi; -use sp_core::{hexdisplay::HexDisplay, offchain::OffchainStorage, H256}; +use sp_core::{offchain::OffchainStorage, H256}; use sp_keystore::KeystorePtr; use sp_offchain::STORAGE_PREFIX; use sp_runtime::{ @@ -345,6 +345,7 @@ struct RunningApp { #[derive(Debug)] struct ProcessorInstance { + app_id: u32, app_hash: H256, running: RunStatus, processor_info: Option, @@ -355,7 +356,7 @@ struct ProcessorInstance { } #[derive(Debug)] struct RunningProcessor { - processors: HashMap, + processors: HashMap, } async fn app_download_task( @@ -540,7 +541,7 @@ async fn processor_run_task( } let mut running_processors = running_processor.lock().await; let processor_instances = &mut running_processors.processors; - processor_instances.entry(processor_info.app_hash).and_modify(|app| { + processor_instances.entry(processor_info.app_id).and_modify(|app| { app.instance = instance; app.instance_docker_log = docker_log_instance; if run_as_docker { @@ -634,7 +635,7 @@ async fn processor_task( let mut running_processors = running_processor.lock().await; let processor_instances = &mut running_processors.processors; processor_instances - .entry(processor_info.app_hash) + .entry(processor_info.app_id) .and_modify(|instance| instance.running = status); Ok(()) } @@ -907,23 +908,23 @@ async fn close_processor_instance( } async fn filter_processor_instance( - processors: &mut HashMap, + processors: &mut HashMap, processor_infos: &Vec, -) -> Vec { +) -> Vec { let mut remove_entrys = Vec::new(); { for processor in &mut *processors { let mut find_flag = false; - let app_hash = processor.0; + let app_id = processor.0; for processor_info in processor_infos { - if *app_hash == processor_info.app_hash { + if *app_id == processor_info.app_id { find_flag = true; break; } } if !find_flag { let _ = close_processor_instance(processor.1).await; - remove_entrys.push(*app_hash); + remove_entrys.push(*app_id); } } } @@ -978,8 +979,10 @@ where processors.remove_entry(&remove_entry); } for processor_info in processor_infos { + let app_id = processor_info.app_id; let app_hash = processor_info.app_hash; - let processor = processors.entry(app_hash).or_insert(ProcessorInstance { + let processor = processors.entry(app_id).or_insert(ProcessorInstance { + app_id, app_hash, running: RunStatus::Pending, processor_info: Some(processor_info.clone()), @@ -992,11 +995,9 @@ where if *run_status == RunStatus::Pending { processor.running = RunStatus::Downloading; - let app_hash = processor.app_hash; - let p_run_args_key = - format!("{}:{}", P_RUN_ARGS_KEY, HexDisplay::from(&app_hash.as_bytes())); - let p_option_args_key = - format!("{}:{}", P_OPTION_ARGS_KEY, HexDisplay::from(&app_hash.as_bytes())); + let app_id = processor.app_id; + let p_run_args_key = format!("{}:{}", P_RUN_ARGS_KEY, app_id); + let p_option_args_key = format!("{}:{}", P_OPTION_ARGS_KEY, app_id); let run_args = get_offchain_storage::<_, TBackend>( offchain_storage.clone(), p_run_args_key.as_bytes(), @@ -1033,11 +1034,9 @@ where let app_hash = app_info.app_hash; let run_status = &app.running; if old_group_id != new_group && *run_status == RunStatus::Pending { - let sync_args_key = - format!("{}:{}", SYNC_ARGS_KEY, HexDisplay::from(&app_hash.as_bytes())); + let sync_args_key = format!("{}:{}", SYNC_ARGS_KEY, app_id); - let option_args_key = - format!("{}:{}", OPTION_ARGS_KEY, HexDisplay::from(&app_hash.as_bytes())); + let option_args_key = format!("{}:{}", OPTION_ARGS_KEY, app_id); let sync_args = get_offchain_storage::<_, TBackend>( offchain_storage.clone(), @@ -1077,15 +1076,13 @@ where if should_run { let mut app = running_app.lock().await; let run_status = &app.running; - let app_hash = app.app_hash; + let app_id = app.app_id; log::info!("run:{:?}", app); if let Some(app_info) = app.app_info.clone() { if *run_status == RunStatus::Downloaded { - let run_args_key = - format!("{}:{}", RUN_ARGS_KEY, HexDisplay::from(&app_hash.as_bytes())); + let run_args_key = format!("{}:{}", RUN_ARGS_KEY, app_id); - let option_args_key = - format!("{}:{}", OPTION_ARGS_KEY, HexDisplay::from(&app_hash.as_bytes())); + let option_args_key = format!("{}:{}", OPTION_ARGS_KEY, app_id); let run_args = get_offchain_storage::<_, TBackend>( offchain_storage.clone(), diff --git a/pallets/container/src/lib.rs b/pallets/container/src/lib.rs index 2864987..ac9ed17 100644 --- a/pallets/container/src/lib.rs +++ b/pallets/container/src/lib.rs @@ -212,16 +212,17 @@ pub mod pallet { let batch_app = *batch_client; - for app_id in 1..old_application_id { - let p_app_info = APPInfoMap::::get(app_id); - if let Some(app_info) = p_app_info { - assert!( - (app_info.consensus_client.app_hash != consensus_app.app_hash) && - (app_info.batch_client.app_hash != batch_app.app_hash), - "Client with the same hash exist!", - ); - } - } + // we can allow same app when register app. + // for app_id in 1..old_application_id { + // let p_app_info = APPInfoMap::::get(app_id); + // if let Some(app_info) = p_app_info { + // assert!( + // (app_info.consensus_client.app_hash != consensus_app.app_hash) && + // (app_info.batch_client.app_hash != batch_app.app_hash), + // "Client with the same hash exist!", + // ); + // } + // } APPInfoMap::::insert( old_application_id, APPInfo { @@ -365,6 +366,7 @@ impl Pallet { .docker_image .and_then(|docker_image| Some(docker_image.as_slice().to_vec())); download_infos.push(ProcessorDownloadInfo { + app_id, app_hash: batch_client.app_hash, file_name: batch_client.file_name.into(), size: batch_client.size, diff --git a/primitives/container/src/lib.rs b/primitives/container/src/lib.rs index 774521f..92b235a 100644 --- a/primitives/container/src/lib.rs +++ b/primitives/container/src/lib.rs @@ -19,6 +19,7 @@ pub struct DownloadInfo { #[derive(Debug, Clone, TypeInfo, Encode, Decode, Default)] pub struct ProcessorDownloadInfo { + pub app_id: u32, pub app_hash: H256, pub file_name: Vec, pub size: u32, From ea269684f46e1b04b7a1ce5416b72339525db055 Mon Sep 17 00:00:00 2001 From: sulijia <984115358@qq.com> Date: Sun, 2 Jun 2024 06:47:20 +0800 Subject: [PATCH 15/16] Feat:Both the IP and the account ID must be met to run the processor client --- Cargo.lock | 351 ++++++++++++++++++-------------- node/Cargo.toml | 2 +- node/src/container_task.rs | 9 +- pallets/container/src/lib.rs | 13 +- primitives/container/src/lib.rs | 2 +- runtime/src/lib.rs | 4 +- 6 files changed, 221 insertions(+), 160 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c964cba..4e2e25a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1214,7 +1214,7 @@ dependencies = [ "anstream", "anstyle", "clap_lex", - "strsim", + "strsim 0.11.1", "terminal_size", ] @@ -2331,6 +2331,41 @@ dependencies = [ "syn 2.0.58", ] +[[package]] +name = "darling" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d706e75d87e35569db781a9b5e2416cff1236a47ed380831f959382ccd5f858" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0c960ae2da4de88a91b2d920c2a7233b400bc33cb28453a2987822d8392519b" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim 0.9.3", + "syn 1.0.109", +] + +[[package]] +name = "darling_macro" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b5a2f4ac4969822c62224815d069952656cadc7084fdca9751e6d959189b72" +dependencies = [ + "darling_core", + "quote", + "syn 1.0.109", +] + [[package]] name = "dashmap" version = "5.5.3" @@ -2436,6 +2471,31 @@ dependencies = [ "syn 2.0.58", ] +[[package]] +name = "derive_builder" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2658621297f2cf68762a6f7dc0bb7e1ff2cfd6583daef8ee0fed6f7ec468ec0" +dependencies = [ + "darling", + "derive_builder_core", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "derive_builder_core" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2791ea3e372c8495c0bc2033991d76b512cd799d07491fbd6890124db9458bef" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "derive_more" version = "0.99.17" @@ -2538,25 +2598,16 @@ dependencies = [ "syn 2.0.58", ] -[[package]] -name = "dlv-list" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f" -dependencies = [ - "const-random", -] - [[package]] name = "dns-lookup" -version = "2.0.4" +version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5766087c2235fec47fafa4cfecc81e494ee679d0fd4a59887ea0919bfb0e4fc" +checksum = "53ecafc952c4528d9b51a458d1a8904b81783feff9fde08ab6ed2545ff396872" dependencies = [ "cfg-if", "libc", - "socket2 0.5.6", - "windows-sys 0.48.0", + "socket2 0.4.10", + "winapi", ] [[package]] @@ -2741,6 +2792,24 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" + +[[package]] +name = "enum-as-inner" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "570d109b813e904becc80d8d5da38376818a143348413f7149f1340fe04754d4" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "enum-as-inner" version = "0.5.1" @@ -3627,19 +3696,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "get_local_info" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a602b5db70aabc62c623c723997426190e1cd95fc3cdf357a343669bf801239" -dependencies = [ - "chrono", - "dns-lookup", - "pnet", - "reqwest", - "rust-ini", -] - [[package]] name = "gethostname" version = "0.2.3" @@ -4017,6 +4073,20 @@ dependencies = [ "tokio-rustls 0.24.1", ] +[[package]] +name = "hyper-system-resolver" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6eea26c5d0b6ab9d72219f65000af310f042a740926f7b2fa3553e774036e2e7" +dependencies = [ + "derive_builder", + "dns-lookup", + "hyper", + "tokio", + "tower-service", + "tracing", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -4053,6 +4123,12 @@ dependencies = [ "cc", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "0.2.3" @@ -4259,15 +4335,6 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" -[[package]] -name = "ipnetwork" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf466541e9d546596ee94f9f69590f89473455f88372423e0008fc1a7daf100e" -dependencies = [ - "serde", -] - [[package]] name = "is-terminal" version = "0.4.12" @@ -4775,7 +4842,7 @@ dependencies = [ "smallvec", "socket2 0.4.10", "tokio", - "trust-dns-proto", + "trust-dns-proto 0.22.0", "void", ] @@ -5754,6 +5821,15 @@ dependencies = [ "tokio", ] +[[package]] +name = "nibble_vec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" +dependencies = [ + "smallvec", +] + [[package]] name = "nix" version = "0.24.3" @@ -6045,16 +6121,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "ordered-multimap" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ed8acf08e98e744e5384c8bc63ceb0364e68a6854187221c18df61c4797690e" -dependencies = [ - "dlv-list", - "hashbrown 0.13.2", -] - [[package]] name = "pallet-asset-conversion" version = "13.0.0" @@ -7648,97 +7714,6 @@ version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db23d408679286588f4d4644f965003d056e3dd5abcaaa938116871d7ce2fee7" -[[package]] -name = "pnet" -version = "0.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "130c5b738eeda2dc5796fe2671e49027e6935e817ab51b930a36ec9e6a206a64" -dependencies = [ - "ipnetwork", - "pnet_base", - "pnet_datalink", - "pnet_packet", - "pnet_sys", - "pnet_transport", -] - -[[package]] -name = "pnet_base" -version = "0.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe4cf6fb3ab38b68d01ab2aea03ed3d1132b4868fa4e06285f29f16da01c5f4c" -dependencies = [ - "no-std-net", -] - -[[package]] -name = "pnet_datalink" -version = "0.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad5854abf0067ebbd3967f7d45ebc8976ff577ff0c7bd101c4973ae3c70f98fe" -dependencies = [ - "ipnetwork", - "libc", - "pnet_base", - "pnet_sys", - "winapi", -] - -[[package]] -name = "pnet_macros" -version = "0.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "688b17499eee04a0408aca0aa5cba5fc86401d7216de8a63fdf7a4c227871804" -dependencies = [ - "proc-macro2", - "quote", - "regex", - "syn 2.0.58", -] - -[[package]] -name = "pnet_macros_support" -version = "0.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eea925b72f4bd37f8eab0f221bbe4c78b63498350c983ffa9dd4bcde7e030f56" -dependencies = [ - "pnet_base", -] - -[[package]] -name = "pnet_packet" -version = "0.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9a005825396b7fe7a38a8e288dbc342d5034dac80c15212436424fef8ea90ba" -dependencies = [ - "glob", - "pnet_base", - "pnet_macros", - "pnet_macros_support", -] - -[[package]] -name = "pnet_sys" -version = "0.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "417c0becd1b573f6d544f73671070b039051e5ad819cc64aa96377b536128d00" -dependencies = [ - "libc", - "winapi", -] - -[[package]] -name = "pnet_transport" -version = "0.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2637e14d7de974ee2f74393afccbc8704f3e54e6eb31488715e72481d1662cc3" -dependencies = [ - "libc", - "pnet_base", - "pnet_packet", - "pnet_sys", -] - [[package]] name = "polkadot-approval-distribution" version = "10.0.0" @@ -9049,7 +9024,6 @@ dependencies = [ "frame-benchmarking", "frame-benchmarking-cli", "futures", - "get_local_info", "jsonrpsee", "log", "pallet-transaction-payment-rpc", @@ -9058,6 +9032,7 @@ dependencies = [ "polkadot-primitives", "popsicle-runtime", "primitives-container", + "public-ip", "reqwest", "ring 0.17.8", "sc-basic-authorship", @@ -9474,6 +9449,27 @@ dependencies = [ "cc", ] +[[package]] +name = "public-ip" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4c40db5262d93298c363a299f8bc1b3a956a78eecddba3bc0e58b76e2f419a" +dependencies = [ + "dns-lookup", + "futures-core", + "futures-util", + "http", + "hyper", + "hyper-system-resolver", + "pin-project-lite 0.2.14", + "thiserror", + "tokio", + "tracing", + "tracing-futures", + "trust-dns-client", + "trust-dns-proto 0.20.4", +] + [[package]] name = "quanta" version = "0.12.3" @@ -9561,6 +9557,16 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" +[[package]] +name = "radix_trie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +dependencies = [ + "endian-type", + "nibble_vec", +] + [[package]] name = "rand" version = "0.8.5" @@ -10071,16 +10077,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "rust-ini" -version = "0.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e2a3bcec1f113553ef1c88aae6c020a369d03d55b58de9869a0908930385091" -dependencies = [ - "cfg-if", - "ordered-multimap", -] - [[package]] name = "rustc-demangle" version = "0.1.23" @@ -13141,6 +13137,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "strsim" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6446ced80d6c486436db5c078dde11a9f73d42b57fb273121e160b84f63d894c" + [[package]] name = "strsim" version = "0.11.1" @@ -13833,6 +13835,8 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" dependencies = [ + "futures", + "futures-task", "pin-project", "tracing", ] @@ -13928,6 +13932,51 @@ dependencies = [ "hash-db", ] +[[package]] +name = "trust-dns-client" +version = "0.20.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b4ef9b9bde0559b78a4abb00339143750085f05e5a453efb7b8bef1061f09dc" +dependencies = [ + "cfg-if", + "data-encoding", + "futures-channel", + "futures-util", + "lazy_static", + "log", + "radix_trie", + "rand", + "thiserror", + "time", + "tokio", + "trust-dns-proto 0.20.4", +] + +[[package]] +name = "trust-dns-proto" +version = "0.20.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca94d4e9feb6a181c690c4040d7a24ef34018d8313ac5044a61d21222ae24e31" +dependencies = [ + "async-trait", + "cfg-if", + "data-encoding", + "enum-as-inner 0.3.4", + "futures-channel", + "futures-io", + "futures-util", + "idna 0.2.3", + "ipnet", + "lazy_static", + "log", + "rand", + "smallvec", + "thiserror", + "tinyvec", + "tokio", + "url", +] + [[package]] name = "trust-dns-proto" version = "0.22.0" @@ -13937,7 +13986,7 @@ dependencies = [ "async-trait", "cfg-if", "data-encoding", - "enum-as-inner", + "enum-as-inner 0.5.1", "futures-channel", "futures-io", "futures-util", @@ -13971,7 +14020,7 @@ dependencies = [ "thiserror", "tokio", "tracing", - "trust-dns-proto", + "trust-dns-proto 0.22.0", ] [[package]] diff --git a/node/Cargo.toml b/node/Cargo.toml index 64d8d1d..87f0122 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -29,7 +29,7 @@ tempfile="3.9.0" error-chain="0.12.4" ring="0.17.8" tokio = { version = "1.22.0", features = ["parking_lot", "rt-multi-thread", "time"] } -get_local_info = "0.2.4" +public-ip = "0.2" # Local primitives-container = { path = "../primitives/container"} diff --git a/node/src/container_task.rs b/node/src/container_task.rs index fbd2d62..848a535 100644 --- a/node/src/container_task.rs +++ b/node/src/container_task.rs @@ -938,6 +938,7 @@ async fn handle_new_best_parachain_head( running_app: Arc>, running_processor: Arc>, backend: Arc, + ip_address: &str, ) -> Result<(), Box> where Block: BlockT, @@ -961,12 +962,13 @@ where let xx = keystore.sr25519_public_keys(sp_application_crypto::key_types::AURA)[0]; // Processor client process - let ip_address = get_local_info::get_pc_ipv4(); log::info!("ip_address:{:?}", ip_address); let processor_infos: Vec = - parachain.runtime_api().processor_run(hash, xx.into())?; + parachain + .runtime_api() + .processor_run(hash, xx.into(), ip_address.as_bytes().to_vec())?; log::info!("processor download info:{:?}", processor_infos); @@ -1185,12 +1187,13 @@ async fn relay_chain_notification( cur_ins: InstanceIndex::Instance1, })); let runing_processor = Arc::new(Mutex::new(RunningProcessor { processors: HashMap::new() })); + let ip_address = public_ip::addr().await.expect("couldn't get an IP address").to_string(); loop { select! { h = new_best_heads.next() => { match h { Some((_height, head, _hash)) => { - let _ = handle_new_best_parachain_head(head, &*parachain,keystore.clone(), data_path.clone(), runing_app.clone(),runing_processor.clone(), backend.clone()).await; + let _ = handle_new_best_parachain_head(head, &*parachain,keystore.clone(), data_path.clone(), runing_app.clone(),runing_processor.clone(), backend.clone(), &ip_address).await; }, None => { return; diff --git a/pallets/container/src/lib.rs b/pallets/container/src/lib.rs index ac9ed17..5617af8 100644 --- a/pallets/container/src/lib.rs +++ b/pallets/container/src/lib.rs @@ -337,14 +337,23 @@ impl Pallet { >::all_group_ids() } - pub fn processor_run(author: T::AccountId) -> Vec { - let processors = >::get_group_ids(author); + pub fn processor_run(author: T::AccountId, ip_address: Vec) -> Vec { + // let processors = >::get_group_ids(author); let mut download_infos: Vec = Vec::new(); if Self::get_groups().len() == 0 { return download_infos; } let url = DefaultUrl::::get().expect("Need set url"); + let processor_info = >::processor_info(); + let processors = if let Some((_, _, group_ids)) = processor_info + .iter() + .find(|(acc, ip, _)| (*acc == author) && (ip.to_vec() == ip_address)) + { + group_ids.clone().into_inner() + } else { + Vec::new() + }; for group_id in processors { if let Some(app_id) = GroupAPPMap::::get(group_id) { let p_app_info = APPInfoMap::::get(app_id); diff --git a/primitives/container/src/lib.rs b/primitives/container/src/lib.rs index 92b235a..f257a26 100644 --- a/primitives/container/src/lib.rs +++ b/primitives/container/src/lib.rs @@ -39,6 +39,6 @@ sp_api::decl_runtime_apis! { fn should_run()-> bool; fn get_group_id(author:AuthorityId) ->u32; fn get_groups()->Vec; - fn processor_run(author:AuthorityId)->Vec; + fn processor_run(author:AuthorityId, ip_address:Vec)->Vec; } } diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index b896264..0378920 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -812,8 +812,8 @@ impl_runtime_apis! { ContainerPallet::should_run() } - fn processor_run(author:AccountId32)->Vec { - ContainerPallet::processor_run(author) + fn processor_run(author:AccountId32, ip_address:Vec)->Vec { + ContainerPallet::processor_run(author, ip_address) } } From 9dc1f4b260ad73acea9c2b2668e1d52c9393a285 Mon Sep 17 00:00:00 2001 From: sulijia <984115358@qq.com> Date: Sun, 2 Jun 2024 16:41:19 +0800 Subject: [PATCH 16/16] Feat:Add code comments --- node/src/container_task.rs | 68 ++++++++++++++++++++++++++++ pallets/container/src/lib.rs | 79 +++++++++++++++++++++++++++------ primitives/container/src/lib.rs | 22 +++++++++ 3 files changed, 155 insertions(+), 14 deletions(-) diff --git a/node/src/container_task.rs b/node/src/container_task.rs index 848a535..9d44e43 100644 --- a/node/src/container_task.rs +++ b/node/src/container_task.rs @@ -1,3 +1,17 @@ +//! Container Spawner +//! +//! After the node is started, a background service is created, which will be executed until the +//! node program is closed. After receiving the block of the relay chain (forming a 6s cycle), the +//! background service will call the download application task (if a download signal is detected) +//! and the execution application task (if an execution signal is detected). Download application +//! task: Determine whether the currently running node is in a certain group. If not, end this task. +//! If the new group is the same as the previous group, end this task. If it is in a certain group +//! and is different from the previously assigned group (the first time the group information is +//! obtained, the subsequent logic will also be executed), then obtain the application information +//! corresponding to the group, check whether the application has been downloaded, if not, download +//! the application, and then start the synchronization block process. Execute application task: +//! Determine whether the effective time has arrived and there is a new application to be started. +//! If not, do not execute. If so, stop the old application and start the new application. use codec::Decode; use cumulus_primitives_core::{ParaId, PersistedValidationData}; use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; @@ -32,11 +46,17 @@ use std::{ sync::Arc, }; +// Consensus client startup consensus arguments. pub const RUN_ARGS_KEY: &str = "run_args"; +// Consensus client startup sync block arguments. pub const SYNC_ARGS_KEY: &str = "sync_args"; +// Consensus client run as docker container startup arguments. pub const OPTION_ARGS_KEY: &str = "option_args"; +// Processor client startup arguments. pub const P_RUN_ARGS_KEY: &str = "p_run_args"; +// Processor client run as docker container startup arguments. pub const P_OPTION_ARGS_KEY: &str = "p_option_args"; + struct PartialRangeIter { start: u64, end: u64, @@ -68,6 +88,7 @@ impl Iterator for PartialRangeIter { } } +// Calculate the sha256 value of the file. async fn sha256_digest(mut reader: R) -> Result> { let mut context = Context::new(&SHA256); let mut buffer = [0; 1024]; @@ -83,6 +104,7 @@ async fn sha256_digest(mut reader: R) -> Result Result> { @@ -210,6 +237,7 @@ async fn check_docker_image_exist( Ok(result) } +// Pull docker image. async fn download_docker_image( docker_image: &str, file_hash: H256, @@ -222,6 +250,7 @@ async fn download_docker_image( Ok(result) } +// Remove docker container by name. async fn remove_docker_container(container_name: &str) -> Result<(), Box> { let mut docker_cmd = format!("container stop {}", container_name); let mut args: Vec<&str> = docker_cmd.split(' ').into_iter().map(|arg| arg).collect(); @@ -235,6 +264,7 @@ async fn remove_docker_container(container_name: &str) -> Result<(), Box, + /// Process instance 1. instance1: Option, + /// Process instance 2. instance2: Option, + /// Instance 1 is docker container or not. instance1_docker: bool, + /// Instance 2 is docker container or not. instance2_docker: bool, + /// Instance 1 docker container name. instance1_docker_name: Option>, + /// Instance 2 docker container name. instance2_docker_name: Option>, + /// Instance 1 docker container instance of linux process. instance1_docker_log: Option, + /// Instance 2 docker container instance of linux process. instance2_docker_log: Option, + /// Current instance,1 or 2. cur_ins: InstanceIndex, } +/// Client information of processor operation. #[derive(Debug)] struct ProcessorInstance { + /// App id,start from 1. app_id: u32, + /// App sha256 app_hash: H256, + /// Running status. running: RunStatus, + /// App info. processor_info: Option, + /// Instance of process. instance: Option, + /// Instance is docker container or not. instance_docker: bool, + /// Instance docker container name. instance_docker_name: Option>, + /// Instance docker container instance of linux process. instance_docker_log: Option, } #[derive(Debug)] struct RunningProcessor { + /// app_id:processor info processors: HashMap, } +// Download the sequencer client from the network and start syncing block data. async fn app_download_task( data_path: PathBuf, app_info: DownloadInfo, @@ -443,6 +503,7 @@ async fn app_download_task( Ok(()) } +// Download and start the processor client. async fn processor_run_task( data_path: PathBuf, processor_info: ProcessorDownloadInfo, @@ -557,6 +618,7 @@ async fn processor_run_task( Ok(()) } +// Background task of processor. async fn processor_task( data_path: PathBuf, processor_info: ProcessorDownloadInfo, @@ -640,6 +702,7 @@ async fn processor_task( Ok(()) } +// Background task of sequncer. async fn app_run_task( data_path: PathBuf, app_info: DownloadInfo, @@ -865,6 +928,7 @@ async fn app_run_task( Ok(()) } +// Get storage of offchain. async fn get_offchain_storage( offchain_storage: Option, args: &[u8], @@ -880,6 +944,7 @@ where } } +// Kill instance of processor. async fn close_processor_instance( instance: &mut ProcessorInstance, ) -> Result<(), Box> { @@ -907,6 +972,7 @@ async fn close_processor_instance( Ok(()) } +// Kill processor of not assigned group. async fn filter_processor_instance( processors: &mut HashMap, processor_infos: &Vec, @@ -930,6 +996,8 @@ async fn filter_processor_instance( } remove_entrys } + +// Background task of sequencer and processor. async fn handle_new_best_parachain_head( validation_data: PersistedValidationData, parachain: &P, diff --git a/pallets/container/src/lib.rs b/pallets/container/src/lib.rs index 5617af8..3534b53 100644 --- a/pallets/container/src/lib.rs +++ b/pallets/container/src/lib.rs @@ -1,3 +1,18 @@ +//! # Container Pallet +//! +//! This pallet is named container, hoping to be a container for various clients. +//! The function it expects to achieve is to register various layer2 clients and start these clients +//! when appropriate conditions are met. +//! +//! The roles that complete this work are called sequencer and processor. +//! The sequencer is responsible for starting the consensus client, +//! and the processor is responsible for starting the batcher client. +//! +//! In order to achieve the goal of being compatible with all layer2, the layer2 client is +//! abstracted into a consensus client and a batcher client. The client can be started and run as a +//! process or a docker container. The operation of the consensus client is based on the time +//! sequence, and there are two steps: synchronization of blocks and consensus. + #![cfg_attr(not(feature = "std"), no_std)] pub use pallet::*; @@ -24,6 +39,7 @@ use sp_runtime::BoundedVec; use sp_std::{boxed::Box, vec}; pub use weights::*; +/// Client basic information structure, including consensus client and batcher client. #[derive(Derivative, Encode, Decode, TypeInfo, MaxEncodedLen)] #[derivative( Clone(bound = ""), @@ -36,30 +52,33 @@ pub use weights::*; #[codec(decode_bound())] #[scale_info(bounds(), skip_type_params(T))] pub struct AppClient { + /// Client hash(sha256), if client is run as docker container, this is digest. pub app_hash: Hash, - + /// Client file name. pub file_name: BoundedVec, - + /// Client file size, bytes. pub size: u32, - + /// Client startup common parameters. pub args: Option>, - + /// Client operation log file. pub log: Option>, - + /// Is started as a Docker container. pub is_docker_image: Option, - + /// Docker image name pub docker_image: Option>, } +/// Registered application information structure. #[derive(Encode, Decode, Default, Clone, TypeInfo, MaxEncodedLen, Debug)] #[scale_info(skip_type_params(T))] pub struct APPInfo { + /// Account of register an application. creator: T::AccountId, - + /// Project name,uniquely identifies. project_name: BoundedVec, - + /// Consensus client. consensus_client: AppClient, - + /// Batcher client. batch_client: AppClient, } @@ -76,35 +95,40 @@ pub mod pallet { type RuntimeEvent: From> + IsType<::RuntimeEvent>; /// Type representing the weight of this pallet type WeightInfo: WeightInfo; - + /// Max length of file name #[pallet::constant] type MaxLengthFileName: Get; - + /// Max number of registered app. #[pallet::constant] type MaxRuningAPP: Get; - + /// Max length of url,for download client binary file. #[pallet::constant] type MaxUrlLength: Get; - + /// Max count of arguments. #[pallet::constant] type MaxArgCount: Get; - + /// Max length of arguments. #[pallet::constant] type MaxArgLength: Get; } + /// By default, the application number starts from 1. #[pallet::type_value] pub fn ApplicationIDOnEmpty() -> u32 { 1 } + + /// The next available application id. #[pallet::storage] #[pallet::getter(fn next_application_id)] pub type NextApplicationID = StorageValue<_, u32, ValueQuery, ApplicationIDOnEmpty>; + /// Url storage. #[pallet::storage] #[pallet::getter(fn default_url)] pub type DefaultUrl = StorageValue<_, BoundedVec, OptionQuery>; + /// Registered application information, map of app_id:app_info. #[pallet::storage] #[pallet::getter(fn appinfo_map)] pub type APPInfoMap = StorageMap<_, Twox64Concat, u32, APPInfo, OptionQuery>; @@ -123,14 +147,23 @@ pub mod pallet { #[pallet::generate_deposit(pub(super) fn deposit_event)] pub enum Event { ReisterApp { + /// Account of register client. creator: T::AccountId, + /// Assigned app id. appid: u32, + /// Project name. project_name: BoundedVec, + /// File name of consensus client. consensus_client: BoundedVec, + /// Hash of consensus client. consensus_hash: Hash, + /// File size of consensus client. consensus_size: u32, + /// File name of batcher client. batch_client: BoundedVec, + /// Hash of batcher client. batch_hash: Hash, + /// File size of batcher client. batch_size: u32, }, SetDownloadURL { @@ -146,6 +179,8 @@ pub mod pallet { #[pallet::hooks] impl Hooks> for Pallet { + /// The logic executed by each parachain block queries how many groups there are and binds + /// the registered applications to the groups. One application is bound to one group. fn on_initialize(_n: BlockNumberFor) -> Weight where BlockNumberFor: From, @@ -196,6 +231,12 @@ pub mod pallet { #[pallet::call] impl Pallet { + /// Register layer2 application client. + /// + /// Parameters: + /// - `project_name`: The project name. + /// - `consensus_client`: Consensus client. + /// - `batch_client`: Batcher client. #[pallet::call_index(0)] #[pallet::weight(::WeightInfo::register_app())] pub fn register_app( @@ -255,6 +296,10 @@ pub mod pallet { Ok(()) } + /// Set url for download client binary file. + /// + /// Parameters: + /// - `url`: Url. #[pallet::call_index(1)] #[pallet::weight(::WeightInfo::set_default_url())] pub fn set_default_url( @@ -312,6 +357,7 @@ impl Pallet { }) } + // Consensus client startup at which block number. pub fn should_run() -> bool { let next_round = >::next_round(); @@ -324,6 +370,7 @@ impl Pallet { } } + // Get sequencer group id. pub fn get_group_id(author: T::AccountId) -> u32 { let group_id_result = >::account_in_group(author); if let Ok(group_id) = group_id_result { @@ -333,10 +380,14 @@ impl Pallet { 0xFFFFFFFF } } + + // Get the assigned group id. pub fn get_groups() -> Vec { >::all_group_ids() } + // Whether the account running the current node has been assigned a group, whether it is a + // processor, and whether the IP meets the requirements. pub fn processor_run(author: T::AccountId, ip_address: Vec) -> Vec { // let processors = >::get_group_ids(author); let mut download_infos: Vec = Vec::new(); diff --git a/primitives/container/src/lib.rs b/primitives/container/src/lib.rs index f257a26..5a64a5e 100644 --- a/primitives/container/src/lib.rs +++ b/primitives/container/src/lib.rs @@ -3,30 +3,52 @@ use codec::{Codec, Decode, Encode}; use scale_info::TypeInfo; use sp_core::H256; use sp_std::vec::Vec; + +/// Client info of sequencer. #[derive(Debug, Clone, TypeInfo, Encode, Decode, Default)] pub struct DownloadInfo { + /// App id. pub app_id: u32, + /// App hash. pub app_hash: H256, + /// File name of app. pub file_name: Vec, + /// File size of app. pub size: u32, + /// Group id of app. pub group: u32, + /// Url of download binary client file. pub url: Vec, + /// Arguments of startup client. pub args: Option>, + /// Log file of startup client. pub log: Option>, + /// Is starup of docker container. pub is_docker_image: bool, + /// Docker image pub docker_image: Option>, } +/// Client info of processor. #[derive(Debug, Clone, TypeInfo, Encode, Decode, Default)] pub struct ProcessorDownloadInfo { + /// App id. pub app_id: u32, + /// App hash. pub app_hash: H256, + /// File name of app. pub file_name: Vec, + /// File size of app. pub size: u32, + /// Url of download binary client file. pub url: Vec, + /// Arguments of startup client. pub args: Option>, + /// Log file of startup client. pub log: Option>, + /// Is starup of docker container. pub is_docker_image: bool, + /// Docker image pub docker_image: Option>, }