Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
dfaf1b4
feat: add CLI config for operator doppelgänger protection
diegomrsantos Oct 15, 2025
5e2b95e
feat: implement operator doppelgänger detection service
diegomrsantos Oct 15, 2025
fd01236
feat: integrate operator doppelgänger detection with message receiver
diegomrsantos Oct 15, 2025
4237308
Merge branch 'unstable' into feat/operator-doppelganger-protection
diegomrsantos Oct 15, 2025
be56784
Merge branch 'unstable' into feat/operator-doppelganger-protection
diegomrsantos Oct 16, 2025
f776d11
Merge branch 'unstable' into feat/operator-doppelganger-protection
diegomrsantos Oct 17, 2025
eb99a99
fix: pass current epoch explicitly instead of using unwrap_or_else
diegomrsantos Oct 17, 2025
3117374
fix: handle slot clock read failure in doppelgänger check
diegomrsantos Oct 17, 2025
67b7126
refactor: simplify doppelgänger state management with Mutex
diegomrsantos Oct 17, 2025
4834057
chore: change stale message log from warn to debug
diegomrsantos Oct 17, 2025
8f98ea0
refactor: remove redundant enabled field from doppelgänger service
diegomrsantos Oct 20, 2025
91836e3
test: add comprehensive tests for doppelgänger service
diegomrsantos Oct 20, 2025
d93ef67
Merge branch 'unstable' into feat/operator-doppelganger-protection
diegomrsantos Oct 20, 2025
64146a6
refactor: extract operator doppelgänger initialization and add monito…
diegomrsantos Oct 20, 2025
e268715
refactor: apply best practices to operator doppelgänger feature
diegomrsantos Oct 20, 2025
982db2e
refactor: parameterize slot duration in operator doppelganger service
diegomrsantos Oct 20, 2025
892fcf6
refactor: remove update_and_check_freshness in favor of separate oper…
diegomrsantos Oct 20, 2025
8006a91
test: remove redundant operator doppelganger tests
diegomrsantos Oct 21, 2025
741c8ad
test: remove redundant initial state test
diegomrsantos Oct 21, 2025
58170c6
lint
diegomrsantos Oct 21, 2025
9276f25
refactor: extract operator_doppelganger to separate crate
diegomrsantos Oct 21, 2025
632f5d4
refactor: remove blocking wait for operator doppelganger monitoring
diegomrsantos Oct 21, 2025
db1d9b2
refactor: simplify operator doppelganger by removing intermediary cha…
diegomrsantos Oct 21, 2025
ffc12a2
Merge branch 'unstable' into feat/operator-doppelganger-protection
diegomrsantos Oct 22, 2025
90b58b1
docs: add architectural principles to CLAUDE.md
diegomrsantos Oct 22, 2025
e0bd8b2
refactor: replace height-based with grace period approach for operato…
diegomrsantos Oct 22, 2025
2d1b48c
style: apply cargo fmt
diegomrsantos Oct 22, 2025
fc962b0
feat: expand doppelgänger detection to all operator-signed messages
diegomrsantos Oct 22, 2025
00c7546
chore: simplify redundant grace period comments
diegomrsantos Oct 22, 2025
c06e7ce
refactor: simplify operator doppelgänger state management
diegomrsantos Oct 22, 2025
571c3f8
refactor: replace epoch-based monitoring with single sleep timer
diegomrsantos Oct 22, 2025
5a8ba76
refactor: remove unnecessary create_operator_doppelganger wrapper
diegomrsantos Oct 22, 2025
433feba
refactor: remove unnecessary generics from OperatorDoppelgangerService
diegomrsantos Oct 22, 2025
b4212b1
test: add async timer tests for operator doppelgänger monitoring
diegomrsantos Oct 22, 2025
ba86842
refactor: convert all detection logic tests to use async timers
diegomrsantos Oct 22, 2025
de1a339
refactor: simplify async timer tests by using single yield
diegomrsantos Oct 22, 2025
26ff439
refactor: replace boolean flags with explicit state enum
diegomrsantos Oct 22, 2025
2aa1ffb
refactor: move DoppelgangerState to private implementation
diegomrsantos Oct 22, 2025
56ef9a8
chore: remove unused dependencies from operator_doppelganger
diegomrsantos Oct 22, 2025
7122d72
perf: use RwLock for read-optimized state access
diegomrsantos Oct 22, 2025
885ab36
feat: block all outgoing messages during doppelgänger monitoring
diegomrsantos Oct 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,16 @@ When contributing to Anchor, follow these Rust best practices:
6. **Simplicity First**: Always choose the simplest solution that elegantly solves the problem, follows existing patterns, maintains performance, and uses basic constructs over complex data structures
7. **Check Requirements First**: Before implementing or creating anything (PRs, commits, code), always read and follow existing templates, guidelines, and requirements in the codebase

