diff --git a/components/controller/Cargo.toml b/components/controller/Cargo.toml index b83a4f60..0f0450ad 100644 --- a/components/controller/Cargo.toml +++ b/components/controller/Cargo.toml @@ -24,6 +24,8 @@ cesrox = { version = "0.1.4" } itertools = "0.11.0" rusqlite = { version = "0.32.1", features = ["bundled"], optional = true} serde = "1.0.219" +reqwest = "0.12.22" +async-trait = "0.1.57" [dev-dependencies] witness = { path = "../witness" } diff --git a/components/controller/src/communication.rs b/components/controller/src/communication.rs index 8c83b609..fe938f0e 100644 --- a/components/controller/src/communication.rs +++ b/components/controller/src/communication.rs @@ -12,7 +12,7 @@ use keri_core::{ }, transport::{Transport, TransportError}, }; -use teliox::transport::GeneralTelTransport; +use teliox::{event::verifiable_event::VerifiableEvent, query::SignedTelQuery}; use crate::{ error::ControllerError, @@ -31,8 +31,14 @@ pub enum SendingError { #[error("Transport error: {0}")] TransportError(keri_core::transport::TransportError), + #[error("Http request error: {0}")] + HTTPError(#[from] reqwest::Error), + #[error(transparent)] OobiError(#[from] OobiRetrieveError), + + #[error("Invalid url: {0}")] + InvalidUrl(#[from] url::ParseError), } impl From for SendingError { @@ -50,14 +56,14 @@ impl From for SendingError { pub struct Communication { pub events: Arc, pub transport: Box, - pub tel_transport: Box, + pub tel_transport: Box, } impl Communication { pub fn new( known_events: Arc, transport: Box + Send + Sync>, - tel_transport: Box, + tel_transport: Box, ) -> Self { Communication { events: known_events, @@ -223,4 +229,74 @@ impl Communication { Ok(()) } + + pub async fn send_tel_query( + &self, + qry: SignedTelQuery, + location: LocationScheme, + ) -> Result { + self.tel_transport.send_query(qry, location).await + } + + pub async fn send_tel_event( + &self, + event: VerifiableEvent, + location: LocationScheme, + ) -> Result<(), SendingError> { + self.tel_transport.send_tel_event(event, location).await + } +} + +#[async_trait::async_trait] +pub trait IdentifierTelTransport { + async fn send_query( + &self, + qry: SignedTelQuery, + location: LocationScheme, + ) -> Result; + async fn send_tel_event( + &self, + qry: VerifiableEvent, + location: LocationScheme, + ) -> Result<(), SendingError>; +} + +pub struct HTTPTelTransport; + +#[async_trait::async_trait] +impl IdentifierTelTransport for HTTPTelTransport { + async fn send_query( + &self, + qry: SignedTelQuery, + location: LocationScheme, + ) -> Result { + let url = match location.scheme { + Scheme::Http => location.url.join("query/tel")?, + Scheme::Tcp => todo!(), + }; + let resp = reqwest::Client::new() + .post(url) + .body(qry.to_cesr().unwrap()) + .send() + .await?; + + Ok(resp.text().await?) + } + + async fn send_tel_event( + &self, + event: VerifiableEvent, + location: LocationScheme, + ) -> Result<(), SendingError> { + let url = match location.scheme { + Scheme::Http => location.url.join("process/tel")?, + Scheme::Tcp => todo!(), + }; + let client = reqwest::Client::new(); + let query = event.serialize().unwrap(); + let resp = client.post(url).body(query).send().await?; + resp.text().await?; + + Ok(()) + } } diff --git a/components/controller/src/config.rs b/components/controller/src/config.rs index ee0215bd..6cf45047 100644 --- a/components/controller/src/config.rs +++ b/components/controller/src/config.rs @@ -5,14 +5,15 @@ use keri_core::{ processor::escrow::EscrowConfig, transport::{default::DefaultTransport, Transport}, }; -use teliox::transport::{GeneralTelTransport, TelTransport}; + +use crate::communication::{HTTPTelTransport, IdentifierTelTransport}; pub struct ControllerConfig { pub db_path: PathBuf, pub initial_oobis: Vec, pub escrow_config: EscrowConfig, pub transport: Box, - pub tel_transport: Box, + pub tel_transport: Box, } impl Default for ControllerConfig { @@ -22,7 +23,7 @@ impl Default for ControllerConfig { initial_oobis: vec![], escrow_config: EscrowConfig::default(), transport: Box::new(DefaultTransport::new()), - tel_transport: Box::new(TelTransport), + tel_transport: Box::new(HTTPTelTransport), } } } diff --git a/components/controller/src/identifier/mechanics/tel_managing.rs b/components/controller/src/identifier/mechanics/tel_managing.rs index d925f20d..ec2259a8 100644 --- a/components/controller/src/identifier/mechanics/tel_managing.rs +++ b/components/controller/src/identifier/mechanics/tel_managing.rs @@ -73,7 +73,6 @@ impl Identifier { .clone(); for event in &to_notify { self.communication - .tel_transport .send_tel_event(event.clone(), location.clone()) .await .map_err(|e| MechanicsError::OtherError(e.to_string()))?; diff --git a/components/controller/src/identifier/nontransferable.rs b/components/controller/src/identifier/nontransferable.rs index b8812cef..15a572b1 100644 --- a/components/controller/src/identifier/nontransferable.rs +++ b/components/controller/src/identifier/nontransferable.rs @@ -130,8 +130,7 @@ impl NontransferableIdentifier { let tel_res = self .communication - .tel_transport - .send_query(signed_qry, witness_location) + .send_tel_query(signed_qry, witness_location) .await .map_err(|e| MechanicsError::OtherError(e.to_string()))?; Ok(tel_res) diff --git a/components/controller/src/identifier/tel.rs b/components/controller/src/identifier/tel.rs index 1fd49da1..d73f135c 100644 --- a/components/controller/src/identifier/tel.rs +++ b/components/controller/src/identifier/tel.rs @@ -143,8 +143,7 @@ impl Identifier { let location = self.known_events.get_loc_schemas(&watcher).unwrap()[0].clone(); let tel_res = self .communication - .tel_transport - .send_query(query, location) + .send_tel_query(query, location) .await .map_err(|e| MechanicsError::OtherError(e.to_string()))?; self.known_events diff --git a/components/watcher/Cargo.toml b/components/watcher/Cargo.toml index 72e068be..362cccbf 100644 --- a/components/watcher/Cargo.toml +++ b/components/watcher/Cargo.toml @@ -28,6 +28,7 @@ teliox = {path = "../../support/teliox"} thiserror = "1.0.63" regex = "1.10.6" tokio = { version = "1", features = ["full"] } +reqwest = "0.12.22" [dev-dependencies] keri-controller = { path = "../controller" } diff --git a/components/watcher/src/lib.rs b/components/watcher/src/lib.rs index c0761b1c..c14aa7ba 100644 --- a/components/watcher/src/lib.rs +++ b/components/watcher/src/lib.rs @@ -6,5 +6,6 @@ pub use crate::{ mod http_routing; #[cfg(test)] mod test; +pub mod transport; mod watcher; pub mod watcher_listener; diff --git a/components/watcher/src/main.rs b/components/watcher/src/main.rs index ebfe1b13..ac6205a7 100644 --- a/components/watcher/src/main.rs +++ b/components/watcher/src/main.rs @@ -14,9 +14,8 @@ use keri_core::{ }; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DurationSeconds}; -use teliox::transport::TelTransport; use url::Url; -use watcher::{WatcherConfig, WatcherListener}; +use watcher::{transport::HttpTelTransport, WatcherConfig, WatcherListener}; #[derive(Deserialize)] pub struct Config { @@ -135,7 +134,7 @@ async fn main() -> anyhow::Result<()> { db_path: cfg.db_path.clone(), priv_key: cfg.seed, transport: Box::new(DefaultTransport::new()), - tel_transport: Box::new(TelTransport), + tel_transport: Box::new(HttpTelTransport), escrow_config: cfg.escrow_config, tel_storage_path: cfg.tel_storage_path, })?; diff --git a/components/watcher/src/transport.rs b/components/watcher/src/transport.rs new file mode 100644 index 00000000..44cf4ee2 --- /dev/null +++ b/components/watcher/src/transport.rs @@ -0,0 +1,45 @@ +use keri_core::oobi::{LocationScheme, Scheme}; +use teliox::query::SignedTelQuery; + +#[async_trait::async_trait] +pub trait WatcherTelTransport { + async fn send_query( + &self, + qry: SignedTelQuery, + location: LocationScheme, + ) -> Result; +} + +#[derive(Debug, thiserror::Error, serde::Serialize, serde::Deserialize)] +pub enum TransportError { + #[error("network error")] + NetworkError, + #[error("invalid response")] + InvalidResponse, +} + +pub struct HttpTelTransport; + +#[async_trait::async_trait] +impl WatcherTelTransport for HttpTelTransport { + async fn send_query( + &self, + qry: SignedTelQuery, + location: LocationScheme, + ) -> Result { + let url = match location.scheme { + Scheme::Http => location.url.join("query/tel").unwrap(), + Scheme::Tcp => todo!(), + }; + let resp = reqwest::Client::new() + .post(url) + .body(qry.to_cesr().unwrap()) + .send() + .await + .map_err(|_| TransportError::NetworkError)?; + + resp.text() + .await + .map_err(|_| TransportError::InvalidResponse) + } +} diff --git a/components/watcher/src/watcher/config.rs b/components/watcher/src/watcher/config.rs index 990835f9..e074f1c5 100644 --- a/components/watcher/src/watcher/config.rs +++ b/components/watcher/src/watcher/config.rs @@ -4,14 +4,15 @@ use keri_core::{ processor::escrow::EscrowConfig, transport::{default::DefaultTransport, Transport}, }; -use teliox::transport::{GeneralTelTransport, TelTransport}; + +use crate::transport::{HttpTelTransport, WatcherTelTransport}; pub struct WatcherConfig { pub public_address: url::Url, pub db_path: PathBuf, pub priv_key: Option, pub transport: Box, - pub tel_transport: Box, + pub tel_transport: Box, pub tel_storage_path: PathBuf, pub escrow_config: EscrowConfig, } @@ -23,7 +24,7 @@ impl Default for WatcherConfig { db_path: PathBuf::from("db"), priv_key: None, transport: Box::new(DefaultTransport::new()), - tel_transport: Box::new(TelTransport), + tel_transport: Box::new(HttpTelTransport), tel_storage_path: PathBuf::from("tel_storage"), escrow_config: EscrowConfig::default(), } diff --git a/components/watcher/src/watcher/mod.rs b/components/watcher/src/watcher/mod.rs index 2c491f87..fb75fb47 100644 --- a/components/watcher/src/watcher/mod.rs +++ b/components/watcher/src/watcher/mod.rs @@ -11,7 +11,13 @@ use keri_core::{ actor::{ error::ActorError, parse_event_stream, parse_notice_stream, parse_query_stream, parse_reply_stream, possible_response::PossibleResponse, - }, database::redb::RedbDatabase, error::Error, event_message::signed_event_message::Message, oobi::{error::OobiError, EndRole, LocationScheme}, prefix::{BasicPrefix, IdentifierPrefix}, query::reply_event::{ReplyRoute, SignedReply} + }, + database::redb::RedbDatabase, + error::Error, + event_message::signed_event_message::Message, + oobi::{error::OobiError, EndRole, LocationScheme}, + prefix::{BasicPrefix, IdentifierPrefix}, + query::reply_event::{ReplyRoute, SignedReply}, }; use tel_providing::RegistryMapping; use teliox::{database::sled_db::SledEventDatabase, event::parse_tel_query_stream}; diff --git a/components/watcher/src/watcher/watcher_data.rs b/components/watcher/src/watcher/watcher_data.rs index b89c2a3a..74471f0a 100644 --- a/components/watcher/src/watcher/watcher_data.rs +++ b/components/watcher/src/watcher/watcher_data.rs @@ -43,9 +43,10 @@ use keri_core::{ }, }; use teliox::query::{SignedTelQuery, TelQueryArgs, TelQueryRoute}; -use teliox::transport::GeneralTelTransport; use tokio::sync::mpsc::Sender; +use crate::transport::WatcherTelTransport; + use super::{config::WatcherConfig, tel_providing::TelToForward}; pub struct WatcherData { @@ -56,7 +57,7 @@ pub struct WatcherData { pub oobi_manager: OobiManager, pub signer: Arc, pub transport: Box, - pub tel_transport: Box, + pub tel_transport: Box, /// Watcher will update KEL of the identifiers that have been sent to this channel. tx: Sender, /// Watcher will update TEL of the identifiers (registry_id, vc_id) that have been sent to this channel. @@ -76,9 +77,9 @@ impl WatcherData { db_path, priv_key, transport, - tel_transport, escrow_config, tel_storage_path, + tel_transport, } = config; let mut tel_to_forward_path = tel_storage_path.clone(); tel_to_forward_path.push("to_forward"); diff --git a/components/witness/Cargo.toml b/components/witness/Cargo.toml index 8f452b74..deee3806 100644 --- a/components/witness/Cargo.toml +++ b/components/witness/Cargo.toml @@ -21,7 +21,6 @@ serde = { version = "1.0", features = ["derive"] } serde_with = "2.2.0" url = { version = "2.2.2", features = ["serde"] } keri-core = { path = "../../keriox_core", features = ["oobi", "mailbox"] } -env_logger = "0.9.0" log = "0.4.17" serde_json = "1.0" teliox = {path = "../../support/teliox"} diff --git a/keriox_core/src/actor/simple_controller.rs b/keriox_core/src/actor/simple_controller.rs index c6cc88f3..bfa3f10f 100644 --- a/keriox_core/src/actor/simple_controller.rs +++ b/keriox_core/src/actor/simple_controller.rs @@ -4,12 +4,12 @@ use std::{ }; use crate::{ - database::{redb::RedbDatabase, EventDatabase, EscrowCreator}, + database::{redb::RedbDatabase, EscrowCreator, EventDatabase}, processor::escrow::{ maybe_out_of_order_escrow::MaybeOutOfOrderEscrow, partially_witnessed_escrow::PartiallyWitnessedEscrow, }, - query::{mailbox::SignedMailboxQuery, query_event::LogsQueryArgs} + query::{mailbox::SignedMailboxQuery, query_event::LogsQueryArgs}, }; use cesrox::{cesr_proof::MaterialPath, parse, primitives::CesrPrimitive}; use said::derivation::{HashFunction, HashFunctionCode}; diff --git a/keriox_core/src/database/mod.rs b/keriox_core/src/database/mod.rs index b21d01cf..b667d134 100644 --- a/keriox_core/src/database/mod.rs +++ b/keriox_core/src/database/mod.rs @@ -10,8 +10,7 @@ use crate::{ msg::KeriEvent, signature::{Nontransferable, Transferable}, signed_event_message::{ - SignedEventMessage, SignedNontransferableReceipt, - SignedTransferableReceipt, + SignedEventMessage, SignedNontransferableReceipt, SignedTransferableReceipt, }, }, prefix::{IdentifierPrefix, IndexedSignature}, @@ -79,19 +78,12 @@ pub trait EventDatabase { params: QueryParameters, ) -> Option>; - fn accept_to_kel( - &self, - event: &KeriEvent, - ) -> Result<(), Self::Error>; + fn accept_to_kel(&self, event: &KeriEvent) -> Result<(), Self::Error>; #[cfg(feature = "query")] fn save_reply(&self, reply: SignedReply) -> Result<(), Self::Error>; #[cfg(feature = "query")] - fn get_reply( - &self, - id: &IdentifierPrefix, - from_who: &IdentifierPrefix, - ) -> Option; + fn get_reply(&self, id: &IdentifierPrefix, from_who: &IdentifierPrefix) -> Option; } pub trait LogDatabase<'db>: Send + Sync { @@ -169,10 +161,7 @@ pub trait SequencedEventDatabase: Send + Sync { type Error; type DigestIter: Iterator; - fn new( - db: Arc, - table_name: &'static str, - ) -> Result + fn new(db: Arc, table_name: &'static str) -> Result where Self: Sized; @@ -183,11 +172,7 @@ pub trait SequencedEventDatabase: Send + Sync { digest: &said::SelfAddressingIdentifier, ) -> Result<(), Self::Error>; - fn get( - &self, - identifier: &IdentifierPrefix, - sn: u64, - ) -> Result; + fn get(&self, identifier: &IdentifierPrefix, sn: u64) -> Result; fn get_greater_than( &self, @@ -219,9 +204,7 @@ pub trait EscrowDatabase: Send + Sync { dyn SequencedEventDatabase< DatabaseType = Self::EscrowDatabaseType, Error = Self::Error, - DigestIter = Box< - dyn Iterator, - >, + DigestIter = Box>, >, >, log: Arc, @@ -245,11 +228,7 @@ pub trait EscrowDatabase: Send + Sync { event: &SignedEventMessage, ) -> Result<(), Self::Error>; - fn get( - &self, - identifier: &IdentifierPrefix, - sn: u64, - ) -> Result; + fn get(&self, identifier: &IdentifierPrefix, sn: u64) -> Result; fn get_from_sn( &self, diff --git a/keriox_core/src/database/redb/escrow_database.rs b/keriox_core/src/database/redb/escrow_database.rs index 99132b40..4caaca49 100644 --- a/keriox_core/src/database/redb/escrow_database.rs +++ b/keriox_core/src/database/redb/escrow_database.rs @@ -6,19 +6,21 @@ use std::{ use redb::{Database, MultimapTableDefinition, TableDefinition}; use said::SelfAddressingIdentifier; -use crate::{database::{EscrowCreator, EscrowDatabase, SequencedEventDatabase, LogDatabase as _}, event::KeyEvent, event_message::{msg::KeriEvent, signed_event_message::SignedEventMessage}, prefix::IdentifierPrefix}; +use crate::{ + database::{EscrowCreator, EscrowDatabase, LogDatabase as _, SequencedEventDatabase}, + event::KeyEvent, + event_message::{msg::KeriEvent, signed_event_message::SignedEventMessage}, + prefix::IdentifierPrefix, +}; -use super::{rkyv_adapter, RedbError, RedbDatabase, LogDatabase}; +use super::{rkyv_adapter, LogDatabase, RedbDatabase, RedbError}; impl EscrowCreator for RedbDatabase { type EscrowDatabaseType = SnKeyEscrow; fn create_escrow_db(&self, table_name: &'static str) -> Self::EscrowDatabaseType { SnKeyEscrow::new( - Arc::new( - SnKeyDatabase::new(self.db.clone(), table_name) - .unwrap(), - ), + Arc::new(SnKeyDatabase::new(self.db.clone(), table_name).unwrap()), self.log_db.clone(), ) } @@ -29,9 +31,7 @@ pub struct SnKeyEscrow { dyn SequencedEventDatabase< DatabaseType = redb::Database, Error = RedbError, - DigestIter = Box< - dyn Iterator, - >, + DigestIter = Box>, >, >, log: Arc, @@ -48,9 +48,7 @@ impl crate::database::EscrowDatabase for SnKeyEscrow { dyn SequencedEventDatabase< DatabaseType = Self::EscrowDatabaseType, Error = Self::Error, - DigestIter = Box< - dyn Iterator, - >, + DigestIter = Box>, >, >, log: Arc, @@ -73,10 +71,8 @@ impl crate::database::EscrowDatabase for SnKeyEscrow { } fn insert(&self, event: &SignedEventMessage) -> Result<(), RedbError> { - self.log.log_event( - &crate::database::redb::WriteTxnMode::CreateNew, - &event, - )?; + self.log + .log_event(&crate::database::redb::WriteTxnMode::CreateNew, &event)?; let said = event.event_message.digest().unwrap(); let id = event.event_message.data.get_prefix(); let sn = event.event_message.data.sn; @@ -91,10 +87,8 @@ impl crate::database::EscrowDatabase for SnKeyEscrow { sn: u64, event: &SignedEventMessage, ) -> Result<(), RedbError> { - self.log.log_event( - &crate::database::redb::WriteTxnMode::CreateNew, - &event, - )?; + self.log + .log_event(&crate::database::redb::WriteTxnMode::CreateNew, &event)?; let said = event.event_message.digest().unwrap(); self.escrow.insert(&id, sn, &said)?; @@ -102,11 +96,7 @@ impl crate::database::EscrowDatabase for SnKeyEscrow { Ok(()) } - fn get( - &self, - identifier: &IdentifierPrefix, - sn: u64, - ) -> Result { + fn get(&self, identifier: &IdentifierPrefix, sn: u64) -> Result { let saids = self.escrow.get(identifier, sn)?; let saids_vec: Vec<_> = saids.collect(); @@ -163,7 +153,6 @@ impl crate::database::EscrowDatabase for SnKeyEscrow { } } - /// Storage for digests of escrowed events. /// The digest of an escrowed event can be used to retrieve the full event from the `LogDatabase`. /// The storage is indexed by a tuple of (identifier, sn), with the value being the event's digest. @@ -172,8 +161,7 @@ pub struct SnKeyDatabase { /// Escrowed events. (identifier, sn) -> event digest /// Table links an identifier and sequence number to the digest of an event, /// referencing the actual event stored in the `EVENTS` table in EventDatabase. - sn_key_table: - MultimapTableDefinition<'static, (&'static str, u64), &'static [u8]>, + sn_key_table: MultimapTableDefinition<'static, (&'static str, u64), &'static [u8]>, /// Timestamps. digest -> timestamp /// Table links digest of an event witch time when an event was saved in the database. dts_table: TableDefinition<'static, &'static [u8], u64>, @@ -184,10 +172,7 @@ impl SequencedEventDatabase for SnKeyDatabase { type Error = RedbError; type DigestIter = Box>; - fn new( - db: Arc, - table_name: &'static str, - ) -> Result { + fn new(db: Arc, table_name: &'static str) -> Result { // Create tables let pse = MultimapTableDefinition::new(table_name); let dts = TableDefinition::new("timestamps_escrow"); @@ -213,13 +198,9 @@ impl SequencedEventDatabase for SnKeyDatabase { ) -> Result<(), RedbError> { let write_txn = self.db.begin_write()?; { - let mut table = - (&write_txn).open_multimap_table(self.sn_key_table)?; + let mut table = (&write_txn).open_multimap_table(self.sn_key_table)?; let value = rkyv_adapter::serialize_said(&digest)?; - table.insert( - (identifier.to_string().as_str(), sn), - value.as_ref(), - )?; + table.insert((identifier.to_string().as_str(), sn), value.as_ref())?; let mut table = (&write_txn).open_table(self.dts_table)?; let value = get_current_timestamp(); @@ -230,18 +211,13 @@ impl SequencedEventDatabase for SnKeyDatabase { Ok(()) } - fn get( - &self, - identifier: &IdentifierPrefix, - sn: u64, - ) -> Result { + fn get(&self, identifier: &IdentifierPrefix, sn: u64) -> Result { let read_txn = self.db.begin_read()?; let table = read_txn.open_multimap_table(self.sn_key_table)?; let value = table.get((identifier.to_string().as_str(), sn))?; let out = value.filter_map(|value| match value { Ok(value) => { - let said = - rkyv_adapter::deserialize_said(value.value()).unwrap(); + let said = rkyv_adapter::deserialize_said(value.value()).unwrap(); Some(said) } _ => None, @@ -267,17 +243,13 @@ impl SequencedEventDatabase for SnKeyDatabase { let out = table .range((lower_bound.as_str(), sn)..(upper_bound.as_str(), 0))? .filter_map(|range| match range { - Ok((_key, value)) => { - Some(value.filter_map(|value| match value { - Ok(value) => { - let said = - rkyv_adapter::deserialize_said(value.value()) - .unwrap(); - Some(said) - } - Err(_) => None, - })) - } + Ok((_key, value)) => Some(value.filter_map(|value| match value { + Ok(value) => { + let said = rkyv_adapter::deserialize_said(value.value()).unwrap(); + Some(said) + } + Err(_) => None, + })), _ => None, }) .flatten(); @@ -295,10 +267,7 @@ impl SequencedEventDatabase for SnKeyDatabase { { let mut table = write_txn.open_multimap_table(self.sn_key_table)?; let said = rkyv_adapter::serialize_said(said).unwrap(); - table.remove( - (identifier.to_string().as_str(), sn), - said.as_slice(), - )?; + table.remove((identifier.to_string().as_str(), sn), said.as_slice())?; let mut table = write_txn.open_table(self.dts_table)?; table.remove(said.as_slice())?; diff --git a/keriox_core/src/database/redb/loging.rs b/keriox_core/src/database/redb/loging.rs index c18f0939..3044d263 100644 --- a/keriox_core/src/database/redb/loging.rs +++ b/keriox_core/src/database/redb/loging.rs @@ -31,7 +31,7 @@ use rkyv::{ use said::SelfAddressingIdentifier; use crate::{ - database::{timestamped::TimestampedSignedEventMessage}, + database::timestamped::TimestampedSignedEventMessage, event::{sections::seal::SourceSeal, KeyEvent}, event_message::{ msg::KeriEvent, @@ -383,9 +383,9 @@ impl LogDatabase { #[test] fn test_retrieve_by_digest() { use crate::actor::parse_event_stream; + use crate::database::LogDatabase as LogDb; use crate::event_message::signed_event_message::{Message, Notice}; use tempfile::NamedTempFile; - use crate::database::LogDatabase as LogDb; // Create test db path. let file_path = NamedTempFile::new().unwrap(); diff --git a/keriox_core/src/database/redb/mod.rs b/keriox_core/src/database/redb/mod.rs index 0d9769de..2d36b2b1 100644 --- a/keriox_core/src/database/redb/mod.rs +++ b/keriox_core/src/database/redb/mod.rs @@ -232,10 +232,7 @@ impl EventDatabase for RedbDatabase { } } - fn accept_to_kel( - &self, - event: &KeriEvent, - ) -> Result<(), RedbError> { + fn accept_to_kel(&self, event: &KeriEvent) -> Result<(), RedbError> { let txn_mode = WriteTxnMode::CreateNew; self.save_to_kel(&txn_mode, event)?; self.update_key_state(&txn_mode, event)?; diff --git a/keriox_core/src/processor/escrow/delegation_escrow.rs b/keriox_core/src/processor/escrow/delegation_escrow.rs index 39de5829..ae24ac58 100644 --- a/keriox_core/src/processor/escrow/delegation_escrow.rs +++ b/keriox_core/src/processor/escrow/delegation_escrow.rs @@ -4,9 +4,7 @@ use said::SelfAddressingIdentifier; use crate::{ actor::prelude::EventStorage, - database::{ - SequencedEventDatabase, EventDatabase, EscrowDatabase, EscrowCreator - }, + database::{EscrowCreator, EscrowDatabase, EventDatabase, SequencedEventDatabase}, error::Error, event::{ event_data::EventData, @@ -27,8 +25,7 @@ pub struct DelegationEscrow { pub delegation_escrow: D::EscrowDatabaseType, } - -impl DelegationEscrow { +impl DelegationEscrow { pub fn new(db: Arc, _duration: Duration) -> Self { let escrow_db = db.create_escrow_db("delegation_escrow"); Self { @@ -108,7 +105,7 @@ impl DelegationEscrow { } } -impl Notifier for DelegationEscrow { +impl Notifier for DelegationEscrow { fn notify(&self, notification: &Notification, bus: &NotificationBus) -> Result<(), Error> { match notification { Notification::KeyEventAdded(ev_message) => { diff --git a/keriox_core/src/processor/escrow/duplicitous_events.rs b/keriox_core/src/processor/escrow/duplicitous_events.rs index 1ea99d9e..33921e9e 100644 --- a/keriox_core/src/processor/escrow/duplicitous_events.rs +++ b/keriox_core/src/processor/escrow/duplicitous_events.rs @@ -1,8 +1,8 @@ use std::sync::Arc; use crate::{ - error::Error, database::{EscrowCreator, EscrowDatabase}, + error::Error, event_message::signed_event_message::SignedEventMessage, prefix::IdentifierPrefix, processor::notification::{Notification, NotificationBus, Notifier}, @@ -38,4 +38,3 @@ impl Notifier for DuplicitousEvents { Ok(()) } } - diff --git a/keriox_core/src/processor/escrow/maybe_out_of_order_escrow.rs b/keriox_core/src/processor/escrow/maybe_out_of_order_escrow.rs index 66c933f8..3724780f 100644 --- a/keriox_core/src/processor/escrow/maybe_out_of_order_escrow.rs +++ b/keriox_core/src/processor/escrow/maybe_out_of_order_escrow.rs @@ -1,9 +1,7 @@ use std::{sync::Arc, time::Duration}; use crate::{ - database::{ - EscrowCreator, EscrowDatabase, EventDatabase, SequencedEventDatabase, - }, + database::{EscrowCreator, EscrowDatabase, EventDatabase, SequencedEventDatabase}, error::Error, prefix::IdentifierPrefix, }; @@ -34,7 +32,11 @@ impl MaybeOutOfOrderEscrow { id: &IdentifierPrefix, sn: u64, ) -> Result<(), Error> { - for event in self.escrowed_out_of_order.get_from_sn(id, sn).map_err(|_| Error::DbError)? { + for event in self + .escrowed_out_of_order + .get_from_sn(id, sn) + .map_err(|_| Error::DbError)? + { let validator = EventValidator::new(self.db.clone()); match validator.validate_event(&event) { Ok(_) => { @@ -61,11 +63,7 @@ impl MaybeOutOfOrderEscrow { } impl Notifier for MaybeOutOfOrderEscrow { - fn notify( - &self, - notification: &Notification, - bus: &NotificationBus, - ) -> Result<(), Error> { + fn notify(&self, notification: &Notification, bus: &NotificationBus) -> Result<(), Error> { match notification { Notification::KeyEventAdded(ev_message) => { let id = ev_message.event_message.data.get_prefix(); @@ -75,7 +73,9 @@ impl Notifier for MaybeOutOfOrderEsc Notification::OutOfOrder(signed_event) => { // ignore events with no signatures if !signed_event.signatures.is_empty() { - self.escrowed_out_of_order.insert(signed_event).map_err(|_| Error::DbError)?; + self.escrowed_out_of_order + .insert(signed_event) + .map_err(|_| Error::DbError)?; } } _ => return Err(Error::SemanticError("Wrong notification".into())), @@ -87,12 +87,12 @@ impl Notifier for MaybeOutOfOrderEsc #[test] fn test_out_of_order() -> Result<(), Error> { + use crate::database::redb::RedbDatabase; use crate::event_message::signed_event_message::{Message, Notice}; use crate::processor::JustNotification; use crate::processor::{ basic_processor::BasicProcessor, event_storage::EventStorage, Processor, }; - use crate::database::redb::RedbDatabase; use cesrox::parse_many; use tempfile::NamedTempFile; let kel = br#"{"v":"KERI10JSON000159_","t":"icp","d":"EO8cED9H5XPqBdoVatgBkEuSP8yXic7HtWpkex-9e0sL","i":"EO8cED9H5XPqBdoVatgBkEuSP8yXic7HtWpkex-9e0sL","s":"0","kt":"1","k":["DODv7KGqEEhAP7-VYXzZvNi5wmgEB8w5y6HLUQL08PNh"],"nt":"1","n":["ECo41Mn5wku-tQd7L4Hp65KhaX1KkdTtSY_NXx4rQphS"],"bt":"0","b":["DPOIlcZk_GLVCVtG7KLbDQa2a5drXGt09wpaeY93G--1"],"c":[],"a":[]}-AABAADtEDd5x0DRfSlGl99G2V3aiJQlILTMG8LHNbG6V3ticL8r1vMK8-nmhZBhZglI06mVChxc-EkgqWPzPlI2rAwD{"v":"KERI10JSON000160_","t":"rot","d":"EDBBxc3_cczsEld6szaFdmhR3JyOhnYaDCCdo_wDe95p","i":"EO8cED9H5XPqBdoVatgBkEuSP8yXic7HtWpkex-9e0sL","s":"1","p":"EO8cED9H5XPqBdoVatgBkEuSP8yXic7HtWpkex-9e0sL","kt":"1","k":["DIgRd-GK29iB-G7tao3-BCdMbUCATveeMrzivmmmM_Nf"],"nt":"1","n":["EBrEok_A-yJGpR9GH_ktdd11x3UR0cHaCg0nzAnYLgGj"],"bt":"0","br":[],"ba":[],"a":[]}-AABAADLgLBVFeCOP8t-sxOWKif-JbQ-PnOz0W7aZCuLPOUEri-OdGXjOV2d3y6-R_SsS2U3toE3TNVJ9UyO5NhBSkkO{"v":"KERI10JSON000160_","t":"rot","d":"ENtkE-NChURiXS5j8ES9GeX9VCqr5PLxilygqUJQ5Wr9","i":"EO8cED9H5XPqBdoVatgBkEuSP8yXic7HtWpkex-9e0sL","s":"2","p":"EDBBxc3_cczsEld6szaFdmhR3JyOhnYaDCCdo_wDe95p","kt":"1","k":["DGx72gYpAdz0N3br4blkVRRoIASdcBTJaqtLnGI6PXHV"],"nt":"1","n":["EMEVqKOHmF9juqQSmphqjnP24tT__JILJJ2Z4u9QKSUn"],"bt":"0","br":[],"ba":[],"a":[]}-AABAAAHF__vhEKj4kn1uW0fdBRS75nyG3uvJuEfcOdnx4sfy2vNirkDLkm6WGluUVDfQ7y9_b2TIaIHLfAoBefjNBkF{"v":"KERI10JSON000160_","t":"rot","d":"EP0HwW561f8fXuZdau8FyVoQxYTqADGfp12EnI6-Wl6T","i":"EO8cED9H5XPqBdoVatgBkEuSP8yXic7HtWpkex-9e0sL","s":"3","p":"ENtkE-NChURiXS5j8ES9GeX9VCqr5PLxilygqUJQ5Wr9","kt":"1","k":["DFXuPGU9uFziSr3uQuDo7yKJFmcyURvTq8YOfLfNHf6r"],"nt":"1","n":["EO3OeLeP4Ux570nxE0cuK76Bn0I2NAyA1artuMiyASJf"],"bt":"0","br":[],"ba":[],"a":[]}-AABAAAXiKK5er1d8dlAorz6SVhp6xs33eoEKSn2JZrrUHTFZz4xjIa_Ectg9Jyvs12JkdjkNf3VUQ2GMsnfgBpIkXMB{"v":"KERI10JSON000160_","t":"rot","d":"EGzDR2bgvFESAlpZ_BiiVrefq6S_Ea7navqFyB8EOu6Q","i":"EO8cED9H5XPqBdoVatgBkEuSP8yXic7HtWpkex-9e0sL","s":"4","p":"EP0HwW561f8fXuZdau8FyVoQxYTqADGfp12EnI6-Wl6T","kt":"1","k":["DHkJs10SLaBPMBsPx8X6x4TozQMM8OuAzgj681jYSckq"],"nt":"1","n":["ELRF262pZpt8-UiEX5TSsCFiZ1NmRHkvHIq-M6mFKDw_"],"bt":"0","br":[],"ba":[],"a":[]}-AABAACx23xFm12mxnmA413AJCGK67SF5OHb6hlz6qbZjyWbkAqtmqmo2_SRFHtbSFpZ5yIVObSf_F9yr8sRQ-_pJg0F{"v":"KERI10JSON000160_","t":"rot","d":"EKlpPRdR6NmMHhJ3XuDt7cuPVkfUy11leY6US9bP3jVx","i":"EO8cED9H5XPqBdoVatgBkEuSP8yXic7HtWpkex-9e0sL","s":"5","p":"EGzDR2bgvFESAlpZ_BiiVrefq6S_Ea7navqFyB8EOu6Q","kt":"1","k":["DOFD9XUnKnAUyn0QjYq0BouHyYjvmHN7T2nnVaxr7VHz"],"nt":"1","n":["EFz-ndoE5OXjvD0-UdQAzepB8zpnfk44HN2h8aWmdnKB"],"bt":"0","br":[],"ba":[],"a":[]}-AABAABKlwj4nLkk8q-1YhxA-NjTJCw6AiqyopKvp-MJgx-FKzgZecMmtGm3q5SLImR8P0evrVGL8-DvI-kF9FzYN5YP{"v":"KERI10JSON000160_","t":"rot","d":"ELQRtBD0vqZOQRTc_uQ0_WebeSM-xLcIog7QPyCDtANg","i":"EO8cED9H5XPqBdoVatgBkEuSP8yXic7HtWpkex-9e0sL","s":"6","p":"EKlpPRdR6NmMHhJ3XuDt7cuPVkfUy11leY6US9bP3jVx","kt":"1","k":["DMrq2ktTKWxE5jjhDKDOz1T8a4R0ZGsikc7M-p5k-Rzp"],"nt":"1","n":["EKw6XLOELmjxU-N_EDuUQ7v1XfodiBVyf2nU2zaSIe05"],"bt":"0","br":[],"ba":[],"a":[]}-AABAABzuuhSMYnxQVJ-K2lJP2WOfUP-oiQAp1Dm2685U-s-91bQovUHAoMoVFWcq0FnxC8W7rQHLXw-Wgt_-lo34u4H{"v":"KERI10JSON000160_","t":"rot","d":"EBOeYHB245lnMJY4or8FvfCaoYlwMVwE5Hr49VE6uXK8","i":"EO8cED9H5XPqBdoVatgBkEuSP8yXic7HtWpkex-9e0sL","s":"7","p":"ELQRtBD0vqZOQRTc_uQ0_WebeSM-xLcIog7QPyCDtANg","kt":"1","k":["DApxTJjlbWOgHIMXR_qrryjCIlLFPqnaSRo2M1FFmp4I"],"nt":"1","n":["EOdAKz4CYF6RFZzs_Chyih7QRgcfcZaJ_G02Y-4lrfHg"],"bt":"0","br":[],"ba":[],"a":[]}-AABAAAmR-tO3N1b7b2ZCZmlaSYmQbgHE0T9wZANzXdezQ2b9XPS0RWJcMfHCtpn3qj0Jxhhij1OfMGPSqtshVtEXsYC"#; @@ -127,8 +127,7 @@ fn test_out_of_order() -> Result<(), Error> { )?; (processor, EventStorage::new(events_db.clone()), new_ooo) }; - let id: IdentifierPrefix = - "EO8cED9H5XPqBdoVatgBkEuSP8yXic7HtWpkex-9e0sL".parse()?; + let id: IdentifierPrefix = "EO8cED9H5XPqBdoVatgBkEuSP8yXic7HtWpkex-9e0sL".parse()?; processor.process(&ev1)?; assert_eq!(storage.get_state(&id).unwrap().sn, 0); diff --git a/keriox_core/src/processor/escrow/mod.rs b/keriox_core/src/processor/escrow/mod.rs index 03d6f138..f6b9bd2d 100644 --- a/keriox_core/src/processor/escrow/mod.rs +++ b/keriox_core/src/processor/escrow/mod.rs @@ -50,7 +50,10 @@ pub fn default_escrow_bus( Arc>, Arc>, ), -) where D: EventDatabase + EscrowCreator + Sync + Send + 'static { +) +where + D: EventDatabase + EscrowCreator + Sync + Send + 'static, +{ let mut bus = NotificationBus::new(); // Register out of order escrow, to save and reprocess out of order events @@ -58,7 +61,10 @@ pub fn default_escrow_bus( event_db.clone(), escrow_config.out_of_order_timeout, )); - println!("Registering out of order escrow with timeout: {:?}", escrow_config.out_of_order_timeout); + println!( + "Registering out of order escrow with timeout: {:?}", + escrow_config.out_of_order_timeout + ); bus.register_observer( ooo_escrow.clone(), vec![ diff --git a/keriox_core/src/processor/escrow/partially_signed_escrow.rs b/keriox_core/src/processor/escrow/partially_signed_escrow.rs index ebc11cad..66909e62 100644 --- a/keriox_core/src/processor/escrow/partially_signed_escrow.rs +++ b/keriox_core/src/processor/escrow/partially_signed_escrow.rs @@ -1,11 +1,7 @@ use std::{sync::Arc, time::Duration}; use crate::{ - database::{ - EventDatabase, - EscrowDatabase, - EscrowCreator, - }, + database::{EscrowCreator, EscrowDatabase, EventDatabase}, error::Error, event::KeyEvent, event_message::{msg::KeriEvent, signed_event_message::SignedEventMessage}, @@ -31,8 +27,11 @@ impl PartiallySignedEscrow { pub fn get_partially_signed_for_event( &self, - event: KeriEvent - ) -> Option where ::Error: std::fmt::Debug { + event: KeriEvent, + ) -> Option + where + ::Error: std::fmt::Debug, + { let id = event.data.get_prefix(); let sn = event.data.sn; self.escrowed_partially_signed @@ -56,7 +55,8 @@ impl PartiallySignedEscrow { let sn = signed_event.event_message.data.sn; if let Some(esc) = self .escrowed_partially_signed - .get(&id, sn).map_err(|_| Error::DbError)? + .get(&id, sn) + .map_err(|_| Error::DbError)? .find(|event| event.event_message == signed_event.event_message) { let mut signatures = esc.signatures; @@ -104,14 +104,18 @@ impl PartiallySignedEscrow { signatures: without_duplicates, ..signed_event.to_owned() }; - self.escrowed_partially_signed.insert(&to_add).map_err(|_| Error::DbError)?; + self.escrowed_partially_signed + .insert(&to_add) + .map_err(|_| Error::DbError)?; } Err(_e) => { // keep in escrow } } } else { - self.escrowed_partially_signed.insert(signed_event).map_err(|_| Error::DbError)?; + self.escrowed_partially_signed + .insert(signed_event) + .map_err(|_| Error::DbError)?; }; Ok(()) diff --git a/keriox_core/src/processor/escrow/partially_witnessed_escrow.rs b/keriox_core/src/processor/escrow/partially_witnessed_escrow.rs index 515d043e..124abe96 100644 --- a/keriox_core/src/processor/escrow/partially_witnessed_escrow.rs +++ b/keriox_core/src/processor/escrow/partially_witnessed_escrow.rs @@ -4,7 +4,7 @@ use said::SelfAddressingIdentifier; use crate::{ actor::prelude::EventStorage, - database::{EscrowDatabase, EventDatabase, EscrowCreator,LogDatabase}, + database::{EscrowCreator, EscrowDatabase, EventDatabase, LogDatabase}, error::Error, event_message::{ signature::Nontransferable, @@ -37,7 +37,9 @@ impl PartiallyWitnessedEscrow { &'a self, id: &IdentifierPrefix, ) -> Result + 'a, Error> { - self.escrowed_partially_witnessed.get_from_sn(id, 0).map_err(|_| Error::DbError ) + self.escrowed_partially_witnessed + .get_from_sn(id, 0) + .map_err(|_| Error::DbError) } /// Returns escrowed partially witness events of given identifier, sn and @@ -71,8 +73,14 @@ impl PartiallyWitnessedEscrow { sn: u64, digest: &'a SelfAddressingIdentifier, ) -> Result + 'a>, Error> { - if self.escrowed_partially_witnessed.contains(id, sn, digest).map_err(|_| Error::DbError)? { - self.log.get_nontrans_couplets(digest).map_err(|_| Error::DbError) + if self + .escrowed_partially_witnessed + .contains(id, sn, digest) + .map_err(|_| Error::DbError)? + { + self.log + .get_nontrans_couplets(digest) + .map_err(|_| Error::DbError) } else { Ok(None) } @@ -91,10 +99,12 @@ impl PartiallyWitnessedEscrow { let id = &receipt.body.prefix; let sn = receipt.body.sn; let digest = &receipt.body.receipted_event_digest; - self.log.log_receipt_with_new_transaction(&receipt) - .map_err(|_| Error::DbError)?; + self.log + .log_receipt_with_new_transaction(&receipt) + .map_err(|_| Error::DbError)?; self.escrowed_partially_witnessed - .save_digest(id, sn, digest).map_err(|_| Error::DbError)?; + .save_digest(id, sn, digest) + .map_err(|_| Error::DbError)?; bus.notify(&Notification::ReceiptEscrowed) } @@ -224,7 +234,8 @@ impl PartiallyWitnessedEscrow { }); self.log - .remove_nontrans_receipt_with_new_transaction(event_digest, wrong_non).map_err(|_| Error::DbError)?; + .remove_nontrans_receipt_with_new_transaction(event_digest, wrong_non) + .map_err(|_| Error::DbError)?; Ok(()) } @@ -267,13 +278,12 @@ impl Notifier for PartiallyWitnessed .validate_partially_witnessed(&receipted_event, Some(ooo.to_owned())) { Ok(_) => { - self.log.log_receipt_with_new_transaction(&ooo) - .map_err(|_| Error::DbError)?; + self.log + .log_receipt_with_new_transaction(&ooo) + .map_err(|_| Error::DbError)?; // accept event and remove receipts self.db - .accept_to_kel( - &receipted_event.event_message, - ) + .accept_to_kel(&receipted_event.event_message) .map_err(|_| Error::DbError)?; // accept receipts and remove them from escrow self.accept_receipts_for(&receipted_event)?; @@ -319,9 +329,7 @@ impl Notifier for PartiallyWitnessed .map_err(|_| Error::DbError)?; // accept event and remove receipts self.db - .accept_to_kel( - &signed_event.event_message, - ) + .accept_to_kel(&signed_event.event_message) .map_err(|_| Error::DbError)?; // accept receipts and remove them from escrow self.accept_receipts_for(&signed_event)?; @@ -330,7 +338,9 @@ impl Notifier for PartiallyWitnessed } Err(Error::SignatureVerificationError) => (), Err(_) => { - self.escrowed_partially_witnessed.insert(&signed_event).map_err(|_| Error::DbError)?; + self.escrowed_partially_witnessed + .insert(&signed_event) + .map_err(|_| Error::DbError)?; } }; Ok(()) diff --git a/keriox_core/src/processor/escrow/reply_escrow.rs b/keriox_core/src/processor/escrow/reply_escrow.rs index 18760c43..2dc9be4b 100644 --- a/keriox_core/src/processor/escrow/reply_escrow.rs +++ b/keriox_core/src/processor/escrow/reply_escrow.rs @@ -9,8 +9,7 @@ use crate::{ ksn_log::{AcceptedKsn, KsnLogDatabase}, RedbDatabase, RedbError, }, - EventDatabase, - SequencedEventDatabase, + EventDatabase, SequencedEventDatabase, }, error::Error, prefix::IdentifierPrefix, diff --git a/keriox_core/src/processor/event_storage.rs b/keriox_core/src/processor/event_storage.rs index 1ac79b06..849b52c3 100644 --- a/keriox_core/src/processor/event_storage.rs +++ b/keriox_core/src/processor/event_storage.rs @@ -44,7 +44,9 @@ impl EventStorage { pub fn new(events_db: Arc) -> Self { #[cfg(feature = "mailbox")] { - if let Some(redb_db) = (events_db.as_ref() as &dyn std::any::Any).downcast_ref::() { + if let Some(redb_db) = + (events_db.as_ref() as &dyn std::any::Any).downcast_ref::() + { let mailbox_data = MailboxData::new(redb_db.db.clone()).unwrap(); Self { events_db, diff --git a/keriox_core/src/processor/validator.rs b/keriox_core/src/processor/validator.rs index 17e3cfa3..5982ee1a 100644 --- a/keriox_core/src/processor/validator.rs +++ b/keriox_core/src/processor/validator.rs @@ -93,7 +93,7 @@ impl EventValidator { )?; } new_state - }, + } None => signed_event .event_message .apply_to(IdentifierState::default())?, diff --git a/keriox_tests/src/transport.rs b/keriox_tests/src/transport.rs index c1b96f7d..adbcf6e8 100644 --- a/keriox_tests/src/transport.rs +++ b/keriox_tests/src/transport.rs @@ -1,13 +1,13 @@ use std::{collections::HashMap, sync::Arc}; use futures::lock::Mutex; -use keri_controller::LocationScheme; -use teliox::{ - event::verifiable_event::VerifiableEvent, - query::SignedTelQuery, - transport::{GeneralTelTransport, TransportError}, +use keri_controller::{ + communication::{IdentifierTelTransport, SendingError}, + LocationScheme, }; -use watcher::Watcher; +use keri_core::transport::TransportError; +use teliox::{event::verifiable_event::VerifiableEvent, query::SignedTelQuery}; +use watcher::{transport::WatcherTelTransport, Watcher}; use witness::Witness; pub enum TelTestActor { @@ -50,7 +50,7 @@ impl TelTestActor { match self { TelTestActor::Witness(wit) => wit .parse_and_process_tel_events(&input_stream) - .map_err(|_err| TransportError::NetworkError)?, + .map_err(|_err| TransportError::NetworkError("Wrong payload".to_string()))?, TelTestActor::Watcher(_wat) => todo!(), }; Ok(()) @@ -86,24 +86,23 @@ impl Clone for TelTestTransport { } #[async_trait::async_trait] -impl GeneralTelTransport for TelTestTransport { +impl IdentifierTelTransport for TelTestTransport { async fn send_query( &self, qry: SignedTelQuery, loc: LocationScheme, - ) -> Result { + ) -> Result { let (host, port) = match loc.url.origin() { url::Origin::Tuple(_scheme, host, port) => (host, port), - _ => return Err(TransportError::NetworkError), + _ => return Err(TransportError::NetworkError("Wrong address".to_string()).into()), }; - let actors = self.actors.lock().await; //.map_err(|_e| TransportError::InvalidResponse)?; + let actors = self.actors.lock().await; let actor = actors .get(&(host, port)) - .ok_or(TransportError::NetworkError)?; - let resp = actor - .send_query(qry, loc) - .await - .map_err(|_err| TransportError::InvalidResponse)?; + .ok_or(TransportError::NetworkError( + "Address not found".to_string(), + ))?; + let resp = actor.send_query(qry, loc).await?; Ok(resp) } @@ -111,20 +110,33 @@ impl GeneralTelTransport for TelTestTransport { &self, qry: VerifiableEvent, loc: LocationScheme, - ) -> Result<(), TransportError> { + ) -> Result<(), SendingError> { let (host, port) = match loc.url.origin() { url::Origin::Tuple(_scheme, host, port) => (host, port), - _ => return Err(TransportError::NetworkError), + _ => return Err(TransportError::NetworkError("Wrong address".to_string()).into()), }; let actors = self.actors.lock().await; actors .get(&(host, port)) - .ok_or(TransportError::NetworkError)? + .ok_or(TransportError::NetworkError( + "Address not found".to_string(), + ))? .send_tel_event(qry, loc) - .await - .map_err(|_err| TransportError::InvalidResponse)?; - + .await?; Ok(()) } } + +#[async_trait::async_trait] +impl WatcherTelTransport for TelTestTransport { + async fn send_query( + &self, + qry: SignedTelQuery, + location: LocationScheme, + ) -> Result { + IdentifierTelTransport::send_query(self, qry, location) + .await + .map_err(|_e| watcher::transport::TransportError::NetworkError) + } +} diff --git a/keriox_tests/tests/tel_from_watcher.rs b/keriox_tests/tests/tel_from_watcher.rs index 3d9cb3f1..41e6fa5a 100644 --- a/keriox_tests/tests/tel_from_watcher.rs +++ b/keriox_tests/tests/tel_from_watcher.rs @@ -7,10 +7,9 @@ use keri_controller::{EndRole, Oobi}; use keri_controller::{IdentifierPrefix, KeyManager, LocationScheme, SelfSigningPrefix}; use keri_core::actor::prelude::{HashFunction, HashFunctionCode}; use keri_core::transport::test::TestTransport; -use keri_tests::{ - setup_identifier, - transport::{TelTestActor, TelTestTransport}, -}; +use keri_tests::setup_identifier; +use keri_tests::transport::TelTestActor; +use keri_tests::transport::TelTestTransport; use teliox::state::vc_state::TelState; use tempfile::Builder; use url::{Host, Url}; diff --git a/support/teliox/Cargo.toml b/support/teliox/Cargo.toml index c92c947c..7981e53f 100644 --- a/support/teliox/Cargo.toml +++ b/support/teliox/Cargo.toml @@ -10,7 +10,7 @@ repository.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -keri-core = {path = "../../keriox_core", version = "0.17.2", features = ["oobi", "mailbox"]} +keri-core = {path = "../../keriox_core", features = ["query"]} said = { version = "0.4.0" } cesrox = { version = "0.1.4" } base64 = "0.13.0" @@ -23,9 +23,6 @@ arrayref = "0.3.6" sled = { version = "0.34.6"} serde_cbor = "0.11.1" sled-tables = "0.2.0" -reqwest = { version = "0.11"} -async-trait = "0.1.57" - [dev-dependencies] tempfile = "3.1" diff --git a/support/teliox/src/lib.rs b/support/teliox/src/lib.rs index d6b9b0c6..2014682e 100644 --- a/support/teliox/src/lib.rs +++ b/support/teliox/src/lib.rs @@ -6,4 +6,3 @@ pub mod query; pub mod seal; pub mod state; pub mod tel; -pub mod transport; diff --git a/support/teliox/src/transport/mod.rs b/support/teliox/src/transport/mod.rs deleted file mode 100644 index 4a07a6bf..00000000 --- a/support/teliox/src/transport/mod.rs +++ /dev/null @@ -1,81 +0,0 @@ -use keri_core::oobi::{LocationScheme, Scheme}; - -use crate::{event::verifiable_event::VerifiableEvent, query::SignedTelQuery}; - -pub struct TelTransport; - -#[async_trait::async_trait] -impl GeneralTelTransport for TelTransport { - async fn send_query( - &self, - qry: SignedTelQuery, - location: LocationScheme, - ) -> Result { - let url = match location.scheme { - Scheme::Http => location.url.join("query/tel").unwrap(), - Scheme::Tcp => todo!(), - }; - let resp = reqwest::Client::new() - .post(url) - .body(qry.to_cesr().unwrap()) - .send() - .await - .map_err(|_| TransportError::NetworkError)?; - - resp.text() - .await - .map_err(|_| TransportError::InvalidResponse) - } - - async fn send_tel_event( - &self, - qry: VerifiableEvent, - location: LocationScheme, - ) -> Result<(), TransportError> { - let url = match location.scheme { - Scheme::Http => location.url.join("process/tel").unwrap(), - Scheme::Tcp => todo!(), - }; - let client = reqwest::Client::new(); - let query = qry - .serialize() - .map_err(|_e| TransportError::InvalidResponse)?; - let resp = client - .post(url) - .body(query) - .send() - .await - .map_err(|_| TransportError::NetworkError)?; - resp.text() - .await - .map_err(|_| TransportError::InvalidResponse)?; - - Ok(()) - } -} - -pub enum HttpTransportError { - NetworkError, -} - -#[async_trait::async_trait] -pub trait GeneralTelTransport { - async fn send_query( - &self, - qry: SignedTelQuery, - location: LocationScheme, - ) -> Result; - async fn send_tel_event( - &self, - qry: VerifiableEvent, - location: LocationScheme, - ) -> Result<(), TransportError>; -} - -#[derive(Debug, thiserror::Error, serde::Serialize, serde::Deserialize)] -pub enum TransportError { - #[error("network error")] - NetworkError, - #[error("invalid response")] - InvalidResponse, -}