diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 00000000..64771164 Binary files /dev/null and b/.DS_Store differ diff --git a/.github/.DS_Store b/.github/.DS_Store new file mode 100644 index 00000000..627bd5a0 Binary files /dev/null and b/.github/.DS_Store differ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..acba7998 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,48 @@ +# Use Ubuntu as the base image +FROM debian:latest + +# Avoid prompts from apt +ENV DEBIAN_FRONTEND=noninteractive +ENV TERM=xterm + +# Install required packages +RUN apt-get update && \ + apt-get install -y curl build-essential pkg-config libssl-dev git protobuf-compiler clang libclang-dev llvm-dev librocksdb-dev jq make && \ + rm -rf /var/lib/apt/lists/* + +# Install Rust +RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y +ENV PATH="/root/.cargo/bin:${PATH}" + +# Verify Rust installation +RUN rustc --version && cargo --version + +# Copy the source code into the image +COPY . /app +WORKDIR /app + +# Build the Rust project +RUN cargo build --release && \ + rm -rf /app/target/debug + +# Expose the necessary ports +EXPOSE 30333 30343 9944 + +# Make the release and scripts directories and its contents executable +# RUN chmod +x /app/target/release +# RUN chmod +x /app/scripts + +RUN chmod +x /app + + +# Keep the container running +CMD ["/bin/sh", "-c", "/app/scripts/run_nodes.sh"] +# CMD ["tail", "-f", "/dev/null"] +# CMD ["/app/scripts/run_nodes.sh"] + +# # Copy and make the entrypoint script executable +# COPY entrypoint.sh /usr/local/bin/entrypoint.sh +# RUN chmod +x /usr/local/bin/entrypoint.sh + +# # Command to run when the container starts +# ENTRYPOINT ["/usr/local/bin/entrypoint.sh"] diff --git a/aleph-client/.DS_Store b/aleph-client/.DS_Store new file mode 100644 index 00000000..35caf377 Binary files /dev/null and b/aleph-client/.DS_Store differ diff --git a/baby-liminal-extension/.DS_Store b/baby-liminal-extension/.DS_Store new file mode 100644 index 00000000..3d0df820 Binary files /dev/null and b/baby-liminal-extension/.DS_Store differ diff --git a/bin/.DS_Store b/bin/.DS_Store new file mode 100644 index 00000000..98ab5eb7 Binary files /dev/null and b/bin/.DS_Store differ diff --git a/bin/node/src/service.rs b/bin/node/src/service.rs index b18f76ab..e78ae0e1 100644 --- a/bin/node/src/service.rs +++ b/bin/node/src/service.rs @@ -7,10 +7,10 @@ use std::{ use fake_runtime_api::fake_runtime::RuntimeApi; use finality_aleph::{ - build_network, get_aleph_block_import, run_validator_node, AlephConfig, AllBlockMetrics, - BlockImporter, BuildNetworkOutput, ChannelProvider, FavouriteSelectChainProvider, - Justification, JustificationTranslator, MillisecsPerBlock, RateLimiterConfig, - RedirectingBlockImport, SessionPeriod, SubstrateChainStatus, SyncOracle, ValidatorAddressCache, + build_network, get_aleph_block_import, run_validator_node, AlephConfig, BlockImporter, + BuildNetworkOutput, ChannelProvider, FavouriteSelectChainProvider, Justification, + JustificationTranslator, MillisecsPerBlock, RateLimiterConfig, RedirectingBlockImport, + SessionPeriod, SubstrateChainStatus, SyncOracle, ValidatorAddressCache, }; use log::warn; use pallet_aleph_runtime_api::AlephSessionApi; @@ -51,7 +51,6 @@ pub struct ServiceComponents { pub keystore_container: KeystoreContainer, pub justification_channel_provider: ChannelProvider, pub telemetry: Option, - pub metrics: AllBlockMetrics, } struct LimitNonfinalized(u32); @@ -133,14 +132,12 @@ pub fn new_partial(config: &Configuration) -> Result Result; } +impl pallet_template::Config for Runtime { + type RuntimeEvent = RuntimeEvent; + //type WeightInfo = pallet_template::weights::SubstrateWeight; +} + // Create the runtime by composing the FRAME pallets that were previously configured. construct_runtime!( pub struct Runtime { @@ -1005,6 +1010,7 @@ construct_runtime!( SafeMode: pallet_safe_mode = 25, TxPause: pallet_tx_pause = 26, Operations: pallet_operations = 255, + TemplateModule: crate::pallet_template::{Pallet, Call, Storage, Event} = 50 } ); @@ -1046,6 +1052,7 @@ mod benches { [pallet_feature_control, FeatureControl] [pallet_vk_storage, VkStorage] [baby_liminal_extension, baby_liminal_extension::ChainExtensionBenchmarking] + [pallet_template, TemplateModule] ); } @@ -1607,3 +1614,159 @@ mod tests { assert!(lhs < rhs); } } + + +//#![cfg_attr(not(feature = "std"), no_std)] + +#[frame_support::pallet] +pub mod pallet_template { + use frame_support::{pallet_prelude::*, traits::StorageVersion}; //, sp_runtime::RuntimeAppPublic}; + use frame_system::pallet_prelude::*; + use scale_info::prelude::vec::Vec; + + const STORAGE_VERSION: StorageVersion = StorageVersion::new(1); + + #[pallet::pallet] + #[pallet::storage_version(STORAGE_VERSION)] + pub struct Pallet(_); + + #[pallet::config] + pub trait Config: frame_system::Config { + type RuntimeEvent: From> + IsType<::RuntimeEvent>; + } + + #[derive(Encode, Decode, MaxEncodedLen, TypeInfo, Debug, Clone, PartialEq, Eq)] + pub struct FSEvent { + pub eventtype: [u8; 64], + pub creationtime: [u8; 64], + pub filepath: [u8; 256], + pub eventkey: [u8; 128], + } + + #[pallet::storage] + #[pallet::getter(fn info)] + // pub(super) type DisReAssembly = StorageMap<_, Blake2_128Concat, T::AccountId, FSEvent, OptionQuery>; + pub(super) type DisReAssembly = StorageDoubleMap< _, Blake2_128Concat, T::AccountId, Blake2_128Concat, u64, FSEvent, OptionQuery>; + + #[pallet::storage] + #[pallet::getter(fn nonces)] + pub(super) type Nonces = StorageMap<_, Blake2_128Concat, T::AccountId, u64, ValueQuery>; + + #[pallet::event] + #[pallet::generate_deposit(pub(super) fn deposit_event)] + pub enum Event { + FileDisassembled { who: T::AccountId, event: FSEvent }, + FileReassembled { who: T::AccountId, event: FSEvent }, + } + + #[pallet::error] + pub enum Error { + EventTypeTooLong, + CreationTimeTooLong, + FilePathTooLong, + EventKeyTooLong, + } + + #[pallet::call] + impl Pallet { + #[pallet::call_index(0)] + #[pallet::weight((Weight::from_parts(10_000, 0) + T::DbWeight::get().writes(1), DispatchClass::Operational))] + pub fn disassembled( + origin: OriginFor, + event_type: Vec, + creation_time: Vec, + file_path: Vec, + event_key: Vec, + ) -> DispatchResult { + let sender = ensure_signed(origin)?; + + ensure!(event_type.len() <= 64, Error::::EventTypeTooLong); + ensure!(creation_time.len() <= 64, Error::::CreationTimeTooLong); + ensure!(file_path.len() <= 256, Error::::FilePathTooLong); + ensure!(event_key.len() <= 128, Error::::EventKeyTooLong); + + let event = FSEvent { + eventtype: { + let mut arr = [0u8; 64]; + arr[..event_type.len()].copy_from_slice(&event_type); + arr + }, + creationtime: { + let mut arr = [0u8; 64]; + arr[..creation_time.len()].copy_from_slice(&creation_time); + arr + }, + filepath: { + let mut arr = [0u8; 256]; + arr[..file_path.len()].copy_from_slice(&file_path); + arr + }, + eventkey: { + let mut arr = [0u8; 128]; + arr[..event_key.len()].copy_from_slice(&event_key); + arr + }, + }; + + let nonce = Nonces::::get(&sender); + >::insert(&sender, nonce, &event); + Nonces::::insert(&sender, nonce + 1); + + // >::insert(&sender, &event); + + Self::deposit_event(Event::::FileDisassembled { who: sender.clone(), event: event.clone() }); + + Ok(()) + } + + #[pallet::call_index(1)] + #[pallet::weight((Weight::from_parts(10_000, 0) + T::DbWeight::get().writes(1), DispatchClass::Operational))] + pub fn reassembled( + origin: OriginFor, + event_type: Vec, + creation_time: Vec, + file_path: Vec, + event_key: Vec, + ) -> DispatchResult { + let sender = ensure_signed(origin)?; + + ensure!(event_type.len() <= 64, Error::::EventTypeTooLong); + ensure!(creation_time.len() <= 64, Error::::CreationTimeTooLong); + ensure!(file_path.len() <= 256, Error::::FilePathTooLong); + ensure!(event_key.len() <= 128, Error::::EventKeyTooLong); + + let event = FSEvent { + eventtype: { + let mut arr = [0u8; 64]; + arr[..event_type.len()].copy_from_slice(&event_type); + arr + }, + creationtime: { + let mut arr = [0u8; 64]; + arr[..creation_time.len()].copy_from_slice(&creation_time); + arr + }, + filepath: { + let mut arr = [0u8; 256]; + arr[..file_path.len()].copy_from_slice(&file_path); + arr + }, + eventkey: { + let mut arr = [0u8; 128]; + arr[..event_key.len()].copy_from_slice(&event_key); + arr + }, + }; + + let nonce = Nonces::::get(&sender); + >::insert(&sender, nonce, &event); + Nonces::::insert(&sender, nonce + 1); + + // >::insert(&sender, &event); + + Self::deposit_event(Event::::FileReassembled { who: sender.clone(), event: event.clone() }); + + Ok(()) + } + } +} diff --git a/docker/Dockerfile b/docker/Dockerfile index 04bad0af..d4711d27 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -7,12 +7,12 @@ RUN apt update && \ EXPOSE 30333 30343 9944 -WORKDIR node +WORKDIR /node -COPY target/release/aleph-node /usr/local/bin +COPY ../target/release/aleph-node /usr/local/bin RUN chmod +x /usr/local/bin/aleph-node -COPY docker/docker_entrypoint.sh /node/docker_entrypoint.sh +COPY docker_entrypoint.sh /node/docker_entrypoint.sh RUN chmod +x /node/docker_entrypoint.sh ENTRYPOINT ["./docker_entrypoint.sh"] diff --git a/e2e-tests/.DS_Store b/e2e-tests/.DS_Store new file mode 100644 index 00000000..e2d14888 Binary files /dev/null and b/e2e-tests/.DS_Store differ diff --git a/entrypoint.sh b/entrypoint.sh new file mode 100644 index 00000000..ffd375ad --- /dev/null +++ b/entrypoint.sh @@ -0,0 +1,9 @@ +#!/bin/sh + + +# Start the aleph-node +/usr/local/bin/scripts/run_nodes.sh + + +# Keep the container running +tail -f /dev/null \ No newline at end of file diff --git a/finality-aleph/src/block/substrate/chain_status.rs b/finality-aleph/src/block/substrate/chain_status.rs index 812136b9..45c3a7a9 100644 --- a/finality-aleph/src/block/substrate/chain_status.rs +++ b/finality-aleph/src/block/substrate/chain_status.rs @@ -64,6 +64,8 @@ impl Display for Error { } } +impl std::error::Error for Error {} + impl From for Error { fn from(value: BackendError) -> Self { Error::Backend(value) @@ -159,6 +161,17 @@ impl SubstrateChainStatus { fn finalized_hash(&self) -> AlephHash { self.info().finalized_hash } + + /// Computes lowest common ancestor between two blocks. Warning: complexity + /// O(distance between blocks). + pub fn lowest_common_ancestor(&self, from: &BlockId, to: &BlockId) -> Result { + let result = sp_blockchain::lowest_common_ancestor( + self.backend.blockchain(), + from.hash(), + to.hash(), + )?; + Ok((result.hash, result.number).into()) + } } impl ChainStatus for SubstrateChainStatus { diff --git a/finality-aleph/src/block/substrate/mod.rs b/finality-aleph/src/block/substrate/mod.rs index d69651d9..aab2729a 100644 --- a/finality-aleph/src/block/substrate/mod.rs +++ b/finality-aleph/src/block/substrate/mod.rs @@ -5,7 +5,7 @@ use sp_runtime::traits::{CheckedSub, Header as _, One}; use crate::{ aleph_primitives::{Block, Header}, block::{Block as BlockT, BlockId, BlockImport, Header as HeaderT, UnverifiedHeader}, - metrics::{AllBlockMetrics, Checkpoint}, + metrics::TimingBlockMetrics, }; mod chain_status; @@ -19,9 +19,12 @@ pub use justification::{ InnerJustification, Justification, JustificationTranslator, TranslateError, }; pub use status_notifier::SubstrateChainStatusNotifier; -pub use verification::{SessionVerifier, SubstrateFinalizationInfo, VerifierCache}; +pub use verification::{SubstrateFinalizationInfo, VerifierCache}; -use crate::block::{BestBlockSelector, BlockchainEvents}; +use crate::{ + block::{BestBlockSelector, BlockchainEvents}, + metrics::Checkpoint, +}; const LOG_TARGET: &str = "aleph-substrate"; @@ -60,18 +63,18 @@ impl HeaderT for Header { /// Wrapper around the trait object that we get from Substrate. pub struct BlockImporter { importer: Box>, - metrics: AllBlockMetrics, + metrics: TimingBlockMetrics, } impl BlockImporter { pub fn new(importer: Box>) -> Self { Self { importer, - metrics: AllBlockMetrics::new(None), + metrics: TimingBlockMetrics::noop(), } } - pub fn attach_metrics(&mut self, metrics: AllBlockMetrics) { + pub fn attach_metrics(&mut self, metrics: TimingBlockMetrics) { self.metrics = metrics; } } @@ -85,7 +88,6 @@ impl BlockImport for BlockImporter { false => BlockOrigin::NetworkBroadcast, }; let hash = block.header.hash(); - let number = *block.header.number(); let incoming_block = IncomingBlock:: { hash, header: Some(block.header), @@ -98,8 +100,7 @@ impl BlockImport for BlockImporter { import_existing: false, state: None, }; - self.metrics - .report_block(BlockId::new(hash, number), Checkpoint::Importing, Some(own)); + self.metrics.report_block(hash, Checkpoint::Importing); self.importer.import_blocks(origin, vec![incoming_block]); } } diff --git a/finality-aleph/src/data_io/data_provider.rs b/finality-aleph/src/data_io/data_provider.rs index ddcf086b..7a8a0be3 100644 --- a/finality-aleph/src/data_io/data_provider.rs +++ b/finality-aleph/src/data_io/data_provider.rs @@ -9,7 +9,7 @@ use crate::{ aleph_primitives::BlockNumber, block::{BestBlockSelector, Header, HeaderBackend, UnverifiedHeader}, data_io::{proposal::UnvalidatedAlephProposal, AlephData, MAX_DATA_BRANCH_LEN}, - metrics::{AllBlockMetrics, Checkpoint}, + metrics::{Checkpoint, TimingBlockMetrics}, party::manager::Runnable, BlockId, SessionBoundaries, }; @@ -161,7 +161,7 @@ where client: C, session_boundaries: SessionBoundaries, config: ChainTrackerConfig, - metrics: AllBlockMetrics, + metrics: TimingBlockMetrics, ) -> (Self, DataProvider) { let data_to_propose = Arc::new(Mutex::new(None)); ( @@ -317,7 +317,7 @@ where #[derive(Clone)] pub struct DataProvider { data_to_propose: Arc>>>, - metrics: AllBlockMetrics, + metrics: TimingBlockMetrics, } // Honest nodes propose data in session `k` as follows: @@ -335,7 +335,7 @@ impl DataProvider { if let Some(data) = &data_to_propose { let top_block = data.head_proposal.top_block(); self.metrics - .report_block(top_block, Checkpoint::Proposed, None); + .report_block(top_block.hash(), Checkpoint::Proposed); debug!(target: LOG_TARGET, "Outputting {:?} in get_data", data); }; @@ -355,7 +355,7 @@ mod tests { data_provider::{ChainTracker, ChainTrackerConfig}, AlephData, DataProvider, MAX_DATA_BRANCH_LEN, }, - metrics::AllBlockMetrics, + metrics::TimingBlockMetrics, party::manager::Runnable, testing::{ client_chain_builder::ClientChainBuilder, @@ -394,7 +394,7 @@ mod tests { client, session_boundaries, config, - AllBlockMetrics::new(None), + TimingBlockMetrics::noop(), ); let (exit_chain_tracker_tx, exit_chain_tracker_rx) = oneshot::channel(); diff --git a/finality-aleph/src/finalization.rs b/finality-aleph/src/finalization.rs index 5244a1ae..8f9d2877 100644 --- a/finality-aleph/src/finalization.rs +++ b/finality-aleph/src/finalization.rs @@ -11,7 +11,6 @@ use sp_runtime::{ use crate::{ aleph_primitives::{BlockHash, BlockNumber}, - metrics::{AllBlockMetrics, Checkpoint}, BlockId, }; @@ -26,7 +25,6 @@ where C: HeaderBackend + LockImportRun + Finalizer, { client: Arc, - metrics: AllBlockMetrics, phantom: PhantomData<(B, BE)>, } @@ -36,10 +34,9 @@ where BE: Backend, C: HeaderBackend + LockImportRun + Finalizer, { - pub(crate) fn new(client: Arc, metrics: AllBlockMetrics) -> Self { + pub(crate) fn new(client: Arc) -> Self { AlephFinalizer { client, - metrics, phantom: PhantomData, } } @@ -74,8 +71,6 @@ where match &update_res { Ok(_) => { debug!(target: "aleph-finality", "Successfully finalized block with hash {:?} and number {:?}. Current best: #{:?}.", hash, number, status.best_number); - self.metrics - .report_block(block, Checkpoint::Finalized, None); } Err(_) => { debug!(target: "aleph-finality", "Failed to finalize block with hash {:?} and number {:?}. Current best: #{:?}.", hash, number, status.best_number) diff --git a/finality-aleph/src/import.rs b/finality-aleph/src/import.rs index 0d24ed43..e9c6394c 100644 --- a/finality-aleph/src/import.rs +++ b/finality-aleph/src/import.rs @@ -9,14 +9,13 @@ use sc_consensus::{ BlockCheckParams, BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult, JustificationImport, }; -use sp_consensus::{BlockOrigin, Error as ConsensusError, SelectChain}; +use sp_consensus::{Error as ConsensusError, SelectChain}; use sp_runtime::{traits::Header as HeaderT, Justification as SubstrateJustification}; use crate::{ aleph_primitives::{Block, BlockHash, BlockNumber, ALEPH_ENGINE_ID}, block::substrate::{Justification, JustificationTranslator, TranslateError}, justification::{backwards_compatible_decode, DecodeError}, - metrics::{AllBlockMetrics, Checkpoint}, BlockId, }; @@ -26,14 +25,12 @@ pub fn get_aleph_block_import( justification_tx: UnboundedSender, translator: JustificationTranslator, select_chain: SC, - metrics: AllBlockMetrics, ) -> impl BlockImport + JustificationImport + Clone where I: BlockImport + Send + Sync + Clone, SC: SelectChain + Send + Sync, { - let tracing_import = TracingBlockImport::new(inner, metrics); - let favourite_marker_import = FavouriteMarkerBlockImport::new(tracing_import, select_chain); + let favourite_marker_import = FavouriteMarkerBlockImport::new(inner, select_chain); AlephBlockImport::new(favourite_marker_import, justification_tx, translator) } @@ -92,68 +89,6 @@ where } } -/// A wrapper around a block import that also marks the start and end of the import of every block -/// in the metrics, if provided. -#[derive(Clone)] -struct TracingBlockImport -where - I: BlockImport + Send + Sync, -{ - inner: I, - metrics: AllBlockMetrics, -} - -impl TracingBlockImport -where - I: BlockImport + Send + Sync, -{ - pub fn new(inner: I, metrics: AllBlockMetrics) -> Self { - TracingBlockImport { inner, metrics } - } -} - -#[async_trait::async_trait] -impl BlockImport for TracingBlockImport -where - I: BlockImport + Send + Sync, -{ - type Error = I::Error; - - async fn check_block( - &mut self, - block: BlockCheckParams, - ) -> Result { - self.inner.check_block(block).await - } - - async fn import_block( - &mut self, - block: BlockImportParams, - ) -> Result { - let post_hash = block.post_hash(); - let number = *block.post_header().number(); - let is_own = block.origin == BlockOrigin::Own; - // Self-created blocks are imported without using the import queue, - // so we need to report them here. - self.metrics.report_block( - BlockId::new(post_hash, number), - Checkpoint::Importing, - Some(is_own), - ); - - let result = self.inner.import_block(block).await; - - if let Ok(ImportResult::Imported(_)) = &result { - self.metrics.report_block( - BlockId::new(post_hash, number), - Checkpoint::Imported, - Some(is_own), - ); - } - result - } -} - /// A wrapper around a block import that also extracts any present justifications and sends them to /// our components which will process them further and possibly finalize the block. #[derive(Clone)] diff --git a/finality-aleph/src/lib.rs b/finality-aleph/src/lib.rs index 8a6ce39a..73d0fdd7 100644 --- a/finality-aleph/src/lib.rs +++ b/finality-aleph/src/lib.rs @@ -70,7 +70,6 @@ pub use crate::{ }, import::{get_aleph_block_import, AlephBlockImport, RedirectingBlockImport}, justification::AlephJustification, - metrics::{AllBlockMetrics, DefaultClock, FinalityRateMetrics, TimingBlockMetrics}, network::{ address_cache::{ValidatorAddressCache, ValidatorAddressingInfo}, build_network, BuildNetworkOutput, ProtocolNetwork, SubstratePeerId, @@ -271,7 +270,6 @@ pub struct AlephConfig { pub keystore: Arc, pub justification_channel_provider: ChannelProvider, pub block_rx: mpsc::UnboundedReceiver, - pub metrics: AllBlockMetrics, pub registry: Option, pub session_period: SessionPeriod, pub millisecs_per_block: MillisecsPerBlock, diff --git a/finality-aleph/src/metrics/all_block.rs b/finality-aleph/src/metrics/all_block.rs deleted file mode 100644 index ac663840..00000000 --- a/finality-aleph/src/metrics/all_block.rs +++ /dev/null @@ -1,53 +0,0 @@ -use log::warn; -use substrate_prometheus_endpoint::Registry; - -use super::{finality_rate::FinalityRateMetrics, timing::DefaultClock, Checkpoint}; -use crate::{metrics::LOG_TARGET, BlockId, TimingBlockMetrics}; - -/// Wrapper around various block-related metrics. -#[derive(Clone)] -pub struct AllBlockMetrics { - timing_metrics: TimingBlockMetrics, - finality_rate_metrics: FinalityRateMetrics, -} - -impl AllBlockMetrics { - pub fn new(registry: Option<&Registry>) -> Self { - let timing_metrics = match TimingBlockMetrics::new(registry, DefaultClock) { - Ok(timing_metrics) => timing_metrics, - Err(e) => { - warn!( - target: LOG_TARGET, - "Failed to register Prometheus block timing metrics: {:?}.", e - ); - TimingBlockMetrics::Noop - } - }; - let finality_rate_metrics = match FinalityRateMetrics::new(registry) { - Ok(finality_rate_metrics) => finality_rate_metrics, - Err(e) => { - warn!( - target: LOG_TARGET, - "Failed to register Prometheus finality rate metrics: {:?}.", e - ); - FinalityRateMetrics::Noop - } - }; - AllBlockMetrics { - timing_metrics, - finality_rate_metrics, - } - } - - /// Triggers all contained block metrics. - pub fn report_block(&self, block_id: BlockId, checkpoint: Checkpoint, own: Option) { - self.timing_metrics - .report_block(block_id.hash(), checkpoint); - self.finality_rate_metrics.report_block( - block_id.hash(), - block_id.number(), - checkpoint, - own, - ); - } -} diff --git a/finality-aleph/src/metrics/best_block.rs b/finality-aleph/src/metrics/best_block.rs new file mode 100644 index 00000000..a05a69cc --- /dev/null +++ b/finality-aleph/src/metrics/best_block.rs @@ -0,0 +1,99 @@ +use std::error::Error; + +use substrate_prometheus_endpoint::{ + register, Gauge, Histogram, HistogramOpts, PrometheusError, Registry, U64, +}; + +use crate::{BlockId, BlockNumber, SubstrateChainStatus}; + +#[derive(Clone)] +pub enum BestBlockMetrics { + Prometheus { + top_finalized_block: Gauge, + best_block: Gauge, + reorgs: Histogram, + best_block_id: BlockId, + chain_status: SubstrateChainStatus, + }, + Noop, +} + +impl BestBlockMetrics { + pub fn new( + registry: Option, + chain_status: SubstrateChainStatus, + ) -> Result { + let registry = match registry { + Some(registry) => registry, + None => return Ok(Self::Noop), + }; + + Ok(Self::Prometheus { + top_finalized_block: register( + Gauge::new("aleph_top_finalized_block", "Top finalized block number")?, + ®istry, + )?, + best_block: register( + Gauge::new( + "aleph_best_block", + "Best (or more precisely, favourite) block number", + )?, + ®istry, + )?, + reorgs: register( + Histogram::with_opts( + HistogramOpts::new("aleph_reorgs", "Number of reorgs by length") + .buckets(vec![1., 2., 4., 9.]), + )?, + ®istry, + )?, + best_block_id: (Default::default(), 0u32).into(), + chain_status, + }) + } + + pub fn report_best_block_imported(&mut self, block_id: BlockId) { + if let Self::Prometheus { + best_block, + ref mut best_block_id, + reorgs, + chain_status, + .. + } = self + { + let reorg_len = retracted_path_length(chain_status, best_block_id, &block_id); + best_block.set(block_id.number() as u64); + *best_block_id = block_id; + match reorg_len { + Ok(0) => {} + Ok(reorg_len) => { + reorgs.observe(reorg_len as f64); + } + Err(e) => { + log::warn!("Failed to calculate reorg length: {:?}", e); + } + } + } + } + + pub fn report_block_finalized(&self, block_id: BlockId) { + if let Self::Prometheus { + top_finalized_block, + .. + } = self + { + top_finalized_block.set(block_id.number() as u64); + } + } +} + +fn retracted_path_length( + chain_status: &SubstrateChainStatus, + from: &BlockId, + to: &BlockId, +) -> Result> { + let lca = chain_status + .lowest_common_ancestor(from, to) + .map_err(Box::new)?; + Ok(from.number().saturating_sub(lca.number())) +} diff --git a/finality-aleph/src/metrics/chain_state.rs b/finality-aleph/src/metrics/chain_state.rs deleted file mode 100644 index e8b69b92..00000000 --- a/finality-aleph/src/metrics/chain_state.rs +++ /dev/null @@ -1,682 +0,0 @@ -use std::{ - fmt::Display, - num::NonZeroUsize, - time::{Duration, Instant}, -}; - -use futures::{stream::FusedStream, StreamExt}; -use lru::LruCache; -use sc_client_api::{ - BlockBackend, BlockImportNotification, FinalityNotification, FinalityNotifications, - ImportNotifications, -}; -use sp_blockchain::{lowest_common_ancestor, HeaderMetadata}; -use sp_runtime::{ - traits::{Block as BlockT, Extrinsic, Header as HeaderT, Zero}, - Saturating, -}; -use substrate_prometheus_endpoint::{ - register, Counter, Gauge, Histogram, HistogramOpts, PrometheusError, Registry, U64, -}; - -use crate::{ - metrics::{exponential_buckets_two_sided, TransactionPoolInfoProvider}, - BlockNumber, -}; - -// Size of transaction cache: 32B (Hash) + 16B (Instant) * `100_000` is approximately 4.8MB -const TRANSACTION_CACHE_SIZE: usize = 100_000; - -const BUCKETS_FACTOR: f64 = 1.4; - -#[derive(Debug)] -pub enum Error { - NoRegistry, - UnableToCreateMetrics(PrometheusError), - BlockImportStreamClosed, - FinalizedBlocksStreamClosed, - TransactionStreamClosed, -} - -impl Display for Error { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Error::NoRegistry => write!(f, "Registry can not be empty."), - Error::UnableToCreateMetrics(e) => { - write!(f, "Failed to create metrics: {e}.") - } - Error::BlockImportStreamClosed => { - write!(f, "Block import notification stream ended unexpectedly.") - } - Error::FinalizedBlocksStreamClosed => { - write!(f, "Finality notification stream ended unexpectedly.") - } - Error::TransactionStreamClosed => { - write!(f, "Transaction stream ended unexpectedly.") - } - } - } -} - -enum ChainStateMetrics { - Prometheus { - top_finalized_block: Gauge, - best_block: Gauge, - reorgs: Histogram, - time_till_block_inclusion: Histogram, - transactions_not_seen_in_the_pool: Counter, - }, - Noop, -} - -impl ChainStateMetrics { - fn new(registry: Option) -> Result { - let registry = match registry { - Some(registry) => registry, - None => return Ok(ChainStateMetrics::Noop), - }; - - Ok(ChainStateMetrics::Prometheus { - top_finalized_block: register( - Gauge::new("aleph_top_finalized_block", "no help")?, - ®istry, - )?, - best_block: register(Gauge::new("aleph_best_block", "no help")?, ®istry)?, - reorgs: register( - Histogram::with_opts( - HistogramOpts::new("aleph_reorgs", "Number of reorgs by length") - .buckets(vec![1., 2., 4., 9.]), - )?, - ®istry, - )?, - time_till_block_inclusion: register( - Histogram::with_opts( - HistogramOpts::new( - "aleph_transaction_to_block_time", - "Time from becoming ready in the pool to inclusion in some valid block.", - ) - .buckets(exponential_buckets_two_sided( - 2000.0, - BUCKETS_FACTOR, - 2, - 8, - )?), - )?, - ®istry, - )?, - transactions_not_seen_in_the_pool: register( - Counter::new("aleph_transactions_not_seen_in_the_pool", "no help")?, - ®istry, - )?, - }) - } - - fn update_best_block(&self, number: BlockNumber) { - if let ChainStateMetrics::Prometheus { best_block, .. } = self { - best_block.set(number as u64) - } - } - - fn update_top_finalized_block(&self, number: BlockNumber) { - if let ChainStateMetrics::Prometheus { - top_finalized_block, - .. - } = self - { - top_finalized_block.set(number as u64); - } - } - - fn report_reorg(&self, length: BlockNumber) { - if let ChainStateMetrics::Prometheus { reorgs, .. } = self { - reorgs.observe(length as f64); - } - } - - fn report_transaction_in_block(&self, elapsed: Duration) { - if let ChainStateMetrics::Prometheus { - time_till_block_inclusion, - .. - } = self - { - time_till_block_inclusion.observe(elapsed.as_secs_f64() * 1000.); - } - } - - fn report_transaction_not_seen_in_the_pool(&self) { - if let ChainStateMetrics::Prometheus { - transactions_not_seen_in_the_pool, - .. - } = self - { - transactions_not_seen_in_the_pool.inc(); - } - } -} - -pub async fn run_chain_state_metrics< - X, - HE: HeaderT, - B: BlockT
, - BE: HeaderMetadata + BlockBackend, - TP: TransactionPoolInfoProvider, ->( - backend: &BE, - mut import_notifications: ImportNotifications, - mut finality_notifications: FinalityNotifications, - registry: Option, - mut transaction_pool_info_provider: TP, -) -> Result<(), Error> { - if registry.is_none() { - return Err(Error::NoRegistry); - } - if import_notifications.is_terminated() { - return Err(Error::BlockImportStreamClosed); - } - if finality_notifications.is_terminated() { - return Err(Error::FinalizedBlocksStreamClosed); - } - - let metrics = ChainStateMetrics::new(registry).map_err(Error::UnableToCreateMetrics)?; - let mut cache: LruCache = LruCache::new( - NonZeroUsize::new(TRANSACTION_CACHE_SIZE).expect("the cache size is a non-zero constant"), - ); - - let mut previous_best: Option = None; - - loop { - tokio::select! { - maybe_block = import_notifications.next() => { - let block = maybe_block.ok_or(Error::BlockImportStreamClosed)?; - handle_block_imported( - block, - backend, - &metrics, - &mut transaction_pool_info_provider, - &mut cache, - &mut previous_best, - ); - }, - maybe_block = finality_notifications.next() => { - let block = maybe_block.ok_or(Error::FinalizedBlocksStreamClosed)?; - handle_block_finalized(block, &metrics); - }, - maybe_transaction = transaction_pool_info_provider.next_transaction() => { - let hash = maybe_transaction.ok_or(Error::TransactionStreamClosed)?; - handle_transaction_in_pool(hash, &mut cache); - } - } - } -} - -fn handle_block_imported< - X, - HE: HeaderT, - B: BlockT
, - BE: HeaderMetadata + BlockBackend, - TP: TransactionPoolInfoProvider, ->( - block: BlockImportNotification, - backend: &BE, - metrics: &ChainStateMetrics, - transaction_pool_info_provider: &mut TP, - cache: &mut LruCache, - previous_best: &mut Option, -) { - if block.is_new_best { - metrics.update_best_block(*block.header.number()); - if let Some(reorg_len) = detect_reorgs(backend, previous_best.clone(), block.header.clone()) - { - metrics.report_reorg(reorg_len); - } - *previous_best = Some(block.header); - } - if let Ok(Some(body)) = backend.block_body(block.hash) { - report_transactions_included_in_block( - transaction_pool_info_provider, - &body, - metrics, - cache, - ); - } -} - -fn handle_block_finalized, B: BlockT
>( - block: FinalityNotification, - metrics: &ChainStateMetrics, -) { - // Sometimes finalization can also cause best block update. However, - // RPC best block subscription won't notify about that immediately, so - // we also don't update there. Also in that case, substrate sets best_block to - // the newly finalized block (see test), so the best block will be updated - // after importing anything on the newly finalized branch. - metrics.update_top_finalized_block(*block.header.number()); -} - -fn handle_transaction_in_pool( - hash: TxHash, - cache: &mut LruCache, -) { - // Putting new transaction can evict the oldest one. However, even if the - // removed transaction was actually still in the pool, we don't have - // any guarantees that it would be eventually included in the block. - // Therefore, we ignore such transaction. - cache.put(hash, Instant::now()); -} - -fn detect_reorgs, B: BlockT
, BE: HeaderMetadata>( - backend: &BE, - prev_best: Option, - best: HE, -) -> Option { - let prev_best = prev_best?; - if best.hash() == prev_best.hash() || *best.parent_hash() == prev_best.hash() { - // Quit early when no change or the best is a child of the previous best. - return None; - } - let lca = lowest_common_ancestor(backend, best.hash(), prev_best.hash()).ok()?; - let len = prev_best.number().saturating_sub(lca.number); - if len == HE::Number::zero() { - return None; - } - Some(len) -} - -fn report_transactions_included_in_block< - 'a, - TP: TransactionPoolInfoProvider, - I: IntoIterator, ->( - pool: &'a TP, - body: I, - metrics: &ChainStateMetrics, - cache: &mut LruCache, -) where - ::TxHash: std::hash::Hash + PartialEq + Eq, -{ - for xt in body { - let hash = pool.hash_of(xt); - if let Some(insert_time) = cache.pop(&hash) { - let elapsed = insert_time.elapsed(); - metrics.report_transaction_in_block(elapsed); - } else if let Some(true) = xt.is_signed() { - // Either it was never in the pool (eg. submitted locally), or we've got BlockImport - // notification faster than transaction in pool one. The latter is more likely, - // so we report it as zero. - metrics.report_transaction_in_block(Duration::ZERO); - metrics.report_transaction_not_seen_in_the_pool(); - } - } -} - -#[cfg(test)] -mod test { - use std::{collections::HashMap, sync::Arc}; - - use futures::{FutureExt, Stream}; - use parity_scale_codec::Encode; - use sc_block_builder::BlockBuilderBuilder; - use sc_client_api::{BlockchainEvents, HeaderBackend}; - use substrate_test_runtime_client::AccountKeyring; - - use super::*; - use crate::{ - metrics::transaction_pool::test::TestTransactionPoolSetup, - testing::{ - client_chain_builder::ClientChainBuilder, - mocks::{TBlock, THash, TestClientBuilder, TestClientBuilderExt}, - }, - }; - - #[tokio::test] - async fn when_finalizing_with_reorg_best_block_is_set_to_that_finalized_block() { - let client = Arc::new(TestClientBuilder::new().build()); - let client_builder = Arc::new(TestClientBuilder::new().build()); - let mut chain_builder = ClientChainBuilder::new(client.clone(), client_builder); - - let chain_a = chain_builder - .build_and_import_branch_above(&chain_builder.genesis_hash(), 5) - .await; - - // (G) - A1 - A2 - A3 - A4 - A5; - - assert_eq!( - client.chain_info().finalized_hash, - chain_builder.genesis_hash() - ); - assert_eq!(client.chain_info().best_number, 5); - - let chain_b = chain_builder - .build_and_import_branch_above(&chain_a[0].header.hash(), 3) - .await; - chain_builder.finalize_block(&chain_b[0].header.hash()); - - // (G) - (A1) - A2 - A3 - A4 - A5 - // \ - // (B2) - B3 - B4 - - assert_eq!(client.chain_info().best_number, 2); - assert_eq!(client.chain_info().finalized_hash, chain_b[0].header.hash()); - } - - #[tokio::test] - async fn test_reorg_detection() { - let client = Arc::new(TestClientBuilder::new().build()); - let client_builder = Arc::new(TestClientBuilder::new().build()); - let mut chain_builder = ClientChainBuilder::new(client.clone(), client_builder); - - let a = chain_builder - .build_and_import_branch_above(&chain_builder.genesis_hash(), 5) - .await; - let b = chain_builder - .build_and_import_branch_above(&a[0].header.hash(), 3) - .await; - let c = chain_builder - .build_and_import_branch_above(&a[2].header.hash(), 2) - .await; - - // - C0 - C1 - // / - // G - A0 - A1 - A2 - A3 - A4 - // \ - // - B0 - B1 - B2 - - for (prev, new_best, expected) in [ - (&a[1], &a[2], None), - (&a[1], &a[4], None), - (&a[1], &a[1], None), - (&a[2], &b[0], Some(2)), - (&b[0], &a[2], Some(1)), - (&c[1], &b[2], Some(4)), - ] { - assert_eq!( - detect_reorgs( - client.as_ref(), - Some(prev.header().clone()), - new_best.header().clone() - ), - expected, - ); - } - } - - // Transaction pool metrics tests - struct TestSetup { - pub pool: TestTransactionPoolSetup, - pub metrics: ChainStateMetrics, - pub cache: LruCache, - pub block_import_notifications: - Box> + Unpin>, - pub finality_notifications: Box> + Unpin>, - } - - #[derive(PartialEq, Eq, Hash, Debug)] - enum NotificationType { - BlockImport, - Finality, - Transaction, - } - - impl TestSetup { - fn new() -> Self { - let client = Arc::new(TestClientBuilder::new().build()); - - let block_import_notifications = - Box::new(client.every_import_notification_stream().fuse()); - let finality_notifications = Box::new(client.finality_notification_stream().fuse()); - - let pool = TestTransactionPoolSetup::new(client); - - let registry = Registry::new(); - let metrics = ChainStateMetrics::new(Some(registry)).expect("metrics"); - let cache = LruCache::new(NonZeroUsize::new(10).expect("cache")); - - TestSetup { - pool, - metrics, - cache, - block_import_notifications, - finality_notifications, - } - } - - fn genesis(&self) -> THash { - self.pool.client.info().genesis_hash - } - - fn transactions_histogram(&self) -> &Histogram { - match &self.metrics { - ChainStateMetrics::Prometheus { - time_till_block_inclusion, - .. - } => time_till_block_inclusion, - _ => panic!("metrics"), - } - } - - fn process_notifications(&mut self) -> HashMap { - let mut block_imported_notifications = 0; - let mut finality_notifications = 0; - let mut transaction_notifications = 0; - - while let Some(block) = self.block_import_notifications.next().now_or_never() { - handle_block_imported( - block.expect("stream should not end"), - self.pool.client.as_ref(), - &self.metrics, - &mut self.pool.transaction_pool_info_provider, - &mut self.cache, - &mut None, - ); - block_imported_notifications += 1; - } - while let Some(finality) = self.finality_notifications.next().now_or_never() { - handle_block_finalized(finality.expect("stream should not end"), &self.metrics); - finality_notifications += 1; - } - while let Some(transaction) = self - .pool - .transaction_pool_info_provider - .next_transaction() - .now_or_never() - { - handle_transaction_in_pool( - transaction.expect("stream should not end"), - &mut self.cache, - ); - transaction_notifications += 1; - } - HashMap::from_iter(vec![ - (NotificationType::BlockImport, block_imported_notifications), - (NotificationType::Finality, finality_notifications), - (NotificationType::Transaction, transaction_notifications), - ]) - } - } - - fn blocks_imported(n: usize) -> HashMap { - HashMap::from_iter(vec![ - (NotificationType::BlockImport, n), - (NotificationType::Finality, 0), - (NotificationType::Transaction, 0), - ]) - } - fn transactions(n: usize) -> HashMap { - HashMap::from_iter(vec![ - (NotificationType::BlockImport, 0), - (NotificationType::Finality, 0), - (NotificationType::Transaction, n), - ]) - } - - const EPS: Duration = Duration::from_nanos(1); - - #[tokio::test] - async fn transactions_are_reported() { - let mut setup = TestSetup::new(); - let genesis = setup.genesis(); - let xt = setup - .pool - .extrinsic(AccountKeyring::Alice, AccountKeyring::Bob, 0); - - let time_before_submit = Instant::now(); - setup.pool.submit(&genesis, xt).await; - - assert_eq!( - setup.process_notifications(), - transactions(1), - "'In pool' notification wasn't sent" - ); - let time_after_submit = Instant::now(); - - tokio::time::sleep(Duration::from_millis(20)).await; - - let time_before_import = Instant::now(); - let _block_1 = setup.pool.propose_block(genesis, None).await; - let pre_count = setup.transactions_histogram().get_sample_count(); - - assert_eq!( - setup.process_notifications(), - blocks_imported(1), - "Block import notification wasn't sent" - ); - - let time_after_import = Instant::now(); - - let duration = - Duration::from_secs_f64(setup.transactions_histogram().get_sample_sum() / 1000.); - - assert_eq!(pre_count, 0); - assert_eq!(setup.transactions_histogram().get_sample_count(), 1); - assert!(duration >= time_before_import - time_after_submit - EPS); - assert!(duration <= time_after_import - time_before_submit + EPS); - } - - #[tokio::test] - async fn transactions_are_reported_only_if_ready_when_added_to_the_pool() { - let mut setup = TestSetup::new(); - let genesis = setup.genesis(); - - let xt1 = setup - .pool - .extrinsic(AccountKeyring::Alice, AccountKeyring::Bob, 0); - let xt2 = setup - .pool - .extrinsic(AccountKeyring::Alice, AccountKeyring::Bob, 1); - let xt3 = setup - .pool - .extrinsic(AccountKeyring::Alice, AccountKeyring::Bob, 2); - - setup.pool.submit(&genesis, xt2.clone()).await; - - // No notification for xt2 as it is not ready - assert_eq!( - setup.process_notifications(), - transactions(0), - "Future transactions should not be reported" - ); - - setup.pool.submit(&genesis, xt1.clone()).await; - setup.pool.submit(&genesis, xt3.clone()).await; - - // Notifications for xt1 and xt3 - assert_eq!(setup.process_notifications(), transactions(2)); - - let block_1 = setup.pool.propose_block(genesis, None).await; - // Block import notification. xt1 notification never appears - assert_eq!(setup.process_notifications(), blocks_imported(1)); - // All 3 extrinsics are included in the block - assert_eq!(block_1.extrinsics.len(), 3); - } - - #[tokio::test] - async fn retracted_transactions_are_reported_only_once() { - let mut setup = TestSetup::new(); - let genesis = setup.genesis(); - - let xt1 = setup - .pool - .extrinsic(AccountKeyring::Alice, AccountKeyring::Bob, 0); - let xt2 = setup - .pool - .extrinsic(AccountKeyring::Charlie, AccountKeyring::Dave, 0); - - setup.pool.submit(&genesis, xt1.clone()).await; - setup.pool.submit(&genesis, xt2.clone()).await; - - // make sure import notifications are received before block import - assert_eq!(setup.process_notifications(), transactions(2)); - - let block_1a = setup.pool.propose_block(genesis, None).await; - assert_eq!(block_1a.extrinsics.len(), 2); - assert_eq!(setup.process_notifications(), blocks_imported(1)); - assert_eq!(setup.transactions_histogram().get_sample_count(), 2); - - let sum_before = setup.transactions_histogram().get_sample_sum(); - - // external fork block with xt1 - let mut block_1b_builder = BlockBuilderBuilder::new(&*setup.pool.client) - .on_parent_block(genesis) - .with_parent_block_number(0) - .build() - .unwrap(); - - block_1b_builder.push(xt1.into()).unwrap(); - let block_1b = block_1b_builder.build().unwrap().block; - setup.pool.import_block(block_1b.clone()).await; - setup.pool.finalize(block_1b.hash()).await; - - let block_2b = setup.pool.propose_block(block_1b.hash(), None).await; - - assert_eq!(block_2b.extrinsics.len(), 1); - assert_eq!(setup.transactions_histogram().get_sample_count(), 2); - assert_eq!(setup.transactions_histogram().get_sample_sum(), sum_before); - } - - #[tokio::test] - async fn transactions_skipped_in_block_authorship_are_not_reported_at_that_time() { - let mut setup = TestSetup::new(); - let genesis = setup.genesis(); - - let xt1 = setup - .pool - .extrinsic(AccountKeyring::Alice, AccountKeyring::Bob, 0); - let xt2 = setup - .pool - .extrinsic(AccountKeyring::Charlie, AccountKeyring::Eve, 0); - - setup.pool.submit(&genesis, xt1.clone()).await; - setup.pool.submit(&genesis, xt2.clone()).await; - assert_eq!(setup.process_notifications(), transactions(2)); - - let time_after_submit = Instant::now(); - - let block_1 = setup - .pool - .propose_block(genesis, Some(2 * xt1.encoded_size() - 1)) - .await; - - assert_eq!(setup.process_notifications(), blocks_imported(1)); - assert_eq!(block_1.extrinsics.len(), 1); - assert_eq!(setup.transactions_histogram().get_sample_count(), 1); - let sample_1 = setup.transactions_histogram().get_sample_sum(); - - tokio::time::sleep(Duration::from_millis(10)).await; - - let time_before_block_2 = Instant::now(); - let block_2 = setup - .pool - .propose_block(block_1.hash(), Some(2 * xt1.encoded_size() - 1)) - .await; - - assert_eq!(setup.process_notifications(), blocks_imported(1)); - assert_eq!(block_2.extrinsics.len(), 1); - assert_eq!(setup.transactions_histogram().get_sample_count(), 2); - - let sample_2 = setup.transactions_histogram().get_sample_sum() - sample_1; - - let duration = Duration::from_secs_f64(sample_2 / 1000.0); - - assert!(duration >= time_before_block_2 - time_after_submit - EPS); - } -} diff --git a/finality-aleph/src/metrics/finality_rate.rs b/finality-aleph/src/metrics/finality_rate.rs index 09d084b5..cd2022b6 100644 --- a/finality-aleph/src/metrics/finality_rate.rs +++ b/finality-aleph/src/metrics/finality_rate.rs @@ -8,8 +8,7 @@ use sc_service::Arc; use sp_core::{bounded_vec::BoundedVec, ConstU32}; use substrate_prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64}; -use super::Checkpoint; -use crate::metrics::LOG_TARGET; +use crate::{metrics::LOG_TARGET, BlockId}; const MAX_CACHE_SIZE: usize = 1800; const MAX_INNER_SIZE: u32 = 64; @@ -36,11 +35,11 @@ impl FinalityRateMetrics { Ok(FinalityRateMetrics::Prometheus { own_finalized: register( - Counter::new("aleph_own_finalized_blocks", "no help")?, + Counter::new("aleph_own_finalized_blocks", "Number of self-produced blocks that became finalized")?, registry, )?, own_hopeless: register( - Counter::new("aleph_own_hopeless_blocks", "no help")?, + Counter::new("aleph_own_hopeless_blocks", "Number of self-produced blocks, such that some alternative block with the same block number was finalized")?, registry, )?, imported_cache: Arc::new(Mutex::new(LruCache::new( @@ -49,39 +48,21 @@ impl FinalityRateMetrics { }) } - pub fn report_block( - &self, - block_hash: BlockHash, - block_number: BlockNumber, - checkpoint: Checkpoint, - own: Option, - ) { - match checkpoint { - Checkpoint::Imported => { - if let Some(true) = own { - self.report_own_imported(block_hash, block_number); - } - } - Checkpoint::Finalized => self.report_finalized(block_hash, block_number), - _ => {} - } - } - /// Stores the imported block's hash. Assumes that the imported block is own. - fn report_own_imported(&self, hash: BlockHash, number: BlockNumber) { + pub fn report_own_imported(&self, id: BlockId) { let mut imported_cache = match self { FinalityRateMetrics::Prometheus { imported_cache, .. } => imported_cache.lock(), FinalityRateMetrics::Noop => return, }; let entry = imported_cache - .get_or_insert_mut(number, BoundedVec::<_, ConstU32>::new); + .get_or_insert_mut(id.number(), BoundedVec::<_, ConstU32>::new); - if entry.try_push(hash).is_err() { + if entry.try_push(id.hash()).is_err() { warn!( target: LOG_TARGET, "Finality Rate Metrics encountered too many own imported blocks at level {}", - number + id.number() ); } } @@ -89,7 +70,7 @@ impl FinalityRateMetrics { /// Counts the blocks at the level of `number` different than the passed block /// and reports them as hopeless. If `hash` is a hash of own block it will be found /// in `imported_cache` and reported as finalized. - fn report_finalized(&self, hash: BlockHash, number: BlockNumber) { + pub fn report_finalized(&self, id: BlockId) { let (own_finalized, own_hopeless, imported_cache) = match self { FinalityRateMetrics::Prometheus { own_finalized, @@ -100,12 +81,12 @@ impl FinalityRateMetrics { }; let mut imported_cache = imported_cache.lock(); - if let Some(hashes) = imported_cache.get_mut(&number) { - let new_hopeless_count = hashes.iter().filter(|h| **h != hash).count(); + if let Some(hashes) = imported_cache.get_mut(&id.number()) { + let new_hopeless_count = hashes.iter().filter(|h| **h != id.hash()).count(); own_hopeless.inc_by(new_hopeless_count as u64); own_finalized.inc_by((hashes.len() - new_hopeless_count) as u64); } - imported_cache.pop(&number); + imported_cache.pop(&id.number()); } } @@ -116,7 +97,7 @@ mod tests { use primitives::{BlockHash, BlockNumber}; use substrate_prometheus_endpoint::{Counter, Registry, U64}; - use crate::{metrics::finality_rate::ImportedHashesCache, FinalityRateMetrics}; + use super::{FinalityRateMetrics, ImportedHashesCache}; type FinalityRateMetricsInternals = (Counter, Counter, ImportedHashesCache); @@ -158,12 +139,12 @@ mod tests { verify_state(&metrics, 0, 0, HashMap::new()); let hash0 = BlockHash::random(); - metrics.report_own_imported(hash0, 0); + metrics.report_own_imported((hash0, 0).into()); verify_state(&metrics, 0, 0, HashMap::from([(0, vec![hash0])])); let hash1 = BlockHash::random(); - metrics.report_own_imported(hash1, 1); + metrics.report_own_imported((hash1, 1).into()); verify_state( &metrics, @@ -173,7 +154,7 @@ mod tests { ); let hash2 = BlockHash::random(); - metrics.report_own_imported(hash2, 1); + metrics.report_own_imported((hash2, 1).into()); verify_state( &metrics, @@ -182,11 +163,11 @@ mod tests { HashMap::from([(0, vec![hash0]), (1, vec![hash1, hash2])]), ); - metrics.report_finalized(hash0, 0); + metrics.report_finalized((hash0, 0).into()); verify_state(&metrics, 1, 0, HashMap::from([(1, vec![hash1, hash2])])); - metrics.report_finalized(BlockHash::random(), 1); + metrics.report_finalized((BlockHash::random(), 1).into()); verify_state(&metrics, 1, 2, HashMap::new()); } diff --git a/finality-aleph/src/metrics/mod.rs b/finality-aleph/src/metrics/mod.rs index 6e322fcd..4ea3ce6c 100644 --- a/finality-aleph/src/metrics/mod.rs +++ b/finality-aleph/src/metrics/mod.rs @@ -1,15 +1,13 @@ -mod all_block; -mod chain_state; +mod best_block; mod finality_rate; +mod slo; mod timing; pub mod transaction_pool; -pub use all_block::AllBlockMetrics; -pub use chain_state::run_chain_state_metrics; -pub use finality_rate::FinalityRateMetrics; +pub use slo::{run_metrics_service, SloMetrics}; +pub use timing::{Checkpoint, DefaultClock}; +pub type TimingBlockMetrics = timing::TimingBlockMetrics; use substrate_prometheus_endpoint::{exponential_buckets, prometheus}; -pub use timing::{Checkpoint, DefaultClock, TimingBlockMetrics}; -pub use transaction_pool::TransactionPoolInfoProvider; const LOG_TARGET: &str = "aleph-metrics"; diff --git a/finality-aleph/src/metrics/slo.rs b/finality-aleph/src/metrics/slo.rs new file mode 100644 index 00000000..b84ff4f4 --- /dev/null +++ b/finality-aleph/src/metrics/slo.rs @@ -0,0 +1,122 @@ +use futures::{Stream, StreamExt}; +use log::warn; +use parity_scale_codec::Encode; +use primitives::Block; +use sp_runtime::traits::Block as _; +use substrate_prometheus_endpoint::Registry; + +use super::{finality_rate::FinalityRateMetrics, timing::DefaultClock}; +use crate::{ + block::ChainStatus, + metrics::{ + best_block::BestBlockMetrics, timing::Checkpoint, transaction_pool::TransactionPoolMetrics, + TimingBlockMetrics, LOG_TARGET, + }, + BlockId, SubstrateChainStatus, +}; + +pub async fn run_metrics_service + Unpin>( + metrics: &SloMetrics, + transaction_pool_stream: &mut TS, +) { + if !metrics.is_noop() { + while let Some(tx) = transaction_pool_stream.next().await { + metrics.report_transaction_in_pool(tx); + } + warn!(target: LOG_TARGET, "SLO Metrics service terminated, because the transaction pool stream ended."); + } +} + +pub type Hashing = sp_runtime::traits::HashingFor; +pub type TxHash = ::Output; + +#[derive(Clone)] +pub struct SloMetrics { + timing_metrics: TimingBlockMetrics, + finality_rate_metrics: FinalityRateMetrics, + best_block_metrics: BestBlockMetrics, + transaction_metrics: TransactionPoolMetrics, + chain_status: SubstrateChainStatus, +} + +impl SloMetrics { + pub fn new(registry: Option<&Registry>, chain_status: SubstrateChainStatus) -> Self { + let warn_creation_failed = |name, e| warn!(target: LOG_TARGET, "Failed to register Prometheus {name} metrics: {e:?}."); + let timing_metrics = TimingBlockMetrics::new(registry, DefaultClock).unwrap_or_else(|e| { + warn!( + target: LOG_TARGET, + "Failed to register Prometheus block timing metrics: {:?}.", e + ); + TimingBlockMetrics::Noop + }); + let finality_rate_metrics = FinalityRateMetrics::new(registry).unwrap_or_else(|e| { + warn!( + target: LOG_TARGET, + "Failed to register Prometheus finality rate metrics: {:?}.", e + ); + FinalityRateMetrics::Noop + }); + let best_block_metrics = BestBlockMetrics::new(registry.cloned(), chain_status.clone()) + .unwrap_or_else(|e| { + warn_creation_failed("best block related", e); + BestBlockMetrics::Noop + }); + let transaction_metrics = TransactionPoolMetrics::new(registry, DefaultClock) + .unwrap_or_else(|e| { + warn_creation_failed("transaction pool", e); + TransactionPoolMetrics::Noop + }); + + SloMetrics { + timing_metrics, + finality_rate_metrics, + best_block_metrics, + transaction_metrics, + chain_status, + } + } + + pub fn is_noop(&self) -> bool { + matches!(self.timing_metrics, TimingBlockMetrics::Noop) + && matches!(self.finality_rate_metrics, FinalityRateMetrics::Noop) + && matches!(self.best_block_metrics, BestBlockMetrics::Noop) + && matches!(self.transaction_metrics, TransactionPoolMetrics::Noop) + } + + pub fn timing_metrics(&self) -> &TimingBlockMetrics { + &self.timing_metrics + } + + pub fn report_transaction_in_pool(&self, hash: TxHash) { + self.transaction_metrics.report_in_pool(hash); + } + + pub fn report_block_imported(&mut self, block_id: BlockId, is_new_best: bool, own: bool) { + self.timing_metrics + .report_block(block_id.hash(), Checkpoint::Imported); + if own { + self.finality_rate_metrics + .report_own_imported(block_id.clone()); + } + if is_new_best { + self.best_block_metrics + .report_best_block_imported(block_id.clone()); + } + if let Ok(Some(block)) = self.chain_status.block(block_id.clone()) { + // Skip inherents - there is always exactly one, namely the timestamp inherent. + for xt in block.extrinsics().iter().skip(1) { + self.transaction_metrics + .report_in_block(xt.using_encoded(::hash)); + } + } + } + + pub fn report_block_finalized(&self, block_id: BlockId) { + self.timing_metrics + .report_block(block_id.hash(), Checkpoint::Finalized); + self.finality_rate_metrics + .report_finalized(block_id.clone()); + self.best_block_metrics + .report_block_finalized(block_id.clone()); + } +} diff --git a/finality-aleph/src/metrics/timing.rs b/finality-aleph/src/metrics/timing.rs index 53b338cd..a71fae52 100644 --- a/finality-aleph/src/metrics/timing.rs +++ b/finality-aleph/src/metrics/timing.rs @@ -24,7 +24,7 @@ pub trait Clock { fn now(&self) -> Instant; } -#[derive(Clone)] +#[derive(Clone, Default)] pub struct DefaultClock; impl Clock for DefaultClock { fn now(&self) -> Instant { @@ -166,7 +166,7 @@ impl TimingBlockMetrics { let duration = checkpoint_time .checked_duration_since(*start) .unwrap_or_else(|| { - Self::warn_about_monotonicity_violation( + warn_about_monotonicity_violation( *start, checkpoint_time, checkpoint_type, @@ -190,7 +190,7 @@ impl TimingBlockMetrics { let duration = checkpoint_time .checked_duration_since(*start) .unwrap_or_else(|| { - Self::warn_about_monotonicity_violation( + warn_about_monotonicity_violation( *start, checkpoint_time, checkpoint_type, @@ -202,23 +202,6 @@ impl TimingBlockMetrics { } } } - - fn warn_about_monotonicity_violation( - start: Instant, - checkpoint_time: Instant, - checkpoint_type: Checkpoint, - hash: BlockHash, - ) { - warn!( - target: LOG_TARGET, - "Earlier metrics time {:?} is later that current one \ - {:?}. Checkpoint type {:?}, block: {:?}", - start, - checkpoint_time, - checkpoint_type, - hash - ); - } } #[derive(Clone, Copy, Debug, Display, Hash, PartialEq, Eq)] @@ -243,6 +226,23 @@ impl Checkpoint { } } +fn warn_about_monotonicity_violation( + start: Instant, + checkpoint_time: Instant, + checkpoint_type: Checkpoint, + hash: BlockHash, +) { + warn!( + target: LOG_TARGET, + "Earlier metrics time {:?} is later that current one \ + {:?}. Checkpoint type {:?}, block: {:?}", + start, + checkpoint_time, + checkpoint_type, + hash + ); +} + #[cfg(test)] mod tests { use std::{cell::RefCell, cmp::min}; diff --git a/finality-aleph/src/metrics/transaction_pool.rs b/finality-aleph/src/metrics/transaction_pool.rs index aa954051..7ed61a29 100644 --- a/finality-aleph/src/metrics/transaction_pool.rs +++ b/finality-aleph/src/metrics/transaction_pool.rs @@ -1,63 +1,145 @@ -use std::sync::Arc; +use std::{ + num::NonZeroUsize, + sync::Arc, + time::{Duration, Instant}, +}; -use futures::StreamExt; -use sc_transaction_pool_api::{ImportNotificationStream, TransactionFor, TransactionPool}; -use sp_runtime::traits::Member; +use lru::LruCache; +use parking_lot::Mutex; +use substrate_prometheus_endpoint::{ + register, Counter, Histogram, HistogramOpts, PrometheusError, Registry, U64, +}; -#[async_trait::async_trait] -pub trait TransactionPoolInfoProvider { - type TxHash: Member + std::hash::Hash; - type Extrinsic: sp_runtime::traits::Extrinsic; - async fn next_transaction(&mut self) -> Option; +use crate::metrics::{exponential_buckets_two_sided, timing::Clock}; - fn hash_of(&self, extrinsic: &Self::Extrinsic) -> Self::TxHash; -} +// Size of transaction cache: 32B (Hash) + 16B (Instant) * `100_000` is approximately 4.8MB +const TRANSACTION_CACHE_SIZE: usize = 100_000; +const BUCKETS_FACTOR: f64 = 1.4; -pub struct TransactionPoolWrapper { - pool: Arc, - import_notification_stream: ImportNotificationStream, +#[derive(Clone)] +pub enum TransactionPoolMetrics { + Prometheus { + time_till_block_inclusion: Histogram, + transactions_not_seen_in_the_pool: Counter, + cache: Arc>>, + clock: C, + }, + Noop, } -impl TransactionPoolWrapper { - pub fn new(pool: Arc) -> Self { - Self { - pool: pool.clone(), - import_notification_stream: pool.import_notification_stream(), - } - } -} +impl TransactionPoolMetrics { + pub fn new(registry: Option<&Registry>, clock: C) -> Result { + let registry = match registry { + None => return Ok(Self::Noop), + Some(registry) => registry, + }; -#[async_trait::async_trait] -impl TransactionPoolInfoProvider for TransactionPoolWrapper { - type TxHash = T::Hash; - type Extrinsic = TransactionFor; + Ok(Self::Prometheus { + time_till_block_inclusion: register( + Histogram::with_opts( + HistogramOpts::new( + "aleph_transaction_to_block_time", + "Time from becoming ready in the pool to inclusion in some valid block.", + ) + .buckets(exponential_buckets_two_sided( + 2000.0, + BUCKETS_FACTOR, + 2, + 8, + )?), + )?, + registry, + )?, + transactions_not_seen_in_the_pool: register( + Counter::new( + "aleph_transactions_not_seen_in_the_pool", + "\ + Number of transactions that were reported to be in block before reporting of \ + being in the ready queue in the transaction pool. This could happen \ + for many reasons, e.g. when a transaction has been added to the future pool, \ + has been submitted locally, or because of a race condition \ + (especially probable when there is an increased transaction load)", + )?, + registry, + )?, + cache: Arc::new(Mutex::new(LruCache::new( + NonZeroUsize::new(TRANSACTION_CACHE_SIZE) + .expect("the cache size is a non-zero constant"), + ))), + clock, + }) + } - async fn next_transaction(&mut self) -> Option { - self.import_notification_stream.next().await + pub fn report_in_pool(&self, hash: TxHash) { + if let Self::Prometheus { cache, clock, .. } = self { + // Putting new transaction can evict the oldest one. However, even if the + // removed transaction was actually still in the pool, we don't have + // any guarantees that it would be eventually included in the block. + // Therefore, we ignore such transaction. + let cache = &mut *cache.lock(); + cache.put(hash, clock.now()); + } } - fn hash_of(&self, extrinsic: &Self::Extrinsic) -> Self::TxHash { - self.pool.hash_of(extrinsic) + pub fn report_in_block(&self, hash: TxHash) { + if let Self::Prometheus { + time_till_block_inclusion, + transactions_not_seen_in_the_pool, + cache, + .. + } = self + { + let cache = &mut *cache.lock(); + let elapsed = match cache.pop(&hash) { + Some(insert_time) => insert_time.elapsed(), + None => { + // Either it was never in the pool (e.g. submitted locally), or we've got BlockImport + // notification faster than transaction in pool one. The latter is much more likely, + // so we report it as zero. + transactions_not_seen_in_the_pool.inc(); + Duration::ZERO + } + }; + time_till_block_inclusion.observe(elapsed.as_secs_f64() * 1000.); + } } } #[cfg(test)] pub mod test { - use std::{sync::Arc, time::Duration}; + use std::{ + collections::HashMap, + hash::Hash, + sync::Arc, + time::{Duration, Instant}, + }; - use futures::{future, StreamExt}; + use futures::{future, FutureExt, Stream, StreamExt}; + use parity_scale_codec::Encode; use sc_basic_authorship::ProposerFactory; - use sc_client_api::{BlockchainEvents, HeaderBackend}; + use sc_block_builder::BlockBuilderBuilder; + use sc_client_api::{ + BlockBackend, BlockImportNotification, BlockchainEvents, FinalityNotification, + HeaderBackend, + }; use sc_transaction_pool::{BasicPool, FullChainApi}; - use sc_transaction_pool_api::{MaintainedTransactionPool, TransactionPool}; + use sc_transaction_pool_api::{ + ImportNotificationStream, MaintainedTransactionPool, TransactionPool, + }; use sp_consensus::{BlockOrigin, DisableProofRecording, Environment, Proposer as _}; use sp_runtime::{traits::Block as BlockT, transaction_validity::TransactionSource}; + use substrate_prometheus_endpoint::{Histogram, Registry}; + use substrate_test_client::TestClientBuilder; use substrate_test_runtime::{Extrinsic, ExtrinsicBuilder, Transfer}; use substrate_test_runtime_client::{AccountKeyring, ClientBlockImportExt, ClientExt}; use crate::{ - metrics::transaction_pool::TransactionPoolWrapper, - testing::mocks::{TBlock, THash, TestClient}, + metrics::{ + slo::{Hashing, TxHash}, + timing::DefaultClock, + transaction_pool::TransactionPoolMetrics, + }, + testing::mocks::{TBlock, THash, TestClient, TestClientBuilderExt}, }; type TChainApi = FullChainApi; @@ -68,7 +150,6 @@ pub mod test { pub client: Arc, pub pool: Arc, pub proposer_factory: TProposerFactory, - pub transaction_pool_info_provider: TransactionPoolWrapper>, } impl TestTransactionPoolSetup { @@ -81,7 +162,6 @@ pub mod test { spawner.clone(), client.clone(), ); - let transaction_pool_info_provider = TransactionPoolWrapper::new(pool.clone()); let proposer_factory = ProposerFactory::new(spawner, client.clone(), pool.clone(), None, None); @@ -90,10 +170,13 @@ pub mod test { client, pool, proposer_factory, - transaction_pool_info_provider, } } + pub fn import_notification_stream(&self) -> ImportNotificationStream { + self.pool.import_notification_stream() + } + pub async fn propose_block(&mut self, at: THash, weight_limit: Option) -> TBlock { let proposer = self .proposer_factory @@ -169,4 +252,279 @@ pub mod test { .unwrap(); } } + + // Transaction pool metrics tests + struct TestSetup { + pub pool: TestTransactionPoolSetup, + pub metrics: TransactionPoolMetrics, + pub block_import_notifications: + Box> + Unpin>, + pub finality_notifications: Box> + Unpin>, + pub pool_import_notifications: ImportNotificationStream, + } + + #[derive(PartialEq, Eq, Hash, Debug)] + enum NotificationType { + BlockImport, + Finality, + Transaction, + } + + impl TestSetup { + fn new() -> Self { + let client = Arc::new(TestClientBuilder::new().build()); + + let block_import_notifications = + Box::new(client.every_import_notification_stream().fuse()); + let finality_notifications = Box::new(client.finality_notification_stream().fuse()); + + let pool = TestTransactionPoolSetup::new(client); + let pool_import_notifications = pool.import_notification_stream(); + + let registry = Registry::new(); + let metrics = + TransactionPoolMetrics::new(Some(®istry), DefaultClock).expect("metrics"); + + TestSetup { + pool, + metrics, + block_import_notifications, + finality_notifications, + pool_import_notifications, + } + } + + fn genesis(&self) -> THash { + self.pool.client.info().genesis_hash + } + + fn transactions_histogram(&self) -> &Histogram { + match &self.metrics { + TransactionPoolMetrics::Prometheus { + time_till_block_inclusion, + .. + } => time_till_block_inclusion, + _ => panic!("metrics"), + } + } + + fn process_notifications(&mut self) -> HashMap { + let mut block_imported_notifications = 0; + let mut finality_notifications = 0; + let mut transaction_notifications = 0; + + while let Some(block) = self.block_import_notifications.next().now_or_never() { + let body = self + .pool + .client + .block_body(block.expect("stream should not end").hash) + .expect("block should exist") + .expect("block should have body"); + for xt in body { + let hash = xt.using_encoded(::hash); + self.metrics.report_in_block(hash); + } + block_imported_notifications += 1; + } + while self.finality_notifications.next().now_or_never().is_some() { + finality_notifications += 1; + } + while let Some(transaction) = self.pool_import_notifications.next().now_or_never() { + self.metrics + .report_in_pool(transaction.expect("stream should not end")); + transaction_notifications += 1; + } + HashMap::from_iter(vec![ + (NotificationType::BlockImport, block_imported_notifications), + (NotificationType::Finality, finality_notifications), + (NotificationType::Transaction, transaction_notifications), + ]) + } + } + + fn blocks_imported(n: usize) -> HashMap { + HashMap::from_iter(vec![ + (NotificationType::BlockImport, n), + (NotificationType::Finality, 0), + (NotificationType::Transaction, 0), + ]) + } + fn transactions(n: usize) -> HashMap { + HashMap::from_iter(vec![ + (NotificationType::BlockImport, 0), + (NotificationType::Finality, 0), + (NotificationType::Transaction, n), + ]) + } + + const EPS: Duration = Duration::from_nanos(1); + + #[tokio::test] + async fn transactions_are_reported() { + let mut setup = TestSetup::new(); + let genesis = setup.genesis(); + let xt = setup + .pool + .extrinsic(AccountKeyring::Alice, AccountKeyring::Bob, 0); + + let time_before_submit = Instant::now(); + setup.pool.submit(&genesis, xt).await; + + assert_eq!( + setup.process_notifications(), + transactions(1), + "'In pool' notification wasn't sent" + ); + let time_after_submit = Instant::now(); + + tokio::time::sleep(Duration::from_millis(20)).await; + + let time_before_import = Instant::now(); + let _block_1 = setup.pool.propose_block(genesis, None).await; + let pre_count = setup.transactions_histogram().get_sample_count(); + + assert_eq!( + setup.process_notifications(), + blocks_imported(1), + "Block import notification wasn't sent" + ); + + let time_after_import = Instant::now(); + + let duration = + Duration::from_secs_f64(setup.transactions_histogram().get_sample_sum() / 1000.); + + assert_eq!(pre_count, 0); + assert_eq!(setup.transactions_histogram().get_sample_count(), 1); + assert!(duration >= time_before_import - time_after_submit - EPS); + assert!(duration <= time_after_import - time_before_submit + EPS); + } + + #[tokio::test] + async fn transactions_are_reported_only_if_ready_when_added_to_the_pool() { + let mut setup = TestSetup::new(); + let genesis = setup.genesis(); + + let xt1 = setup + .pool + .extrinsic(AccountKeyring::Alice, AccountKeyring::Bob, 0); + let xt2 = setup + .pool + .extrinsic(AccountKeyring::Alice, AccountKeyring::Bob, 1); + let xt3 = setup + .pool + .extrinsic(AccountKeyring::Alice, AccountKeyring::Bob, 2); + + setup.pool.submit(&genesis, xt2.clone()).await; + + // No notification for xt2 as it is not ready + assert_eq!( + setup.process_notifications(), + transactions(0), + "Future transactions should not be reported" + ); + + setup.pool.submit(&genesis, xt1.clone()).await; + setup.pool.submit(&genesis, xt3.clone()).await; + + // Notifications for xt1 and xt3 + assert_eq!(setup.process_notifications(), transactions(2)); + + let block_1 = setup.pool.propose_block(genesis, None).await; + // Block import notification. xt1 notification never appears + assert_eq!(setup.process_notifications(), blocks_imported(1)); + // All 3 extrinsics are included in the block + assert_eq!(block_1.extrinsics.len(), 3); + } + + #[tokio::test] + async fn retracted_transactions_are_reported_only_once() { + let mut setup = TestSetup::new(); + let genesis = setup.genesis(); + + let xt1 = setup + .pool + .extrinsic(AccountKeyring::Alice, AccountKeyring::Bob, 0); + let xt2 = setup + .pool + .extrinsic(AccountKeyring::Charlie, AccountKeyring::Dave, 0); + + setup.pool.submit(&genesis, xt1.clone()).await; + setup.pool.submit(&genesis, xt2.clone()).await; + + // make sure import notifications are received before block import + assert_eq!(setup.process_notifications(), transactions(2)); + + let block_1a = setup.pool.propose_block(genesis, None).await; + assert_eq!(block_1a.extrinsics.len(), 2); + assert_eq!(setup.process_notifications(), blocks_imported(1)); + assert_eq!(setup.transactions_histogram().get_sample_count(), 2); + + let sum_before = setup.transactions_histogram().get_sample_sum(); + + // external fork block with xt1 + let mut block_1b_builder = BlockBuilderBuilder::new(&*setup.pool.client) + .on_parent_block(genesis) + .with_parent_block_number(0) + .build() + .unwrap(); + + block_1b_builder.push(xt1.into()).unwrap(); + let block_1b = block_1b_builder.build().unwrap().block; + setup.pool.import_block(block_1b.clone()).await; + setup.pool.finalize(block_1b.hash()).await; + + let block_2b = setup.pool.propose_block(block_1b.hash(), None).await; + + assert_eq!(block_2b.extrinsics.len(), 1); + assert_eq!(setup.transactions_histogram().get_sample_count(), 2); + assert_eq!(setup.transactions_histogram().get_sample_sum(), sum_before); + } + + #[tokio::test] + async fn transactions_skipped_in_block_authorship_are_not_reported_at_that_time() { + let mut setup = TestSetup::new(); + let genesis = setup.genesis(); + + let xt1 = setup + .pool + .extrinsic(AccountKeyring::Alice, AccountKeyring::Bob, 0); + let xt2 = setup + .pool + .extrinsic(AccountKeyring::Charlie, AccountKeyring::Eve, 0); + + setup.pool.submit(&genesis, xt1.clone()).await; + setup.pool.submit(&genesis, xt2.clone()).await; + assert_eq!(setup.process_notifications(), transactions(2)); + + let time_after_submit = Instant::now(); + + let block_1 = setup + .pool + .propose_block(genesis, Some(2 * xt1.encoded_size() - 1)) + .await; + + assert_eq!(setup.process_notifications(), blocks_imported(1)); + assert_eq!(block_1.extrinsics.len(), 1); + assert_eq!(setup.transactions_histogram().get_sample_count(), 1); + let sample_1 = setup.transactions_histogram().get_sample_sum(); + + tokio::time::sleep(Duration::from_millis(10)).await; + + let time_before_block_2 = Instant::now(); + let block_2 = setup + .pool + .propose_block(block_1.hash(), Some(2 * xt1.encoded_size() - 1)) + .await; + + assert_eq!(setup.process_notifications(), blocks_imported(1)); + assert_eq!(block_2.extrinsics.len(), 1); + assert_eq!(setup.transactions_histogram().get_sample_count(), 2); + + let sample_2 = setup.transactions_histogram().get_sample_sum() - sample_1; + + let duration = Duration::from_secs_f64(sample_2 / 1000.0); + + assert!(duration >= time_before_block_2 - time_after_submit - EPS); + } } diff --git a/finality-aleph/src/network/session/mod.rs b/finality-aleph/src/network/session/mod.rs index 81298e77..bae67600 100644 --- a/finality-aleph/src/network/session/mod.rs +++ b/finality-aleph/src/network/session/mod.rs @@ -32,7 +32,7 @@ pub use discovery::Discovery; #[cfg(test)] pub use handler::tests::authentication; pub use handler::{Handler as SessionHandler, HandlerError as SessionHandlerError}; -pub use service::{Config as ConnectionManagerConfig, ManagerError, Service as ConnectionManager}; +pub use service::{Config as ConnectionManagerConfig, Service as ConnectionManager}; /// The maximum size an authentication can have and be accepted. /// This leaves a generous margin of error, as the signature is 64 bytes, diff --git a/finality-aleph/src/nodes.rs b/finality-aleph/src/nodes.rs index e035c069..e1dd0afb 100644 --- a/finality-aleph/src/nodes.rs +++ b/finality-aleph/src/nodes.rs @@ -5,6 +5,7 @@ use futures::channel::oneshot; use log::{debug, error}; use network_clique::{RateLimitingDialer, RateLimitingListener, Service, SpawnHandleT}; use pallet_aleph_runtime_api::AlephSessionApi; +use primitives::TransactionHash; use rate_limiter::SleepingRateLimiter; use sc_client_api::Backend; use sc_keystore::{Keystore, LocalKeystore}; @@ -20,7 +21,7 @@ use crate::{ crypto::AuthorityPen, finalization::AlephFinalizer, idx_to_account::ValidatorIndexToAccountIdConverterImpl, - metrics::{run_chain_state_metrics, transaction_pool::TransactionPoolWrapper}, + metrics::{run_metrics_service, SloMetrics}, network::{ address_cache::validator_address_cache_updater, session::{ConnectionManager, ConnectionManagerConfig}, @@ -57,7 +58,7 @@ where C: crate::ClientForAleph + Send + Sync + 'static, C::Api: AlephSessionApi + AuraApi, BE: Backend + 'static, - TP: TransactionPool + 'static, + TP: TransactionPool + 'static, { let AlephConfig { authentication_network, @@ -68,7 +69,6 @@ where select_chain_provider, spawn_handle, keystore, - metrics, registry, unit_creation_delay, session_period, @@ -144,22 +144,17 @@ where let chain_events = client.chain_status_notifier(); - let client_for_slo_metrics = client.clone(); - let registry_for_slo_metrics = registry.clone(); - spawn_handle.spawn("aleph/slo-metrics", async move { - if let Err(err) = run_chain_state_metrics( - client_for_slo_metrics.as_ref(), - client_for_slo_metrics.every_import_notification_stream(), - client_for_slo_metrics.finality_notification_stream(), - registry_for_slo_metrics, - TransactionPoolWrapper::new(transaction_pool), - ) - .await - { - error!( - target: LOG_TARGET, - "ChainStateMetrics service finished with err: {err}." - ); + let slo_metrics = SloMetrics::new(registry.as_ref(), chain_status.clone()); + let timing_metrics = slo_metrics.timing_metrics().clone(); + + spawn_handle.spawn("aleph/slo-metrics", { + let slo_metrics = slo_metrics.clone(); + async move { + run_metrics_service( + &slo_metrics, + &mut transaction_pool.import_notification_stream(), + ) + .await; } }); @@ -177,8 +172,8 @@ where VERIFIER_CACHE_SIZE, genesis_header, ); - let finalizer = AlephFinalizer::new(client.clone(), metrics.clone()); - import_queue_handle.attach_metrics(metrics.clone()); + let finalizer = AlephFinalizer::new(client.clone()); + import_queue_handle.attach_metrics(timing_metrics.clone()); let justifications_for_sync = justification_channel_provider.get_sender(); let sync_io = SyncIO::new( SyncDatabaseIO::new(chain_status.clone(), finalizer, import_queue_handle), @@ -195,6 +190,7 @@ where session_info.clone(), sync_io, registry.clone(), + slo_metrics, favourite_block_user_requests, ) { Ok(x) => x, @@ -256,7 +252,7 @@ where justifications_for_sync, JustificationTranslator::new(chain_status.clone()), request_block, - metrics, + timing_metrics, spawn_handle, connection_manager, keystore, diff --git a/finality-aleph/src/party/manager/aggregator.rs b/finality-aleph/src/party/manager/aggregator.rs index a9eb0ac3..a28a58f7 100644 --- a/finality-aleph/src/party/manager/aggregator.rs +++ b/finality-aleph/src/party/manager/aggregator.rs @@ -19,7 +19,7 @@ use crate::{ }, crypto::Signature, justification::AlephJustification, - metrics::{AllBlockMetrics, Checkpoint}, + metrics::{Checkpoint, TimingBlockMetrics}, network::data::Network, party::{ manager::aggregator::AggregatorVersion::{Current, Legacy}, @@ -60,15 +60,14 @@ where async fn process_new_block_data( aggregator: &mut Aggregator, block: BlockId, - metrics: &mut AllBlockMetrics, + metrics: &mut TimingBlockMetrics, ) where CN: Network, LN: Network, { trace!(target: "aleph-party", "Received unit {:?} in aggregator.", block); let hash = block.hash(); - metrics.report_block(block, Checkpoint::Ordered, None); - + metrics.report_block(hash, Checkpoint::Ordered); aggregator.start_aggregation(hash).await; } @@ -108,7 +107,7 @@ async fn run_aggregator( io: IO, client: C, session_boundaries: &SessionBoundaries, - mut metrics: AllBlockMetrics, + mut metrics: TimingBlockMetrics, mut exit_rx: oneshot::Receiver<()>, ) -> Result<(), Error> where @@ -191,7 +190,7 @@ pub fn task( client: C, io: IO, session_boundaries: SessionBoundaries, - metrics: AllBlockMetrics, + metrics: TimingBlockMetrics, multikeychain: Keychain, version: AggregatorVersion, ) -> Task diff --git a/finality-aleph/src/party/manager/mod.rs b/finality-aleph/src/party/manager/mod.rs index 4d600f8f..6b1931b4 100644 --- a/finality-aleph/src/party/manager/mod.rs +++ b/finality-aleph/src/party/manager/mod.rs @@ -21,7 +21,7 @@ use crate::{ }, crypto::{AuthorityPen, AuthorityVerifier}, data_io::{ChainTracker, DataStore, OrderedDataInterpreter, SubstrateChainInfoProvider}, - metrics::AllBlockMetrics, + metrics::TimingBlockMetrics, mpsc, network::{ data::{ @@ -110,7 +110,7 @@ where justifications_for_sync: JS, justification_translator: JustificationTranslator, block_requester: RB, - metrics: AllBlockMetrics, + metrics: TimingBlockMetrics, spawn_handle: SpawnHandle, session_manager: SM, keystore: Arc, @@ -142,7 +142,7 @@ where justifications_for_sync: JS, justification_translator: JustificationTranslator, block_requester: RB, - metrics: AllBlockMetrics, + metrics: TimingBlockMetrics, spawn_handle: SpawnHandle, session_manager: SM, keystore: Arc, diff --git a/finality-aleph/src/sync/service.rs b/finality-aleph/src/sync/service.rs index 0f36ca02..ecfc24a6 100644 --- a/finality-aleph/src/sync/service.rs +++ b/finality-aleph/src/sync/service.rs @@ -15,6 +15,7 @@ use crate::{ EquivocationProof, Finalizer, Header, HeaderVerifier, Justification, JustificationVerifier, UnverifiedHeader, UnverifiedHeaderFor, }, + metrics::SloMetrics, network::GossipNetwork, session::SessionBoundaryInfo, sync::{ @@ -143,6 +144,7 @@ where blocks_from_creator: mpsc::UnboundedReceiver, major_sync_last_status: bool, metrics: Metrics, + slo_metrics: SloMetrics, favourite_block_request: mpsc::UnboundedReceiver>, } @@ -180,6 +182,7 @@ where session_info: SessionBoundaryInfo, io: IO, metrics_registry: Option, + slo_metrics: SloMetrics, favourite_block_request: mpsc::UnboundedReceiver>, ) -> Result<(Self, impl RequestBlocks), HandlerError> { let IO { @@ -196,13 +199,10 @@ where let broadcast_ticker = Ticker::new(TICK_PERIOD, BROADCAST_COOLDOWN); let chain_extension_ticker = Ticker::new(TICK_PERIOD, CHAIN_EXTENSION_COOLDOWN); let (block_requests_for_sync, block_requests_from_user) = mpsc::unbounded(); - let metrics = match Metrics::new(metrics_registry) { - Ok(metrics) => metrics, - Err(e) => { - warn!(target: LOG_TARGET, "Failed to create metrics: {}.", e); - Metrics::noop() - } - }; + let metrics = Metrics::new(metrics_registry).unwrap_or_else(|e| { + warn!(target: LOG_TARGET, "Failed to create metrics: {}.", e); + Metrics::noop() + }); Ok(( Service { @@ -217,6 +217,7 @@ where block_requests_from_user, major_sync_last_status: false, metrics, + slo_metrics, favourite_block_request, }, block_requests_for_sync, @@ -585,9 +586,12 @@ where match event { BlockImported(header) => { trace!(target: LOG_TARGET, "Handling a new imported block."); + let mut own_block = false; + let id = header.id(); self.metrics.report_event(Event::HandleBlockImported); match self.handler.block_imported(header) { Ok(Some(broadcast)) => { + own_block = true; if let Err(e) = self .network .broadcast(NetworkData::RequestResponse(broadcast)) @@ -607,10 +611,14 @@ where ); } } + let is_new_best = id == self.handler.favourite_block().id(); + self.slo_metrics + .report_block_imported(id, is_new_best, own_block); } - BlockFinalized(_) => { + BlockFinalized(header) => { trace!(target: LOG_TARGET, "Handling a new finalized block."); self.metrics.report_event(Event::HandleBlockFinalized); + self.slo_metrics.report_block_finalized(header.id()) } } // We either learned about a new finalized or best block, so we diff --git a/pallets/.DS_Store b/pallets/.DS_Store new file mode 100644 index 00000000..d84519c6 Binary files /dev/null and b/pallets/.DS_Store differ diff --git a/primitives/src/lib.rs b/primitives/src/lib.rs index 0650268e..41ef259f 100644 --- a/primitives/src/lib.rs +++ b/primitives/src/lib.rs @@ -86,6 +86,9 @@ pub type BlockId = generic::BlockId; /// Block Hash type pub type BlockHash =
::Hash; +/// A hash of extrinsic. +pub type TransactionHash = Hash; + /// The address format for describing accounts. pub type Address = sp_runtime::MultiAddress; diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 7a20ddee..78386293 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,5 +1,5 @@ [toolchain] -channel = "1.74" +channel = "1.74.1" targets = [ "wasm32-unknown-unknown" ] components = [ "rustfmt", "clippy", "rust-src" ] profile = "minimal" diff --git a/scripts/.DS_Store b/scripts/.DS_Store new file mode 100644 index 00000000..a8bb3fb1 Binary files /dev/null and b/scripts/.DS_Store differ diff --git a/scripts/accounts-invariants/aleph_chain_version.py b/scripts/accounts-invariants/aleph_chain_version.py new file mode 100644 index 00000000..58e00b00 --- /dev/null +++ b/scripts/accounts-invariants/aleph_chain_version.py @@ -0,0 +1,28 @@ +import enum + + +class AlephChainVersion(enum.IntEnum): + VERSION_11_4 = 65, + VERSION_12_0 = 67, + VERSION_12_2 = 68, + VERSION_13_0 = 70, + VERSION_13_2 = 71, + VERSION_12_3 = 72, + VERSION_13_3 = 73, + VERSION_14_X = 14000000, + + @classmethod + def from_spec_version(cls, spec_version): + return cls(spec_version) + + +def get_aleph_chain_version(chain_connection, block_hash): + """ + Retrieves spec_version from chain and returns an `AlephChainVersion` enum + :param chain_connection: WS handler + :param block_hash: Block hash to query state from + :return: AlephChainVersion + """ + runtime_version = chain_connection.get_block_runtime_version(block_hash) + spec_version = runtime_version['specVersion'] + return AlephChainVersion.from_spec_version(spec_version) diff --git a/scripts/accounts-invariants/common.py b/scripts/accounts-invariants/chain_operations.py similarity index 51% rename from scripts/accounts-invariants/common.py rename to scripts/accounts-invariants/chain_operations.py index c6bbdcb6..ddfced42 100644 --- a/scripts/accounts-invariants/common.py +++ b/scripts/accounts-invariants/chain_operations.py @@ -1,60 +1,9 @@ -#!/bin/python3 - -import enum -import logging -import datetime - import substrateinterface -import json - -def get_global_logger(): - log_formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s") - root_logger = logging.getLogger() - root_logger.setLevel('DEBUG') - - time_now = datetime.datetime.now().strftime("%d-%m-%Y_%H:%M:%S") - file_handler = logging.FileHandler(f"pallet-balances-maintenance-{time_now}.log") - file_handler.setFormatter(log_formatter) - file_handler.setLevel(logging.DEBUG) - root_logger.addHandler(file_handler) - - console_handler = logging.StreamHandler() - console_handler.setFormatter(log_formatter) - console_handler.setLevel(logging.INFO) - root_logger.addHandler(console_handler) - - return logging - - -log = get_global_logger() - - -class ChainMajorVersion(enum.Enum): - PRE_12_MAJOR_VERSION = 65, - AT_LEAST_12_MAJOR_VERSION = 68, - AT_LEAST_13_2_VERSION = 71, - - @classmethod - def from_spec_version(cls, spec_version): - if spec_version <= 65: - return cls(ChainMajorVersion.PRE_12_MAJOR_VERSION) - elif 68 <= spec_version < 71 or spec_version == 72: - return cls(ChainMajorVersion.AT_LEAST_12_MAJOR_VERSION) - elif spec_version >= 71: - return cls(ChainMajorVersion.AT_LEAST_13_2_VERSION) - +from tqdm import tqdm +import sys +import logging -def get_chain_major_version(chain_connection, block_hash): - """ - Retrieves spec_version from chain and returns an enum whether this is pre 12 version or at least 12 version - :param chain_connection: WS handler - :param block_hash: Block hash to query state from - :return: ChainMajorVersion - """ - runtime_version = chain_connection.get_block_runtime_version(block_hash) - spec_version = runtime_version['specVersion'] - major_version = ChainMajorVersion.from_spec_version(spec_version) - return major_version +log = logging.getLogger() def filter_accounts(chain_connection, @@ -80,22 +29,19 @@ def filter_accounts(chain_connection, page_size=1000, block_hash=block_hash) total_accounts_count = 0 - total_issuance = 0 - for (i, (account_id, info)) in enumerate(account_query): + for (i, (account_id, info)) in tqdm(iterable=enumerate(account_query), + desc="Accounts checked", + unit="", + file=sys.stdout): total_accounts_count += 1 - free = info['data']['free'].value - reserved = info['data']['reserved'].value - total_issuance += free + reserved if check_accounts_predicate(info, chain_major_version, ed): accounts_that_do_meet_predicate.append([account_id.value, info.serialize()]) - if i % 5000 == 0 and i > 0: - log.info(f"Checked {i} accounts") log.info( f"Total accounts that match given predicate {check_accounts_predicate_name} is {len(accounts_that_do_meet_predicate)}") log.info(f"Total accounts checked: {total_accounts_count}") - return accounts_that_do_meet_predicate, total_issuance + return accounts_that_do_meet_predicate def format_balance(chain_connection, amount): @@ -111,44 +57,6 @@ def format_balance(chain_connection, amount): return f"{amount} {token}" -def batch_transfer(chain_connection, - input_args, - accounts, - amount, - sender_keypair): - """ - Send Balance.Transfer calls in a batch - :param chain_connection: WS connection handler - :param input_args: script input arguments returned from argparse - :param accounts: transfer beneficents - :param amount: amount to be transferred - :param sender_keypair: keypair of sender account - :return: None. Can raise exception in case of SubstrateRequestException thrown - """ - for (i, account_ids_chunk) in enumerate(chunks(accounts, input_args.transfer_calls_in_batch)): - balance_calls = list(map(lambda account: chain_connection.compose_call( - call_module='Balances', - call_function='transfer', - call_params={ - 'dest': account, - 'value': amount, - }), account_ids_chunk)) - batch_call = chain_connection.compose_call( - call_module='Utility', - call_function='batch', - call_params={ - 'calls': balance_calls - } - ) - - extrinsic = chain_connection.create_signed_extrinsic(call=batch_call, keypair=sender_keypair) - log.info(f"About to send {len(balance_calls)} transfers, each with {format_balance(chain_connection, amount)} " - f"from {sender_keypair.ss58_address} to below accounts: " - f"{account_ids_chunk}") - - submit_extrinsic(chain_connection, extrinsic, len(balance_calls), args.dry_run) - - def submit_extrinsic(chain_connection, extrinsic, expected_number_of_events, @@ -193,18 +101,4 @@ def get_all_accounts(chain_connection, block_hash=None): chain_major_version=None, check_accounts_predicate=lambda x, y, z: True, check_accounts_predicate_name="\'all accounts\'", - block_hash=block_hash)[0] - - -def save_accounts_to_json_file(json_file_name, accounts): - with open(json_file_name, 'w') as f: - json.dump(accounts, f) - log.info(f"Wrote file '{json_file_name}'") - - -def chunks(list_of_elements, n): - """ - Lazily split 'list_of_elements' into 'n'-sized chunks. - """ - for i in range(0, len(list_of_elements), n): - yield list_of_elements[i:i + n] + block_hash=block_hash) diff --git a/scripts/accounts-invariants/contracts.py b/scripts/accounts-invariants/contracts.py new file mode 100644 index 00000000..fbe2ae53 --- /dev/null +++ b/scripts/accounts-invariants/contracts.py @@ -0,0 +1,44 @@ +import logging +from tqdm import tqdm +import sys + +log = logging.getLogger() + + +def query_contract_and_code_owners_accounts(chain_connection, block_hash): + """ + Returns contract accounts and code owners. + """ + code_owners = set() + contract_accounts = set() + + log.info(f"Querying code owners.") + code_info_of_query = chain_connection.query_map(module='Contracts', + storage_function='CodeInfoOf', + page_size=500, + block_hash=block_hash) + + for (i, (account_id, info)) in tqdm(iterable=enumerate(code_info_of_query), + desc="CodeInfoOfs checked", + unit="", + file=sys.stdout): + code_owners.add(info.serialize()['owner']) + + log.info(f"Total code owners is {len(code_owners)}") + log.debug(f"Code owners: {code_owners}") + + log.info(f"Querying contract accounts.") + contract_info_of_query = chain_connection.query_map(module='Contracts', + storage_function='ContractInfoOf', + page_size=1000, + block_hash=block_hash) + for (i, (account_id, info)) in tqdm(iterable=enumerate(contract_info_of_query), + desc="ContractInfoOfs checked", + unit="", + file=sys.stdout): + contract_accounts.add(account_id.value) + + log.info(f"Total contracts count is {len(contract_accounts)}") + log.debug(f"Contract accounts: {contract_accounts}") + + return code_owners, contract_accounts diff --git a/scripts/accounts-invariants/logger.py b/scripts/accounts-invariants/logger.py new file mode 100644 index 00000000..36885f7e --- /dev/null +++ b/scripts/accounts-invariants/logger.py @@ -0,0 +1,19 @@ +import logging +import datetime + + +def setup_global_logger(): + log_formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s") + root_logger = logging.getLogger() + root_logger.setLevel('DEBUG') + + time_now = datetime.datetime.now().strftime("%d-%m-%Y_%H:%M:%S") + file_handler = logging.FileHandler(f"pallet-balances-maintenance-{time_now}.log") + file_handler.setFormatter(log_formatter) + file_handler.setLevel(logging.DEBUG) + root_logger.addHandler(file_handler) + + console_handler = logging.StreamHandler() + console_handler.setFormatter(log_formatter) + console_handler.setLevel(logging.INFO) + root_logger.addHandler(console_handler) diff --git a/scripts/accounts-invariants/pallet-balances-maintenance.py b/scripts/accounts-invariants/pallet-balances-maintenance.py index 088a3fca..039803c8 100755 --- a/scripts/accounts-invariants/pallet-balances-maintenance.py +++ b/scripts/accounts-invariants/pallet-balances-maintenance.py @@ -1,16 +1,18 @@ #!/bin/python3 -import logging - -from common import * -import copy -import json +from chain_operations import * +from aleph_chain_version import * +from utils import * +from total_issuance import * import substrateinterface import argparse import os - -from substrateinterface.storage import StorageKey +import logger +import logging +import contracts +from tqdm import tqdm +import sys def get_args() -> argparse.Namespace: @@ -19,10 +21,9 @@ def get_args() -> argparse.Namespace: Script for maintenance operations on AlephNode chain with regards to pallet balances. It has following functionality: -* workaround for bug https://github.com/paritytech/polkadot-sdk/pull/2700/files, that make sure all - accounts have at least ED as their free balance, -* programmatic support for sending Balances.UpgradeAccounts ext for all accounts, * checking pallet balances and account reference counters invariants. +* calling pallet operations for maintenance actions +* checking total issuance By default, it connects to a AlephZero Testnet and performs sanity checks only ie not changing state of the chain at all. Accounts that do not satisfy those checks are written to accounts-with-failed-invariants.json file. @@ -42,19 +43,6 @@ def get_args() -> argparse.Namespace: parser.add_argument('--dry-run', action='store_true', help='Specify this switch if script should just print what if would do. Default: False') - parser.add_argument('--transfer-calls-in-batch', - type=int, - default=64, - help='How many transfer calls to perform in a single batch transaction. Default: 64') - parser.add_argument('--upgrade-accounts-in-batch', - type=int, - default=128, - help='How many accounts to upgrade in a single transaction. Default: 128') - parser.add_argument('--double-providers-accounts-to-fix', - type=int, - default=10, - help='How many accounts to fix in one script run session.' - ' Default: 10') parser.add_argument('--fix-consumers-calls-in-batch', type=int, default=10, @@ -64,29 +52,16 @@ def get_args() -> argparse.Namespace: type=str, default='', help='Block hash from which this script should query state from. Default: chain tip.') + parser.add_argument('--start-range-block-hash', + type=str, + default='', + help='A block hash which denotes starting interval when searching for total issuance ' + 'imbalance. Must be present when --check-total-issuance is set.') + parser.add_argument('--check-total-issuance', + action='store_true', + help='Specify this switch if script should compare total issuance aggregated over all ' + 'accounts with StorageValue balances.total_issuance') group = parser.add_mutually_exclusive_group() - group.add_argument('--fix-free-balance', - action='store_true', - help='Specify this switch if script should find all accounts ' - 'that have free < ED and send transfers so that free >= ED. ' - 'It requires AlephNode < 12.X version and SENDER_ACCOUNT env to be set with ' - 'a mnemonic phrase of sender account that has funds for transfers and fees. ' - 'dust-accounts.json file is saved with all such accounts.' - 'Default: False') - group.add_argument('--upgrade-accounts', - action='store_true', - help='Specify this switch if script should send Balances.UpgradeAccounts ' - 'for all accounts on a chain. It requires at least AlephNode 12.X version ' - 'and SENDER_ACCOUNT env to be set with a mnemonic phrase of sender account that has funds ' - 'for transfers and fees.' - 'Default: False') - group.add_argument('--fix-double-providers-count', - action='store_true', - help='Specify this switch if script should query all accounts that have providers == 2' - 'and decrease this counter by one using System.SetStorage call. ' - 'It requires at least AlephNode 12.X version ' - 'and SENDER_ACCOUNT env to be set with a sudo mnemonic phrase. ' - 'Default: False') group.add_argument('--fix-consumers-counter-underflow', action='store_true', help='Specify this switch if script should query all accounts that have underflow consumers ' @@ -94,12 +69,6 @@ def get_args() -> argparse.Namespace: 'Operations.fix_accounts_consumers_underflow call.' 'Query operation is possible on all chains, but fix only on AlephNode >= 13.2' 'Default: False') - group.add_argument('--fix-consumers-counter-overflow', - action='store_true', - help='Specify this switch if script should query all accounts that have overflow consumers ' - 'counter, and decrease this counter by one using System.SetStorage call. ' - 'It requires at least AlephNode 12.X version ' - 'and SENDER_ACCOUNT env to be set with a sudo mnemonic phrase.') return parser.parse_args() @@ -121,7 +90,7 @@ def check_account_invariants(account, chain_major_version, ed): # balance so max possible value of providers is 1 account_ref_counter_invariant = (providers <= 1 and consumers == 0) or (consumers > 0 and providers == 1) - if chain_major_version == ChainMajorVersion.PRE_12_MAJOR_VERSION: + if chain_major_version <= AlephChainVersion.VERSION_11_4: misc_frozen = account['data']['misc_frozen'].value fee_frozen = account['data']['fee_frozen'].value @@ -159,485 +128,6 @@ def check_account_invariants(account, chain_major_version, ed): consumer_ref_applies_to_suspended_balances_invariant -def check_if_account_would_be_dust_in_12_version(account, chain_major_version, ed): - """ - This predicate checks if a valid account in pre-12 version will be invalid in version 12. - - :param account: AccountInfo struct (element of System.Accounts StorageMap) - :param chain_major_version: Must be < 12 - :param ed: existential deposit - :return: True if account free balance < ED, False otherwise - """ - - assert chain_major_version == ChainMajorVersion.PRE_12_MAJOR_VERSION, \ - "Chain major version must be less than 12!" - assert check_account_invariants(account, chain_major_version, ed), \ - f"Account {account} does not meet pre-12 version invariants!" - - free = account['data']['free'].value - - return free < ed - - -def find_dust_accounts(chain_connection, ed, chain_major_version, block_hash=None): - """ - This function finds all accounts that are valid in 11 version, but not on 12 version - """ - assert chain_major_version == ChainMajorVersion.PRE_12_MAJOR_VERSION, \ - "Chain major version must be less than 12!" - return filter_accounts(chain_connection=chain_connection, - ed=ed, - chain_major_version=chain_major_version, - check_accounts_predicate=check_if_account_would_be_dust_in_12_version, - check_accounts_predicate_name="\'account valid in pre-12 version but not in 12 version\'", - block_hash=block_hash)[0] - - -def is_account_not_upgraded(account, chain_major_version, ed): - """ - Checks if accounts has LSB flag not set, so it's not upgraded. - """ - assert chain_major_version in [ChainMajorVersion.AT_LEAST_12_MAJOR_VERSION, - ChainMajorVersion.AT_LEAST_13_2_VERSION], \ - "Chain major version must be at least 12!" - - flags = account['data']['flags'].value - is_account_already_upgraded = flags >= 2 ** 127 - return not is_account_already_upgraded - - -def upgrade_accounts(chain_connection, - input_args, - ed, - chain_major_version, - sender_keypair, - block_hash=None): - """ - Prepare and send Balances.UpgradeAccounts call for all accounts on a chain that are not upgraded yet - :param chain_connection: WS connection handler - :param input_args: script input arguments returned from argparse - :param ed: chain existential deposit - :param chain_major_version: enum ChainMajorVersion - :param sender_keypair: keypair of sender account - :param block_hash: A block hash to query state from - :return: None. Can raise exception in case of SubstrateRequestException thrown - """ - log.info("Querying all accounts.") - not_upgraded_accounts_and_info = filter_accounts(chain_connection=chain_connection, - ed=None, - chain_major_version=chain_major_version, - check_accounts_predicate=is_account_not_upgraded, - check_accounts_predicate_name="\'not upgraded accounts\'", - block_hash=block_hash)[0] - save_accounts_to_json_file("not-upgraded-accounts.json", not_upgraded_accounts_and_info) - not_upgraded_accounts = list(map(lambda x: x[0], not_upgraded_accounts_and_info)) - - for (i, account_ids_chunk) in enumerate(chunks(not_upgraded_accounts, input_args.upgrade_accounts_in_batch)): - upgrade_accounts_call = chain_connection.compose_call( - call_module='Balances', - call_function='upgrade_accounts', - call_params={ - 'who': account_ids_chunk, - } - ) - - extrinsic = chain_connection.create_signed_extrinsic(call=upgrade_accounts_call, keypair=sender_keypair) - log.info( - f"About to upgrade {len(account_ids_chunk)} accounts, each with " - f"{format_balance(chain_connection, existential_deposit)}") - - submit_extrinsic(chain_connection, extrinsic, len(account_ids_chunk), args.dry_run) - - -def check_if_account_has_double_providers(account, chain_major_version, ed): - """ - This predicate checks if an account's providers counter is equal to 2 - :param account: AccountInfo struct (element of System.Accounts StorageMap) - :param chain_major_version: Must be >= 12 - :param ed: existential deposit, not used - :return: True if providers == 2, False otherwise - """ - - assert chain_major_version == ChainMajorVersion.AT_LEAST_12_MAJOR_VERSION, \ - "Chain major version must be at least 12!" - - providers = account['providers'].value - - return providers == 2 - - -def state_sanity_check(chain_connection, - account_id, - set_storage_block_hash, - account_info_assert_function): - """ - Makes sure AccountInfo data changed only in terms of given assert function, by comparing state between block that - setStorage landed vs state of that block parent. - :param chain_connection WS connection handler - :param account_id: An account to check - :param set_storage_block_hash: a hash of a block that contains setStorage - :param account_info_assert_function: a function that checks for previous and current block state - :return: None, but might raise AssertionError. - """ - set_storage_block_number = chain_connection.get_block_number(set_storage_block_hash) - parent_block_number = set_storage_block_number - 1 - parent_block_hash = chain_connection.get_block_hash(parent_block_number) - - log.info(f"Comparing AccountInfo for account {account_id} " - f"between blocks #{parent_block_number} and #{set_storage_block_number}") - parent_account_info_state = chain_connection.query(module='System', - storage_function='Account', - params=[account_id], - block_hash=parent_block_hash).value - log.debug(f"Account state in the parent block: {parent_account_info_state}") - set_storage_info_state = chain_connection.query(module='System', - storage_function='Account', - params=[account_id], - block_hash=set_storage_block_hash).value - log.debug(f"Account state in the setStorage block: {set_storage_info_state}") - account_info_assert_function(account_id, parent_account_info_state, set_storage_block_number, - set_storage_info_state) - - -def assert_state_different_only_in_providers_counter(account_id, parent_account_info_state, set_storage_block_number, - set_storage_info_state): - assert parent_account_info_state['providers'] == 2, f"Providers before fix for account {account_id} is not 2!" - assert set_storage_info_state['providers'] == 1, f"Providers after fix for account {account_id} is not 1!" - changed_state_for_sake_of_assert = copy.deepcopy(parent_account_info_state) - changed_state_for_sake_of_assert['providers'] = 1 - assert changed_state_for_sake_of_assert == set_storage_info_state, \ - f"Parent account info state is different from set storage state with more than providers counter! " \ - f"Parent state: {parent_account_info_state} " \ - f"Set storage state: {set_storage_info_state}" \ - f"Set storage block number {set_storage_block_number}" - - -def assert_state_different_only_in_consumers_counter(account_id, parent_account_info_state, set_storage_block_number, - set_storage_info_state): - assert parent_account_info_state['consumers'] - 1 == set_storage_info_state['consumers'], \ - f"Consumers counter before fix for account {account_id} is not decremented!" - changed_state_for_sake_of_assert = copy.deepcopy(parent_account_info_state) - changed_state_for_sake_of_assert['consumers'] = set_storage_info_state['consumers'] - assert changed_state_for_sake_of_assert == set_storage_info_state, \ - f"Parent account info state is different from set storage state with more than consumers counter! " \ - f"Parent state: {parent_account_info_state} " \ - f"Set storage state: {set_storage_info_state}" \ - f"Set storage block number {set_storage_block_number}" - - -def get_system_account_metadata_scale_codec_type(chain_connection): - """ - Retrieves System.Account metadata decoder type - :param chain_connection WS connection handler - :return: string representation of System.Account decoder type - """ - system_account_storage_function = \ - next(filter( - lambda storage_function: storage_function['storage_name'] == 'Account' and - storage_function['module_id'] == 'System', - chain_connection.get_metadata_storage_functions())) - internal_metadata_scale_codec_type = system_account_storage_function['type_value'] - return internal_metadata_scale_codec_type - - -def fix_double_providers_count_for_account(chain_connection, - account_id, - sudo_sender_keypair, - internal_system_account_scale_codec_type, - input_args, - block_hash=None): - """ - Fixes double providers counter for a given account id. - :param chain_connection: WS connection handler - :param account_id: Account id to which we should fix providers counter - :param sudo_sender_keypair Mnemonic phrase of sudo - :param internal_system_account_scale_codec_type Internal metadata SCALE decoder/encoder type for System.Account - entry - :param block_hash: A block hash to query state from - """ - log.info(f"Fixing double providers counter for account {account_id}") - fix_account_info_with_set_storage(chain_connection=chain_connection, - account_id=account_id, - sudo_sender_keypair=sudo_sender_keypair, - internal_system_account_scale_codec_type=internal_system_account_scale_codec_type, - input_args=input_args, - account_info_functor=set_providers_counter_to_one, - account_info_check_functor=assert_state_different_only_in_providers_counter, - block_hash=block_hash) - - -def fix_account_info_with_set_storage(chain_connection, - account_id, - sudo_sender_keypair, - internal_system_account_scale_codec_type, - input_args, - account_info_functor, - account_info_check_functor, - block_hash=None): - """ - General function to fix AccountInfo using System.SetStorage call. - :param chain_connection: WS connection handler - :param account_id: Account id to which we should fix providers counter - :param sudo_sender_keypair Mnemonic phrase of sudo - :param internal_system_account_scale_codec_type Internal metadata SCALE decoder/encoder type for System.Account - entry - :param account_info_functor: a function which returns fixed account info - :param account_info_check_functor: a function which compares previous and current block states - :param block_hash: A block hash to query state from - """ - log.info(f"Querying state for account {account_id}") - result = chain_connection.query(module='System', - storage_function='Account', - params=[account_id], - block_hash=block_hash) - log.debug(f"Returned value: {result.value}") - account_id_and_account_info_data = [(account_id, result.value)] - raw_key_values = get_raw_key_values(chain_connection, - account_id_and_account_info_data, - internal_system_account_scale_codec_type, - account_info_functor) - - set_storage_call = chain_connection.compose_call( - call_module='System', - call_function='set_storage', - call_params={ - 'items': raw_key_values, - }) - # ref time is set to 400ms to make sure this is the only tx that ends up in a block - # 359 875 586 000 is a maximal weight (found empirically) that sudo_unchecked_weight is able to consume - max_weight = 359875586000 - sudo_unchecked_weight_call = chain_connection.compose_call( - call_module='Sudo', - call_function='sudo_unchecked_weight', - call_params={ - 'call': set_storage_call, - 'weight': { - 'proof_size': 0, - 'ref_time': max_weight, - }, - } - ) - - # add a small tip to make sure this will be the first transaction in the block - token_mili_unit = 1000000000 - extrinsic = chain_connection.create_signed_extrinsic(call=sudo_unchecked_weight_call, - keypair=sudo_sender_keypair, - tip=token_mili_unit) - set_storage_block_hash = submit_extrinsic(chain_connection, extrinsic, 1, input_args.dry_run) - if not input_args.dry_run: - state_sanity_check(chain_connection, - account_id, - set_storage_block_hash, - account_info_check_functor) - - -def fix_double_providers_count(chain_connection, - input_args, - chain_major_version, - sudo_sender_keypair, - block_hash=None): - """ - Queries those accounts using System.Account map which have providers == 2. - For each such account, performs System.SetStorage with the same data but providers set to 1. - Must be run on AlephNode chain with at least 12 version. - :param chain_connection: WS connection handler - :param input_args: script input arguments returned from argparse - :param chain_major_version: enum ChainMajorVersion - :param sudo_sender_keypair: sudo keypair of sender account - :param block_hash: A block hash to query state from - :return: None. Can raise exception in case of SubstrateRequestException thrown - """ - log.info("Querying all accounts that have double provider counter.") - double_providers_accounts = filter_accounts(chain_connection=chain_connection, - ed=None, - chain_major_version=chain_major_version, - check_accounts_predicate=check_if_account_has_double_providers, - check_accounts_predicate_name="\'double provider count\'", - block_hash=block_hash)[0] - log.info(f"Found {len(double_providers_accounts)} accounts with double provider counter.") - if len(double_providers_accounts) > 0: - save_accounts_to_json_file("double-providers-accounts.json", double_providers_accounts) - log.info(f"Will fix at most first {input_args.double_providers_accounts_to_fix} accounts.") - internal_system_account_scale_codec_type = get_system_account_metadata_scale_codec_type(chain_connection) - for account_id, _ in double_providers_accounts[:input_args.double_providers_accounts_to_fix]: - fix_double_providers_count_for_account(chain_connection, - account_id, - sudo_sender_keypair, - internal_system_account_scale_codec_type, - input_args, - block_hash) - - -def get_raw_key_values(chain_connection, - account_id_and_data_chunk, - internal_system_account_scale_codec_type, - account_info_functor): - """ - Prepares input arguments for System.setStorage calls wth fixed providers counter - :param chain_connection: WS connection handler. Used for passing metadata when creating storage keys, which - is a valid assumption that it's not going to change during this script execution - :param account_id_and_data_chunk: A list of tuples (account_id, decoded_account_info) - :param internal_system_account_scale_codec_type Internal metadata SCALE decoder/encoder type for System.Account - entry - :param account_info_functor: function that manipulates input account info and returns corrected data - :return: List of tuples (system_account_storage_key_hexstring, account_info_raw_value_hexstring) ready to be sent - to System.setStorage call - """ - account_ids_chunk = list( - map(lambda account_id_and_data: account_id_and_data[0], account_id_and_data_chunk)) - system_account_storage_keys_hexstrings = list(map( - lambda account_id: - StorageKey.create_from_storage_function(pallet="System", - storage_function="Account", - params=[account_id], - runtime_config=chain_connection.runtime_config, - metadata=chain_connection.metadata).to_hex(), - account_ids_chunk)) - account_info_chunk = list( - map(lambda account_id_and_data: account_id_and_data[1], account_id_and_data_chunk)) - raw_hexstring_values = list(map(lambda account_info: account_info_functor(chain_connection, - account_info, - internal_system_account_scale_codec_type), - account_info_chunk)) - raw_key_values = list(zip(system_account_storage_keys_hexstrings, raw_hexstring_values)) - log.info(f"Prepared {len(raw_key_values)} raw key value pairs.") - log.debug(f"{raw_key_values}") - return raw_key_values - - -def assert_same_data_except_providers_counter(account_data_hexstring, - account_data_with_fixed_providers_counter_hexstring): - """ - Function makes sure previous and fixed account data is different only in providers counter - :param account_data_hexstring: Hexstring (raw value) representation of original AccountData - :param account_data_with_fixed_providers_counter_hexstring: Hexstring representation (raw value) of AccountData with - fixed providers counter - :return: None, but raises AssertionError in case data is different not only in providers counter - """ - # example hexstring of AccountInfo is - # 0x00000000000000000100000000000000f4010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000080 - # difference can be only on byte 20th, which is LSB of providers counter that must be equal to 1 in fixed data - assert account_data_hexstring[:19] == account_data_with_fixed_providers_counter_hexstring[:19], \ - f"First 20 bytes of original and fixed AccountInfo must be equal" - assert account_data_with_fixed_providers_counter_hexstring[19] == '1', \ - f"Providers counter of fixed AccountInfo must be 1" - assert account_data_hexstring[20:] == account_data_with_fixed_providers_counter_hexstring[20:], \ - f"Last but 21 bytes of original and fixed AccountInfo must be equal" - - -def assert_same_data_except_consumers_counter(account_data_hexstring, - account_data_with_decremented_consumers_counter_hexstring): - """ - Function makes sure previous and fixed account data is different only in consumers counter - :param account_data_hexstring: Hexstring (raw value) representation of original AccountData - :param account_data_with_decremented_consumers_counter_hexstring: Hexstring representation (raw value) of - AccountData with decremented consumers counter - :return: None, but raises AssertionError in case data is different not only in consumers counter - """ - # example hexstring of AccountInfo is that has consumers == 4 - # 0x1a0000000400000001000000000000008dbc99a42761730200000000000000000094ff113c0000000000000000000000c1015dcaffee71020000000000000000b5985591727f46020000000000000080 - # difference can be only on byte 12th, which is LSB of consumers counter - assert account_data_hexstring[:11] == account_data_with_decremented_consumers_counter_hexstring[:11], \ - f"First 12 bytes of original and fixed AccountInfo must be equal" - assert int(account_data_hexstring[11]) - 1 == int( - account_data_with_decremented_consumers_counter_hexstring[11]), \ - f"Consumers counter of fixed AccountInfo must be decremented" - assert account_data_hexstring[12:] == account_data_with_decremented_consumers_counter_hexstring[12:], \ - f"Last but 12 bytes of original and fixed AccountInfo must be equal" - - -def set_providers_counter_to_one(chain_connection, account_info, internal_system_account_scale_codec_type): - """ - Method sets provider counter for a System.Account to 1 using System.SetStorage call. Since we must replace whole - System.Account value, which contains also other account counters as well as balances data for the account, this - solution is prone to a race condition in which we this data is altered meanwhile we issue set_storage. Practically, - this can happen either that here is a transaction that ends up in the same block as set_storage or just before, - causing a (write) race condition. In order to prevent that one needs to read state of parent of the block that - contains setStorge transaction and make sure only difference in state is providers counter. - - This function encodes original AccountInfo with fixed providers count (set to 1). It also asserts - original and fixed AccountInfo, encoded as hexstrings, is different only on the providers counter. - - :param chain_connection: WS connection handler. Uses for passing metadata when creating storage keys, which - is a valid assumption that it's not going to change during this script execution - :param account_info: decoded AccountInfo that has double providers counter - :param internal_system_account_scale_codec_type Internal metadata SCALE decoder/encoder type for System.Account - entry - :return: Raw storage value hexstring representation of AccountInfo with providers counter set to 1 - """ - fixed_account_data = copy.deepcopy(account_info) - fixed_account_data['providers'] = 1 - scale_object = chain_connection.runtime_config.create_scale_object( - type_string=internal_system_account_scale_codec_type, metadata=chain_connection.metadata - ) - account_data_with_fixed_providers_counter = scale_object.encode(fixed_account_data) - fixed_account_info_hexstring = account_data_with_fixed_providers_counter.to_hex() - original_encoded_data_hexstring = scale_object.encode(account_info).to_hex() - assert_same_data_except_providers_counter(original_encoded_data_hexstring, - fixed_account_info_hexstring) - return fixed_account_info_hexstring - - -def decrement_consumers_counter(chain_connection, account_info, internal_system_account_scale_codec_type): - """ - See description of `set_providers_counter_to_one` for more details. - :param chain_connection: WS connection handler. Uses for passing metadata when creating storage keys, which - is a valid assumption that it's not going to change during this script execution - :param account_info: decoded AccountInfo that has consumers counter overflow - :param internal_system_account_scale_codec_type Internal metadata SCALE decoder/encoder type for System.Account - entry - :return: Raw storage value hexstring representation of AccountInfo with decremented consumers counter or - AssertionError if consumers is 0. - """ - fixed_account_data = copy.deepcopy(account_info) - assert fixed_account_data['consumers'] > 0, f"Consumers counter of account {account_info} must be > 0!" - fixed_account_data['consumers'] -= 1 - scale_object = chain_connection.runtime_config.create_scale_object( - type_string=internal_system_account_scale_codec_type, metadata=chain_connection.metadata - ) - account_data_with_fixed_consumers_counter = scale_object.encode(fixed_account_data) - fixed_account_info_hexstring = account_data_with_fixed_consumers_counter.to_hex() - original_encoded_data_hexstring = scale_object.encode(account_info).to_hex() - assert_same_data_except_consumers_counter(original_encoded_data_hexstring, - fixed_account_info_hexstring) - return fixed_account_info_hexstring - - -def query_contract_and_code_owners_accounts(chain_connection, block_hash): - """ - Returns contract accounts and code owners. - """ - code_owners = set() - contract_accounts = set() - - log.info(f"Querying code owners.") - code_info_of_query = chain_connection.query_map(module='Contracts', - storage_function='CodeInfoOf', - page_size=1000, - block_hash=block_hash) - - for (i, (account_id, info)) in enumerate(code_info_of_query): - code_owners.add(info.serialize()['owner']) - if i % 5000 == 0 and i > 0: - log.info(f"Checked {i} code owners") - log.info(f"Total code owners is {len(code_owners)}") - log.debug(f"Code owners: {code_owners}") - - log.info(f"Querying contract accounts.") - contract_info_of_query = chain_connection.query_map(module='Contracts', - storage_function='ContractInfoOf', - page_size=1000, - block_hash=block_hash) - for (i, (account_id, info)) in enumerate(contract_info_of_query): - contract_accounts.add(account_id.value) - if i % 5000 == 0 and i > 0: - log.info(f"Checked {i} contracts") - log.info(f"Total contracts count is {len(contract_accounts)}") - log.debug(f"Contract accounts: {contract_accounts}") - - return code_owners, contract_accounts - - def reserved_or_frozen_non_zero(account_id, account_info): """ Checks if an account has zero consumers, but non-zero reserved amount. @@ -661,22 +151,6 @@ def get_vesting_lock(account_id_locks): return vesting_lock is not None -def has_account_consumer_overflow(account_id_and_info, locks, bonded, next_keys): - """ - Returns True if an account has consumers overflow - """ - account_id, account_info = account_id_and_info - consumers = account_info['consumers'] - if account_id in locks and len(locks[account_id]) > 0 and get_staking_lock(locks[account_id]) is not None and \ - consumers == 4 and \ - account_id in next_keys and \ - account_id in bonded and bonded[account_id] != account_id: - log.debug(f"Found an account that has four consumers, staking lock, next session key, " - f"and stash != controller: {account_id}") - return True - return False - - def has_account_consumer_underflow(account_id_and_info, locks, bonded, ledger, next_keys, contract_accounts): """ Returns True if an account has consumers underflow @@ -734,23 +208,6 @@ def get_expected_consumers_counter_in_13_version(account_id, return expected_consumers -def query_accounts_with_consumers_counter_overflow(chain_connection, block_hash=None): - """ - Queries all accounts that have an overflow of consumers counter, ie accounts which satisfy below conditions: - * `consumers` == 4, - `balances.Locks` contain entries with `id` == `staking`, - `staking.bonded(accountId) != accountId`, - accountId is in `session.nextKeys` - :param chain_connection: WS connection handler - :param block_hash: A block hash to query state from - """ - bonded, _, locks, next_keys = get_consumers_related_data(chain_connection, block_hash) - - log.info("Querying all accounts and filtering by consumers overflow predicate.") - return [account_id_and_info for account_id_and_info in get_all_accounts(chain_connection, block_hash) if - has_account_consumer_overflow(account_id_and_info, locks, bonded, next_keys)] - - def query_accounts_with_consumers_counter_underflow(chain_connection, block_hash=None): """ Queries all accounts that have an underflow of consumers counter by calculating expected number of consumers and @@ -760,7 +217,7 @@ def query_accounts_with_consumers_counter_underflow(chain_connection, block_hash :param block_hash: A block hash to query state from """ bonded, ledger, locks, next_keys = get_consumers_related_data(chain_connection, block_hash) - _, contract_accounts = query_contract_and_code_owners_accounts( + _, contract_accounts = contracts.query_contract_and_code_owners_accounts( chain_connection=chain_ws_connection, block_hash=block_hash) @@ -822,7 +279,10 @@ def batch_fix_accounts_consumers_underflow(chain_connection, :param sender_keypair: keypair of sender account :return: None. Can raise exception in case of SubstrateRequestException thrown """ - for (i, account_ids_chunk) in enumerate(chunks(accounts, input_args.fix_consumers_calls_in_batch)): + for (i, account_ids_chunk) in tqdm(iterable=enumerate(chunks(accounts, input_args.fix_consumers_calls_in_batch)), + desc="Accounts checked", + unit="", + file=sys.stdout): operations_calls = list(map(lambda account: chain_connection.compose_call( call_module='Operations', call_function='fix_accounts_consumers_underflow', @@ -845,36 +305,10 @@ def batch_fix_accounts_consumers_underflow(chain_connection, submit_extrinsic(chain_connection, extrinsic, len(operations_calls), args.dry_run) -def fix_overflow_consumers_counter_for_account(chain_connection, - account_id, - sudo_sender_keypair, - internal_system_account_scale_codec_type, - input_args, - block_hash=None): - """ - Decrements consumers counter for fiven account id - :param chain_connection: WS connection handler - :param account_id: Account id to which we should decrement consumers counter - :param sudo_sender_keypair Mnemonic phrase of sudo - :param internal_system_account_scale_codec_type Internal metadata SCALE decoder/encoder type for System.Account - entry - :param block_hash: A block has to query state from - """ - log.info(f"Decrementing consumers counter for account {account_id}") - fix_account_info_with_set_storage(chain_connection=chain_connection, - account_id=account_id, - sudo_sender_keypair=sudo_sender_keypair, - internal_system_account_scale_codec_type=internal_system_account_scale_codec_type, - input_args=input_args, - account_info_functor=decrement_consumers_counter, - account_info_check_functor=assert_state_different_only_in_consumers_counter, - block_hash=block_hash) - - def perform_accounts_sanity_checks(chain_connection, ed, chain_major_version, - total_issuance_from_chain): + block_hash=None): """ Checks whether all accounts on a chain matches pallet balances invariants :param chain_connection: WS connection handler @@ -882,35 +316,28 @@ def perform_accounts_sanity_checks(chain_connection, :param chain_major_version: enum ChainMajorVersion :return:None """ - invalid_accounts, total_issuance_from_accounts = \ - filter_accounts(chain_connection=chain_connection, - ed=ed, - chain_major_version=chain_major_version, - check_accounts_predicate=lambda x, y, z: not check_account_invariants(x, y, z), - check_accounts_predicate_name="\'incorrect account invariants\'") + invalid_accounts = filter_accounts(chain_connection=chain_connection, + ed=ed, + chain_major_version=chain_major_version, + check_accounts_predicate=lambda x, y, z: not check_account_invariants(x, y, z), + check_accounts_predicate_name="\'incorrect account invariants\'", + block_hash=block_hash) if len(invalid_accounts) > 0: log.warning(f"Found {len(invalid_accounts)} accounts that do not meet balances invariants!") save_accounts_to_json_file("accounts-with-failed-invariants.json", invalid_accounts) else: log.info(f"All accounts on chain {chain_connection.chain} meet balances invariants.") - total_issuance_from_accounts_human = format_balance(chain_connection, total_issuance_from_accounts) - log.info(f"Total issuance computed from accounts: {total_issuance_from_accounts_human}") - if total_issuance_from_accounts != total_issuance_from_chain: - total_issuance_from_chain_human = format_balance(chain_connection, total_issuance_from_chain) - delta_human = format_balance(chain_connection, - total_issuance_from_chain - total_issuance_from_accounts) - log.warning(f"TotalIssuance from chain: {total_issuance_from_chain_human} is different from computed: " - f"{total_issuance_from_accounts_human}, delta: {delta_human}") if __name__ == "__main__": + logger.setup_global_logger() + log = logging.getLogger() args = get_args() - if args.fix_free_balance or args.upgrade_accounts or args.fix_double_providers_count \ - or args.fix_consumers_counter_underflow or args.fix_consumers_counter_overflow: + if args.fix_consumers_counter_underflow: sender_origin_account_seed = os.getenv('SENDER_ACCOUNT') if sender_origin_account_seed is None: - log.error(f"When specifying --fix-free-balance or --upgrade-accounts or --fix-double-providers-count or " + log.error(f"When specifying " f"--fix-consumers-counter-underflow, env SENDER_ACCOUNT must exists. Exiting.") exit(1) if args.dry_run: @@ -923,85 +350,20 @@ def perform_accounts_sanity_checks(chain_connection, state_block_hash = chain_ws_connection.get_chain_head() log.info(f"Script uses block hash {state_block_hash} to query state from.") - chain_major_version = get_chain_major_version(chain_ws_connection, state_block_hash) - log.info(f"Major version of chain connected to is {chain_major_version}") - if args.fix_free_balance: - if chain_major_version is not ChainMajorVersion.PRE_12_MAJOR_VERSION: - log.error(f"--fix-free-balance can be used only on chains with pre-12 version. Exiting.") - exit(2) - if args.upgrade_accounts: - if chain_major_version is not ChainMajorVersion.AT_LEAST_12_MAJOR_VERSION: - log.error(f"--upgrade-accounts can be used only on chains with at least 12 version. Exiting.") - exit(3) - if args.fix_double_providers_count: - if chain_major_version is not ChainMajorVersion.AT_LEAST_12_MAJOR_VERSION: - log.error(f"--fix-double-providers-count can be used only on chains with at least 12 version. Exiting.") - exit(4) + aleph_chain_version = get_aleph_chain_version(chain_ws_connection, state_block_hash) + log.info(f"Version of an AlephNode chain connected to is {str(aleph_chain_version)}") if args.fix_consumers_counter_underflow: - if chain_major_version is not ChainMajorVersion.AT_LEAST_13_2_VERSION: + if aleph_chain_version < AlephChainVersion.VERSION_13_2: log.error( f"Fixing underflow consumers account can only be done on AlephNode chains with at least " f"13.2 version. Exiting.") exit(5) - if args.fix_consumers_counter_overflow: - if chain_major_version is not ChainMajorVersion.AT_LEAST_13_2_VERSION: - log.error( - f"Fixing underflow consumers account can only be done on AlephNode chains with at least " - f"13.2 version. Exiting.") - exit(6) - - total_issuance_from_chain = chain_ws_connection.query(module='Balances', - storage_function='TotalIssuance', - block_hash=state_block_hash).value - log.info(f"Chain total issuance is {format_balance(chain_ws_connection, total_issuance_from_chain)}") existential_deposit = chain_ws_connection.get_constant(module_name="Balances", constant_name="ExistentialDeposit", block_hash=state_block_hash).value log.info(f"Existential deposit is {format_balance(chain_ws_connection, existential_deposit)}") - if args.fix_free_balance: - sender_origin_account_keypair = substrateinterface.Keypair.create_from_uri(sender_origin_account_seed) - log.info(f"Using following account for transfers: {sender_origin_account_keypair.ss58_address}") - log.info(f"Will send at most {args.transfer_calls_in_batch} transfers in a batch.") - log.info(f"Looking for accounts that would be dust in 12 version.") - dust_accounts_in_12_version = find_dust_accounts(chain_connection=chain_ws_connection, - ed=existential_deposit, - chain_major_version=chain_major_version, - block_hash=state_block_hash) - if len(dust_accounts_in_12_version): - log.info(f"Found {len(dust_accounts_in_12_version)} accounts that will be invalid in 12 version.") - save_accounts_to_json_file("dust-accounts.json", dust_accounts_in_12_version) - log.info("Adjusting balances by sending transfers.") - batch_transfer(chain_connection=chain_ws_connection, - input_args=args, - accounts=list(map(lambda x: x[0], dust_accounts_in_12_version)), - amount=existential_deposit, - sender_keypair=sender_origin_account_keypair) - log.info(f"Transfers done.") - else: - log.info(f"No dust accounts found, skipping transfers.") - if args.upgrade_accounts: - sender_origin_account_keypair = substrateinterface.Keypair.create_from_uri(sender_origin_account_seed) - log.info(f"Using following account for upgrade_accounts: {sender_origin_account_keypair.ss58_address}") - log.info(f"Will upgrade at most {args.upgrade_accounts_in_batch} accounts in a batch.") - upgrade_accounts(chain_connection=chain_ws_connection, - input_args=args, - ed=existential_deposit, - chain_major_version=chain_major_version, - sender_keypair=sender_origin_account_keypair, - block_hash=state_block_hash) - log.info("Upgrade accounts done.") - if args.fix_double_providers_count: - sudo_account_keypair = substrateinterface.Keypair.create_from_uri(sender_origin_account_seed) - log.info(f"This script is going to query all accounts that have providers == 2 and decrease this counter " - f"by one using System.SetStorage extrinsic, which requires sudo.") - log.info(f"Using the following account for System.SetStorage calls: {sudo_account_keypair.ss58_address}") - fix_double_providers_count(chain_connection=chain_ws_connection, - input_args=args, - chain_major_version=chain_major_version, - sudo_sender_keypair=sudo_account_keypair, - block_hash=state_block_hash) if args.fix_consumers_counter_underflow: log.info(f"This script is going to query all accounts that have underflow of consumers counter, " f"and fix them using runtime Operations.fix_accounts_consumers_underflow extrinsic.") @@ -1011,7 +373,7 @@ def perform_accounts_sanity_checks(chain_connection, log.info(f"Found {len(accounts_with_consumers_underflow)} accounts with consumers underflow.") if len(accounts_with_consumers_underflow) > 0: save_accounts_to_json_file("accounts_with_consumers_underflow.json", accounts_with_consumers_underflow) - code_owners, contract_accounts = query_contract_and_code_owners_accounts( + code_owners, contract_accounts = contracts.query_contract_and_code_owners_accounts( chain_connection=chain_ws_connection, block_hash=state_block_hash) accounts_with_consumers_underflow_set = set(list(map(lambda x: x[0], accounts_with_consumers_underflow))) @@ -1028,29 +390,31 @@ def perform_accounts_sanity_checks(chain_connection, input_args=args, sender_keypair=sender_origin_account_keypair, accounts=list(accounts_with_consumers_underflow_set)) - if args.fix_consumers_counter_overflow: - log.info(f"This script is going to query all accounts that have overflow of consumers counter, " - f"and decrease this counter by one using System.SetStorage extrinsic, which requires sudo.") - sudo_account_keypair = substrateinterface.Keypair.create_from_uri(sender_origin_account_seed) - log.info(f"Using the following account for System.SetStorage calls: {sudo_account_keypair.ss58_address}") - accounts_with_consumers_overflow = \ - query_accounts_with_consumers_counter_overflow(chain_connection=chain_ws_connection, - block_hash=state_block_hash) - log.info(f"Found {len(accounts_with_consumers_overflow)} accounts with consumers overflow.") - if len(accounts_with_consumers_overflow) > 0: - save_accounts_to_json_file("accounts_with_consumers_overflow.json", accounts_with_consumers_overflow) - internal_system_account_scale_codec_type = get_system_account_metadata_scale_codec_type(chain_ws_connection) - for account_id, _ in accounts_with_consumers_overflow: - fix_overflow_consumers_counter_for_account(chain_ws_connection, - account_id, - sudo_account_keypair, - internal_system_account_scale_codec_type, - args, - block_hash=state_block_hash) + if args.check_total_issuance: + log.info(f"Comparing total issuance aggregated over all accounts with storage value balances.total_issuance") + total_issuance_from_chain, total_issuance_from_accounts = \ + get_total_issuance_imbalance(chain_connection=chain_ws_connection, + block_hash=state_block_hash) + log_total_issuance_imbalance(chain_connection=chain_ws_connection, + total_issuance_from_chain=total_issuance_from_chain, + total_issuance_from_accounts=total_issuance_from_accounts, + block_hash=state_block_hash) + delta = total_issuance_from_chain - total_issuance_from_accounts + if delta != 0: + if not args.start_range_block_hash: + log.error(f"--start-range-block-hash must be set when --check-total-issuance is set " + f"to perform further actions. Exiting.") + sys.exit(2) + log.warning(f"Total issuance retrieved from the chain storage is different than aggregated sum over" + f" all accounts. Finding first block when it happened.") + first_block_hash_imbalance = find_block_hash_with_imbalance(chain_connection=chain_ws_connection, + start_block_hash=args.start_range_block_hash, + end_block_hash=state_block_hash) + log.info(f"The first block where it happened is {first_block_hash_imbalance}") log.info(f"Performing pallet balances sanity checks.") perform_accounts_sanity_checks(chain_connection=chain_ws_connection, ed=existential_deposit, - chain_major_version=chain_major_version, - total_issuance_from_chain=total_issuance_from_chain) + chain_major_version=aleph_chain_version, + block_hash=state_block_hash) log.info(f"DONE") diff --git a/scripts/accounts-invariants/total_issuance.py b/scripts/accounts-invariants/total_issuance.py new file mode 100644 index 00000000..d154f3ba --- /dev/null +++ b/scripts/accounts-invariants/total_issuance.py @@ -0,0 +1,130 @@ +import logging +import itertools +import functools + +from chain_operations import format_balance, get_all_accounts + +log = logging.getLogger() + + +def get_total_issuance_imbalance(chain_connection, block_hash): + """ + Compares total issuance computed from all accounts with balances.total_issuance storage + :param chain_connection: WS handler + :param block_hash: total issuance computed from all accounts + :return: delta between those two values + """ + total_issuance_from_chain = get_total_issuance_from_storage(chain_connection=chain_connection, + block_hash=block_hash) + all_accounts_and_infos = get_all_accounts(chain_connection, block_hash) + total_issuance_from_accounts = calculate_total_issuance( + map(lambda account_and_info: account_and_info[1], all_accounts_and_infos)) + return total_issuance_from_chain, total_issuance_from_accounts + + +def log_total_issuance_imbalance(chain_connection, + total_issuance_from_chain, + total_issuance_from_accounts, + block_hash): + """ + Logs imbalance data in a given block hash in a human-readable format. + :param chain_connection: WS handler + :param total_issuance_from_chain: balances.total_issuance storage value + :param total_issuance_from_accounts: total_issuance as sum aggregated over all accounts + :param block_hash: block hash from which above data was retrieved + :return: None + """ + total_issuance_from_accounts_human = format_balance(chain_connection, total_issuance_from_accounts) + total_issuance_from_chain_human = format_balance(chain_connection, total_issuance_from_chain) + delta = total_issuance_from_chain - total_issuance_from_accounts + delta_human = format_balance(chain_connection, delta) + log.info(f"Total issuance imbalance computed from block {block_hash}") + log.info( + f"balances.total_issuance storage value: {total_issuance_from_chain_human}") + log.info( + f"Total issuance computed as aggregated sum over all accounts: {total_issuance_from_accounts_human}") + log.info(f"Delta is: {delta_human}") + + +def calculate_total_issuance(account_infos): + """ + Calculates total issuance as sum over all accounts free + reserved funds + :param account_infos: A list AccountInfo structs + :return: total issuance as number + """ + + def get_account_total_balance(account_info): + free = account_info['data']['free'] + reserved = account_info['data']['reserved'] + return free + reserved + + return \ + functools.reduce(lambda x, account_info: x + get_account_total_balance(account_info), account_infos, 0) + + +def get_total_issuance_from_storage(chain_connection, block_hash): + """ + Retrieves balances.total_issuance StorageValue + :param chain_connection: WS handler + :param block_hash: A block hash to query state from + :return: total issuance as number + """ + total_issuance_from_chain = chain_connection.query(module='Balances', + storage_function='TotalIssuance', + block_hash=block_hash).value + return total_issuance_from_chain + + +def find_block_hash_with_imbalance(chain_connection, start_block_hash, end_block_hash): + """ + Finds a first block hash that positively contributed to a total issuance imbalance. + + Positive contribution to the total issuance imbalance in block B is a situation in which total issuance imbalance + increases in block B. Total issuance imbalance is a difference between aggregated sum of total bolance over all + accounts and balances.total_issuance storage value. It might happen that difference in some value X in block B, + and some value Y in parent(B), and X > Y. There might be many such blocks in chain [start_block_hash; end_block_hash] + and this method returns the first one. + + Method uses bisection algorithm. It computes mid-range block hash by computing + mid_block_number = floor((end_block_number - start_block_number) / 2) + and then calculating total_issuance imbalance in mid_block_number to start and end range total_issuance imbalance, + adjusting interval ends accordingly to bisection algorith. + + :param chain_connection: WS handler + :param start_block_hash: first block hash in range to check + :param end_block_hash: end block hash in range to check + :return: the first block_hash that contributed positively to total issuance imbalance + """ + start_block_number = chain_connection.get_block_number(start_block_hash) + end_block_number = chain_connection.get_block_number(end_block_hash) + + start_total_issuance_imbalance = get_total_issuance_imbalance(chain_connection, start_block_hash) + log_total_issuance_imbalance(chain_connection=chain_connection, + total_issuance_from_chain=start_total_issuance_imbalance[0], + total_issuance_from_accounts=start_total_issuance_imbalance[1], + block_hash=start_block_hash) + delta_start_imbalance = start_total_issuance_imbalance[0] - start_total_issuance_imbalance[1] + + while end_block_number - 1 > start_block_number: + log.info(f"Finding first block that contributed to total issuance imbalance in range " + f"[{start_block_number}; {end_block_number}]") + + mid_range_block_number = start_block_number + (end_block_number - start_block_number) // 2 + mid_range_block_hash = chain_connection.get_block_hash(mid_range_block_number) + log.info(f"Mid-range block hash: {mid_range_block_hash}, number: {mid_range_block_number}") + mid_total_issuance_imbalance = get_total_issuance_imbalance(chain_connection, mid_range_block_hash) + log_total_issuance_imbalance(chain_connection=chain_connection, + total_issuance_from_chain=mid_total_issuance_imbalance[0], + total_issuance_from_accounts=mid_total_issuance_imbalance[1], + block_hash=mid_range_block_hash) + + delta_mid_imbalance = mid_total_issuance_imbalance[0] - mid_total_issuance_imbalance[1] + if delta_mid_imbalance > delta_start_imbalance: + end_block_hash = mid_range_block_hash + end_block_number = chain_connection.get_block_number(end_block_hash) + else: + start_block_hash = mid_range_block_hash + start_block_number = chain_connection.get_block_number(start_block_hash) + delta_start_imbalance = delta_mid_imbalance + + return chain_connection.get_block_hash(end_block_number) diff --git a/scripts/accounts-invariants/utils.py b/scripts/accounts-invariants/utils.py new file mode 100644 index 00000000..7ebc8094 --- /dev/null +++ b/scripts/accounts-invariants/utils.py @@ -0,0 +1,18 @@ +import json +import logging + +log = logging.getLogger() + + +def save_accounts_to_json_file(json_file_name, accounts): + with open(json_file_name, 'w') as f: + json.dump(accounts, f) + log.info(f"Wrote file '{json_file_name}'") + + +def chunks(list_of_elements, n): + """ + Lazily split 'list_of_elements' into 'n'-sized chunks. + """ + for i in range(0, len(list_of_elements), n): + yield list_of_elements[i:i + n]