diff --git a/Cargo.lock b/Cargo.lock index 6ff7aa42..5ce4a29c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5029,36 +5029,6 @@ dependencies = [ "web-sys", ] -[[package]] -name = "watcher" -version = "1.0.0" -dependencies = [ - "affix", - "async-trait", - "color-eyre", - "config 0.10.1", - "dotenv", - "ethers", - "futures-util", - "log", - "nomad-base", - "nomad-core", - "nomad-ethereum", - "nomad-test", - "nomad-xyz-configuration", - "prometheus", - "rocksdb", - "serde 1.0.137", - "serde_json", - "serial_test", - "thiserror", - "tokio", - "tokio-test", - "tracing", - "tracing-futures", - "tracing-subscriber 0.3.14", -] - [[package]] name = "web-sys" version = "0.3.57" diff --git a/Cargo.toml b/Cargo.toml index 54350ccb..0613272c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ members = [ "agents/kathy", "agents/updater", "agents/relayer", - "agents/watcher", +# "agents/watcher", "agents/processor", "tools/kms-cli", "tools/nomad-cli", diff --git a/agents/kathy/src/kathy.rs b/agents/kathy/src/kathy.rs index 15a30f43..63a9c6bf 100644 --- a/agents/kathy/src/kathy.rs +++ b/agents/kathy/src/kathy.rs @@ -67,12 +67,13 @@ impl NomadAgent for Kathy { } fn build_channel(&self, replica: &str) -> Self::Channel { + let home = self.connections().home().expect("!home"); Self::Channel { base: self.channel_base(replica), home_lock: self.home_lock.clone(), generator: self.generator.clone(), messages_dispatched: self.messages_dispatched.with_label_values(&[ - self.home().name(), + home.name(), replica, Self::AGENT_NAME, ]), diff --git a/agents/processor/src/processor.rs b/agents/processor/src/processor.rs index c970b404..b15853f1 100644 --- a/agents/processor/src/processor.rs +++ b/agents/processor/src/processor.rs @@ -357,10 +357,11 @@ impl NomadAgent for Processor { } fn build_channel(&self, replica: &str) -> Self::Channel { + let home = self.connections().home().expect("!home"); Self::Channel { base: self.channel_base(replica), next_message_nonce: self.next_message_nonces.with_label_values(&[ - self.home().name(), + home.name(), replica, Self::AGENT_NAME, ]), @@ -395,15 +396,16 @@ impl NomadAgent for Processor { self.assert_home_not_failed().await??; info!("Starting Processor tasks"); + let home = self.connections().home().expect("!home"); // tree sync info!("Starting ProverSync"); - let db = NomadDB::new(self.home().name(), self.db()); + let db = NomadDB::new(home.name(), self.db()); let sync = ProverSync::from_disk(db.clone()); let prover_sync_task = sync.spawn(); info!("Starting indexer"); - let home_sync_task = self.home().sync(); + let home_sync_task = home.sync(); let home_fail_watch_task = self.watch_home_fail(self.interval); @@ -415,10 +417,11 @@ impl NomadAgent for Processor { if !self.subsidized_remotes.is_empty() { // Get intersection of specified remotes (replicas in settings) // and subsidized remotes + let replicas = self.connections().replicas().expect("!replicas"); let specified_subsidized: Vec<&str> = self .subsidized_remotes .iter() - .filter(|r| self.replicas().contains_key(*r)) + .filter(|r| replicas.contains_key(*r)) .map(AsRef::as_ref) .collect(); @@ -430,7 +433,8 @@ impl NomadAgent for Processor { // if we have a bucket, add a task to push to it if let Some(config) = &self.config { info!(bucket = %config.bucket, "Starting S3 push tasks"); - let pusher = Pusher::new(self.core.home.name(), &config.bucket, db.clone()).await; + let home = self.connections().home().expect("!home"); + let pusher = Pusher::new(home.name(), &config.bucket, db.clone()).await; tasks.push(pusher.spawn()) } diff --git a/agents/relayer/src/relayer.rs b/agents/relayer/src/relayer.rs index f773e630..66faa497 100644 --- a/agents/relayer/src/relayer.rs +++ b/agents/relayer/src/relayer.rs @@ -155,10 +155,11 @@ impl NomadAgent for Relayer { } fn build_channel(&self, replica: &str) -> Self::Channel { + let home = self.connections().home().expect("!home"); Self::Channel { base: self.channel_base(replica), updates_relayed_count: self.updates_relayed_counts.with_label_values(&[ - self.home().name(), + home.name(), replica, Self::AGENT_NAME, ]), @@ -195,8 +196,8 @@ impl NomadAgent for Relayer { mod test { use ethers::prelude::{ProviderError, H256}; use nomad_base::{ - chains::PageSettings, CommonIndexers, ContractSync, ContractSyncMetrics, CoreMetrics, - HomeIndexers, IndexSettings, NomadDB, + chains::PageSettings, AgentConnections, CommonIndexers, ContractSync, ContractSyncMetrics, + CoreMetrics, HomeIndexers, IndexSettings, NomadDB, }; use nomad_core::ChainCommunicationError; use nomad_test::mocks::{MockHomeContract, MockIndexer, MockReplicaContract}; @@ -293,8 +294,7 @@ mod test { // Setting agent let core = AgentCore { - home, - replicas, + connections: AgentConnections::Default { home, replicas }, db, metrics, indexer: IndexSettings::default(), diff --git a/agents/updater/src/updater.rs b/agents/updater/src/updater.rs index 2c360f83..583e62a3 100644 --- a/agents/updater/src/updater.rs +++ b/agents/updater/src/updater.rs @@ -39,7 +39,8 @@ impl Updater { finalization_seconds: u64, core: AgentCore, ) -> Self { - let home_name = core.home.name(); + let home = core.connections.home().expect("!home"); + let home_name = home.name(); let signed_attestation_count = core .metrics .new_int_counter( @@ -73,9 +74,10 @@ impl Updater { impl From<&Updater> for UpdaterChannel { fn from(updater: &Updater) -> Self { + let home = updater.connections().home().expect("!home"); UpdaterChannel { - home: updater.home(), - db: NomadDB::new(updater.home().name(), updater.db()), + home: home.clone(), + db: NomadDB::new(home.name(), updater.db()), signer: updater.signer.clone(), signed_attestation_count: updater.signed_attestation_count.clone(), submitted_update_count: updater.submitted_update_count.clone(), @@ -199,7 +201,7 @@ impl NomadAgent for Updater { let home_fail_watch_task = self.watch_home_fail(self.interval_seconds); info!("Starting updater sync task..."); - let sync_task = self.home().sync(); + let sync_task = self.connections().home().expect("!home").sync(); // Run a single error-catching task for producing and submitting // updates. While we use the agent channel pattern, this run task diff --git a/nomad-base/src/agent.rs b/nomad-base/src/agent.rs index e2832ca7..fc05398b 100644 --- a/nomad-base/src/agent.rs +++ b/nomad-base/src/agent.rs @@ -25,13 +25,55 @@ use tokio::{task::JoinHandle, time::sleep}; const MAX_EXPONENTIAL: u32 = 7; // 2^7 = 128 second timeout +/// General or agent-specific connection map +#[derive(Debug, Clone)] +pub enum AgentConnections { + /// Connections for watchers + Watcher { + /// A map of boxed Homes + homes: HashMap>, + // ... + }, + /// Connections for other agents + Default { + /// A boxed Home + home: Arc, + /// A map of boxed Replicas + replicas: HashMap>, + }, +} + +/// Accessor methods for AgentConnections +impl AgentConnections { + /// Get an optional clone of home + pub fn home(&self) -> Option> { + use AgentConnections::*; + match self { + Default { home, .. } => Some(home.clone()), + _ => None, + } + } + + /// Get an optional clone of the map of replicas + pub fn replicas(&self) -> Option>> { + use AgentConnections::*; + match self { + Default { replicas, .. } => Some(replicas.clone()), + _ => None, + } + } + + /// Get an optional clone of a replica by its name + pub fn replica_by_name(&self, name: &str) -> Option> { + self.replicas().and_then(|r| r.get(name).map(Clone::clone)) + } +} + /// Properties shared across all agents #[derive(Debug, Clone)] pub struct AgentCore { - /// A boxed Home - pub home: Arc, - /// A map of boxed Replicas - pub replicas: HashMap>, + /// Agent connections + pub connections: AgentConnections, /// A persistent KV Store (currently implemented as rocksdb) pub db: DB, /// Prometheus metrics @@ -78,10 +120,14 @@ pub trait NomadAgent: Send + Sync + Sized + std::fmt::Debug + AsRef { /// Build channel base for home <> replica channel fn channel_base(&self, replica: &str) -> ChannelBase { + let home = self.connections().home().expect("!home"); ChannelBase { - home: self.home(), - replica: self.replica_by_name(replica).expect("!replica exist"), - db: NomadDB::new(self.home().name(), self.db()), + home: home.clone(), + replica: self + .connections() + .replica_by_name(replica) + .expect("!replica exist"), + db: NomadDB::new(home.name(), self.db()), } } @@ -95,19 +141,9 @@ pub trait NomadAgent: Send + Sync + Sized + std::fmt::Debug + AsRef { self.as_ref().db.clone() } - /// Return a reference to a home contract - fn home(&self) -> Arc { - self.as_ref().home.clone() - } - - /// Get a reference to the replicas map - fn replicas(&self) -> &HashMap> { - &self.as_ref().replicas - } - - /// Get a reference to a replica by its name - fn replica_by_name(&self, name: &str) -> Option> { - self.replicas().get(name).map(Clone::clone) + /// Return a reference to the connections object + fn connections(&self) -> &AgentConnections { + &self.as_ref().connections } /// Run the agent with the given home and replica @@ -200,7 +236,8 @@ pub trait NomadAgent: Send + Sync + Sized + std::fmt::Debug + AsRef { let span = info_span!("run_all"); tokio::spawn(async move { // this is the unused must use - let names: Vec<&str> = self.replicas().keys().map(|k| k.as_str()).collect(); + let replicas = self.connections().replicas().expect("!replicas"); + let names: Vec<&str> = replicas.keys().map(|k| k.as_str()).collect(); // quick check that at least 1 replica is configured names @@ -214,7 +251,7 @@ pub trait NomadAgent: Send + Sync + Sized + std::fmt::Debug + AsRef { if Self::AGENT_NAME != "kathy" { // Only the processor needs to index messages so default is // just indexing updates - let sync_task = self.home().sync(); + let sync_task = self.connections().home().expect("!home").sync(); tasks.push(sync_task); } @@ -236,7 +273,7 @@ pub trait NomadAgent: Send + Sync + Sized + std::fmt::Debug + AsRef { #[allow(clippy::unit_arg)] fn watch_home_fail(&self, interval: u64) -> Instrumented>> { let span = info_span!("home_watch"); - let home = self.home(); + let home = self.connections().home().expect("!home"); let home_failure_checks = self.metrics().home_failure_checks(); let home_failure_observations = self.metrics().home_failure_observations(); @@ -259,7 +296,7 @@ pub trait NomadAgent: Send + Sync + Sized + std::fmt::Debug + AsRef { fn assert_home_not_failed(&self) -> Instrumented>> { use nomad_core::Common; let span = info_span!("check_home_state"); - let home = self.home(); + let home = self.connections().home().expect("!home"); tokio::spawn(async move { if home.state().await? == nomad_core::State::Failed { Err(BaseError::FailedHome.into()) diff --git a/nomad-base/src/settings/mod.rs b/nomad-base/src/settings/mod.rs index 0c3317a4..4f0a7c7e 100644 --- a/nomad-base/src/settings/mod.rs +++ b/nomad-base/src/settings/mod.rs @@ -11,8 +11,9 @@ //! corresponding env file and/or secrets.json file. use crate::{ - agent::AgentCore, CachingHome, CachingReplica, CommonIndexerVariants, CommonIndexers, - ContractSync, ContractSyncMetrics, HomeIndexerVariants, HomeIndexers, Homes, NomadDB, Replicas, + agent::AgentCore, AgentConnections, CachingHome, CachingReplica, CommonIndexerVariants, + CommonIndexers, ContractSync, ContractSyncMetrics, HomeIndexerVariants, HomeIndexers, Homes, + NomadDB, Replicas, }; use color_eyre::{eyre::bail, Result}; use nomad_core::{db::DB, Common, ContractLocator}; @@ -419,8 +420,7 @@ impl Settings { .await?; Ok(AgentCore { - home, - replicas, + connections: AgentConnections::Default { home, replicas }, db, settings: self.clone(), metrics,