From 5f8f0fc6864e0d27103e41921ab369df8cecc247 Mon Sep 17 00:00:00 2001 From: Matthew Smith Date: Fri, 22 Jul 2022 00:26:49 +0700 Subject: [PATCH 1/6] Stub AgentConnections --- nomad-base/src/agent.rs | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/nomad-base/src/agent.rs b/nomad-base/src/agent.rs index e2832ca7..95266b39 100644 --- a/nomad-base/src/agent.rs +++ b/nomad-base/src/agent.rs @@ -25,13 +25,29 @@ use tokio::{task::JoinHandle, time::sleep}; const MAX_EXPONENTIAL: u32 = 7; // 2^7 = 128 second timeout +/// General or agent-specific connection map +#[derive(Debug, Clone)] +pub enum AgentConnections { + /// Connections for watchers + Watcher { + /// A map of boxed Homes + homes: HashMap>, + // ... + }, + /// Connections for other agents + Default { + /// A boxed Home + home: Arc, + /// A map of boxed Replicas + replicas: HashMap>, + }, +} + /// Properties shared across all agents #[derive(Debug, Clone)] pub struct AgentCore { - /// A boxed Home - pub home: Arc, - /// A map of boxed Replicas - pub replicas: HashMap>, + /// Agent connections + pub connections: AgentConnections, /// A persistent KV Store (currently implemented as rocksdb) pub db: DB, /// Prometheus metrics From 230322c4bab4d00965cd10787e879ca1b053d57d Mon Sep 17 00:00:00 2001 From: Matthew Smith Date: Fri, 22 Jul 2022 21:28:58 +0700 Subject: [PATCH 2/6] Remove watcher from workspace --- Cargo.lock | 30 ------------------------------ Cargo.toml | 2 +- 2 files changed, 1 insertion(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6ff7aa42..5ce4a29c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5029,36 +5029,6 @@ dependencies = [ "web-sys", ] -[[package]] -name = "watcher" -version = "1.0.0" -dependencies = [ - "affix", - "async-trait", - "color-eyre", - "config 0.10.1", - "dotenv", - "ethers", - "futures-util", - "log", - "nomad-base", - "nomad-core", - "nomad-ethereum", - "nomad-test", - "nomad-xyz-configuration", - "prometheus", - "rocksdb", - "serde 1.0.137", - "serde_json", - "serial_test", - "thiserror", - "tokio", - "tokio-test", - "tracing", - "tracing-futures", - "tracing-subscriber 0.3.14", -] - [[package]] name = "web-sys" version = "0.3.57" diff --git a/Cargo.toml b/Cargo.toml index 54350ccb..0613272c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ members = [ "agents/kathy", "agents/updater", "agents/relayer", - "agents/watcher", +# "agents/watcher", "agents/processor", "tools/kms-cli", "tools/nomad-cli", From 5b62e48620244cba820d3176315cedc777f2a381 Mon Sep 17 00:00:00 2001 From: Matthew Smith Date: Fri, 22 Jul 2022 21:32:31 +0700 Subject: [PATCH 3/6] Move home and replica accessors from AgentCore to AgentConnections --- nomad-base/src/agent.rs | 42 ++++++++++++++++++++++++++++------------- 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/nomad-base/src/agent.rs b/nomad-base/src/agent.rs index 95266b39..d4051f7d 100644 --- a/nomad-base/src/agent.rs +++ b/nomad-base/src/agent.rs @@ -43,6 +43,32 @@ pub enum AgentConnections { }, } +/// Accessor methods for AgentConnections +impl AgentConnections { + /// Get an optional clone of home + pub fn home(&self) -> Option> { + use AgentConnections::*; + match self { + Default { home, .. } => Some(home.clone()), + _ => None, + } + } + + /// Get an optional clone of the map of replicas + pub fn replicas(&self) -> Option>> { + use AgentConnections::*; + match self { + Default { replicas, .. } => Some(replicas.clone()), + _ => None, + } + } + + /// Get an optional clone of a replica by its name + pub fn replica_by_name(&self, name: &str) -> Option> { + self.replicas().and_then(|r| r.get(name).map(Clone::clone)) + } +} + /// Properties shared across all agents #[derive(Debug, Clone)] pub struct AgentCore { @@ -111,19 +137,9 @@ pub trait NomadAgent: Send + Sync + Sized + std::fmt::Debug + AsRef { self.as_ref().db.clone() } - /// Return a reference to a home contract - fn home(&self) -> Arc { - self.as_ref().home.clone() - } - - /// Get a reference to the replicas map - fn replicas(&self) -> &HashMap> { - &self.as_ref().replicas - } - - /// Get a reference to a replica by its name - fn replica_by_name(&self, name: &str) -> Option> { - self.replicas().get(name).map(Clone::clone) + /// Return a reference to the connections object + fn connections(&self) -> &AgentConnections { + &self.as_ref().connections } /// Run the agent with the given home and replica From 91d27dba21276bb06c5e7d7fe4347e367e794b97 Mon Sep 17 00:00:00 2001 From: Matthew Smith Date: Fri, 22 Jul 2022 21:34:15 +0700 Subject: [PATCH 4/6] Use new home and replica accessor methods --- agents/kathy/src/kathy.rs | 3 ++- agents/processor/src/processor.rs | 14 +++++++++----- agents/relayer/src/relayer.rs | 3 ++- agents/updater/src/updater.rs | 10 ++++++---- nomad-base/src/agent.rs | 19 ++++++++++++------- 5 files changed, 31 insertions(+), 18 deletions(-) diff --git a/agents/kathy/src/kathy.rs b/agents/kathy/src/kathy.rs index 15a30f43..63a9c6bf 100644 --- a/agents/kathy/src/kathy.rs +++ b/agents/kathy/src/kathy.rs @@ -67,12 +67,13 @@ impl NomadAgent for Kathy { } fn build_channel(&self, replica: &str) -> Self::Channel { + let home = self.connections().home().expect("!home"); Self::Channel { base: self.channel_base(replica), home_lock: self.home_lock.clone(), generator: self.generator.clone(), messages_dispatched: self.messages_dispatched.with_label_values(&[ - self.home().name(), + home.name(), replica, Self::AGENT_NAME, ]), diff --git a/agents/processor/src/processor.rs b/agents/processor/src/processor.rs index c970b404..b15853f1 100644 --- a/agents/processor/src/processor.rs +++ b/agents/processor/src/processor.rs @@ -357,10 +357,11 @@ impl NomadAgent for Processor { } fn build_channel(&self, replica: &str) -> Self::Channel { + let home = self.connections().home().expect("!home"); Self::Channel { base: self.channel_base(replica), next_message_nonce: self.next_message_nonces.with_label_values(&[ - self.home().name(), + home.name(), replica, Self::AGENT_NAME, ]), @@ -395,15 +396,16 @@ impl NomadAgent for Processor { self.assert_home_not_failed().await??; info!("Starting Processor tasks"); + let home = self.connections().home().expect("!home"); // tree sync info!("Starting ProverSync"); - let db = NomadDB::new(self.home().name(), self.db()); + let db = NomadDB::new(home.name(), self.db()); let sync = ProverSync::from_disk(db.clone()); let prover_sync_task = sync.spawn(); info!("Starting indexer"); - let home_sync_task = self.home().sync(); + let home_sync_task = home.sync(); let home_fail_watch_task = self.watch_home_fail(self.interval); @@ -415,10 +417,11 @@ impl NomadAgent for Processor { if !self.subsidized_remotes.is_empty() { // Get intersection of specified remotes (replicas in settings) // and subsidized remotes + let replicas = self.connections().replicas().expect("!replicas"); let specified_subsidized: Vec<&str> = self .subsidized_remotes .iter() - .filter(|r| self.replicas().contains_key(*r)) + .filter(|r| replicas.contains_key(*r)) .map(AsRef::as_ref) .collect(); @@ -430,7 +433,8 @@ impl NomadAgent for Processor { // if we have a bucket, add a task to push to it if let Some(config) = &self.config { info!(bucket = %config.bucket, "Starting S3 push tasks"); - let pusher = Pusher::new(self.core.home.name(), &config.bucket, db.clone()).await; + let home = self.connections().home().expect("!home"); + let pusher = Pusher::new(home.name(), &config.bucket, db.clone()).await; tasks.push(pusher.spawn()) } diff --git a/agents/relayer/src/relayer.rs b/agents/relayer/src/relayer.rs index f773e630..65562a4c 100644 --- a/agents/relayer/src/relayer.rs +++ b/agents/relayer/src/relayer.rs @@ -155,10 +155,11 @@ impl NomadAgent for Relayer { } fn build_channel(&self, replica: &str) -> Self::Channel { + let home = self.connections().home().expect("!home"); Self::Channel { base: self.channel_base(replica), updates_relayed_count: self.updates_relayed_counts.with_label_values(&[ - self.home().name(), + home.name(), replica, Self::AGENT_NAME, ]), diff --git a/agents/updater/src/updater.rs b/agents/updater/src/updater.rs index 2c360f83..583e62a3 100644 --- a/agents/updater/src/updater.rs +++ b/agents/updater/src/updater.rs @@ -39,7 +39,8 @@ impl Updater { finalization_seconds: u64, core: AgentCore, ) -> Self { - let home_name = core.home.name(); + let home = core.connections.home().expect("!home"); + let home_name = home.name(); let signed_attestation_count = core .metrics .new_int_counter( @@ -73,9 +74,10 @@ impl Updater { impl From<&Updater> for UpdaterChannel { fn from(updater: &Updater) -> Self { + let home = updater.connections().home().expect("!home"); UpdaterChannel { - home: updater.home(), - db: NomadDB::new(updater.home().name(), updater.db()), + home: home.clone(), + db: NomadDB::new(home.name(), updater.db()), signer: updater.signer.clone(), signed_attestation_count: updater.signed_attestation_count.clone(), submitted_update_count: updater.submitted_update_count.clone(), @@ -199,7 +201,7 @@ impl NomadAgent for Updater { let home_fail_watch_task = self.watch_home_fail(self.interval_seconds); info!("Starting updater sync task..."); - let sync_task = self.home().sync(); + let sync_task = self.connections().home().expect("!home").sync(); // Run a single error-catching task for producing and submitting // updates. While we use the agent channel pattern, this run task diff --git a/nomad-base/src/agent.rs b/nomad-base/src/agent.rs index d4051f7d..fc05398b 100644 --- a/nomad-base/src/agent.rs +++ b/nomad-base/src/agent.rs @@ -120,10 +120,14 @@ pub trait NomadAgent: Send + Sync + Sized + std::fmt::Debug + AsRef { /// Build channel base for home <> replica channel fn channel_base(&self, replica: &str) -> ChannelBase { + let home = self.connections().home().expect("!home"); ChannelBase { - home: self.home(), - replica: self.replica_by_name(replica).expect("!replica exist"), - db: NomadDB::new(self.home().name(), self.db()), + home: home.clone(), + replica: self + .connections() + .replica_by_name(replica) + .expect("!replica exist"), + db: NomadDB::new(home.name(), self.db()), } } @@ -232,7 +236,8 @@ pub trait NomadAgent: Send + Sync + Sized + std::fmt::Debug + AsRef { let span = info_span!("run_all"); tokio::spawn(async move { // this is the unused must use - let names: Vec<&str> = self.replicas().keys().map(|k| k.as_str()).collect(); + let replicas = self.connections().replicas().expect("!replicas"); + let names: Vec<&str> = replicas.keys().map(|k| k.as_str()).collect(); // quick check that at least 1 replica is configured names @@ -246,7 +251,7 @@ pub trait NomadAgent: Send + Sync + Sized + std::fmt::Debug + AsRef { if Self::AGENT_NAME != "kathy" { // Only the processor needs to index messages so default is // just indexing updates - let sync_task = self.home().sync(); + let sync_task = self.connections().home().expect("!home").sync(); tasks.push(sync_task); } @@ -268,7 +273,7 @@ pub trait NomadAgent: Send + Sync + Sized + std::fmt::Debug + AsRef { #[allow(clippy::unit_arg)] fn watch_home_fail(&self, interval: u64) -> Instrumented>> { let span = info_span!("home_watch"); - let home = self.home(); + let home = self.connections().home().expect("!home"); let home_failure_checks = self.metrics().home_failure_checks(); let home_failure_observations = self.metrics().home_failure_observations(); @@ -291,7 +296,7 @@ pub trait NomadAgent: Send + Sync + Sized + std::fmt::Debug + AsRef { fn assert_home_not_failed(&self) -> Instrumented>> { use nomad_core::Common; let span = info_span!("check_home_state"); - let home = self.home(); + let home = self.connections().home().expect("!home"); tokio::spawn(async move { if home.state().await? == nomad_core::State::Failed { Err(BaseError::FailedHome.into()) From dda08aaa58042c0da79cab02991cbe899687d138 Mon Sep 17 00:00:00 2001 From: Matthew Smith Date: Fri, 22 Jul 2022 21:34:37 +0700 Subject: [PATCH 5/6] Use AgentConnections --- nomad-base/src/settings/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nomad-base/src/settings/mod.rs b/nomad-base/src/settings/mod.rs index 0c3317a4..4f0a7c7e 100644 --- a/nomad-base/src/settings/mod.rs +++ b/nomad-base/src/settings/mod.rs @@ -11,8 +11,9 @@ //! corresponding env file and/or secrets.json file. use crate::{ - agent::AgentCore, CachingHome, CachingReplica, CommonIndexerVariants, CommonIndexers, - ContractSync, ContractSyncMetrics, HomeIndexerVariants, HomeIndexers, Homes, NomadDB, Replicas, + agent::AgentCore, AgentConnections, CachingHome, CachingReplica, CommonIndexerVariants, + CommonIndexers, ContractSync, ContractSyncMetrics, HomeIndexerVariants, HomeIndexers, Homes, + NomadDB, Replicas, }; use color_eyre::{eyre::bail, Result}; use nomad_core::{db::DB, Common, ContractLocator}; @@ -419,8 +420,7 @@ impl Settings { .await?; Ok(AgentCore { - home, - replicas, + connections: AgentConnections::Default { home, replicas }, db, settings: self.clone(), metrics, From 22c1195779ecb7f34f7b23710a54a843e7762d8d Mon Sep 17 00:00:00 2001 From: Matthew Smith Date: Fri, 22 Jul 2022 21:39:48 +0700 Subject: [PATCH 6/6] Fix tests --- agents/relayer/src/relayer.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/agents/relayer/src/relayer.rs b/agents/relayer/src/relayer.rs index 65562a4c..66faa497 100644 --- a/agents/relayer/src/relayer.rs +++ b/agents/relayer/src/relayer.rs @@ -196,8 +196,8 @@ impl NomadAgent for Relayer { mod test { use ethers::prelude::{ProviderError, H256}; use nomad_base::{ - chains::PageSettings, CommonIndexers, ContractSync, ContractSyncMetrics, CoreMetrics, - HomeIndexers, IndexSettings, NomadDB, + chains::PageSettings, AgentConnections, CommonIndexers, ContractSync, ContractSyncMetrics, + CoreMetrics, HomeIndexers, IndexSettings, NomadDB, }; use nomad_core::ChainCommunicationError; use nomad_test::mocks::{MockHomeContract, MockIndexer, MockReplicaContract}; @@ -294,8 +294,7 @@ mod test { // Setting agent let core = AgentCore { - home, - replicas, + connections: AgentConnections::Default { home, replicas }, db, metrics, indexer: IndexSettings::default(),