Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
14 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ futures-util = "0.3.31"
gasket = { git = "https://github.com/construkts/gasket-rs.git", features = ["derive"] }
hex = "0.4.3"
itertools = "0.14.0"
pallas = { git = "https://github.com/txpipe/pallas.git", features = ["phase-two"] }
pallas = { git = "https://github.com/txpipe/pallas.git", features = ["phase2", "wallet"] }
protoc-wkt = "1.0.0"
serde = { version = "1.0.217", features = ["derive"] }
thiserror = "2.0.11"
Expand All @@ -39,3 +39,5 @@ async-stream = "0.3.6"
tokio-stream = "0.1.17"
rand = "0.9.0"
prost = "0.13.5"
vaultrs = "0.7.4"
bip39 = "2.1.0"
10 changes: 7 additions & 3 deletions src/ledger/u5c/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use std::{collections::HashMap, pin::Pin, str::FromStr, vec};
use std::{
collections::{BTreeMap, HashMap},
pin::Pin,
str::FromStr,
vec,
};

use anyhow::bail;
use async_stream::stream;
use futures::{Stream, TryStreamExt};
use pallas::{
codec::utils::KeyValuePairs,
crypto::hash::Hash,
interop::utxorpc::spec::{
cardano::{CostModel, Tx},
Expand Down Expand Up @@ -322,7 +326,7 @@ fn map_conway_pparams(pparams: &Params) -> ConwayProtParams {
.unwrap_or(CostModel { values: vec![] })
.values,
),
unknown: KeyValuePairs::from(vec![]),
unknown: BTreeMap::new(),
},
execution_costs: ExUnitPrices {
mem_price: primitives::RationalNumber {
Expand Down
4 changes: 3 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod network;
mod pipeline;
mod queue;
mod server;
mod signing;
mod storage;
mod validation;

Expand Down Expand Up @@ -56,7 +57,7 @@ async fn main() -> Result<()> {
Arc::clone(&cursor_storage),
);
let server = server::run(
config.server,
config,
u5c_data_adapter.clone(),
Arc::clone(&tx_storage),
Arc::clone(&tx_chaining),
Expand All @@ -76,6 +77,7 @@ struct Config {
#[serde(default)]
queues: HashSet<queue::Config>,
u5c: ledger::u5c::Config,
signing: signing::Config,
}

impl Config {
Expand Down
29 changes: 25 additions & 4 deletions src/pipeline/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use tokio::time::sleep;
use tracing::info;

use super::CAP;
use crate::signing::SigningAdapter;
use crate::validation::{evaluate_tx, validate_tx};
use crate::Config;
use crate::{
ledger::u5c::U5cDataAdapter,
queue::priority::Priority,
Expand All @@ -20,6 +22,8 @@ pub struct Stage {
storage: Arc<SqliteTransaction>,
priority: Arc<Priority>,
u5c_adapter: Arc<dyn U5cDataAdapter>,
secret_adapter: Arc<dyn SigningAdapter>,
config: Config,
pub output: OutputPort<Vec<u8>>,
}

Expand All @@ -28,11 +32,15 @@ impl Stage {
storage: Arc<SqliteTransaction>,
priority: Arc<Priority>,
u5c_adapter: Arc<dyn U5cDataAdapter>,
secret_adapter: Arc<dyn SigningAdapter>,
config: Config,
) -> Self {
Self {
storage,
priority,
u5c_adapter,
secret_adapter,
config,
output: Default::default(),
}
}
Expand Down Expand Up @@ -78,6 +86,20 @@ impl gasket::framework::Worker<Stage> for Worker {
) -> Result<(), WorkerError> {
for tx in unit {
let mut tx = tx.clone();

let should_sign = stage
.config
.queues
.get(&tx.queue)
.map(|config| config.server_signing)
.unwrap_or(false);

if should_sign {
info!("Signing transaction {} with server key", tx.id);
tx.raw = stage.secret_adapter.sign(tx.raw).await.or_retry()?;
info!("Transaction {} signed successfully", tx.id);
}

let metx = MultiEraTx::decode(&tx.raw).map_err(|_| WorkerError::Recv)?;

if let Err(e) = validate_tx(&metx, stage.u5c_adapter.clone()).await {
Expand All @@ -100,7 +122,6 @@ impl gasket::framework::Worker<Stage> for Worker {
info!("Failed to broadcast transaction: {}", e);
} else {
info!("Transaction {} broadcasted to receivers", tx.id);

let tip = stage.u5c_adapter.fetch_tip().await.or_retry()?;
tx.status = TransactionStatus::InFlight;
tx.slot = Some(tip.0);
Expand All @@ -114,12 +135,12 @@ impl gasket::framework::Worker<Stage> for Worker {

#[cfg(test)]
mod ingest_tests {
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::str::FromStr;
use std::sync::Arc;

use anyhow::Ok;
use pallas::codec::utils::KeyValuePairs;

use pallas::crypto::hash::Hash;
use pallas::ledger::primitives::conway::{
CostModels, DRepVotingThresholds, PoolVotingThresholds,
Expand Down Expand Up @@ -420,7 +441,7 @@ mod ingest_tests {
20744, 32, 25933, 32, 24623, 32, 43053543, 10, 53384111, 14333, 10, 43574283,
26308, 10,
]),
unknown: KeyValuePairs::from(vec![]),
unknown: BTreeMap::new(),
},
execution_costs: ExUnitPrices {
mem_price: RationalNumber {
Expand Down
6 changes: 5 additions & 1 deletion src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
},
network::peer_manager::PeerManager,
queue::priority::Priority,
signing::hashicorp::HashicorpVaultClient,
storage::{
sqlite::{SqliteCursor, SqliteTransaction},
Cursor,
Expand Down Expand Up @@ -46,12 +47,15 @@ pub async fn run(

let peer_manager = Arc::new(peer_manager);

let priority = Arc::new(Priority::new(tx_storage.clone(), config.queues));
let priority = Arc::new(Priority::new(tx_storage.clone(), config.queues.clone()));
let secret_adapter = Arc::new(HashicorpVaultClient::new(config.signing.clone())?);

let mut ingest = ingest::Stage::new(
tx_storage.clone(),
priority.clone(),
u5c_data_adapter.clone(),
secret_adapter,
config.clone(),
);

ingest.output.connect(sender);
Expand Down
9 changes: 9 additions & 0 deletions src/queue/chaining.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ mod chaining_tests {
name: "banana".into(),
weight: 1,
chained: true,
server_signing: false,
}]);

let chaining = TxChaining::new(Arc::clone(&storage), queues);
Expand Down Expand Up @@ -206,6 +207,7 @@ mod chaining_tests {
name: "banana".into(),
weight: 1,
chained: true,
server_signing: false,
}]);

let chaining = TxChaining::new(Arc::clone(&storage), queues);
Expand Down Expand Up @@ -253,11 +255,13 @@ mod chaining_tests {
name: "banana".into(),
weight: 1,
chained: true,
server_signing: false,
},
Config {
name: "orange".into(),
weight: 1,
chained: true,
server_signing: false,
},
]);

Expand Down Expand Up @@ -305,6 +309,7 @@ mod chaining_tests {
name: "banana".into(),
weight: 1,
chained: true,
server_signing: false,
}]);

let chaining = TxChaining::new(Arc::clone(&storage), queues);
Expand Down Expand Up @@ -340,6 +345,7 @@ mod chaining_tests {
name: "banana".into(),
weight: 1,
chained: true,
server_signing: false,
}]);

let chaining = TxChaining::new(Arc::clone(&storage), queues);
Expand All @@ -356,6 +362,7 @@ mod chaining_tests {
name: "banana".into(),
weight: 1,
chained: false,
server_signing: false,
}]);

let chaining = TxChaining::new(Arc::clone(&storage), queues);
Expand All @@ -372,6 +379,7 @@ mod chaining_tests {
name: "banana".into(),
weight: 1,
chained: true,
server_signing: false,
}]);

