Skip to content

Commit d2eaa28

Browse files
authored
fix: Handle historical bug where messages were not correctly revoked (#503)
Fix for processing blocks when #484 was active. Also supports future protocol version upgrades via EngineVersion.
1 parent af11dae commit d2eaa28

File tree

11 files changed

+398
-60
lines changed

11 files changed

+398
-60
lines changed

src/consensus/proposer.rs

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
use crate::core::types::{
2-
proto, Address, Height, ShardHash, ShardId, SnapchainShard, FARCASTER_EPOCH,
3-
};
1+
use crate::core::types::{proto, Address, Height, ShardHash, ShardId, SnapchainShard};
2+
use crate::core::util::FarcasterTime;
43
use crate::proto::{
54
full_proposal, Block, BlockHeader, Commits, FullProposal, ShardChunk, ShardChunkWitness,
65
ShardHeader, ShardWitness,
@@ -9,6 +8,7 @@ use crate::storage::store::engine::{BlockEngine, ShardEngine, ShardStateChange};
98
use crate::storage::store::stores::Stores;
109
use crate::storage::store::BlockStorageError;
1110
use crate::utils::statsd_wrapper::StatsdClientWrapper;
11+
use crate::version::version::EngineVersion;
1212
use informalsystems_malachitebft_core_types::{Round, Validity};
1313
use prost::Message;
1414
use std::collections::{BTreeMap, HashMap};
@@ -23,14 +23,6 @@ pub const PROTOCOL_VERSION: u32 = 1;
2323
pub const GENESIS_MESSAGE: &str =
2424
"It occurs to me that our survival may depend upon our talking to one another.";
2525

26-
pub fn current_time() -> u64 {
27-
std::time::SystemTime::now()
28-
.duration_since(std::time::UNIX_EPOCH)
29-
.unwrap()
30-
.as_secs()
31-
- (FARCASTER_EPOCH / 1000)
32-
}
33-
3426
#[allow(async_fn_in_trait)] // TODO
3527
pub trait Proposer {
3628
// Create a new block/shard chunk for the given height that will be proposed for confirmation to the other validators
@@ -167,12 +159,12 @@ impl Proposer for ShardProposer {
167159
None => vec![0, 32],
168160
};
169161

170-
let state_change = self
171-
.engine
172-
.propose_state_change(self.shard_id.shard_id(), messages);
162+
let state_change =
163+
self.engine
164+
.propose_state_change(self.shard_id.shard_id(), messages, None);
173165
let shard_header = ShardHeader {
174166
parent_hash,
175-
timestamp: current_time(),
167+
timestamp: state_change.timestamp.into(),
176168
height: Some(height.clone()),
177169
shard_root: state_change.new_state_root.clone(),
178170
};
@@ -205,7 +197,11 @@ impl Proposer for ShardProposer {
205197
let height = header.height.unwrap();
206198
self.proposed_chunks
207199
.add_proposed_value(full_proposal.clone());
208-
let receive_delay = current_time().saturating_sub(header.timestamp);
200+
let timestamp = FarcasterTime::new(header.timestamp);
201+
let receive_delay = FarcasterTime::current()
202+
.to_u64()
203+
.saturating_sub(timestamp.to_u64());
204+
let version = EngineVersion::version_for(&timestamp);
209205
self.statsd_client.gauge_with_shard(
210206
self.shard_id.shard_id(),
211207
"proposer.receive_delay",
@@ -225,6 +221,8 @@ impl Proposer for ShardProposer {
225221

226222
let state = ShardStateChange {
227223
shard_id: height.shard_index,
224+
timestamp,
225+
version,
228226
new_state_root: header.shard_root.clone(),
229227
transactions: chunk.transactions.clone(),
230228
events: vec![],
@@ -469,7 +467,7 @@ impl Proposer for BlockProposer {
469467
parent_hash,
470468
chain_id: self.network as i32,
471469
version: PROTOCOL_VERSION,
472-
timestamp: current_time(),
470+
timestamp: FarcasterTime::current().into(),
473471
height: Some(height.clone()),
474472
shard_witnesses_hash: witness_hash,
475473
};

src/core/util.rs

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,45 @@
11
use crate::core::error::HubError;
22
use crate::core::types::FARCASTER_EPOCH;
33

4+
#[derive(Clone, Debug)]
5+
pub struct FarcasterTime {
6+
time: u64, // seconds since the farcaster epoch
7+
}
8+
9+
impl FarcasterTime {
10+
pub fn current() -> Self {
11+
let time = std::time::SystemTime::now()
12+
.duration_since(std::time::UNIX_EPOCH)
13+
.unwrap()
14+
.as_secs()
15+
- (FARCASTER_EPOCH / 1000);
16+
FarcasterTime { time }
17+
}
18+
19+
pub fn new(time: u64) -> Self {
20+
FarcasterTime { time }
21+
}
22+
23+
pub fn from_unix_seconds(time: u64) -> Self {
24+
let time = time - (FARCASTER_EPOCH / 1000);
25+
FarcasterTime { time }
26+
}
27+
28+
pub fn to_unix_seconds(&self) -> u64 {
29+
self.time + (FARCASTER_EPOCH / 1000)
30+
}
31+
32+
pub fn to_u64(&self) -> u64 {
33+
self.time
34+
}
35+
}
36+
37+
impl Into<u64> for FarcasterTime {
38+
fn into(self) -> u64 {
39+
self.time
40+
}
41+
}
42+
443
#[allow(dead_code)]
544
pub fn to_farcaster_time(time_ms: u64) -> Result<u64, HubError> {
645
if time_ms < FARCASTER_EPOCH {
@@ -26,10 +65,6 @@ pub fn from_farcaster_time(time: u64) -> u64 {
2665
time * 1000 + FARCASTER_EPOCH
2766
}
2867

29-
pub fn farcaster_time_to_unix_seconds(time: u64) -> u64 {
30-
time + (FARCASTER_EPOCH / 1000)
31-
}
32-
3368
#[allow(dead_code)]
3469
pub fn get_farcaster_time() -> Result<u64, HubError> {
3570
let now = std::time::SystemTime::now()

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ pub mod node;
99
pub mod perf;
1010
pub mod storage;
1111
pub mod utils;
12+
pub mod version;
1213

1314
mod tests;
1415

src/mempool/mempool.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::{
1111
use tokio::sync::{broadcast, mpsc, oneshot};
1212

1313
use crate::core::error::HubError;
14-
use crate::core::util::farcaster_time_to_unix_seconds;
14+
use crate::core::util::FarcasterTime;
1515
use crate::proto::OnChainEventType;
1616
use crate::{
1717
core::types::SnapchainValidatorContext,
@@ -171,7 +171,7 @@ impl proto::Message {
171171
// TODO: Consider revisiting choice of timestamp here as backdated messages currently are prioritized.
172172
return MempoolKey::new(
173173
MempoolMessageKind::UserMessage,
174-
farcaster_time_to_unix_seconds(data.timestamp as u64),
174+
FarcasterTime::new(data.timestamp as u64).to_unix_seconds(),
175175
self.hex_hash(),
176176
);
177177
}

src/perf/engine_only_perftest.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ pub async fn run() -> Result<(), Box<dyn Error>> {
9999
}
100100

101101
let messages = engine.pull_messages(Duration::from_millis(50)).await?;
102-
let state_change = engine.propose_state_change(1, messages);
102+
let state_change = engine.propose_state_change(1, messages, None);
103103

104104
let valid = engine.validate_state_change(&state_change);
105105
assert!(valid);

src/perf/perftest.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1+
use crate::core::util::FarcasterTime;
12
use crate::perf::gen_single::SingleUser;
23
use crate::perf::generate::{new_generator, GeneratorTypes};
34
use crate::perf::{gen_single, generate};
45
use crate::proto;
6+
use crate::proto::admin_service_client::AdminServiceClient;
57
use crate::proto::hub_service_client::HubServiceClient;
68
use crate::proto::Block;
79
use crate::utils::cli::follow_blocks;
810
use crate::utils::cli::send_on_chain_event;
9-
use crate::{consensus::proposer::current_time, proto::admin_service_client::AdminServiceClient};
1011
use clap::Parser;
1112
use figment::{
1213
providers::{Env, Format, Toml},
@@ -181,7 +182,7 @@ pub async fn run() -> Result<(), Box<dyn Error>> {
181182

182183
let start = Instant::now();
183184
let mut stats_calculation_timer = time::interval(cfg.stats_calculation_interval);
184-
let start_time = current_time();
185+
let start_time = FarcasterTime::current().to_u64();
185186
let mut block_count = 0;
186187
let mut num_messages_confirmed = 0;
187188
let mut num_messages_submitted = 0;

src/storage/store/engine.rs

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use super::account::UsernameProofStore;
22
use super::account::{IntoU8, OnchainEventStorageError, UserDataStore};
33
use crate::core::error::HubError;
44
use crate::core::types::Height;
5+
use crate::core::util::FarcasterTime;
56
use crate::core::validations;
67
use crate::core::validations::verification;
78
use crate::mempool::mempool::MempoolMessagesRequest;
@@ -17,6 +18,7 @@ use crate::storage::store::BlockStore;
1718
use crate::storage::trie;
1819
use crate::storage::trie::merkle_trie;
1920
use crate::utils::statsd_wrapper::StatsdClientWrapper;
21+
use crate::version::version::{EngineVersion, ProtocolFeature};
2022
use informalsystems_malachitebft_core_types::Round;
2123
use itertools::Itertools;
2224
use merkle_trie::TrieKey;
@@ -122,9 +124,11 @@ impl MempoolMessage {
122124
#[derive(Clone)]
123125
pub struct ShardStateChange {
124126
pub shard_id: u32,
127+
pub timestamp: FarcasterTime,
125128
pub new_state_root: Vec<u8>,
126129
pub transactions: Vec<Transaction>,
127130
pub events: Vec<HubEvent>,
131+
pub version: EngineVersion,
128132
}
129133

130134
#[derive(Clone)]
@@ -279,18 +283,21 @@ impl ShardEngine {
279283
txn_batch: &mut RocksDbTransactionBatch,
280284
shard_id: u32,
281285
messages: Vec<MempoolMessage>,
286+
timestamp: &FarcasterTime,
282287
) -> Result<ShardStateChange, EngineError> {
283288
self.count("prepare_proposal.recv_messages", messages.len() as u64);
284289

285290
let mut snapchain_txns = self.create_transactions_from_mempool(messages)?;
286291
let mut events = vec![];
287292
let mut validation_error_count = 0;
293+
let version = EngineVersion::version_for(timestamp);
288294
for snapchain_txn in &mut snapchain_txns {
289295
let (account_root, txn_events, validation_errors) = self.replay_snapchain_txn(
290296
trie_ctx,
291297
&snapchain_txn,
292298
txn_batch,
293299
ProposalSource::Propose,
300+
version,
294301
)?;
295302
snapchain_txn.account_root = account_root;
296303
events.extend(txn_events);
@@ -310,6 +317,8 @@ impl ShardEngine {
310317
let new_root_hash = self.stores.trie.root_hash()?;
311318
let result = ShardStateChange {
312319
shard_id,
320+
timestamp: timestamp.clone(),
321+
version,
313322
new_state_root: new_root_hash.clone(),
314323
transactions: snapchain_txns,
315324
events,
@@ -375,6 +384,7 @@ impl ShardEngine {
375384
&mut self,
376385
shard: u32,
377386
messages: Vec<MempoolMessage>,
387+
timestamp: Option<FarcasterTime>,
378388
) -> ShardStateChange {
379389
let now = std::time::Instant::now();
380390
let mut txn = RocksDbTransactionBatch::new();
@@ -386,12 +396,14 @@ impl ShardEngine {
386396
count_fn("trie.mem_get_count.total", read_count.1);
387397
count_fn("trie.mem_get_count.for_propose", read_count.1);
388398
};
399+
let timestamp = timestamp.unwrap_or_else(FarcasterTime::current);
389400
let result = self
390401
.prepare_proposal(
391402
&merkle_trie::Context::with_callback(count_callback),
392403
&mut txn,
393404
shard,
394405
messages,
406+
&timestamp,
395407
)
396408
.unwrap(); //TODO: don't unwrap()
397409

@@ -470,6 +482,7 @@ impl ShardEngine {
470482
transactions: &[Transaction],
471483
shard_root: &[u8],
472484
source: ProposalSource,
485+
version: EngineVersion,
473486
) -> Result<Vec<HubEvent>, EngineError> {
474487
let now = std::time::Instant::now();
475488
let mut events = vec![];
@@ -500,8 +513,13 @@ impl ShardEngine {
500513
}
501514

502515
for snapchain_txn in transactions {
503-
let (account_root, txn_events, _) =
504-
self.replay_snapchain_txn(trie_ctx, snapchain_txn, txn_batch, source.clone())?;
516+
let (account_root, txn_events, _) = self.replay_snapchain_txn(
517+
trie_ctx,
518+
snapchain_txn,
519+
txn_batch,
520+
source.clone(),
521+
version,
522+
)?;
505523
// Reject early if account roots fail to match (shard roots will definitely fail)
506524
if &account_root != &snapchain_txn.account_root {
507525
warn!(
@@ -544,6 +562,7 @@ impl ShardEngine {
544562
snapchain_txn: &Transaction,
545563
txn_batch: &mut RocksDbTransactionBatch,
546564
source: ProposalSource,
565+
version: EngineVersion,
547566
) -> Result<(Vec<u8>, Vec<HubEvent>, Vec<MessageValidationError>), EngineError> {
548567
let now = std::time::Instant::now();
549568
let total_user_messages = snapchain_txn.user_messages.len();
@@ -577,8 +596,7 @@ impl ShardEngine {
577596
system_messages_count += 1;
578597
match &onchain_event.body {
579598
Some(proto::on_chain_event::Body::SignerEventBody(signer_event)) => {
580-
if signer_event.event_type == proto::SignerEventType::Remove as i32
581-
{
599+
if Self::should_revoke_signer(&signer_event, version) {
582600
revoked_signers.insert(signer_event.key.clone());
583601
}
584602
}
@@ -1145,6 +1163,7 @@ impl ShardEngine {
11451163
transactions,
11461164
shard_root,
11471165
ProposalSource::Validate,
1166+
shard_state_change.version,
11481167
);
11491168

11501169
match proposal_result {
@@ -1279,20 +1298,17 @@ impl ShardEngine {
12791298
"No valid cached transaction to apply. Replaying proposal"
12801299
);
12811300
// If we need to replay, reset the sequence number on the event id generator, just in case
1282-
let block_number = &shard_chunk
1283-
.header
1284-
.as_ref()
1285-
.unwrap()
1286-
.height
1287-
.unwrap()
1288-
.block_number;
1289-
self.stores.event_handler.set_current_height(*block_number);
1301+
let header = &shard_chunk.header.as_ref().unwrap();
1302+
let block_number = header.height.unwrap().block_number;
1303+
self.stores.event_handler.set_current_height(block_number);
1304+
let version = EngineVersion::version_for(&FarcasterTime::new(header.timestamp));
12901305
match self.replay_proposal(
12911306
trie_ctx,
12921307
&mut txn,
12931308
transactions,
12941309
shard_root,
12951310
ProposalSource::Commit,
1311+
version,
12961312
) {
12971313
Err(err) => {
12981314
error!("State change commit failed: {}", err);
@@ -1316,11 +1332,13 @@ impl ShardEngine {
13161332
system_messages: vec![],
13171333
user_messages: vec![message.clone()],
13181334
};
1335+
let version = EngineVersion::version_for(&FarcasterTime::current());
13191336
let result = self.replay_snapchain_txn(
13201337
&merkle_trie::Context::new(),
13211338
&snapchain_txn,
13221339
&mut txn,
13231340
ProposalSource::Simulate,
1341+
version,
13241342
);
13251343

13261344
match result {
@@ -1490,6 +1508,14 @@ impl ShardEngine {
14901508
pub fn trie_num_items(&mut self) -> usize {
14911509
self.stores.trie.items().unwrap()
14921510
}
1511+
1512+
fn should_revoke_signer(signer_event: &proto::SignerEventBody, version: EngineVersion) -> bool {
1513+
// When this bug was active, we did not revoke any signers, so, always return false
1514+
if version.is_enabled(ProtocolFeature::SignerRevokeBug) {
1515+
return false;
1516+
}
1517+
signer_event.event_type == proto::SignerEventType::Remove as i32
1518+
}
14931519
}
14941520

14951521
pub struct BlockEngine {

0 commit comments

Comments
 (0)