Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions components/controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ cesrox = { version = "0.1.4" }
itertools = "0.11.0"
rusqlite = { version = "0.32.1", features = ["bundled"], optional = true}
serde = "1.0.219"
reqwest = "0.12.22"
async-trait = "0.1.57"

[dev-dependencies]
witness = { path = "../witness" }
Expand Down
82 changes: 79 additions & 3 deletions components/controller/src/communication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use keri_core::{
},
transport::{Transport, TransportError},
};
use teliox::transport::GeneralTelTransport;
use teliox::{event::verifiable_event::VerifiableEvent, query::SignedTelQuery};

use crate::{
error::ControllerError,
Expand All @@ -31,8 +31,14 @@ pub enum SendingError {
#[error("Transport error: {0}")]
TransportError(keri_core::transport::TransportError),

#[error("Http request error: {0}")]
HTTPError(#[from] reqwest::Error),

#[error(transparent)]
OobiError(#[from] OobiRetrieveError),

#[error("Invalid url: {0}")]
InvalidUrl(#[from] url::ParseError),
}

impl From<TransportError> for SendingError {
Expand All @@ -50,14 +56,14 @@ impl From<TransportError> for SendingError {
pub struct Communication {
pub events: Arc<KnownEvents>,
pub transport: Box<dyn Transport + Send + Sync>,
pub tel_transport: Box<dyn GeneralTelTransport + Send + Sync>,
pub tel_transport: Box<dyn IdentifierTelTransport + Send + Sync>,
}

impl Communication {
pub fn new(
known_events: Arc<KnownEvents>,
transport: Box<dyn Transport<ActorError> + Send + Sync>,
tel_transport: Box<dyn GeneralTelTransport + Send + Sync>,
tel_transport: Box<dyn IdentifierTelTransport + Send + Sync>,
) -> Self {
Communication {
events: known_events,
Expand Down Expand Up @@ -223,4 +229,74 @@ impl Communication {

Ok(())
}

pub async fn send_tel_query(
&self,
qry: SignedTelQuery,
location: LocationScheme,
) -> Result<String, SendingError> {
self.tel_transport.send_query(qry, location).await
}

pub async fn send_tel_event(
&self,
event: VerifiableEvent,
location: LocationScheme,
) -> Result<(), SendingError> {
self.tel_transport.send_tel_event(event, location).await
}
}

#[async_trait::async_trait]
pub trait IdentifierTelTransport {
async fn send_query(
&self,
qry: SignedTelQuery,
location: LocationScheme,
) -> Result<String, SendingError>;
async fn send_tel_event(
&self,
qry: VerifiableEvent,
location: LocationScheme,
) -> Result<(), SendingError>;
}

pub struct HTTPTelTransport;

#[async_trait::async_trait]
impl IdentifierTelTransport for HTTPTelTransport {
async fn send_query(
&self,
qry: SignedTelQuery,
location: LocationScheme,
) -> Result<String, SendingError> {
let url = match location.scheme {
Scheme::Http => location.url.join("query/tel")?,
Scheme::Tcp => todo!(),
};
let resp = reqwest::Client::new()
.post(url)
.body(qry.to_cesr().unwrap())
.send()
.await?;

Ok(resp.text().await?)
}

async fn send_tel_event(
&self,
event: VerifiableEvent,
location: LocationScheme,
) -> Result<(), SendingError> {
let url = match location.scheme {
Scheme::Http => location.url.join("process/tel")?,
Scheme::Tcp => todo!(),
};
let client = reqwest::Client::new();
let query = event.serialize().unwrap();
let resp = client.post(url).body(query).send().await?;
resp.text().await?;

Ok(())
}
}
7 changes: 4 additions & 3 deletions components/controller/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ use keri_core::{
processor::escrow::EscrowConfig,
transport::{default::DefaultTransport, Transport},
};
use teliox::transport::{GeneralTelTransport, TelTransport};

use crate::communication::{HTTPTelTransport, IdentifierTelTransport};

pub struct ControllerConfig {
pub db_path: PathBuf,
pub initial_oobis: Vec<LocationScheme>,
pub escrow_config: EscrowConfig,
pub transport: Box<dyn Transport + Send + Sync>,
pub tel_transport: Box<dyn GeneralTelTransport + Send + Sync>,
pub tel_transport: Box<dyn IdentifierTelTransport + Send + Sync>,
}

impl Default for ControllerConfig {
Expand All @@ -22,7 +23,7 @@ impl Default for ControllerConfig {
initial_oobis: vec![],
escrow_config: EscrowConfig::default(),
transport: Box::new(DefaultTransport::new()),
tel_transport: Box::new(TelTransport),
tel_transport: Box::new(HTTPTelTransport),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ impl Identifier {
.clone();
for event in &to_notify {
self.communication
.tel_transport
.send_tel_event(event.clone(), location.clone())
.await
.map_err(|e| MechanicsError::OtherError(e.to_string()))?;
Expand Down
3 changes: 1 addition & 2 deletions components/controller/src/identifier/nontransferable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ impl NontransferableIdentifier {

let tel_res = self
.communication
.tel_transport
.send_query(signed_qry, witness_location)
.send_tel_query(signed_qry, witness_location)
.await
.map_err(|e| MechanicsError::OtherError(e.to_string()))?;
Ok(tel_res)
Expand Down
3 changes: 1 addition & 2 deletions components/controller/src/identifier/tel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,7 @@ impl Identifier {
let location = self.known_events.get_loc_schemas(&watcher).unwrap()[0].clone();
let tel_res = self
.communication
.tel_transport
.send_query(query, location)
.send_tel_query(query, location)
.await
.map_err(|e| MechanicsError::OtherError(e.to_string()))?;
self.known_events
Expand Down
1 change: 1 addition & 0 deletions components/watcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ teliox = {path = "../../support/teliox"}
thiserror = "1.0.63"
regex = "1.10.6"
tokio = { version = "1", features = ["full"] }
reqwest = "0.12.22"

[dev-dependencies]
keri-controller = { path = "../controller" }
Expand Down
1 change: 1 addition & 0 deletions components/watcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ pub use crate::{
mod http_routing;
#[cfg(test)]
mod test;
pub mod transport;
mod watcher;
pub mod watcher_listener;
5 changes: 2 additions & 3 deletions components/watcher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ use keri_core::{
};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DurationSeconds};
use teliox::transport::TelTransport;
use url::Url;
use watcher::{WatcherConfig, WatcherListener};
use watcher::{transport::HttpTelTransport, WatcherConfig, WatcherListener};

#[derive(Deserialize)]
pub struct Config {
Expand Down Expand Up @@ -135,7 +134,7 @@ async fn main() -> anyhow::Result<()> {
db_path: cfg.db_path.clone(),
priv_key: cfg.seed,
transport: Box::new(DefaultTransport::new()),
tel_transport: Box::new(TelTransport),
tel_transport: Box::new(HttpTelTransport),
escrow_config: cfg.escrow_config,
tel_storage_path: cfg.tel_storage_path,
})?;
Expand Down
45 changes: 45 additions & 0 deletions components/watcher/src/transport.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use keri_core::oobi::{LocationScheme, Scheme};
use teliox::query::SignedTelQuery;

#[async_trait::async_trait]
pub trait WatcherTelTransport {
async fn send_query(
&self,
qry: SignedTelQuery,
location: LocationScheme,
) -> Result<String, TransportError>;
}

#[derive(Debug, thiserror::Error, serde::Serialize, serde::Deserialize)]
pub enum TransportError {
#[error("network error")]
NetworkError,
#[error("invalid response")]
InvalidResponse,
}

pub struct HttpTelTransport;

#[async_trait::async_trait]
impl WatcherTelTransport for HttpTelTransport {
async fn send_query(
&self,
qry: SignedTelQuery,
location: LocationScheme,
) -> Result<String, TransportError> {
let url = match location.scheme {
Scheme::Http => location.url.join("query/tel").unwrap(),
Scheme::Tcp => todo!(),
};
let resp = reqwest::Client::new()
.post(url)
.body(qry.to_cesr().unwrap())
.send()
.await
.map_err(|_| TransportError::NetworkError)?;

resp.text()
.await
.map_err(|_| TransportError::InvalidResponse)
}
}
7 changes: 4 additions & 3 deletions components/watcher/src/watcher/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ use keri_core::{
processor::escrow::EscrowConfig,
transport::{default::DefaultTransport, Transport},
};
use teliox::transport::{GeneralTelTransport, TelTransport};

use crate::transport::{HttpTelTransport, WatcherTelTransport};

pub struct WatcherConfig {
pub public_address: url::Url,
pub db_path: PathBuf,
pub priv_key: Option<String>,
pub transport: Box<dyn Transport + Send + Sync>,
pub tel_transport: Box<dyn GeneralTelTransport + Send + Sync>,
pub tel_transport: Box<dyn WatcherTelTransport + Send + Sync>,
pub tel_storage_path: PathBuf,
pub escrow_config: EscrowConfig,
}
Expand All @@ -23,7 +24,7 @@ impl Default for WatcherConfig {
db_path: PathBuf::from("db"),
priv_key: None,
transport: Box::new(DefaultTransport::new()),
tel_transport: Box::new(TelTransport),
tel_transport: Box::new(HttpTelTransport),
tel_storage_path: PathBuf::from("tel_storage"),
escrow_config: EscrowConfig::default(),
}
Expand Down
8 changes: 7 additions & 1 deletion components/watcher/src/watcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@ use keri_core::{
actor::{
error::ActorError, parse_event_stream, parse_notice_stream, parse_query_stream,
parse_reply_stream, possible_response::PossibleResponse,
}, database::redb::RedbDatabase, error::Error, event_message::signed_event_message::Message, oobi::{error::OobiError, EndRole, LocationScheme}, prefix::{BasicPrefix, IdentifierPrefix}, query::reply_event::{ReplyRoute, SignedReply}
},
database::redb::RedbDatabase,
error::Error,
event_message::signed_event_message::Message,
oobi::{error::OobiError, EndRole, LocationScheme},
prefix::{BasicPrefix, IdentifierPrefix},
query::reply_event::{ReplyRoute, SignedReply},
};
use tel_providing::RegistryMapping;
use teliox::{database::sled_db::SledEventDatabase, event::parse_tel_query_stream};
Expand Down
7 changes: 4 additions & 3 deletions components/watcher/src/watcher/watcher_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ use keri_core::{
},
};
use teliox::query::{SignedTelQuery, TelQueryArgs, TelQueryRoute};
use teliox::transport::GeneralTelTransport;
use tokio::sync::mpsc::Sender;

use crate::transport::WatcherTelTransport;

use super::{config::WatcherConfig, tel_providing::TelToForward};

pub struct WatcherData {
Expand All @@ -56,7 +57,7 @@ pub struct WatcherData {
pub oobi_manager: OobiManager,
pub signer: Arc<Signer>,
pub transport: Box<dyn Transport + Send + Sync>,
pub tel_transport: Box<dyn GeneralTelTransport + Send + Sync>,
pub tel_transport: Box<dyn WatcherTelTransport + Send + Sync>,
/// Watcher will update KEL of the identifiers that have been sent to this channel.
tx: Sender<IdentifierPrefix>,
/// Watcher will update TEL of the identifiers (registry_id, vc_id) that have been sent to this channel.
Expand All @@ -76,9 +77,9 @@ impl WatcherData {
db_path,
priv_key,
transport,
tel_transport,
escrow_config,
tel_storage_path,
tel_transport,
} = config;
let mut tel_to_forward_path = tel_storage_path.clone();
tel_to_forward_path.push("to_forward");
Expand Down
1 change: 0 additions & 1 deletion components/witness/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ serde = { version = "1.0", features = ["derive"] }
serde_with = "2.2.0"
url = { version = "2.2.2", features = ["serde"] }
keri-core = { path = "../../keriox_core", features = ["oobi", "mailbox"] }
env_logger = "0.9.0"
log = "0.4.17"
serde_json = "1.0"
teliox = {path = "../../support/teliox"}
Expand Down
4 changes: 2 additions & 2 deletions keriox_core/src/actor/simple_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use std::{
};

use crate::{
database::{redb::RedbDatabase, EventDatabase, EscrowCreator},
database::{redb::RedbDatabase, EscrowCreator, EventDatabase},
processor::escrow::{
maybe_out_of_order_escrow::MaybeOutOfOrderEscrow,
partially_witnessed_escrow::PartiallyWitnessedEscrow,
},
query::{mailbox::SignedMailboxQuery, query_event::LogsQueryArgs}
query::{mailbox::SignedMailboxQuery, query_event::LogsQueryArgs},
};
use cesrox::{cesr_proof::MaterialPath, parse, primitives::CesrPrimitive};
use said::derivation::{HashFunction, HashFunctionCode};
Expand Down
Loading
Loading