Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@ futures-util = "0.3.31"
gasket = { git = "https://github.com/construkts/gasket-rs.git", features = ["derive"] }
hex = "0.4.3"
itertools = "0.14.0"
pallas = { version = "1.0.0-alpha.1", features = ["phase2", "wallet"] }
pallas = { version = "=1.0.0-alpha.1", features = ["phase2", "wallet"] }
protoc-wkt = "1.0.0"
serde = { version = "1.0.217", features = ["derive"] }
thiserror = "2.0.11"
sqlx = { version = "0.8.3", features = ["runtime-tokio-rustls", "sqlite", "chrono"] }
tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread", "time"] }
tonic = { version = "0.12.3", features = ["transport", "tls", "tls-webpki-roots", "tls-roots"] }
tonic-reflection = "0.12.3"
tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread", "time", "signal"] }
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
futures = "0.3.31"
Expand All @@ -41,3 +39,10 @@ rand = "0.9.0"
prost = "0.13.5"
vaultrs = "0.7.4"
bip39 = "2.1.0"
tonic = { version = "0.12.3", features = ["transport", "tls", "tls-webpki-roots", "tls-roots"] }
tower-http = { version = "0.6.4", features = ["cors", "trace"]}
tonic-reflection = { version = "0.12.3" }
jsonrpsee = { version = "0.25.1", features = ["server", "client"] }
tower = "0.5.2"
tokio-util = { version = "0.7.15", features = ["rt"] }
base64 = "0.22.1"
26 changes: 23 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use queue::{chaining::TxChaining, DEFAULT_QUEUE};
use serde::Deserialize;
use storage::sqlite::{SqliteCursor, SqliteStorage, SqliteTransaction};
use tokio::try_join;
use tracing::Level;
use tokio_util::sync::CancellationToken;
use tracing::{debug, Level};
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

mod ledger;
Expand All @@ -34,6 +35,8 @@ async fn main() -> Result<()> {
.with(env_filter)
.init();

let cancellation_token = cancellation_token();

let config = Config::new().expect("invalid config file");

let storage = Arc::new(SqliteStorage::new(path::Path::new(&config.storage.db_path)).await?);
Expand All @@ -56,18 +59,35 @@ async fn main() -> Result<()> {
Arc::clone(&tx_storage),
Arc::clone(&cursor_storage),
);
let server = server::run(
config,
let server = server::serve(
config.server.clone(),
config.queues.clone(),
u5c_data_adapter.clone(),
Arc::clone(&tx_storage),
Arc::clone(&tx_chaining),
cancellation_token.clone(),
);

try_join!(pipeline, server)?;

Ok(())
}

fn cancellation_token() -> CancellationToken {
let cancel = CancellationToken::new();

let cancel2 = cancel.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c()
.await
.expect("failed to listen for Ctrl+C");
debug!("shutdown signal received");
cancel2.cancel();
});

cancel
}

