diff --git a/CLAUDE.md b/CLAUDE.md index 943553bf5..52ac6b57d 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -225,6 +225,16 @@ When contributing to Anchor, follow these Rust best practices: 6. **Simplicity First**: Always choose the simplest solution that elegantly solves the problem, follows existing patterns, maintains performance, and uses basic constructs over complex data structures 7. **Check Requirements First**: Before implementing or creating anything (PRs, commits, code), always read and follow existing templates, guidelines, and requirements in the codebase +### Architecture and Design + +1. **Question Intermediaries**: If data flows A → B with no transformation, question why A → intermediate → B exists. Each layer should provide clear value (logging, transformation, validation, etc.). Ask: "What problem does this solve that direct communication doesn't?" + +2. **Separation Through Interfaces, Not Layers**: Clean boundaries come from well-defined APIs, not intermediary components. A component receiving a `Sender` achieves separation without needing forwarding tasks or wrapper channels. + +3. **Simplification is Always Valid**: Refactoring working code for simplicity is encouraged. Question architectural decisions even after tests pass. Fewer lines and fewer components often indicates better design. + +4. **Challenge Complexity**: Every abstraction should justify its existence. "We might need it later" or "it provides separation" aren't sufficient reasons. Complexity must solve specific, current problems. + ### Specific Guidelines 1. **Naming**: @@ -409,10 +419,31 @@ When writing PR descriptions, follow these guidelines for maintainable and revie - **Keep "Proposed Changes" section high-level** - focus on what components were changed and why - **Avoid line-by-line documentation** - reviewers can see specific changes in the diff -- **Use component-level summaries** rather than file-by-file breakdowns +- **Use component-level summaries** rather than file-by-file breakdowns - **Emphasize the principles** being applied and operational impact - **Be concise but complete** - provide context without overwhelming detail +### Code Review Culture + +Effective code reviews question "why" architectural decisions exist: + +**Questions to Ask:** +- "Why does this intermediary layer exist?" +- "What problem does this abstraction solve?" +- "Could components communicate directly?" +- "Is this complexity providing clear value?" + +**Encourage Simplification:** +- Working code can still be improved +- Refactoring for clarity is valuable +- Fewer components usually means better architecture +- Test passing ≠ design complete + +**Balance:** +- Question complexity, but respect existing patterns that solve real problems +- Not every layer is unnecessary - some provide genuine value +- Focus on "why" over "what" + ## Development Tips - This is a Rust project that follows standard Rust development practices diff --git a/Cargo.lock b/Cargo.lock index 44906ac71..5ffa56f75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1843,6 +1843,7 @@ dependencies = [ "network", "network_utils", "openssl", + "operator_doppelganger", "operator_key", "parking_lot", "processor", @@ -5161,6 +5162,7 @@ dependencies = [ "libp2p", "libp2p-gossipsub", "message_validator", + "operator_doppelganger", "processor", "qbft_manager", "signature_collector", @@ -5169,6 +5171,7 @@ dependencies = [ "thiserror 2.0.17", "tokio", "tracing", + "types", ] [[package]] @@ -5179,6 +5182,7 @@ dependencies = [ "ethereum_ssz", "message_validator", "openssl", + "operator_doppelganger", "processor", "slot_clock", "ssv_types", @@ -5815,6 +5819,21 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "operator_doppelganger" +version = "0.1.0" +dependencies = [ + "async-channel 1.9.0", + "database", + "futures", + "parking_lot", + "ssv_types", + "task_executor", + "tokio", + "tracing", + "types", +] + [[package]] name = "operator_key" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 959e51f63..a0cdd062d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ members = [ "anchor/message_sender", "anchor/message_validator", "anchor/network", + "anchor/operator_doppelganger", "anchor/processor", "anchor/qbft_manager", "anchor/signature_collector", @@ -54,6 +55,7 @@ message_receiver = { path = "anchor/message_receiver" } message_sender = { path = "anchor/message_sender" } message_validator = { path = "anchor/message_validator" } network = { path = "anchor/network" } +operator_doppelganger = { path = "anchor/operator_doppelganger" } operator_key = { path = "anchor/common/operator_key" } processor = { path = "anchor/processor" } qbft = { path = "anchor/common/qbft" } diff --git a/anchor/client/Cargo.toml b/anchor/client/Cargo.toml index c1f3637f7..91c537bcb 100644 --- a/anchor/client/Cargo.toml +++ b/anchor/client/Cargo.toml @@ -32,6 +32,7 @@ multiaddr = { workspace = true } network = { workspace = true } network_utils = { workspace = true } openssl = { workspace = true } +operator_doppelganger = { workspace = true } operator_key = { workspace = true } parking_lot = { workspace = true } processor = { workspace = true } diff --git a/anchor/client/src/cli.rs b/anchor/client/src/cli.rs index 992a6f513..ed6458f62 100644 --- a/anchor/client/src/cli.rs +++ b/anchor/client/src/cli.rs @@ -491,6 +491,31 @@ pub struct Node { #[clap(long, help = "Disables gossipsub topic scoring.", hide = true)] pub disable_gossipsub_topic_scoring: bool, + // Operator Doppelgänger Protection + #[clap( + long, + help = "Enable operator doppelgänger protection. When enabled, the node will monitor \ + for messages signed by its operator ID on startup and shut down if a twin \ + (duplicate operator) is detected. Enabled by default.", + display_order = 0, + default_value_t = true, + help_heading = FLAG_HEADER, + action = ArgAction::Set + )] + pub operator_dg: bool, + + #[clap( + long, + value_name = "EPOCHS", + help = "Number of epochs to wait in monitor mode before starting normal operation. \ + During this period, the node listens for messages from its own operator ID \ + to detect if another instance is running.", + display_order = 0, + default_value_t = 2, + requires = "operator_dg" + )] + pub operator_dg_wait_epochs: u64, + #[clap(flatten)] pub logging_flags: FileLoggingFlags, } diff --git a/anchor/client/src/config.rs b/anchor/client/src/config.rs index 775dc68e2..e6d6c4b69 100644 --- a/anchor/client/src/config.rs +++ b/anchor/client/src/config.rs @@ -72,6 +72,10 @@ pub struct Config { pub prefer_builder_proposals: bool, /// Controls whether the latency measurement service is enabled pub disable_latency_measurement_service: bool, + /// Enable operator doppelgänger protection + pub operator_dg: bool, + /// Number of epochs to wait in monitor mode + pub operator_dg_wait_epochs: u64, } impl Config { @@ -115,6 +119,8 @@ impl Config { prefer_builder_proposals: false, gas_limit: 36_000_000, disable_latency_measurement_service: false, + operator_dg: true, + operator_dg_wait_epochs: 2, } } } @@ -246,6 +252,10 @@ pub fn from_cli(cli_args: &Node, global_config: GlobalConfig) -> Result, + wait_epochs: u64, + executor: &TaskExecutor, +) { + info!( + wait_epochs = wait_epochs, + grace_period_secs = network::OPERATOR_DOPPELGANGER_GRACE_PERIOD_SECS, + "Operator doppelgänger: starting monitoring period" + ); + + // Spawn background task to end monitoring after grace period + wait epochs + // Grace period prevents false positives from receiving our own old messages after + // restart (they remain in gossip cache for ~4.2s) + service.spawn_monitor_task( + Duration::from_secs(network::OPERATOR_DOPPELGANGER_GRACE_PERIOD_SECS), + wait_epochs, + executor, + ); +} + impl Client { /// Runs the Anchor Client pub async fn run(executor: TaskExecutor, config: Config) -> Result<(), String> { @@ -394,6 +419,10 @@ impl Client { "syncer", ); + // Create operator ID wrapper that watches the database for our operator ID. + // Follows the common pattern: pass OwnOperatorId to components, they call .get() only when + // needed. This allows initialization before sync completes (which populates the ID from + // chain). let operator_id = OwnOperatorId::new(database.watch()); // Network sender/receiver @@ -419,15 +448,30 @@ impl Client { &executor, ); + // Create operator doppelgänger protection if enabled (will be started after sync) + let doppelganger_service = if config.operator_dg && config.impostor.is_none() { + Some(Arc::new(OperatorDoppelgangerService::new( + operator_id.clone(), + E::slots_per_epoch(), + Duration::from_secs(spec.seconds_per_slot), + executor.shutdown_sender(), + ))) + } else { + None + }; + let message_sender: Arc = if config.impostor.is_none() { Arc::new(NetworkMessageSender::new( - processor_senders.clone(), - network_tx.clone(), - key.clone(), - operator_id.clone(), - Some(message_validator.clone()), - SUBNET_COUNT, - is_synced.clone(), + message_sender::NetworkMessageSenderConfig { + processor: processor_senders.clone(), + network_tx: network_tx.clone(), + private_key: key.clone(), + operator_id: operator_id.clone(), + validator: Some(message_validator.clone()), + subnet_count: SUBNET_COUNT, + is_synced: is_synced.clone(), + doppelganger_service: doppelganger_service.clone(), + }, )?) } else { Arc::new(ImpostorMessageSender::new(network_tx.clone(), SUBNET_COUNT)) @@ -474,6 +518,7 @@ impl Client { is_synced.clone(), outcome_tx, message_validator, + doppelganger_service.clone(), ); // Start the p2p network @@ -573,6 +618,13 @@ impl Client { .map_err(|_| "Sync watch channel closed")?; info!("Sync complete, starting services..."); + // Start operator doppelgänger monitoring (now that sync is complete and operator ID + // available). The service will automatically stop monitoring after the configured + // wait period. Messages will be checked but dropped during monitoring. + if let Some(service) = &doppelganger_service { + start_operator_doppelganger(service.clone(), config.operator_dg_wait_epochs, &executor); + } + let mut block_service_builder = BlockServiceBuilder::new() .slot_clock(slot_clock.clone()) .validator_store(validator_store.clone()) diff --git a/anchor/message_receiver/Cargo.toml b/anchor/message_receiver/Cargo.toml index b21d6a6be..baf62b209 100644 --- a/anchor/message_receiver/Cargo.toml +++ b/anchor/message_receiver/Cargo.toml @@ -10,6 +10,7 @@ gossipsub = { workspace = true } hex = { workspace = true } libp2p = { workspace = true } message_validator = { workspace = true } +operator_doppelganger = { workspace = true } processor = { workspace = true } qbft_manager = { workspace = true } signature_collector = { workspace = true } @@ -18,3 +19,4 @@ ssv_types = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +types = { workspace = true } diff --git a/anchor/message_receiver/src/manager.rs b/anchor/message_receiver/src/manager.rs index 42e4d87cf..47b1108da 100644 --- a/anchor/message_receiver/src/manager.rs +++ b/anchor/message_receiver/src/manager.rs @@ -6,6 +6,7 @@ use libp2p::PeerId; use message_validator::{ DutiesProvider, ValidatedMessage, ValidatedSSVMessage, ValidationResult, Validator, }; +use operator_doppelganger::OperatorDoppelgangerService; use qbft_manager::QbftManager; use signature_collector::SignatureCollectorManager; use slot_clock::SlotClock; @@ -32,9 +33,11 @@ pub struct NetworkMessageReceiver { is_synced: watch::Receiver, outcome_tx: mpsc::Sender, validator: Arc>, + doppelganger_service: Option>, } impl NetworkMessageReceiver { + #[allow(clippy::too_many_arguments)] pub fn new( processor: processor::Senders, qbft_manager: Arc, @@ -43,6 +46,7 @@ impl NetworkMessageReceiver { is_synced: watch::Receiver, outcome_tx: mpsc::Sender, validator: Arc>, + doppelganger_service: Option>, ) -> Arc { Arc::new(Self { processor, @@ -52,6 +56,7 @@ impl NetworkMessageReceiver { is_synced, outcome_tx, validator, + doppelganger_service, }) } } @@ -159,6 +164,21 @@ impl MessageReceiver } } + // Check for operator doppelgänger before processing any message + if let Some(service) = &receiver.doppelganger_service { + // If in monitoring mode, check for twin and drop message + if service.is_monitoring() { + // Extract QBFT message for detailed logging if twin detected + let qbft_msg = match &ssv_message { + ValidatedSSVMessage::QbftMessage(msg) => Some(msg), + ValidatedSSVMessage::PartialSignatureMessages(_) => None, + }; + service.check_message(&signed_ssv_message, qbft_msg); + // Drop message during monitoring period - don't process + return; + } + } + match ssv_message { ValidatedSSVMessage::QbftMessage(qbft_message) => { if let Err(err) = receiver diff --git a/anchor/message_sender/Cargo.toml b/anchor/message_sender/Cargo.toml index e41102145..66905756c 100644 --- a/anchor/message_sender/Cargo.toml +++ b/anchor/message_sender/Cargo.toml @@ -12,6 +12,7 @@ database = { workspace = true } ethereum_ssz = { workspace = true } message_validator = { workspace = true } openssl = { workspace = true } +operator_doppelganger = { workspace = true } processor = { workspace = true } slot_clock = { workspace = true } ssv_types = { workspace = true } diff --git a/anchor/message_sender/src/network.rs b/anchor/message_sender/src/network.rs index 7603f0d15..5a0384bd8 100644 --- a/anchor/message_sender/src/network.rs +++ b/anchor/message_sender/src/network.rs @@ -8,6 +8,7 @@ use openssl::{ rsa::Rsa, sign::Signer, }; +use operator_doppelganger::OperatorDoppelgangerService; use slot_clock::SlotClock; use ssv_types::{ CommitteeId, RSA_SIGNATURE_SIZE, consensus::UnsignedSSVMessage, message::SignedSSVMessage, @@ -22,6 +23,18 @@ use crate::{Error, MessageCallback, MessageSender, SigningError}; const SIGNER_NAME: &str = "message_sign_and_send"; const SENDER_NAME: &str = "message_send"; +/// Configuration for creating a NetworkMessageSender +pub struct NetworkMessageSenderConfig { + pub processor: processor::Senders, + pub network_tx: mpsc::Sender<(SubnetId, Vec)>, + pub private_key: Rsa, + pub operator_id: OwnOperatorId, + pub validator: Option>>, + pub subnet_count: usize, + pub is_synced: watch::Receiver, + pub doppelganger_service: Option>, +} + pub struct NetworkMessageSender { processor: processor::Senders, network_tx: mpsc::Sender<(SubnetId, Vec)>, @@ -30,6 +43,7 @@ pub struct NetworkMessageSender { validator: Option>>, subnet_count: usize, is_synced: watch::Receiver, + doppelganger_service: Option>, } impl MessageSender for Arc> { @@ -39,6 +53,14 @@ impl MessageSender for Arc>, ) -> Result<(), Error> { + // Check if in doppelgänger monitoring period - silently drop + if let Some(dg) = &self.doppelganger_service + && dg.is_monitoring() + { + trace!("Dropping message send - in doppelgänger monitoring period"); + return Ok(()); + } + if self.network_tx.is_closed() { return Err(Error::NetworkQueueClosed); } @@ -84,6 +106,14 @@ impl MessageSender for Arc Result<(), Error> { + // Check if in doppelgänger monitoring period - silently drop + if let Some(dg) = &self.doppelganger_service + && dg.is_monitoring() + { + trace!("Dropping message send - in doppelgänger monitoring period"); + return Ok(()); + } + if self.network_tx.is_closed() { return Err(Error::NetworkQueueClosed); } @@ -105,25 +135,18 @@ impl MessageSender for Arc NetworkMessageSender { - pub fn new( - processor: processor::Senders, - network_tx: mpsc::Sender<(SubnetId, Vec)>, - private_key: Rsa, - operator_id: OwnOperatorId, - validator: Option>>, - subnet_count: usize, - is_synced: watch::Receiver, - ) -> Result, String> { - let private_key = PKey::from_rsa(private_key) + pub fn new(config: NetworkMessageSenderConfig) -> Result, String> { + let private_key = PKey::from_rsa(config.private_key) .map_err(|err| format!("Failed to create PKey from RSA: {err}"))?; Ok(Arc::new(Self { - processor, - network_tx, + processor: config.processor, + network_tx: config.network_tx, private_key, - operator_id, - validator, - subnet_count, - is_synced, + operator_id: config.operator_id, + validator: config.validator, + subnet_count: config.subnet_count, + is_synced: config.is_synced, + doppelganger_service: config.doppelganger_service, })) } diff --git a/anchor/network/src/behaviour.rs b/anchor/network/src/behaviour.rs index 6cc473493..5e3d5cebf 100644 --- a/anchor/network/src/behaviour.rs +++ b/anchor/network/src/behaviour.rs @@ -19,6 +19,23 @@ use crate::{ const MAX_TRANSMIT_SIZE_BYTES: usize = 5_000_000; +/// Gossipsub heartbeat interval in milliseconds (how often messages are propagated) +pub const GOSSIPSUB_HEARTBEAT_INTERVAL_MILLIS: u64 = 700; + +/// Gossipsub history length (number of heartbeat intervals messages stay in cache) +/// Messages remain in mcache for: history_length × heartbeat_interval (6 × 700ms = 4.2s) +pub const GOSSIPSUB_HISTORY_LENGTH: usize = 6; + +/// Operator doppelgänger grace period in seconds +/// +/// Wait after startup before detecting twins to prevent false positives from own old messages. +/// Automatically derived as: (gossip_cache_window + 1 second buffer) +/// Default: 5s (cache window is 4.2s) +pub const OPERATOR_DOPPELGANGER_GRACE_PERIOD_SECS: u64 = { + let cache_window_millis = GOSSIPSUB_HISTORY_LENGTH as u64 * GOSSIPSUB_HEARTBEAT_INTERVAL_MILLIS; + cache_window_millis / 1000 + 1 +}; + /// Custom message ID function matching Go-SSV implementation. /// Uses xxhash64 of the full message to ensure uniqueness across operators. /// @@ -104,8 +121,8 @@ impl AnchorBehaviour { .mesh_n_low(6) // Dlo .mesh_n_high(12) // Dhi .mesh_outbound_min(4) // Dout - .heartbeat_interval(Duration::from_millis(700)) - .history_length(6) + .heartbeat_interval(Duration::from_millis(GOSSIPSUB_HEARTBEAT_INTERVAL_MILLIS)) + .history_length(GOSSIPSUB_HISTORY_LENGTH) .history_gossip(4) .max_ihave_length(1500) .max_ihave_messages(32) diff --git a/anchor/network/src/lib.rs b/anchor/network/src/lib.rs index 507bf7f18..81ba6eee2 100644 --- a/anchor/network/src/lib.rs +++ b/anchor/network/src/lib.rs @@ -11,6 +11,7 @@ mod peer_manager; mod scoring; mod transport; +pub use behaviour::OPERATOR_DOPPELGANGER_GRACE_PERIOD_SECS; pub use config::{Config, DEFAULT_DISC_PORT, DEFAULT_QUIC_PORT, DEFAULT_TCP_PORT}; pub use network::Network; pub use network_utils::listen_addr::{ListenAddr, ListenAddress}; diff --git a/anchor/operator_doppelganger/Cargo.toml b/anchor/operator_doppelganger/Cargo.toml new file mode 100644 index 000000000..2d91bf1ce --- /dev/null +++ b/anchor/operator_doppelganger/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "operator_doppelganger" +version = "0.1.0" +edition = { workspace = true } + +[dependencies] +database = { workspace = true } +futures = { workspace = true } +parking_lot = { workspace = true } +ssv_types = { workspace = true } +task_executor = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } + +[dev-dependencies] +async-channel = { workspace = true } +tokio = { workspace = true, features = ["test-util"] } +types = { workspace = true } diff --git a/anchor/operator_doppelganger/src/lib.rs b/anchor/operator_doppelganger/src/lib.rs new file mode 100644 index 000000000..b3aa21971 --- /dev/null +++ b/anchor/operator_doppelganger/src/lib.rs @@ -0,0 +1,3 @@ +mod service; + +pub use service::OperatorDoppelgangerService; diff --git a/anchor/operator_doppelganger/src/service.rs b/anchor/operator_doppelganger/src/service.rs new file mode 100644 index 000000000..717094a82 --- /dev/null +++ b/anchor/operator_doppelganger/src/service.rs @@ -0,0 +1,542 @@ +use std::{sync::Arc, time::Duration}; + +use database::OwnOperatorId; +use futures::channel::mpsc; +use parking_lot::{Mutex, RwLock}; +use ssv_types::{consensus::QbftMessage, message::SignedSSVMessage}; +use task_executor::{ShutdownReason, TaskExecutor}; +use tracing::{error, info}; + +/// State of operator doppelgänger detection +/// +/// ## State Transitions +/// +/// ```text +/// GracePeriod → Monitoring → Completed +/// ``` +/// +/// - **GracePeriod**: Waiting for network message caches to expire before checking +/// - **Monitoring**: Actively checking messages for doppelgängers +/// - **Completed**: Monitoring period finished, no longer checking +/// +/// ## Implementation Note +/// +/// Stored in `RwLock` for read-optimized access in hot path (message validation). +/// Read locks have minimal overhead and avoid contention for the entire node +/// lifetime after the brief monitoring period ends. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum DoppelgangerState { + /// In startup grace period - not yet checking for doppelgängers + /// + /// ## Why we need a grace period + /// + /// Gossipsub stores messages in a sliding window cache (mcache) for gossip propagation. + /// Messages remain in this cache for `history_length × heartbeat_interval` (~4.2s in Anchor). + /// + /// **The restart vulnerability:** + /// 1. Node sends messages (QBFT + partial signatures) at t=0 + /// 2. Messages propagate to peers' mcache + /// 3. Node crashes/restarts at t=2s + /// 4. Gossipsub seen_cache is cleared (not persisted) + /// 5. Messages still in peers' mcache (~2s remaining) + /// 6. Node reconnects and receives its own messages via IHAVE/IWANT + /// 7. FALSE POSITIVE: Messages have our operator ID but we think they're from a twin! + /// + /// **Solution:** + /// Wait for gossip cache expiry (~5s) before checking messages. This ensures our own + /// old messages have expired from the network before we start detecting twins. + GracePeriod, + + /// Actively monitoring for doppelgängers - checking all messages + Monitoring, + + /// Monitoring period completed - no longer checking for doppelgängers + Completed, +} + +impl Default for DoppelgangerState { + fn default() -> Self { + Self::new() + } +} + +impl DoppelgangerState { + /// Create a new doppelgänger state starting in grace period + const fn new() -> Self { + Self::GracePeriod + } + + /// Check if actively monitoring + const fn is_monitoring(self) -> bool { + matches!(self, Self::Monitoring) + } + + /// Transition from grace period to monitoring + fn end_grace_period(&mut self) { + *self = Self::Monitoring; + } + + /// Transition from monitoring to completed + fn end_monitoring(&mut self) { + *self = Self::Completed; + } +} + +pub struct OperatorDoppelgangerService { + /// Our operator ID to watch for (wraps database watch) + own_operator_id: OwnOperatorId, + /// Current state (RwLock for read-optimized access in hot path) + state: Arc>, + /// Number of slots per epoch (for calculating monitoring duration) + slots_per_epoch: u64, + /// Duration of a slot (for calculating monitoring duration) + slot_duration: Duration, + /// Shutdown sender (triggers fatal shutdown on twin detection) + shutdown_sender: Mutex>, +} + +impl OperatorDoppelgangerService { + /// Create a new operator doppelgänger service + pub fn new( + own_operator_id: OwnOperatorId, + slots_per_epoch: u64, + slot_duration: Duration, + shutdown_sender: mpsc::Sender, + ) -> Self { + let state = Arc::new(RwLock::new(DoppelgangerState::new())); + + Self { + own_operator_id, + state, + slots_per_epoch, + slot_duration, + shutdown_sender: Mutex::new(shutdown_sender), + } + } + + /// Spawn a background task to end monitoring after the configured wait period + /// + /// The task sleeps for the grace period plus the monitoring duration, then transitions + /// to active mode. + /// + /// # Arguments + /// + /// * `grace_period` - Duration to wait before starting twin detection. Should be slightly + /// longer than the gossip message cache window (history_length × heartbeat_interval ≈ 4.2s) + /// to ensure our own old messages have expired from the network. See `DoppelgangerState` + /// documentation for details on why this prevents false positives. + /// * `wait_epochs` - Number of epochs to monitor for doppelgängers after grace period ends + pub fn spawn_monitor_task( + self: Arc, + grace_period: Duration, + wait_epochs: u64, + executor: &TaskExecutor, + ) { + // Calculate monitoring duration (after grace period) + let monitoring_duration = + Duration::from_secs(wait_epochs * self.slots_per_epoch * self.slot_duration.as_secs()); + + executor.spawn_without_exit( + async move { + // Wait for grace period - prevents false positives from own old messages + tokio::time::sleep(grace_period).await; + + // Grace period complete - start detecting twins + self.state.write().end_grace_period(); + + // Wait for monitoring period to complete + tokio::time::sleep(monitoring_duration).await; + + // Monitoring complete - stop checking for doppelgängers + self.end_monitoring_period(); + }, + "doppelganger-monitor", + ); + } + + /// End the monitoring period + /// + /// This should be called when the monitoring period completes. + /// After this, messages will no longer be checked for doppelgängers. + /// + /// Note: This is automatically called by `spawn_monitor_task()`. Made public for testing. + #[cfg_attr(not(test), allow(dead_code))] + pub(crate) fn end_monitoring_period(&self) { + let mut state = self.state.write(); + if state.is_monitoring() { + state.end_monitoring(); + info!("Operator doppelgänger: monitoring period ended"); + } + } + + /// Check if a message indicates a potential doppelgänger (detection logic only) + /// + /// Returns `true` if a twin operator is detected, `false` otherwise. + /// This method performs pure detection logic without side effects (except logging). + /// + /// Checks all single-signer messages (QBFT consensus and partial signatures) signed + /// with our operator ID. + pub fn is_doppelganger( + &self, + signed_message: &SignedSSVMessage, + qbft_message: Option<&QbftMessage>, + ) -> bool { + // Fast path: read lock for checking state (lock-free for readers) + let state = *self.state.read(); + + // Only check when actively monitoring (not during grace period or after completion) + if !state.is_monitoring() { + return false; + } + + // Get operator ID - return early if not yet available (still syncing) + let Some(own_operator_id) = self.own_operator_id.get() else { + return false; + }; + + // Check if this is a single-signer message with our operator ID + let operator_ids = signed_message.operator_ids(); + if operator_ids.len() != 1 { + // Not a single-signer message (could be aggregate/decided) + return false; + } + + let signer = operator_ids[0]; + if signer != own_operator_id { + // Not signed by us + return false; + } + + // Single-signer message with our operator ID = twin detected! + let msg_id = signed_message.ssv_message().msg_id(); + error!( + operator_id = *own_operator_id, + duty_executor = ?msg_id.duty_executor(), + height = ?qbft_message.map(|m| m.height), + round = ?qbft_message.map(|m| m.round), + qbft_type = ?qbft_message.map(|m| m.qbft_message_type), + "OPERATOR DOPPELGÄNGER DETECTED: Received message signed with our operator ID. \ + Another instance of this operator is running. Shutting down to prevent equivocation." + ); + + true + } + + /// Check if a message indicates a potential doppelgänger + /// + /// Checks the message and triggers shutdown if a twin is detected + pub fn check_message( + &self, + signed_message: &SignedSSVMessage, + qbft_message: Option<&QbftMessage>, + ) { + if self.is_doppelganger(signed_message, qbft_message) { + // Trigger shutdown + let _ = self + .shutdown_sender + .lock() + .try_send(ShutdownReason::Failure("Operator doppelgänger detected")); + } + } + + /// Check if actively monitoring for doppelgängers + /// + /// Returns `true` only during the monitoring state (after grace period, + /// before completion). Returns `false` during grace period or after completion. + pub fn is_monitoring(&self) -> bool { + self.state.read().is_monitoring() + } +} + +#[cfg(test)] +mod tests { + use std::{sync::Arc, time::Duration}; + + use database::OwnOperatorId; + use ssv_types::{ + CommitteeId, OperatorId, RSA_SIGNATURE_SIZE, + consensus::{QbftMessage, QbftMessageType}, + domain_type::DomainType, + message::{MsgType, SSVMessage, SignedSSVMessage}, + msgid::{DutyExecutor, MessageId, Role}, + }; + use task_executor::TaskExecutor; + use types::Hash256; + + use super::*; + + /// Helper to create a TaskExecutor for testing + fn create_test_executor() -> TaskExecutor { + let handle = tokio::runtime::Handle::current(); + let (_signal, exit) = async_channel::bounded(1); + let (shutdown_tx, _) = futures::channel::mpsc::channel(1); + TaskExecutor::new(handle, exit, shutdown_tx, "doppelganger_test".into()) + } + + /// Helper to spawn monitor task and advance time past grace period + /// + /// Returns the service in monitoring mode with grace period complete, + /// ready for twin detection tests. + async fn spawn_and_advance_past_grace_period( + service: Arc, + executor: &TaskExecutor, + ) { + let grace_period = Duration::from_secs(5); + let wait_epochs = 2; + + // Spawn monitor task + service + .clone() + .spawn_monitor_task(grace_period, wait_epochs, executor); + + // Give the spawned task a chance to start + tokio::task::yield_now().await; + + // Advance time past grace period + tokio::time::advance(grace_period).await; + + // Allow timer to fire and task to process (single yield is sufficient) + tokio::task::yield_now().await; + } + + fn create_service() -> OperatorDoppelgangerService { + let own_operator_id = OwnOperatorId::from(OperatorId(1)); + let slots_per_epoch = 1; + let slot_duration = Duration::from_secs(12); + + // Create a shutdown channel for testing + let (shutdown_tx, _shutdown_rx) = mpsc::channel(1); + + OperatorDoppelgangerService::new( + own_operator_id, + slots_per_epoch, + slot_duration, + shutdown_tx, + ) + } + + /// Helper to create test messages for doppelgänger detection + /// + /// # Arguments + /// * `committee_id` - The committee identifier + /// * `operator_ids` - Vector of operator IDs (single for non-aggregated, multiple for + /// aggregated) + /// * `height` - QBFT consensus height + /// * `round` - QBFT consensus round + fn create_test_message( + committee_id: CommitteeId, + operator_ids: Vec, + height: u64, + round: u64, + ) -> (SignedSSVMessage, QbftMessage) { + // Create MessageId for committee messages + let message_id = MessageId::new( + &DomainType([0; 4]), + Role::Committee, + &DutyExecutor::Committee(committee_id), + ); + + // Create QbftMessage + let qbft_message = QbftMessage { + qbft_message_type: QbftMessageType::Prepare, + height, + round, + identifier: message_id.as_ref().to_vec().into(), + root: Hash256::from([0u8; 32]), + data_round: 0, + round_change_justification: vec![].try_into().unwrap(), + prepare_justification: vec![].try_into().unwrap(), + }; + + // Create SSVMessage with serialized QbftMessage + // Note: Since ethereum_ssz::Encode isn't directly accessible in this test module, + // we use a minimal test payload. This is acceptable since we're testing the + // doppelgänger detection logic, not QBFT message serialization. + let qbft_bytes = vec![0u8; 100]; + let ssv_message = SSVMessage::new(MsgType::SSVConsensusMsgType, message_id, qbft_bytes) + .expect("should create SSVMessage"); + + // Create signatures (one per operator) + let signatures: Vec<[u8; RSA_SIGNATURE_SIZE]> = operator_ids + .iter() + .map(|_| [0u8; RSA_SIGNATURE_SIZE]) + .collect(); + + // Create SignedSSVMessage + let signed_message = SignedSSVMessage::new( + signatures, + operator_ids, + ssv_message, + vec![], // empty full_data for non-proposal messages + ) + .expect("should create SignedSSVMessage"); + + (signed_message, qbft_message) + } + + #[test] + fn test_state_lifecycle() { + let mut state = DoppelgangerState::new(); + + // Start in grace period - not monitoring + assert!(!state.is_monitoring()); + + // Transition to monitoring + state.end_grace_period(); + assert!(state.is_monitoring()); + + // Complete monitoring + state.end_monitoring(); + assert!(!state.is_monitoring()); + } + + #[test] + fn test_service_creation() { + let service = create_service(); + + // Start in grace period, not yet monitoring + assert!(!service.is_monitoring()); + } + + // High-value tests for check_message functionality + + #[tokio::test(start_paused = true)] + async fn test_twin_detected_after_grace_period_timer() { + let service = Arc::new(create_service()); + let committee_id = CommitteeId([1u8; 32]); + let executor = create_test_executor(); + + // Advance past grace period via timer + spawn_and_advance_past_grace_period(service.clone(), &executor).await; + + // Grace period should be complete, now monitoring + assert!(service.is_monitoring()); + + // Create a single-signer message with our operator ID (1) + let (signed_message, qbft_message) = + create_test_message(committee_id, vec![OperatorId(1)], 10, 0); + + // This should detect a twin (grace period ended via timer) + let result = service.is_doppelganger(&signed_message, Some(&qbft_message)); + assert!( + result, + "Single-signer message with our operator ID should detect twin after grace period" + ); + } + + #[tokio::test(start_paused = true)] + async fn test_no_twin_during_grace_period() { + let service = Arc::new(create_service()); + let committee_id = CommitteeId([1u8; 32]); + let executor = create_test_executor(); + + // Spawn the monitor task with grace period + let grace_period = Duration::from_secs(5); + let wait_epochs = 2; + service + .clone() + .spawn_monitor_task(grace_period, wait_epochs, &executor); + + // Still in grace period (don't advance time) + assert!(!service.is_monitoring()); + + // Create a single-signer message with our operator ID (1) + let (signed_message, qbft_message) = + create_test_message(committee_id, vec![OperatorId(1)], 10, 0); + + // This should NOT detect a twin (still in grace period) + let result = service.is_doppelganger(&signed_message, Some(&qbft_message)); + assert!( + !result, + "Message during grace period should NOT detect twin (prevents false positives from own old messages)" + ); + } + + #[tokio::test(start_paused = true)] + async fn test_no_twin_multi_signer_aggregate_message() { + let service = Arc::new(create_service()); + let committee_id = CommitteeId([1u8; 32]); + let executor = create_test_executor(); + + // Advance past grace period via timer + spawn_and_advance_past_grace_period(service.clone(), &executor).await; + + // Create a multi-signer aggregate message (includes our operator ID) + let (signed_message, qbft_message) = create_test_message( + committee_id, + vec![OperatorId(1), OperatorId(2), OperatorId(3)], + 10, + 0, + ); + + // This should NOT detect a twin (aggregate message) + let result = service.is_doppelganger(&signed_message, Some(&qbft_message)); + assert!( + !result, + "Multi-signer aggregate message should NOT detect twin" + ); + } + + #[tokio::test(start_paused = true)] + async fn test_no_twin_different_operator_id() { + let service = Arc::new(create_service()); + let committee_id = CommitteeId([1u8; 32]); + let executor = create_test_executor(); + + // Advance past grace period via timer + spawn_and_advance_past_grace_period(service.clone(), &executor).await; + + // Create a single-signer message from a different operator (2, not 1) + let (signed_message, qbft_message) = + create_test_message(committee_id, vec![OperatorId(2)], 10, 0); + + // This should NOT detect a twin (different operator) + let result = service.is_doppelganger(&signed_message, Some(&qbft_message)); + assert!( + !result, + "Message from different operator should NOT detect twin" + ); + } + + #[tokio::test(start_paused = true)] + async fn test_no_twin_after_monitoring_period_timer() { + let service = Arc::new(create_service()); + let committee_id = CommitteeId([1u8; 32]); + let executor = create_test_executor(); + + // Spawn the monitor task + let grace_period = Duration::from_secs(5); + let wait_epochs = 2; + service + .clone() + .spawn_monitor_task(grace_period, wait_epochs, &executor); + + // Give the spawned task a chance to start + tokio::task::yield_now().await; + + // Calculate monitoring duration + let monitoring_duration = Duration::from_secs(wait_epochs * 12); // epochs * (slots_per_epoch=1) * (slot_duration=12s) + + // Advance time past grace period first + tokio::time::advance(grace_period).await; + tokio::task::yield_now().await; + + // Now advance time past monitoring period + tokio::time::advance(monitoring_duration).await; + tokio::task::yield_now().await; + + // Monitoring should be complete + assert!(!service.is_monitoring()); + + // Create a single-signer message with our operator ID + let (signed_message, qbft_message) = + create_test_message(committee_id, vec![OperatorId(1)], 10, 0); + + // This should NOT detect a twin (monitoring period ended via timer) + let result = service.is_doppelganger(&signed_message, Some(&qbft_message)); + assert!( + !result, + "Message after monitoring period should NOT detect twin" + ); + } +}