Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 84 additions & 72 deletions modules/accounts_state/src/accounts_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ use drep_distribution_publisher::DRepDistributionPublisher;
mod spo_distribution_publisher;
use spo_distribution_publisher::SPODistributionPublisher;
mod state;
use state::State;
mod snapshot;
mod rewards;
use state::{LiveStakeAddressState, State};
mod monetary;
mod rest;
mod rewards;
mod snapshot;
use acropolis_common::queries::accounts::{
AccountInfo, AccountsStateQuery, AccountsStateQueryResponse,
};
use rest::{handle_drdd, handle_pots, handle_spdd};
use rest::handle_pots;

use crate::state::StakeAddressState;

const DEFAULT_SPO_STATE_TOPIC: &str = "cardano.spo.state";
const DEFAULT_EPOCH_ACTIVITY_TOPIC: &str = "cardano.epoch.activity";
Expand All @@ -40,9 +42,7 @@ const DEFAULT_DREP_DISTRIBUTION_TOPIC: &str = "cardano.drep.distribution";
const DEFAULT_SPO_DISTRIBUTION_TOPIC: &str = "cardano.spo.distribution";
const DEFAULT_PROTOCOL_PARAMETERS_TOPIC: &str = "cardano.protocol.parameters";

const DEFAULT_HANDLE_SPDD_TOPIC: (&str, &str) = ("handle-topic-spdd", "rest.get.spdd");
const DEFAULT_HANDLE_POTS_TOPIC: (&str, &str) = ("handle-topic-pots", "rest.get.pots");
const DEFAULT_HANDLE_DRDD_TOPIC: (&str, &str) = ("handle-topic-drdd", "rest.get.drdd");

/// Accounts State module
#[module(
Expand All @@ -56,6 +56,7 @@ impl AccountsState {
/// Async run loop
async fn run(
history: Arc<Mutex<StateHistory<State>>>,
live: Arc<LiveStakeAddressState>,
mut drep_publisher: DRepDistributionPublisher,
mut spo_publisher: SPODistributionPublisher,
mut spos_subscription: Box<dyn Subscription<Message>>,
Expand Down Expand Up @@ -96,6 +97,7 @@ impl AccountsState {
}

if let Some(block_info) = current_block {
state.snapshot_from_live(&live);
history.lock().await.commit(&block_info, state);
}
}
Expand All @@ -119,12 +121,13 @@ impl AccountsState {
// Handle rollbacks on this topic only
if block_info.status == BlockStatus::RolledBack {
state = history.lock().await.get_rolled_back_state(&block_info);
state.restore_live_from_snapshot(&live);
}

current_block = Some(block_info.clone());
block_info.new_epoch && block_info.epoch > 0
}
_ => false
_ => false,
};