### Architecture and Design

1. **Question Intermediaries**: If data flows A → B with no transformation, question why A → intermediate → B exists. Each layer should provide clear value (logging, transformation, validation, etc.). Ask: "What problem does this solve that direct communication doesn't?"

2. **Separation Through Interfaces, Not Layers**: Clean boundaries come from well-defined APIs, not intermediary components. A component receiving a `Sender<Event>` achieves separation without needing forwarding tasks or wrapper channels.

3. **Simplification is Always Valid**: Refactoring working code for simplicity is encouraged. Question architectural decisions even after tests pass. Fewer lines and fewer components often indicates better design.

4. **Challenge Complexity**: Every abstraction should justify its existence. "We might need it later" or "it provides separation" aren't sufficient reasons. Complexity must solve specific, current problems.

### Specific Guidelines

1. **Naming**:
Expand Down Expand Up @@ -409,10 +419,31 @@ When writing PR descriptions, follow these guidelines for maintainable and revie

- **Keep "Proposed Changes" section high-level** - focus on what components were changed and why
- **Avoid line-by-line documentation** - reviewers can see specific changes in the diff
- **Use component-level summaries** rather than file-by-file breakdowns
- **Use component-level summaries** rather than file-by-file breakdowns
- **Emphasize the principles** being applied and operational impact
- **Be concise but complete** - provide context without overwhelming detail

### Code Review Culture

Effective code reviews question "why" architectural decisions exist:

**Questions to Ask:**
- "Why does this intermediary layer exist?"
- "What problem does this abstraction solve?"
- "Could components communicate directly?"
- "Is this complexity providing clear value?"

**Encourage Simplification:**
- Working code can still be improved
- Refactoring for clarity is valuable
- Fewer components usually means better architecture
- Test passing ≠ design complete

**Balance:**
- Question complexity, but respect existing patterns that solve real problems
- Not every layer is unnecessary - some provide genuine value
- Focus on "why" over "what"

## Development Tips

- This is a Rust project that follows standard Rust development practices
Expand Down
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ members = [
"anchor/message_sender",
"anchor/message_validator",
"anchor/network",
"anchor/operator_doppelganger",
"anchor/processor",
"anchor/qbft_manager",
"anchor/signature_collector",
Expand Down Expand Up @@ -54,6 +55,7 @@ message_receiver = { path = "anchor/message_receiver" }
message_sender = { path = "anchor/message_sender" }
message_validator = { path = "anchor/message_validator" }
network = { path = "anchor/network" }
operator_doppelganger = { path = "anchor/operator_doppelganger" }
operator_key = { path = "anchor/common/operator_key" }
processor = { path = "anchor/processor" }
qbft = { path = "anchor/common/qbft" }
Expand Down
1 change: 1 addition & 0 deletions anchor/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ multiaddr = { workspace = true }
network = { workspace = true }
network_utils = { workspace = true }
openssl = { workspace = true }
operator_doppelganger = { workspace = true }
operator_key = { workspace = true }
parking_lot = { workspace = true }
processor = { workspace = true }
Expand Down
25 changes: 25 additions & 0 deletions anchor/client/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,31 @@ pub struct Node {
#[clap(long, help = "Disables gossipsub topic scoring.", hide = true)]
pub disable_gossipsub_topic_scoring: bool,

// Operator Doppelgänger Protection
#[clap(
long,
help = "Enable operator doppelgänger protection. When enabled, the node will monitor \
for messages signed by its operator ID on startup and shut down if a twin \
(duplicate operator) is detected. Enabled by default.",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation improvement: Consider enhancing the help text to include:

help = "Enable operator doppelgänger protection. When enabled, the node will monitor \
        for messages signed by its operator ID on startup and shut down if a twin \
        (duplicate operator) is detected. During the monitoring period (~2 epochs + 5s grace), \
        incoming duties will be dropped to ensure safe detection. \
        Use --operator-dg=false to disable. Enabled by default."

This clarifies:

  • What happens during monitoring (duties dropped)
  • How to disable the feature
  • Approximate monitoring duration

Helps operators understand the behavior without reading source code.

display_order = 0,
default_value_t = true,
help_heading = FLAG_HEADER,
action = ArgAction::Set
)]
pub operator_dg: bool,

#[clap(
long,
value_name = "EPOCHS",
help = "Number of epochs to wait in monitor mode before starting normal operation. \
During this period, the node listens for messages from its own operator ID \
to detect if another instance is running.",
display_order = 0,
default_value_t = 2,
requires = "operator_dg"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usability Issue: Remove requires = "operator_dg" from this line.

The requires directive hides these configuration options when operator_dg = false, making them undiscoverable. Users should be able to see these options in --help even if the feature is disabled.