#[derive(Deserialize, Clone)]
struct Config {
server: server::Config,
Expand Down
60 changes: 60 additions & 0 deletions src/server/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::{collections::HashSet, net::SocketAddr, sync::Arc};

use anyhow::Result;
use serde::Deserialize;
use spec::boros::v1 as spec;
use tokio_util::sync::CancellationToken;
use tonic::transport::Server;
use tracing::info;

use crate::{
ledger::u5c::U5cDataAdapter,
queue::{self, chaining::TxChaining},
storage::sqlite::SqliteTransaction,
};

mod submit;

pub async fn run(
config: Config,
queues: HashSet<queue::Config>,
u5c_adapter: Arc<dyn U5cDataAdapter>,
tx_storage: Arc<SqliteTransaction>,
tx_chaining: Arc<TxChaining>,
cancellation_token: CancellationToken,
) -> Result<()> {
let reflection = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(protoc_wkt::google::protobuf::FILE_DESCRIPTOR_SET)
.register_encoded_file_descriptor_set(spec::submit::FILE_DESCRIPTOR_SET)
.build_v1alpha()
.unwrap();

let submit_service = submit::SubmitServiceImpl::new(
Arc::clone(&tx_storage),
Arc::clone(&tx_chaining),
Arc::clone(&u5c_adapter),
queues,
);
let submit_service =
spec::submit::submit_service_server::SubmitServiceServer::new(submit_service);

info!(
address = config.listen_address.to_string(),
"GRPC server running"
);

Server::builder()
.add_service(reflection)
.add_service(submit_service)
.serve_with_shutdown(config.listen_address, cancellation_token.cancelled())
.await?;

info!("gracefully shut down grpc");

Ok(())
}

#[derive(Deserialize, Clone)]
pub struct Config {
pub listen_address: SocketAddr,
}
File renamed without changes.
93 changes: 52 additions & 41 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,61 +1,72 @@
use std::{net::SocketAddr, sync::Arc};
use std::{collections::HashSet, sync::Arc};

use anyhow::Result;
use futures::try_join;
use serde::Deserialize;
use spec::boros::v1 as spec;
use tonic::transport::Server;
use tracing::{error, info};
use tokio_util::sync::CancellationToken;

use crate::{
ledger::u5c::U5cDataAdapter, queue::chaining::TxChaining, storage::sqlite::SqliteTransaction,
Config as BorosConfig,
ledger::u5c::U5cDataAdapter,
queue::{self, chaining::TxChaining},
storage::sqlite::SqliteTransaction,
};

mod submit;
mod grpc;
mod trp;

pub async fn run(
config: BorosConfig,
pub async fn serve(
config: Config,
queues: HashSet<queue::Config>,
u5c_adapter: Arc<dyn U5cDataAdapter>,
tx_storage: Arc<SqliteTransaction>,
tx_chaining: Arc<TxChaining>,
cancellation_token: CancellationToken,
) -> Result<()> {
tokio::spawn(async move {
let reflection = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(protoc_wkt::google::protobuf::FILE_DESCRIPTOR_SET)
.register_encoded_file_descriptor_set(spec::submit::FILE_DESCRIPTOR_SET)
.build_v1alpha()
.unwrap();

let submit_service = submit::SubmitServiceImpl::new(
Arc::clone(&tx_storage),
Arc::clone(&tx_chaining),
Arc::clone(&u5c_adapter),
config.queues,
);
let submit_service =
spec::submit::submit_service_server::SubmitServiceServer::new(submit_service);

info!(
address = config.server.listen_address.to_string(),
"GRPC server running"
);

let result = Server::builder()
.add_service(reflection)
.add_service(submit_service)
.serve(config.server.listen_address)
.await;

if let Err(error) = result {
error!(?error);
std::process::exit(1);
}
let grpc_task = config.grpc.map(|cfg| {
let queues = queues.clone();
let u5c_adapter = Arc::clone(&u5c_adapter);
let tx_storage = Arc::clone(&tx_storage);
let tx_chaining = Arc::clone(&tx_chaining);
let cancellation_token = cancellation_token.clone();

tokio::spawn(grpc::run(
cfg,
queues,
u5c_adapter,
tx_storage,
tx_chaining,
cancellation_token,
))
});

let trp_task = config.trp.map(|cfg| {
let tx_storage = Arc::clone(&tx_storage);
let u5c_adapter = Arc::clone(&u5c_adapter);
let cancellation_token = cancellation_token.clone();
tokio::spawn(trp::run(cfg, tx_storage, u5c_adapter, cancellation_token))
});

try_join!(
async {
if let Some(task) = grpc_task {
return task.await?;
}

Ok(())
},
async {
if let Some(task) = trp_task {
return task.await?;
}
Ok(())
},
)?;

Ok(())
}

#[derive(Deserialize, Clone)]
pub struct Config {
pub listen_address: SocketAddr,
pub grpc: Option<grpc::Config>,
pub trp: Option<trp::Config>,
}
Loading