// Read from epoch-boundary messages only when it's a new epoch
Expand All @@ -138,17 +141,21 @@ impl AccountsState {
let (_, message) = dreps_message_f.await?;
match message.as_ref() {
Message::Cardano((block_info, CardanoMessage::DRepState(dreps_msg))) => {
let span = info_span!("account_state.handle_drep_state",
block = block_info.number);
let span = info_span!(
"account_state.handle_drep_state",
block = block_info.number
);
async {
Self::check_sync(&current_block, &block_info);
state.handle_drep_state(&dreps_msg);

let drdd = state.generate_drdd();
let drdd = state.generate_drdd(&live);
if let Err(e) = drep_publisher.publish_drdd(block_info, drdd).await {
error!("Error publishing drep voting stake distribution: {e:#}")
}
}.instrument(span).await;
}
.instrument(span)
.await;
}

_ => error!("Unexpected message type: {message:?}"),
Expand All @@ -158,20 +165,22 @@ impl AccountsState {
let (_, message) = spos_message_f.await?;
match message.as_ref() {
Message::Cardano((block_info, CardanoMessage::SPOState(spo_msg))) => {
let span = info_span!("account_state.handle_spo_state",
block = block_info.number);
let span =
info_span!("account_state.handle_spo_state", block = block_info.number);
async {
Self::check_sync(&current_block, &block_info);
state
.handle_spo_state(spo_msg)
.inspect_err(|e| error!("SPOState handling error: {e:#}"))
.ok();

let spdd = state.generate_spdd();
let spdd = State::generate_spdd(&live);
if let Err(e) = spo_publisher.publish_spdd(block_info, spdd).await {
error!("Error publishing SPO stake distribution: {e:#}")
}
}.instrument(span).await;
}
.instrument(span)
.await;
}

_ => error!("Unexpected message type: {message:?}"),
Expand All @@ -181,16 +190,20 @@ impl AccountsState {
let (_, message) = ea_message_f.await?;
match message.as_ref() {
Message::Cardano((block_info, CardanoMessage::EpochActivity(ea_msg))) => {
let span = info_span!("account_state.handle_epoch_activity",
block = block_info.number);
let span = info_span!(
"account_state.handle_epoch_activity",
block = block_info.number
);
async {
Self::check_sync(&current_block, &block_info);
state
.handle_epoch_activity(ea_msg)
.handle_epoch_activity(&live, ea_msg)
.await
.inspect_err(|e| error!("EpochActivity handling error: {e:#}"))
.ok();
}.instrument(span).await;
}
.instrument(span)
.await;
}

_ => error!("Unexpected message type: {message:?}"),
Expand All @@ -201,8 +214,10 @@ impl AccountsState {
let (_, message) = params_message_f.await?;
match message.as_ref() {
Message::Cardano((block_info, CardanoMessage::ProtocolParams(params_msg))) => {
let span = info_span!("account_state.handle_parameters",
block = block_info.number);
let span = info_span!(
"account_state.handle_parameters",
block = block_info.number
);
async {
Self::check_sync(&current_block, &block_info);
if let Some(ref block) = current_block {
Expand All @@ -219,7 +234,9 @@ impl AccountsState {
.handle_parameters(params_msg)
.inspect_err(|e| error!("Messaging handling error: {e}"))
.ok();
}.instrument(span).await;
}
.instrument(span)
.await;
}

_ => error!("Unexpected message type: {message:?}"),
Expand All @@ -233,11 +250,13 @@ impl AccountsState {
async {
Self::check_sync(&current_block, &block_info);
state
.handle_tx_certificates(tx_certs_msg)
.handle_tx_certificates(&live, tx_certs_msg)
.inspect_err(|e| error!("TxCertificates handling error: {e:#}"))
.ok();
}.instrument(span).await;
}
}
.instrument(span)
.await;
}

_ => error!("Unexpected message type: {certs_message:?}"),
}
Expand All @@ -246,15 +265,19 @@ impl AccountsState {
let (_, message) = withdrawals_message_f.await?;
match message.as_ref() {
Message::Cardano((block_info, CardanoMessage::Withdrawals(withdrawals_msg))) => {
let span = info_span!("account_state.handle_withdrawals",
block = block_info.number);
let span = info_span!(
"account_state.handle_withdrawals",
block = block_info.number
);
async {
Self::check_sync(&current_block, &block_info);
state
.handle_withdrawals(withdrawals_msg)
.handle_withdrawals(&live, withdrawals_msg)
.inspect_err(|e| error!("Withdrawals handling error: {e:#}"))
.ok();
}.instrument(span).await;
}
.instrument(span)
.await;
}

_ => error!("Unexpected message type: {message:?}"),
Expand All @@ -264,22 +287,27 @@ impl AccountsState {
let (_, message) = stake_message_f.await?;
match message.as_ref() {
Message::Cardano((block_info, CardanoMessage::StakeAddressDeltas(deltas_msg))) => {
let span = info_span!("account_state.handle_stake_deltas",
block = block_info.number);
let span = info_span!(
"account_state.handle_stake_deltas",
block = block_info.number
);
async {
Self::check_sync(&current_block, &block_info);
state
.handle_stake_deltas(deltas_msg)
.handle_stake_deltas(&live, deltas_msg)
.inspect_err(|e| error!("StakeAddressDeltas handling error: {e:#}"))
.ok();
}.instrument(span).await;
}
.instrument(span)
.await;
}

_ => error!("Unexpected message type: {message:?}"),
}

// Commit the new state
if let Some(block_info) = current_block {
state.snapshot_from_live(&live);
history.lock().await.commit(&block_info, state);
}
}
Expand All @@ -292,7 +320,8 @@ impl AccountsState {
error!(
expected = block.number,
actual = actual.number,
"Messages out of sync");
"Messages out of sync"
);
}
}
}
Expand Down Expand Up @@ -347,31 +376,23 @@ impl AccountsState {
.unwrap_or(DEFAULT_SPO_DISTRIBUTION_TOPIC.to_string());

// REST handler topics
let handle_spdd_topic = config
.get_string(DEFAULT_HANDLE_SPDD_TOPIC.0)
.unwrap_or(DEFAULT_HANDLE_SPDD_TOPIC.1.to_string());
info!("Creating request handler on '{}'", handle_spdd_topic);

let handle_pots_topic = config
.get_string(DEFAULT_HANDLE_POTS_TOPIC.0)
.unwrap_or(DEFAULT_HANDLE_POTS_TOPIC.1.to_string());
info!("Creating request handler on '{}'", handle_pots_topic);

let handle_drdd_topic = config
.get_string(DEFAULT_HANDLE_DRDD_TOPIC.0)
.unwrap_or(DEFAULT_HANDLE_DRDD_TOPIC.1.to_string());
info!("Creating request handler on '{}'", handle_drdd_topic);

// Create history
let history = Arc::new(Mutex::new(StateHistory::<State>::new("AccountsState")));
let history_account_single = history.clone();
let history_spdd = history.clone();
let history_pots = history.clone();
let history_drdd = history.clone();
let history_tick = history.clone();

// Create live state
let live = Arc::new(LiveStakeAddressState::default());
let live_handle = live.clone();
let live_run = live.clone();

context.handle("accounts-state", move |message| {
let history = history_account_single.clone();
let live = live_handle.clone();
async move {
let Message::StateQuery(StateQuery::Accounts(query)) = message.as_ref() else {
return Arc::new(Message::StateQueryResponse(StateQueryResponse::Accounts(
Expand All @@ -381,24 +402,20 @@ impl AccountsState {
)));
};

let guard = history.lock().await;
let state = match guard.current() {
Some(s) => s,
None => {
return Arc::new(Message::StateQueryResponse(
StateQueryResponse::Accounts(AccountsStateQueryResponse::NotFound),
));
}
};

let response = match query {
AccountsStateQuery::GetAccountInfo { stake_key } => {
if let Some(account) = state.get_stake_state(stake_key) {
let account: Option<StakeAddressState> = {
let map = live.handle();
map.get::<Vec<u8>>(stake_key.as_ref()) // &Vec<u8>
.map(|acc| acc.as_ref().clone())
};

if let Some(acc) = account {
AccountsStateQueryResponse::AccountInfo(AccountInfo {
utxo_value: account.utxo_value,
rewards: account.rewards,
delegated_spo: account.delegated_spo.clone(),
delegated_drep: account.delegated_drep.clone(),
utxo_value: acc.utxo_value,
rewards: acc.rewards,
delegated_spo: acc.delegated_spo,
delegated_drep: acc.delegated_drep,
})
} else {
AccountsStateQueryResponse::NotFound
Expand All @@ -417,18 +434,10 @@ impl AccountsState {
}
});

handle_rest(context.clone(), &handle_spdd_topic, move || {
handle_spdd(history_spdd.clone())
});

handle_rest(context.clone(), &handle_pots_topic, move || {
handle_pots(history_pots.clone())
});

handle_rest(context.clone(), &handle_drdd_topic, move || {
handle_drdd(history_drdd.clone())
});

// Ticker to log stats
let mut tick_subscription = context.subscribe("clock.tick").await?;
context.clone().run(async move {
Expand All @@ -443,7 +452,9 @@ impl AccountsState {
if let Some(state) = history_tick.lock().await.current() {
state.tick().await.inspect_err(|e| error!("Tick error: {e}")).ok();
}
}.instrument(span).await;
}
.instrument(span)
.await;
}
}
}
Expand All @@ -468,6 +479,7 @@ impl AccountsState {
context.run(async move {
Self::run(
history,
live_run,
drep_publisher,
spo_publisher,
spos_subscription,
Expand Down
Loading