diff --git a/Cargo.toml b/Cargo.toml index 176db9f..e6a41fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ futures-util = "0.3.31" gasket = { git = "https://github.com/construkts/gasket-rs.git", features = ["derive"] } hex = "0.4.3" itertools = "0.14.0" -pallas = { git = "https://github.com/txpipe/pallas.git", features = ["phase-two"] } +pallas = { git = "https://github.com/txpipe/pallas.git", features = ["phase2", "wallet"] } protoc-wkt = "1.0.0" serde = { version = "1.0.217", features = ["derive"] } thiserror = "2.0.11" @@ -39,3 +39,5 @@ async-stream = "0.3.6" tokio-stream = "0.1.17" rand = "0.9.0" prost = "0.13.5" +vaultrs = "0.7.4" +bip39 = "2.1.0" diff --git a/src/ledger/u5c/mod.rs b/src/ledger/u5c/mod.rs index 2abfb3a..1931b44 100644 --- a/src/ledger/u5c/mod.rs +++ b/src/ledger/u5c/mod.rs @@ -1,10 +1,14 @@ -use std::{collections::HashMap, pin::Pin, str::FromStr, vec}; +use std::{ + collections::{BTreeMap, HashMap}, + pin::Pin, + str::FromStr, + vec, +}; use anyhow::bail; use async_stream::stream; use futures::{Stream, TryStreamExt}; use pallas::{ - codec::utils::KeyValuePairs, crypto::hash::Hash, interop::utxorpc::spec::{ cardano::{CostModel, Tx}, @@ -322,7 +326,7 @@ fn map_conway_pparams(pparams: &Params) -> ConwayProtParams { .unwrap_or(CostModel { values: vec![] }) .values, ), - unknown: KeyValuePairs::from(vec![]), + unknown: BTreeMap::new(), }, execution_costs: ExUnitPrices { mem_price: primitives::RationalNumber { diff --git a/src/main.rs b/src/main.rs index a8896a0..d39a2f7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,6 +16,7 @@ mod network; mod pipeline; mod queue; mod server; +mod signing; mod storage; mod validation; @@ -56,7 +57,7 @@ async fn main() -> Result<()> { Arc::clone(&cursor_storage), ); let server = server::run( - config.server, + config, u5c_data_adapter.clone(), Arc::clone(&tx_storage), Arc::clone(&tx_chaining), @@ -76,6 +77,7 @@ struct Config { #[serde(default)] queues: HashSet, u5c: ledger::u5c::Config, + signing: signing::Config, } impl Config { diff --git a/src/pipeline/ingest.rs b/src/pipeline/ingest.rs index 1d64e90..e663f6c 100644 --- a/src/pipeline/ingest.rs +++ b/src/pipeline/ingest.rs @@ -7,7 +7,9 @@ use tokio::time::sleep; use tracing::info; use super::CAP; +use crate::signing::SigningAdapter; use crate::validation::{evaluate_tx, validate_tx}; +use crate::Config; use crate::{ ledger::u5c::U5cDataAdapter, queue::priority::Priority, @@ -20,6 +22,8 @@ pub struct Stage { storage: Arc, priority: Arc, u5c_adapter: Arc, + secret_adapter: Arc, + config: Config, pub output: OutputPort>, } @@ -28,11 +32,15 @@ impl Stage { storage: Arc, priority: Arc, u5c_adapter: Arc, + secret_adapter: Arc, + config: Config, ) -> Self { Self { storage, priority, u5c_adapter, + secret_adapter, + config, output: Default::default(), } } @@ -78,6 +86,20 @@ impl gasket::framework::Worker for Worker { ) -> Result<(), WorkerError> { for tx in unit { let mut tx = tx.clone(); + + let should_sign = stage + .config + .queues + .get(&tx.queue) + .map(|config| config.server_signing) + .unwrap_or(false); + + if should_sign { + info!("Signing transaction {} with server key", tx.id); + tx.raw = stage.secret_adapter.sign(tx.raw).await.or_retry()?; + info!("Transaction {} signed successfully", tx.id); + } + let metx = MultiEraTx::decode(&tx.raw).map_err(|_| WorkerError::Recv)?; if let Err(e) = validate_tx(&metx, stage.u5c_adapter.clone()).await { @@ -100,7 +122,6 @@ impl gasket::framework::Worker for Worker { info!("Failed to broadcast transaction: {}", e); } else { info!("Transaction {} broadcasted to receivers", tx.id); - let tip = stage.u5c_adapter.fetch_tip().await.or_retry()?; tx.status = TransactionStatus::InFlight; tx.slot = Some(tip.0); @@ -114,12 +135,12 @@ impl gasket::framework::Worker for Worker { #[cfg(test)] mod ingest_tests { - use std::collections::HashMap; + use std::collections::{BTreeMap, HashMap}; use std::str::FromStr; use std::sync::Arc; use anyhow::Ok; - use pallas::codec::utils::KeyValuePairs; + use pallas::crypto::hash::Hash; use pallas::ledger::primitives::conway::{ CostModels, DRepVotingThresholds, PoolVotingThresholds, @@ -420,7 +441,7 @@ mod ingest_tests { 20744, 32, 25933, 32, 24623, 32, 43053543, 10, 53384111, 14333, 10, 43574283, 26308, 10, ]), - unknown: KeyValuePairs::from(vec![]), + unknown: BTreeMap::new(), }, execution_costs: ExUnitPrices { mem_price: RationalNumber { diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs index 93061d9..6401c87 100644 --- a/src/pipeline/mod.rs +++ b/src/pipeline/mod.rs @@ -10,6 +10,7 @@ use crate::{ }, network::peer_manager::PeerManager, queue::priority::Priority, + signing::hashicorp::HashicorpVaultClient, storage::{ sqlite::{SqliteCursor, SqliteTransaction}, Cursor, @@ -46,12 +47,15 @@ pub async fn run( let peer_manager = Arc::new(peer_manager); - let priority = Arc::new(Priority::new(tx_storage.clone(), config.queues)); + let priority = Arc::new(Priority::new(tx_storage.clone(), config.queues.clone())); + let secret_adapter = Arc::new(HashicorpVaultClient::new(config.signing.clone())?); let mut ingest = ingest::Stage::new( tx_storage.clone(), priority.clone(), u5c_data_adapter.clone(), + secret_adapter, + config.clone(), ); ingest.output.connect(sender); diff --git a/src/queue/chaining.rs b/src/queue/chaining.rs index 4c6ab00..5b3f096 100644 --- a/src/queue/chaining.rs +++ b/src/queue/chaining.rs @@ -176,6 +176,7 @@ mod chaining_tests { name: "banana".into(), weight: 1, chained: true, + server_signing: false, }]); let chaining = TxChaining::new(Arc::clone(&storage), queues); @@ -206,6 +207,7 @@ mod chaining_tests { name: "banana".into(), weight: 1, chained: true, + server_signing: false, }]); let chaining = TxChaining::new(Arc::clone(&storage), queues); @@ -253,11 +255,13 @@ mod chaining_tests { name: "banana".into(), weight: 1, chained: true, + server_signing: false, }, Config { name: "orange".into(), weight: 1, chained: true, + server_signing: false, }, ]); @@ -305,6 +309,7 @@ mod chaining_tests { name: "banana".into(), weight: 1, chained: true, + server_signing: false, }]); let chaining = TxChaining::new(Arc::clone(&storage), queues); @@ -340,6 +345,7 @@ mod chaining_tests { name: "banana".into(), weight: 1, chained: true, + server_signing: false, }]); let chaining = TxChaining::new(Arc::clone(&storage), queues); @@ -356,6 +362,7 @@ mod chaining_tests { name: "banana".into(), weight: 1, chained: false, + server_signing: false, }]); let chaining = TxChaining::new(Arc::clone(&storage), queues); @@ -372,6 +379,7 @@ mod chaining_tests { name: "banana".into(), weight: 1, chained: true, + server_signing: false, }]); let chaining = TxChaining::new(Arc::clone(&storage), queues); @@ -406,6 +414,7 @@ mod chaining_tests { name: "banana".into(), weight: 1, chained: true, + server_signing: false, }]); let chaining = TxChaining::new(Arc::clone(&storage), queues); diff --git a/src/queue/mod.rs b/src/queue/mod.rs index 639e893..0804ce9 100644 --- a/src/queue/mod.rs +++ b/src/queue/mod.rs @@ -15,6 +15,7 @@ pub struct Config { pub weight: u8, #[serde(default)] pub chained: bool, + pub server_signing: bool, } impl Default for Config { fn default() -> Self { @@ -22,6 +23,7 @@ impl Default for Config { name: DEFAULT_QUEUE.into(), weight: DEFAULT_WEIGHT, chained: false, + server_signing: false, } } } diff --git a/src/queue/priority.rs b/src/queue/priority.rs index b101b99..a2e0408 100644 --- a/src/queue/priority.rs +++ b/src/queue/priority.rs @@ -113,16 +113,19 @@ mod priority_tests { name: "default".into(), weight: 1, chained: false, + server_signing: false, }, Config { name: "banana".into(), weight: 2, chained: false, + server_signing: false, }, Config { name: "orange".into(), weight: 1, chained: false, + server_signing: false, }, ]), ); @@ -155,6 +158,7 @@ mod priority_tests { name: "default".into(), weight: 1, chained: false, + server_signing: false, }]), ); diff --git a/src/server/mod.rs b/src/server/mod.rs index 4fbc721..ec024ca 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -7,14 +7,14 @@ use tonic::transport::Server; use tracing::{error, info}; use crate::{ - ledger::u5c::U5cDataAdapter, queue::chaining::TxChaining, - storage::sqlite::SqliteTransaction, + ledger::u5c::U5cDataAdapter, queue::chaining::TxChaining, storage::sqlite::SqliteTransaction, + Config as BorosConfig, }; mod submit; pub async fn run( - config: Config, + config: BorosConfig, u5c_adapter: Arc, tx_storage: Arc, tx_chaining: Arc, @@ -30,19 +30,20 @@ pub async fn run( Arc::clone(&tx_storage), Arc::clone(&tx_chaining), Arc::clone(&u5c_adapter), + config.queues, ); let submit_service = spec::submit::submit_service_server::SubmitServiceServer::new(submit_service); info!( - address = config.listen_address.to_string(), + address = config.server.listen_address.to_string(), "GRPC server running" ); let result = Server::builder() .add_service(reflection) .add_service(submit_service) - .serve(config.listen_address) + .serve(config.server.listen_address) .await; if let Err(error) = result { diff --git a/src/server/submit.rs b/src/server/submit.rs index 12ee709..eca468e 100644 --- a/src/server/submit.rs +++ b/src/server/submit.rs @@ -1,4 +1,4 @@ -use std::{pin::Pin, sync::Arc}; +use std::{collections::HashSet, pin::Pin, sync::Arc}; use pallas::ledger::traverse::MultiEraTx; use spec::boros::v1::submit::{ @@ -8,11 +8,11 @@ use spec::boros::v1::submit::{ use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, Stream}; use tonic::{Response, Status}; -use tracing::{error, info}; +use tracing::{error, info, warn}; use crate::{ ledger::u5c::U5cDataAdapter, - queue::chaining::TxChaining, + queue::{chaining::TxChaining, Config, DEFAULT_QUEUE}, storage::{sqlite::SqliteTransaction, Transaction}, validation::{evaluate_tx, validate_tx}, }; @@ -21,6 +21,7 @@ pub struct SubmitServiceImpl { tx_storage: Arc, tx_chaining: Arc, u5c_adapter: Arc, + queues: HashSet, } impl SubmitServiceImpl { @@ -28,11 +29,13 @@ impl SubmitServiceImpl { tx_storage: Arc, tx_chaining: Arc, u5c_adapter: Arc, + queues: HashSet, ) -> Self { Self { tx_storage, tx_chaining, u5c_adapter, + queues, } } } @@ -55,30 +58,39 @@ impl SubmitService for SubmitServiceImpl { Status::failed_precondition(format!("invalid tx at index {idx}")) })?; - if let Err(error) = validate_tx(&metx, self.u5c_adapter.clone()).await { - error!(?error); - continue; - } + let hash = metx.hash(); - if let Err(error) = evaluate_tx(&metx, self.u5c_adapter.clone()).await { - error!(?error); - continue; - } + let should_validate = tx + .queue + .as_ref() + .and_then(|queue_name| { + self.queues.get(queue_name).or_else(|| { + warn!(queue = ?queue_name, "Queue not found, using default queue"); + self.queues.iter().find(|q| q.name == *DEFAULT_QUEUE) + }) + }) + .is_none_or(|config| !config.server_signing); + + if should_validate { + if let Err(error) = validate_tx(&metx, self.u5c_adapter.clone()).await { + error!(?error); + continue; + } - let hash = metx.hash(); + if let Err(error) = evaluate_tx(&metx, self.u5c_adapter.clone()).await { + error!(?error); + continue; + } + } - hashes.push(hash.to_vec().into()); let mut tx_storage = Transaction::new(hash.to_string(), tx.raw.to_vec()); if let Some(queue) = tx.queue { if self.tx_chaining.is_chained_queue(&queue) { chained_queues.push(queue.clone()); - if !self - .tx_chaining - .is_valid_token(&queue, &tx.lock_token.unwrap_or_default()) - .await - { + let lock_token = tx.lock_token.unwrap_or_default(); + if !self.tx_chaining.is_valid_token(&queue, &lock_token).await { return Err(Status::permission_denied("invalid lock token")); } } @@ -86,6 +98,7 @@ impl SubmitService for SubmitServiceImpl { tx_storage.queue = queue; } + hashes.push(hash.to_vec().into()); txs.push(tx_storage) } diff --git a/src/signing/hashicorp.rs b/src/signing/hashicorp.rs new file mode 100644 index 0000000..91ea4bc --- /dev/null +++ b/src/signing/hashicorp.rs @@ -0,0 +1,71 @@ +use bip39::Mnemonic; +use pallas::{crypto::key::ed25519::Signature, ledger::traverse::MultiEraTx}; +use vaultrs::{ + client::{VaultClient, VaultClientSettingsBuilder}, + kv2, +}; + +use super::{ + key::{ + derive::get_ed25519_keypair, + sign::{sign_transaction, to_built_transaction}, + }, + Config as VaultConfig, Secret, SigningAdapter, SigningError, +}; + +pub struct HashicorpVaultClient { + client: VaultClient, + config: VaultConfig, +} + +impl HashicorpVaultClient { + pub fn new(config: VaultConfig) -> Result { + let client_settings = VaultClientSettingsBuilder::default() + .address(&config.api_addr) + .token(&config.token) + .build() + .map_err(|e| SigningError::Config(e.to_string()))?; + + let client = VaultClient::new(client_settings).map_err(SigningError::Client)?; + + Ok(Self { client, config }) + } + + async fn get_mnemonic(&self, key: &str) -> Result { + let secret: Secret = kv2::read(&self.client, "secret", &self.config.path) + .await + .map_err(SigningError::Client)?; + + let value = secret.values.get(key).ok_or_else(|| { + SigningError::SecretNotFound(format!("Key {} not found in secret", key)) + })?; + + let mnemonic = serde_json::from_value(value.clone())?; + Ok(mnemonic) + } +} + +#[async_trait::async_trait] +impl SigningAdapter for HashicorpVaultClient { + async fn sign(&self, data: Vec) -> Result, SigningError> { + let mnemonic = self.get_mnemonic(&self.config.key).await?; + + let metx = MultiEraTx::decode(&data) + .map_err(|e| SigningError::Decoding(format!("Failed to decode transaction: {}", e)))?; + + let built_tx = to_built_transaction(&metx); + let signed_tx = sign_transaction(built_tx, &mnemonic); + + Ok(signed_tx.tx_bytes.0) + } + + async fn verify(&self, data: Vec, signature: Vec) -> Result { + let mnemonic = self.get_mnemonic(&self.config.key).await?; + let (_, public_key) = get_ed25519_keypair(&mnemonic); + + let signature = Signature::try_from(signature.as_slice()) + .map_err(|e| SigningError::Decoding(format!("Failed to decode signature: {}", e)))?; + + Ok(public_key.verify(data, &signature)) + } +} diff --git a/src/signing/key/derive.rs b/src/signing/key/derive.rs new file mode 100644 index 0000000..66dea8f --- /dev/null +++ b/src/signing/key/derive.rs @@ -0,0 +1,79 @@ +use bip39::Mnemonic; +use pallas::{ + crypto::{hash::Hasher, key::ed25519::PublicKey}, + ledger::addresses::{ + Address, Network, ShelleyAddress, ShelleyDelegationPart, ShelleyPaymentPart, + }, + wallet::keystore::{ + hd::{Bip32PrivateKey, Bip32PublicKey}, + PrivateKey, + }, +}; + +pub fn get_ed25519_keypair(mnemonic: &Mnemonic) -> (PrivateKey, PublicKey) { + let account_key = generate_account_key(mnemonic); + let (private_key, _) = generate_payment_keypair(&account_key); + + to_ed25519_keypair(&private_key) +} + +pub fn generate_account_key(mnemonic: &Mnemonic) -> Bip32PrivateKey { + let root_key = Bip32PrivateKey::from_bip39_mnenomic(mnemonic.to_string(), "".into()).unwrap(); + root_key + .derive(1852 | 0x80000000) + .derive(1815 | 0x80000000) + .derive(0x80000000) +} + +pub fn generate_payment_keypair( + account_key: &Bip32PrivateKey, +) -> (Bip32PrivateKey, Bip32PublicKey) { + let external_key = account_key.derive(0); + let private_key = external_key.derive(0); + let public_key = private_key.to_public(); + (private_key, public_key) +} + +pub fn generate_delegation_keypair( + account_key: &Bip32PrivateKey, +) -> (Bip32PrivateKey, Bip32PublicKey) { + let delegation_key = account_key.derive(2); + let private_key = delegation_key.derive(0); + let public_key = private_key.to_public(); + (private_key, public_key) +} + +pub fn generate_address(public_key: &Bip32PublicKey) -> Address { + let payment_hash = Hasher::<224>::hash(&public_key.as_bytes()[..32]); + let address = ShelleyAddress::new( + Network::Testnet, + ShelleyPaymentPart::key_hash(payment_hash), + ShelleyDelegationPart::Null, + ); + + Address::Shelley(address) +} + +pub fn generate_address_with_delegation( + account_key: &Bip32PrivateKey, + public_key: &Bip32PublicKey, +) -> Address { + let payment_hash = Hasher::<224>::hash(public_key.as_bytes().as_slice()); + + let (_, public_key) = generate_delegation_keypair(account_key); + let delegation_hash = Hasher::<224>::hash(public_key.as_bytes().as_slice()); + + let address = ShelleyAddress::new( + Network::Testnet, + ShelleyPaymentPart::key_hash(payment_hash), + ShelleyDelegationPart::key_hash(delegation_hash), + ); + + Address::Shelley(address) +} + +pub fn to_ed25519_keypair(private_key: &Bip32PrivateKey) -> (PrivateKey, PublicKey) { + let private_key = private_key.to_ed25519_private_key(); + let public_key = private_key.public_key(); + (private_key, public_key) +} diff --git a/src/signing/key/mod.rs b/src/signing/key/mod.rs new file mode 100644 index 0000000..f4fc9e7 --- /dev/null +++ b/src/signing/key/mod.rs @@ -0,0 +1,3 @@ +#[allow(unused)] +pub mod derive; +pub mod sign; diff --git a/src/signing/key/sign.rs b/src/signing/key/sign.rs new file mode 100644 index 0000000..c12e48d --- /dev/null +++ b/src/signing/key/sign.rs @@ -0,0 +1,30 @@ +use bip39::Mnemonic; +use pallas::{ + ledger::traverse::MultiEraTx, + txbuilder::{BuildConway, BuiltTransaction, Bytes, Bytes32, StagingTransaction}, +}; + +use super::derive::get_ed25519_keypair; + +pub fn to_built_transaction(tx: &MultiEraTx) -> BuiltTransaction { + let staging_tx = StagingTransaction::new(); + let built_tx = staging_tx.build_conway_raw().unwrap(); + + BuiltTransaction { + version: built_tx.version, + era: built_tx.era, + status: built_tx.status, + tx_hash: Bytes32(*tx.hash()), + tx_bytes: Bytes(tx.encode()), + signatures: None, + } +} + +pub fn sign_transaction(built_tx: BuiltTransaction, mnemonic: &Mnemonic) -> BuiltTransaction { + let (signing_key, _) = get_ed25519_keypair(mnemonic); + let signed_tx = built_tx.sign(signing_key); + match signed_tx { + Ok(tx) => tx, + _ => panic!("Failed to sign transaction"), + } +} diff --git a/src/signing/mod.rs b/src/signing/mod.rs new file mode 100644 index 0000000..3585e13 --- /dev/null +++ b/src/signing/mod.rs @@ -0,0 +1,47 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use thiserror::Error; + +pub mod hashicorp; +pub mod key; + +#[derive(Error, Debug)] +pub enum SigningError { + #[error("Vault client error: {0}")] + Client(#[from] vaultrs::error::ClientError), + + #[error("Configuration error: {0}")] + Config(String), + + #[error("Secret not found error: {0}")] + SecretNotFound(String), + + #[error("Deserialization error: {0}")] + Deserialization(#[from] serde_json::Error), + + #[error("Decoding error: {0}")] + Decoding(String), +} + +#[derive(Deserialize, Clone)] +pub struct Config { + pub api_addr: String, + pub token: String, + pub path: String, + pub key: String, +} + +#[derive(Serialize, Deserialize, Debug)] +struct Secret { + #[serde(flatten)] + values: HashMap, +} + +#[async_trait::async_trait] +pub trait SigningAdapter: Send + Sync { + async fn sign(&self, data: Vec) -> Result, SigningError>; + #[allow(unused)] // to make clippy happy only + async fn verify(&self, data: Vec, signature: Vec) -> Result; +} diff --git a/src/validation/mod.rs b/src/validation/mod.rs index 97fae19..4510a63 100644 --- a/src/validation/mod.rs +++ b/src/validation/mod.rs @@ -6,8 +6,8 @@ use pallas::{ primitives::TransactionInput, traverse::{wellknown::GenesisValues, MultiEraInput, MultiEraOutput, MultiEraTx}, validate::{ - phase_one, phase_two, - uplc::{script_context::SlotConfig, EvalReport}, + phase1, + phase2::{self, script_context::SlotConfig, EvalReport}, utils::{AccountState, CertState, Environment, UTxOs}, }, }, @@ -61,7 +61,7 @@ pub async fn validate_tx( pallas_utxos.insert(input, output); } - phase_one::validate_tx(tx, 0, &env, &pallas_utxos, &mut CertState::default())?; + phase1::validate_tx(tx, 0, &env, &pallas_utxos, &mut CertState::default())?; Ok(()) } @@ -89,7 +89,7 @@ pub async fn evaluate_tx( .map(|((tx_hash, index), eracbor)| (From::from((*tx_hash, *index)), eracbor.clone())) .collect(); - let report = phase_two::evaluate_tx(tx, &pparams, &utxos, &slot_config) + let report = phase2::evaluate_tx(tx, &pparams, &utxos, &slot_config) .map_err(|e| anyhow::anyhow!("Error evaluating transaction: {:?}", e))?; Ok(report)