The code already correctly handles these values only being used when the feature is enabled (see config.rs:256-258), so the requires constraint is unnecessary and reduces discoverability.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to claude: requires does not prevent an option from being listed

)]
pub operator_dg_wait_epochs: u64,

#[clap(flatten)]
pub logging_flags: FileLoggingFlags,
}
10 changes: 10 additions & 0 deletions anchor/client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ pub struct Config {
pub prefer_builder_proposals: bool,
/// Controls whether the latency measurement service is enabled
pub disable_latency_measurement_service: bool,
/// Enable operator doppelgänger protection
pub operator_dg: bool,
/// Number of epochs to wait in monitor mode
pub operator_dg_wait_epochs: u64,
}

impl Config {
Expand Down Expand Up @@ -115,6 +119,8 @@ impl Config {
prefer_builder_proposals: false,
gas_limit: 36_000_000,
disable_latency_measurement_service: false,
operator_dg: true,
operator_dg_wait_epochs: 2,
}
}
}
Expand Down Expand Up @@ -243,6 +249,10 @@ pub fn from_cli(cli_args: &Node, global_config: GlobalConfig) -> Result<Config,
config.impostor = cli_args.impostor.map(OperatorId);
config.disable_latency_measurement_service = cli_args.disable_latency_measurement_service;

// Operator doppelgänger protection
config.operator_dg = cli_args.operator_dg;
config.operator_dg_wait_epochs = cli_args.operator_dg_wait_epochs;