let chaining = TxChaining::new(Arc::clone(&storage), queues);
Expand Down Expand Up @@ -406,6 +414,7 @@ mod chaining_tests {
name: "banana".into(),
weight: 1,
chained: true,
server_signing: false,
}]);

let chaining = TxChaining::new(Arc::clone(&storage), queues);
Expand Down
2 changes: 2 additions & 0 deletions src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ pub struct Config {
pub weight: u8,
#[serde(default)]
pub chained: bool,
pub server_signing: bool,
}
impl Default for Config {
fn default() -> Self {
Self {
name: DEFAULT_QUEUE.into(),
weight: DEFAULT_WEIGHT,
chained: false,
server_signing: false,
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/queue/priority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,19 @@ mod priority_tests {
name: "default".into(),
weight: 1,
chained: false,
server_signing: false,
},
Config {
name: "banana".into(),
weight: 2,
chained: false,
server_signing: false,
},
Config {
name: "orange".into(),
weight: 1,
chained: false,
server_signing: false,
},
]),
);
Expand Down Expand Up @@ -155,6 +158,7 @@ mod priority_tests {
name: "default".into(),
weight: 1,
chained: false,
server_signing: false,
}]),
);

Expand Down
11 changes: 6 additions & 5 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ use tonic::transport::Server;
use tracing::{error, info};

use crate::{
ledger::u5c::U5cDataAdapter, queue::chaining::TxChaining,
storage::sqlite::SqliteTransaction,
ledger::u5c::U5cDataAdapter, queue::chaining::TxChaining, storage::sqlite::SqliteTransaction,
Config as BorosConfig,
};

mod submit;

pub async fn run(
config: Config,
config: BorosConfig,
u5c_adapter: Arc<dyn U5cDataAdapter>,
tx_storage: Arc<SqliteTransaction>,
tx_chaining: Arc<TxChaining>,
Expand All @@ -30,19 +30,20 @@ pub async fn run(
Arc::clone(&tx_storage),
Arc::clone(&tx_chaining),
Arc::clone(&u5c_adapter),
config.queues,
);
let submit_service =
spec::submit::submit_service_server::SubmitServiceServer::new(submit_service);

info!(
address = config.listen_address.to_string(),
address = config.server.listen_address.to_string(),
"GRPC server running"
);

let result = Server::builder()
.add_service(reflection)
.add_service(submit_service)
.serve(config.listen_address)
.serve(config.server.listen_address)
.await;

if let Err(error) = result {
Expand Down
Loading