From c8b99b70b4feb12b5d72249880743cf5d72d6c4c Mon Sep 17 00:00:00 2001 From: paulobressan Date: Thu, 22 May 2025 18:55:27 -0300 Subject: [PATCH 1/3] feat: refactor server to support trp and implemented trp health method --- Cargo.toml | 10 ++-- src/main.rs | 26 +++++++-- src/server/grpc/mod.rs | 60 +++++++++++++++++++++ src/server/{ => grpc}/submit.rs | 0 src/server/mod.rs | 93 ++++++++++++++++++-------------- src/server/trp/methods.rs | 6 +++ src/server/trp/mod.rs | 69 ++++++++++++++++++++++++ src/server/utxorpc.rs | 95 --------------------------------- 8 files changed, 217 insertions(+), 142 deletions(-) create mode 100644 src/server/grpc/mod.rs rename src/server/{ => grpc}/submit.rs (100%) create mode 100644 src/server/trp/methods.rs create mode 100644 src/server/trp/mod.rs delete mode 100644 src/server/utxorpc.rs diff --git a/Cargo.toml b/Cargo.toml index 66df410..3a46162 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,9 +28,7 @@ protoc-wkt = "1.0.0" serde = { version = "1.0.217", features = ["derive"] } thiserror = "2.0.11" sqlx = { version = "0.8.3", features = ["runtime-tokio-rustls", "sqlite", "chrono"] } -tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread", "time"] } -tonic = { version = "0.12.3", features = ["transport", "tls", "tls-webpki-roots", "tls-roots"] } -tonic-reflection = "0.12.3" +tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread", "time", "signal"] } tracing = "0.1.41" tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } futures = "0.3.31" @@ -41,3 +39,9 @@ rand = "0.9.0" prost = "0.13.5" vaultrs = "0.7.4" bip39 = "2.1.0" +tonic = { version = "0.12.3", features = ["transport", "tls", "tls-webpki-roots", "tls-roots"] } +tower-http = { version = "0.6.4", features = ["cors", "trace"]} +tonic-reflection = { version = "0.12.3" } +jsonrpsee = { version = "0.25.1", features = ["server"] } +tower = "0.5.2" +tokio-util = { version = "0.7.15", features = ["rt"] } diff --git a/src/main.rs b/src/main.rs index eec63ba..a874739 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,7 +8,8 @@ use queue::{chaining::TxChaining, DEFAULT_QUEUE}; use serde::Deserialize; use storage::sqlite::{SqliteCursor, SqliteStorage, SqliteTransaction}; use tokio::try_join; -use tracing::Level; +use tokio_util::sync::CancellationToken; +use tracing::{debug, Level}; use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; mod ledger; @@ -34,6 +35,8 @@ async fn main() -> Result<()> { .with(env_filter) .init(); + let cancellation_token = cancellation_token(); + let config = Config::new().expect("invalid config file"); let storage = Arc::new(SqliteStorage::new(path::Path::new(&config.storage.db_path)).await?); @@ -56,11 +59,13 @@ async fn main() -> Result<()> { Arc::clone(&tx_storage), Arc::clone(&cursor_storage), ); - let server = server::run( - config, + let server = server::serve( + config.server.clone(), + config.queues.clone(), u5c_data_adapter.clone(), Arc::clone(&tx_storage), Arc::clone(&tx_chaining), + cancellation_token.clone(), ); try_join!(pipeline, server)?; @@ -68,6 +73,21 @@ async fn main() -> Result<()> { Ok(()) } +fn cancellation_token() -> CancellationToken { + let cancel = CancellationToken::new(); + + let cancel2 = cancel.clone(); + tokio::spawn(async move { + tokio::signal::ctrl_c() + .await + .expect("failed to listen for Ctrl+C"); + debug!("shutdown signal received"); + cancel2.cancel(); + }); + + cancel +} + #[derive(Deserialize, Clone)] struct Config { server: server::Config, diff --git a/src/server/grpc/mod.rs b/src/server/grpc/mod.rs new file mode 100644 index 0000000..3ee28f9 --- /dev/null +++ b/src/server/grpc/mod.rs @@ -0,0 +1,60 @@ +use std::{collections::HashSet, net::SocketAddr, sync::Arc}; + +use anyhow::Result; +use serde::Deserialize; +use spec::boros::v1 as spec; +use tokio_util::sync::CancellationToken; +use tonic::transport::Server; +use tracing::info; + +use crate::{ + ledger::u5c::U5cDataAdapter, + queue::{self, chaining::TxChaining}, + storage::sqlite::SqliteTransaction, +}; + +mod submit; + +pub async fn run( + config: Config, + queues: HashSet, + u5c_adapter: Arc, + tx_storage: Arc, + tx_chaining: Arc, + cancellation_token: CancellationToken, +) -> Result<()> { + let reflection = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(protoc_wkt::google::protobuf::FILE_DESCRIPTOR_SET) + .register_encoded_file_descriptor_set(spec::submit::FILE_DESCRIPTOR_SET) + .build_v1alpha() + .unwrap(); + + let submit_service = submit::SubmitServiceImpl::new( + Arc::clone(&tx_storage), + Arc::clone(&tx_chaining), + Arc::clone(&u5c_adapter), + queues, + ); + let submit_service = + spec::submit::submit_service_server::SubmitServiceServer::new(submit_service); + + info!( + address = config.listen_address.to_string(), + "GRPC server running" + ); + + Server::builder() + .add_service(reflection) + .add_service(submit_service) + .serve_with_shutdown(config.listen_address, cancellation_token.cancelled()) + .await?; + + info!("gracefully shut down grpc"); + + Ok(()) +} + +#[derive(Deserialize, Clone)] +pub struct Config { + pub listen_address: SocketAddr, +} diff --git a/src/server/submit.rs b/src/server/grpc/submit.rs similarity index 100% rename from src/server/submit.rs rename to src/server/grpc/submit.rs diff --git a/src/server/mod.rs b/src/server/mod.rs index ec024ca..ae78bc4 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,61 +1,72 @@ -use std::{net::SocketAddr, sync::Arc}; +use std::{collections::HashSet, sync::Arc}; use anyhow::Result; +use futures::try_join; use serde::Deserialize; -use spec::boros::v1 as spec; -use tonic::transport::Server; -use tracing::{error, info}; +use tokio_util::sync::CancellationToken; use crate::{ - ledger::u5c::U5cDataAdapter, queue::chaining::TxChaining, storage::sqlite::SqliteTransaction, - Config as BorosConfig, + ledger::u5c::U5cDataAdapter, + queue::{self, chaining::TxChaining}, + storage::sqlite::SqliteTransaction, }; -mod submit; +mod grpc; +mod trp; -pub async fn run( - config: BorosConfig, +pub async fn serve( + config: Config, + queues: HashSet, u5c_adapter: Arc, tx_storage: Arc, tx_chaining: Arc, + cancellation_token: CancellationToken, ) -> Result<()> { - tokio::spawn(async move { - let reflection = tonic_reflection::server::Builder::configure() - .register_encoded_file_descriptor_set(protoc_wkt::google::protobuf::FILE_DESCRIPTOR_SET) - .register_encoded_file_descriptor_set(spec::submit::FILE_DESCRIPTOR_SET) - .build_v1alpha() - .unwrap(); - - let submit_service = submit::SubmitServiceImpl::new( - Arc::clone(&tx_storage), - Arc::clone(&tx_chaining), - Arc::clone(&u5c_adapter), - config.queues, - ); - let submit_service = - spec::submit::submit_service_server::SubmitServiceServer::new(submit_service); - - info!( - address = config.server.listen_address.to_string(), - "GRPC server running" - ); - - let result = Server::builder() - .add_service(reflection) - .add_service(submit_service) - .serve(config.server.listen_address) - .await; - - if let Err(error) = result { - error!(?error); - std::process::exit(1); - } + let grpc_task = config.grpc.map(|cfg| { + let queues = queues.clone(); + let u5c_adapter = Arc::clone(&u5c_adapter); + let tx_storage = Arc::clone(&tx_storage); + let tx_chaining = Arc::clone(&tx_chaining); + let cancellation_token = cancellation_token.clone(); + + grpc::run( + cfg, + queues, + u5c_adapter, + tx_storage, + tx_chaining, + cancellation_token, + ) + }); + + let trp_task = config.trp.map(|cfg| { + let tx_storage = Arc::clone(&tx_storage); + let cancellation_token = cancellation_token.clone(); + + trp::run(cfg, tx_storage, cancellation_token) }); + try_join!( + async { + if let Some(task) = grpc_task { + return task.await; + } + + Ok(()) + }, + async { + if let Some(task) = trp_task { + return task.await; + } + Ok(()) + }, + )?; + Ok(()) } #[derive(Deserialize, Clone)] pub struct Config { - pub listen_address: SocketAddr, + pub grpc: Option, + pub trp: Option, } diff --git a/src/server/trp/methods.rs b/src/server/trp/methods.rs new file mode 100644 index 0000000..efc71c8 --- /dev/null +++ b/src/server/trp/methods.rs @@ -0,0 +1,6 @@ +use super::Context; + +pub fn health(_context: &Context) -> bool { + // TODO: add health check + true +} diff --git a/src/server/trp/mod.rs b/src/server/trp/mod.rs new file mode 100644 index 0000000..864456b --- /dev/null +++ b/src/server/trp/mod.rs @@ -0,0 +1,69 @@ +use std::{net::SocketAddr, sync::Arc}; + +use anyhow::{Error, Result}; +use jsonrpsee::server::{RpcModule, Server}; +use serde::Deserialize; +use tokio::try_join; +use tokio_util::sync::CancellationToken; +use tower::ServiceBuilder; +use tower_http::cors::CorsLayer; +use tracing::info; + +use crate::storage::sqlite::SqliteTransaction; + +mod methods; + +#[derive(Clone)] +pub struct Context {} + +pub async fn run( + config: Config, + _tx_storage: Arc, + cancellation_token: CancellationToken, +) -> Result<()> { + let cors_layer = if config.permissive_cors.unwrap_or_default() { + CorsLayer::permissive() + } else { + CorsLayer::new() + }; + + let middleware = ServiceBuilder::new().layer(cors_layer); + let server = Server::builder() + .set_http_middleware(middleware) + .build(config.listen_address) + .await?; + + let mut module = RpcModule::new(Context {}); + + module.register_method("health", |_, context, _| methods::health(context))?; + + info!( + address = config.listen_address.to_string(), + "TRP server running" + ); + + let handle = server.start(module); + + let server = async { + handle.clone().stopped().await; + Ok::<(), Error>(()) + }; + + let cancellation = async { + cancellation_token.cancelled().await; + info!("gracefully shuting down trp"); + let _ = handle.stop(); // Empty result with AlreadyStoppedError, can be ignored. + Ok::<(), Error>(()) + }; + + try_join!(server, cancellation)?; + + Ok(()) +} + +#[derive(Deserialize, Clone)] +pub struct Config { + pub listen_address: SocketAddr, + pub max_optimize_rounds: u8, + pub permissive_cors: Option, +} diff --git a/src/server/utxorpc.rs b/src/server/utxorpc.rs deleted file mode 100644 index f88f9eb..0000000 --- a/src/server/utxorpc.rs +++ /dev/null @@ -1,95 +0,0 @@ -use std::{pin::Pin, sync::Arc}; - -use futures_core::Stream; -use pallas::{ - interop::utxorpc::spec::submit::{WaitForTxResponse, *}, - ledger::traverse::MultiEraTx, -}; -use tonic::{Request, Response, Status}; -use tracing::{error, info}; - -use crate::storage::{sqlite::SqliteTransaction, Transaction}; - -pub struct SubmitServiceImpl { - tx_storage: Arc, -} - -impl SubmitServiceImpl { - pub fn new(tx_storage: Arc) -> Self { - Self { tx_storage } - } -} - -#[async_trait::async_trait] -impl submit_service_server::SubmitService for SubmitServiceImpl { - type WaitForTxStream = - Pin> + Send + 'static>>; - - type WatchMempoolStream = - Pin> + Send + 'static>>; - - async fn submit_tx( - &self, - request: Request, - ) -> Result, Status> { - let message = request.into_inner(); - - // TODO: validate a better structure to have this code. - - let mut txs: Vec = Vec::default(); - let mut hashes = vec![]; - - for (idx, tx_bytes) in message.tx.into_iter().flat_map(|x| x.r#type).enumerate() { - match tx_bytes { - any_chain_tx::Type::Raw(bytes) => { - let tx = MultiEraTx::decode(&bytes).map_err(|error| { - error!(?error); - Status::failed_precondition(format!("invalid tx at index {idx}")) - })?; - let hash = tx.hash(); - - hashes.push(hash.to_vec().into()); - txs.push(Transaction::new(hash.to_string(), bytes.to_vec())) - } - } - } - - let hashes_str: Vec = hashes.iter().map(hex::encode).collect(); - info!(?hashes_str, "submitting txs"); - - self.tx_storage.create(&txs).await.map_err(|error| { - error!(?error); - Status::internal("internal error") - })?; - - Ok(Response::new(SubmitTxResponse { r#ref: hashes })) - } - - async fn wait_for_tx( - &self, - _request: Request, - ) -> Result, Status> { - todo!() - } - - async fn read_mempool( - &self, - _request: tonic::Request, - ) -> Result, tonic::Status> { - Err(Status::unimplemented("read_mempool is not yet available")) - } - - async fn watch_mempool( - &self, - _request: tonic::Request, - ) -> Result, tonic::Status> { - todo!() - } - - async fn eval_tx( - &self, - _request: tonic::Request, - ) -> Result, Status> { - todo!() - } -} From 404987c1c3c12e14f762139dee3e6581a94fabcb Mon Sep 17 00:00:00 2001 From: paulobressan Date: Fri, 23 May 2025 19:23:08 -0300 Subject: [PATCH 2/3] feat: implemented trp resolve --- Cargo.toml | 4 +- src/server/trp/methods.rs | 86 +++++++++++++++++++++++++++++++++++++++ src/server/trp/mod.rs | 6 +++ 3 files changed, 94 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3a46162..3caed6b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ futures-util = "0.3.31" gasket = { git = "https://github.com/construkts/gasket-rs.git", features = ["derive"] } hex = "0.4.3" itertools = "0.14.0" -pallas = { version = "1.0.0-alpha.1", features = ["phase2", "wallet"] } +pallas = { version = "=1.0.0-alpha.1", features = ["phase2", "wallet"] } protoc-wkt = "1.0.0" serde = { version = "1.0.217", features = ["derive"] } thiserror = "2.0.11" @@ -42,6 +42,6 @@ bip39 = "2.1.0" tonic = { version = "0.12.3", features = ["transport", "tls", "tls-webpki-roots", "tls-roots"] } tower-http = { version = "0.6.4", features = ["cors", "trace"]} tonic-reflection = { version = "0.12.3" } -jsonrpsee = { version = "0.25.1", features = ["server"] } +jsonrpsee = { version = "0.25.1", features = ["server", "client"] } tower = "0.5.2" tokio-util = { version = "0.7.15", features = ["rt"] } diff --git a/src/server/trp/methods.rs b/src/server/trp/methods.rs index efc71c8..37d842e 100644 --- a/src/server/trp/methods.rs +++ b/src/server/trp/methods.rs @@ -1,5 +1,91 @@ +use std::sync::Arc; + +use jsonrpsee::{ + core::client::ClientT, + http_client::HttpClient, + types::{ErrorCode, ErrorObject, ErrorObjectOwned, Params}, +}; +use serde::{Deserialize, Serialize}; +use tracing::error; + use super::Context; +#[derive(Deserialize)] +pub enum Encoding { + Hex, + Base64, +} + +#[derive(Deserialize)] +pub struct TrpSubmitRequest { + pub encoding: Encoding, + pub payload: String, + pub version: String, +} + +#[derive(Serialize)] +pub struct TrpSubmitResponse { + pub hash: String, +} + +pub async fn trp_submit( + params: Params<'_>, + _context: Arc, +) -> Result { + let _request = params.parse::().map_err(|error| { + error!(?error); + ErrorObject::owned( + ErrorCode::InvalidParams.code(), + "invalid params", + Some(error.to_string()), + ) + })?; + + todo!() +} + +pub async fn trp_resolve( + params: Params<'_>, + _context: Arc, +) -> Result { + tracing::info!(method = "trp.resolve", "Received TRP request."); + + let client = HttpClient::builder() + .build("http://localhost:8000") + .map_err(|error| { + error!(?error); + ErrorObject::owned( + ErrorCode::InternalError.code(), + "Internal error", + Some("Internal error"), + ) + })?; + + let params = params.parse::().map_err(|error| { + error!(?error); + ErrorObject::owned( + ErrorCode::InvalidParams.code(), + "invalid params", + Some(error.to_string()), + ) + }); + let params = jsonrpsee::core::rpc_params!(params); + + let response = client + .request("trp.resolve", params) + .await + .map_err(|error| { + error!(?error); + ErrorObject::owned( + ErrorCode::InternalError.code(), + "failed to resolve", + Some(error.to_string()), + ) + })?; + + Ok(response) +} + pub fn health(_context: &Context) -> bool { // TODO: add health check true diff --git a/src/server/trp/mod.rs b/src/server/trp/mod.rs index 864456b..af2f8cf 100644 --- a/src/server/trp/mod.rs +++ b/src/server/trp/mod.rs @@ -35,6 +35,12 @@ pub async fn run( let mut module = RpcModule::new(Context {}); + module.register_async_method("trp.resolve", |params, context, _| async { + methods::trp_resolve(params, context).await + })?; + module.register_async_method("trp.submit", |params, context, _| async { + methods::trp_submit(params, context).await + })?; module.register_method("health", |_, context, _| methods::health(context))?; info!( From 8528abe901ca16c5359614829b2f09c13e5c3102 Mon Sep 17 00:00:00 2001 From: paulobressan Date: Mon, 26 May 2025 18:26:58 -0300 Subject: [PATCH 3/3] feat: implemented trp server proxy --- Cargo.toml | 1 + src/server/mod.rs | 12 +-- src/server/trp/methods.rs | 202 ++++++++++++++++++++++++++++++++------ src/server/trp/mod.rs | 34 +++++-- 4 files changed, 207 insertions(+), 42 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3caed6b..2534b4f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,3 +45,4 @@ tonic-reflection = { version = "0.12.3" } jsonrpsee = { version = "0.25.1", features = ["server", "client"] } tower = "0.5.2" tokio-util = { version = "0.7.15", features = ["rt"] } +base64 = "0.22.1" diff --git a/src/server/mod.rs b/src/server/mod.rs index ae78bc4..fadbe1e 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -29,34 +29,34 @@ pub async fn serve( let tx_chaining = Arc::clone(&tx_chaining); let cancellation_token = cancellation_token.clone(); - grpc::run( + tokio::spawn(grpc::run( cfg, queues, u5c_adapter, tx_storage, tx_chaining, cancellation_token, - ) + )) }); let trp_task = config.trp.map(|cfg| { let tx_storage = Arc::clone(&tx_storage); + let u5c_adapter = Arc::clone(&u5c_adapter); let cancellation_token = cancellation_token.clone(); - - trp::run(cfg, tx_storage, cancellation_token) + tokio::spawn(trp::run(cfg, tx_storage, u5c_adapter, cancellation_token)) }); try_join!( async { if let Some(task) = grpc_task { - return task.await; + return task.await?; } Ok(()) }, async { if let Some(task) = trp_task { - return task.await; + return task.await?; } Ok(()) }, diff --git a/src/server/trp/methods.rs b/src/server/trp/methods.rs index 37d842e..60d217b 100644 --- a/src/server/trp/methods.rs +++ b/src/server/trp/methods.rs @@ -1,26 +1,39 @@ use std::sync::Arc; +use base64::{prelude::BASE64_STANDARD, Engine}; use jsonrpsee::{ - core::client::ClientT, + core::{client::ClientT, params::ObjectParams}, http_client::HttpClient, types::{ErrorCode, ErrorObject, ErrorObjectOwned, Params}, }; +use pallas::ledger::traverse::MultiEraTx; use serde::{Deserialize, Serialize}; -use tracing::error; +use tracing::{error, info}; + +use crate::{ + storage::Transaction, + validation::{evaluate_tx, validate_tx}, +}; use super::Context; #[derive(Deserialize)] pub enum Encoding { + #[serde(rename = "hex")] Hex, + #[serde(rename = "base64")] Base64, } #[derive(Deserialize)] -pub struct TrpSubmitRequest { +pub struct TrpSubmitTxRequest { pub encoding: Encoding, pub payload: String, - pub version: String, +} + +#[derive(Deserialize)] +pub struct TrpSubmitRequest { + pub tx: TrpSubmitTxRequest, } #[derive(Serialize)] @@ -28,11 +41,31 @@ pub struct TrpSubmitResponse { pub hash: String, } -pub async fn trp_submit( +pub async fn trp_resolve( params: Params<'_>, - _context: Arc, + context: Arc, ) -> Result { - let _request = params.parse::().map_err(|error| { + tracing::info!(method = "trp.resolve", "Received TRP request."); + + let mut client_builder = HttpClient::builder(); + + if let Some(headers) = &context.config.server.headers { + let headermap = headers.try_into().unwrap(); + client_builder = client_builder.set_headers(headermap); + } + + let client = client_builder + .build(&context.config.server.uri) + .map_err(|error| { + error!(?error); + ErrorObject::owned( + ErrorCode::InternalError.code(), + "Internal error", + Some("Internal error"), + ) + })?; + + let params = params.parse::().map_err(|error| { error!(?error); ErrorObject::owned( ErrorCode::InvalidParams.code(), @@ -41,52 +74,165 @@ pub async fn trp_submit( ) })?; - todo!() + let object_params = match params { + serde_json::Value::Object(map) => { + let mut obj = ObjectParams::new(); + map.into_iter() + .for_each(|(key, value)| obj.insert(&key, value).unwrap()); + Ok(obj) + } + _ => { + error!("invalid params"); + Err(ErrorObject::owned( + ErrorCode::InvalidParams.code(), + "invalid params", + Some("params must be a json"), + )) + } + }?; + + let response = client + .request("trp.resolve", object_params) + .await + .map_err(|error| { + error!(?error); + ErrorObject::owned( + ErrorCode::InternalError.code(), + "failed to resolve", + Some(error.to_string()), + ) + })?; + + Ok(response) } -pub async fn trp_resolve( +pub async fn trp_submit( params: Params<'_>, - _context: Arc, + context: Arc, ) -> Result { - tracing::info!(method = "trp.resolve", "Received TRP request."); + tracing::info!(method = "trp.submit", "Received TRP request."); + + let request = params.parse::().map_err(|error| { + error!(?error); + ErrorObject::owned( + ErrorCode::InvalidParams.code(), + "invalid params", + Some(error.to_string()), + ) + })?; - let client = HttpClient::builder() - .build("http://localhost:8000") + let raw = match request.tx.encoding { + Encoding::Hex => hex::decode(request.tx.payload).map_err(|error| { + error!(?error); + ErrorObject::owned( + ErrorCode::ParseError.code(), + "invalid tx hex encoding", + Some(error.to_string()), + ) + })?, + Encoding::Base64 => BASE64_STANDARD + .decode(request.tx.payload) + .map_err(|error| { + error!(?error); + ErrorObject::owned( + ErrorCode::ParseError.code(), + "invalid tx base64 encoding", + Some(error.to_string()), + ) + })?, + }; + + let metx = MultiEraTx::decode(&raw).map_err(|error| { + error!(?error); + ErrorObject::owned( + ErrorCode::InvalidParams.code(), + "invalid tx", + Some(error.to_string()), + ) + })?; + + let hash = metx.hash(); + + if let Err(error) = validate_tx(&metx, context.u5c_adapter.clone()).await { + error!(?error); + return Err(ErrorObject::owned( + ErrorCode::InvalidRequest.code(), + "failed to submit tx", + Some(error.to_string()), + )); + } + + if let Err(error) = evaluate_tx(&metx, context.u5c_adapter.clone()).await { + error!(?error); + return Err(ErrorObject::owned( + ErrorCode::InvalidRequest.code(), + "failed to submit tx", + Some(error.to_string()), + )); + } + + let tx_storage = Transaction::new(hash.to_string(), raw.to_vec()); + + context + .tx_storage + .create(&vec![tx_storage]) + .await .map_err(|error| { error!(?error); ErrorObject::owned( - ErrorCode::InternalError.code(), - "Internal error", - Some("Internal error"), + ErrorCode::InvalidRequest.code(), + "internal error", + Some(error.to_string()), ) })?; - let params = params.parse::().map_err(|error| { + let hash = hex::encode(hash); + info!(?hash, "submitting tx"); + + let response = serde_json::to_value(TrpSubmitResponse { hash }).map_err(|error| { error!(?error); ErrorObject::owned( - ErrorCode::InvalidParams.code(), - "invalid params", + ErrorCode::InternalError.code(), + "transaction accepted, but error to encode response", Some(error.to_string()), ) - }); - let params = jsonrpsee::core::rpc_params!(params); + })?; + + Ok(response) +} + +pub async fn health(context: Arc) -> Result { + tracing::info!(method = "health", "Received TRP request."); + + let mut client_builder = HttpClient::builder(); + + if let Some(headers) = &context.config.server.headers { + let headermap = headers.try_into().unwrap(); + client_builder = client_builder.set_headers(headermap); + } + + let client = client_builder + .build(&context.config.server.uri) + .map_err(|error| { + error!(?error); + ErrorObject::owned( + ErrorCode::InternalError.code(), + "Internal error", + Some("Internal error"), + ) + })?; let response = client - .request("trp.resolve", params) + .request("health", ObjectParams::default()) .await .map_err(|error| { error!(?error); ErrorObject::owned( ErrorCode::InternalError.code(), - "failed to resolve", + "failed health check", Some(error.to_string()), ) })?; Ok(response) } - -pub fn health(_context: &Context) -> bool { - // TODO: add health check - true -} diff --git a/src/server/trp/mod.rs b/src/server/trp/mod.rs index af2f8cf..9c9231c 100644 --- a/src/server/trp/mod.rs +++ b/src/server/trp/mod.rs @@ -1,4 +1,4 @@ -use std::{net::SocketAddr, sync::Arc}; +use std::{collections::HashMap, net::SocketAddr, sync::Arc}; use anyhow::{Error, Result}; use jsonrpsee::server::{RpcModule, Server}; @@ -9,16 +9,21 @@ use tower::ServiceBuilder; use tower_http::cors::CorsLayer; use tracing::info; -use crate::storage::sqlite::SqliteTransaction; +use crate::{ledger::u5c::U5cDataAdapter, storage::sqlite::SqliteTransaction}; mod methods; #[derive(Clone)] -pub struct Context {} +pub struct Context { + config: Config, + tx_storage: Arc, + u5c_adapter: Arc, +} pub async fn run( config: Config, - _tx_storage: Arc, + tx_storage: Arc, + u5c_adapter: Arc, cancellation_token: CancellationToken, ) -> Result<()> { let cors_layer = if config.permissive_cors.unwrap_or_default() { @@ -33,7 +38,11 @@ pub async fn run( .build(config.listen_address) .await?; - let mut module = RpcModule::new(Context {}); + let mut module = RpcModule::new(Context { + config: config.clone(), + tx_storage: tx_storage.clone(), + u5c_adapter: u5c_adapter.clone(), + }); module.register_async_method("trp.resolve", |params, context, _| async { methods::trp_resolve(params, context).await @@ -41,7 +50,10 @@ pub async fn run( module.register_async_method("trp.submit", |params, context, _| async { methods::trp_submit(params, context).await })?; - module.register_method("health", |_, context, _| methods::health(context))?; + + module.register_async_method("health", |_, context, _| async { + methods::health(context).await + })?; info!( address = config.listen_address.to_string(), @@ -67,9 +79,15 @@ pub async fn run( Ok(()) } -#[derive(Deserialize, Clone)] +#[derive(Debug, Deserialize, Clone)] +pub struct ServerConfig { + pub uri: String, + pub headers: Option>, +} + +#[derive(Debug, Deserialize, Clone)] pub struct Config { + pub server: ServerConfig, pub listen_address: SocketAddr, - pub max_optimize_rounds: u8, pub permissive_cors: Option, }