// Performance options
if let Some(max_workers) = cli_args.max_workers {
config.processor.max_workers = max_workers;
Expand Down
66 changes: 59 additions & 7 deletions anchor/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use message_sender::{MessageSender, NetworkMessageSender, impostor::ImpostorMess
use message_validator::Validator;
use network::Network;
use openssl::rsa::Rsa;
use operator_doppelganger::OperatorDoppelgangerService;
use parking_lot::RwLock;
use qbft_manager::QbftManager;
use sensitive_url::SensitiveUrl;
Expand Down Expand Up @@ -83,6 +84,30 @@ const HTTP_DEFAULT_TIMEOUT_QUOTIENT: u32 = 4;

pub struct Client {}

/// Start operator doppelgänger monitoring
///
/// Logs the monitoring start and spawns the background monitoring task
fn start_operator_doppelganger(
service: Arc<OperatorDoppelgangerService>,
wait_epochs: u64,
executor: &TaskExecutor,
) {
info!(
wait_epochs = wait_epochs,
grace_period_secs = network::OPERATOR_DOPPELGANGER_GRACE_PERIOD_SECS,
"Operator doppelgänger: starting monitoring period"
);

// Spawn background task to end monitoring after grace period + wait epochs
// Grace period prevents false positives from receiving our own old messages after
// restart (they remain in gossip cache for ~4.2s)
service.spawn_monitor_task(
Duration::from_secs(network::OPERATOR_DOPPELGANGER_GRACE_PERIOD_SECS),
wait_epochs,
executor,
);
}

impl Client {
/// Runs the Anchor Client
pub async fn run<E: EthSpec>(executor: TaskExecutor, config: Config) -> Result<(), String> {
Expand Down Expand Up @@ -394,6 +419,10 @@ impl Client {
"syncer",
);

// Create operator ID wrapper that watches the database for our operator ID.
// Follows the common pattern: pass OwnOperatorId to components, they call .get() only when
// needed. This allows initialization before sync completes (which populates the ID from
// chain).
let operator_id = OwnOperatorId::new(database.watch());

// Network sender/receiver
Expand All @@ -419,15 +448,30 @@ impl Client {
&executor,
);

// Create operator doppelgänger protection if enabled (will be started after sync)
let doppelganger_service = if config.operator_dg && config.impostor.is_none() {
Some(Arc::new(OperatorDoppelgangerService::new(
operator_id.clone(),
E::slots_per_epoch(),
Duration::from_secs(spec.seconds_per_slot),
executor.shutdown_sender(),
)))
} else {
None
};

let message_sender: Arc<dyn MessageSender> = if config.impostor.is_none() {
Arc::new(NetworkMessageSender::new(
processor_senders.clone(),
network_tx.clone(),
key.clone(),
operator_id.clone(),
Some(message_validator.clone()),
SUBNET_COUNT,
is_synced.clone(),
message_sender::NetworkMessageSenderConfig {
processor: processor_senders.clone(),
network_tx: network_tx.clone(),
private_key: key.clone(),
operator_id: operator_id.clone(),
validator: Some(message_validator.clone()),
subnet_count: SUBNET_COUNT,
is_synced: is_synced.clone(),
doppelganger_service: doppelganger_service.clone(),
},
)?)
} else {
Arc::new(ImpostorMessageSender::new(network_tx.clone(), SUBNET_COUNT))
Expand Down Expand Up @@ -474,6 +518,7 @@ impl Client {
is_synced.clone(),
outcome_tx,
message_validator,
doppelganger_service.clone(),
);

// Start the p2p network
Expand Down Expand Up @@ -573,6 +618,13 @@ impl Client {
.map_err(|_| "Sync watch channel closed")?;
info!("Sync complete, starting services...");

// Start operator doppelgänger monitoring (now that sync is complete and operator ID
// available). The service will automatically stop monitoring after the configured
// wait period. Messages will be checked but dropped during monitoring.
if let Some(service) = &doppelganger_service {
start_operator_doppelganger(service.clone(), config.operator_dg_wait_epochs, &executor);
}

let mut block_service_builder = BlockServiceBuilder::new()
.slot_clock(slot_clock.clone())
.validator_store(validator_store.clone())
Expand Down
2 changes: 2 additions & 0 deletions anchor/message_receiver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ gossipsub = { workspace = true }
hex = { workspace = true }
libp2p = { workspace = true }
message_validator = { workspace = true }
operator_doppelganger = { workspace = true }
processor = { workspace = true }
qbft_manager = { workspace = true }
signature_collector = { workspace = true }
Expand All @@ -18,3 +19,4 @@ ssv_types = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
types = { workspace = true }
20 changes: 20 additions & 0 deletions anchor/message_receiver/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use libp2p::PeerId;
use message_validator::{
DutiesProvider, ValidatedMessage, ValidatedSSVMessage, ValidationResult, Validator,
};
use operator_doppelganger::OperatorDoppelgangerService;
use qbft_manager::QbftManager;
use signature_collector::SignatureCollectorManager;
use slot_clock::SlotClock;
Expand All @@ -32,9 +33,11 @@ pub struct NetworkMessageReceiver<S: SlotClock, D: DutiesProvider> {
is_synced: watch::Receiver<bool>,
outcome_tx: mpsc::Sender<Outcome>,
validator: Arc<Validator<S, D>>,
doppelganger_service: Option<Arc<OperatorDoppelgangerService>>,
}

impl<S: SlotClock + 'static, D: DutiesProvider> NetworkMessageReceiver<S, D> {
#[allow(clippy::too_many_arguments)]
pub fn new(
processor: processor::Senders,
qbft_manager: Arc<QbftManager>,
Expand All @@ -43,6 +46,7 @@ impl<S: SlotClock + 'static, D: DutiesProvider> NetworkMessageReceiver<S, D> {
is_synced: watch::Receiver<bool>,
outcome_tx: mpsc::Sender<Outcome>,
validator: Arc<Validator<S, D>>,
doppelganger_service: Option<Arc<OperatorDoppelgangerService>>,
) -> Arc<Self> {
Arc::new(Self {
processor,
Expand All @@ -52,6 +56,7 @@ impl<S: SlotClock + 'static, D: DutiesProvider> NetworkMessageReceiver<S, D> {
is_synced,
outcome_tx,
validator,
doppelganger_service,
})
}
}
Expand Down Expand Up @@ -159,6 +164,21 @@ impl<S: SlotClock + 'static, D: DutiesProvider> MessageReceiver
}
}

// Check for operator doppelgänger before processing any message
if let Some(service) = &receiver.doppelganger_service {
// If in monitoring mode, check for twin and drop message
if service.is_monitoring() {
// Extract QBFT message for detailed logging if twin detected
let qbft_msg = match &ssv_message {
ValidatedSSVMessage::QbftMessage(msg) => Some(msg),
ValidatedSSVMessage::PartialSignatureMessages(_) => None,
};
service.check_message(&signed_ssv_message, qbft_msg);
// Drop message during monitoring period - don't process

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Observability suggestion: Consider adding metrics or periodic logging for messages dropped during monitoring.

This would help operators understand:

  • That monitoring is active
  • How many duties are being skipped
  • When monitoring period ends

Example:

if service.is_monitoring() {
    // ... existing check_message logic ...
    
    // Optional: track dropped messages
    metrics::inc_counter(&metrics::DOPPELGANGER_MESSAGES_DROPPED);
    return;
}

Not critical, but useful for operational visibility.

return;
}
}

match ssv_message {
ValidatedSSVMessage::QbftMessage(qbft_message) => {
if let Err(err) = receiver
Expand Down
1 change: 1 addition & 0 deletions anchor/message_sender/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ database = { workspace = true }
ethereum_ssz = { workspace = true }
message_validator = { workspace = true }
openssl = { workspace = true }
operator_doppelganger = { workspace = true }
processor = { workspace = true }
slot_clock = { workspace = true }
ssv_types = { workspace = true }
Expand Down
Loading