diff --git a/Cargo.toml b/Cargo.toml index 51f75819..2f039c47 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ resolver = "2" package.version = "0.10.0" [workspace.dependencies] -beam-lib = { path = "./beam-lib", features = [ "strict-ids" ] } +beam-lib = { path = "./beam-lib" } # Command Line Interface clap = { version = "4", features = ["env", "derive"] } diff --git a/beam-lib/Cargo.toml b/beam-lib/Cargo.toml index 922319dd..250be0e3 100644 --- a/beam-lib/Cargo.toml +++ b/beam-lib/Cargo.toml @@ -13,6 +13,5 @@ reqwest = { version = "0.12", features = ["json"], default-features = false, opt thiserror = { version = "2.0", optional = true } [features] -strict-ids = [] http-util = ["dep:reqwest", "dep:thiserror"] sockets = [] diff --git a/beam-lib/src/http_util.rs b/beam-lib/src/http_util.rs index 78b05dce..a99ccdd2 100644 --- a/beam-lib/src/http_util.rs +++ b/beam-lib/src/http_util.rs @@ -4,7 +4,7 @@ use reqwest::{Client, header::{self, HeaderValue, HeaderName}, Url, StatusCode, use serde::{de::DeserializeOwned, Serialize}; use thiserror::Error; -use crate::{AddressingId, TaskRequest, MsgId, TaskResult, ProxyId}; +use crate::{AppId, MsgId, ProxyId, TaskRequest, TaskResult}; #[cfg(feature = "sockets")] use crate::SocketTask; @@ -71,7 +71,7 @@ impl BlockingOptions { impl BeamClient { /// Create a beam client based on the Authorization credentials and the beam proxy url - pub fn new(app: &AddressingId, key: &str, beam_proxy_url: Url) -> Self { + pub fn new(app: &AppId, key: &str, beam_proxy_url: Url) -> Self { let default_headers = [ (header::AUTHORIZATION, HeaderValue::from_bytes(format!("ApiKey {app} {key}").as_bytes()).expect("This is a valid header value")) ].into_iter().collect(); @@ -187,13 +187,13 @@ impl BeamClient { /// Create a socket task for some other application to connect to /// For this to work both the beam proxy and beam broker need to have the sockets feature enabled. #[cfg(feature = "sockets")] - pub async fn create_socket(&self, destination: &AddressingId) -> Result { + pub async fn create_socket(&self, destination: &AppId) -> Result { self.create_socket_with_metadata(destination, serde_json::Value::Null).await } /// Same as `create_socket` but with associated (unencrypted) metadata. #[cfg(feature = "sockets")] - pub async fn create_socket_with_metadata(&self, destination: &AddressingId, metadata: impl Serialize) -> Result { + pub async fn create_socket_with_metadata(&self, destination: &AppId, metadata: impl Serialize) -> Result { const METADATA_HEADER: HeaderName = HeaderName::from_static("metadata"); let url = self.beam_proxy_url .join(&format!("/v1/sockets/{destination}")) diff --git a/beam-lib/src/ids.rs b/beam-lib/src/ids.rs index 60614691..4e421158 100644 --- a/beam-lib/src/ids.rs +++ b/beam-lib/src/ids.rs @@ -3,349 +3,164 @@ use std::fmt::Display; use serde::{Deserialize, Serialize}; -#[cfg(feature = "strict-ids")] -pub type AddressingId = crate::AppOrProxyId; - -#[cfg(not(feature = "strict-ids"))] -pub type AddressingId = crate::AppId; - -#[cfg(feature = "strict-ids")] -static BROKER_ID: std::sync::OnceLock = std::sync::OnceLock::new(); - -#[cfg(feature = "strict-ids")] -#[derive(Debug, Clone, Serialize, PartialEq, Eq, Hash)] -#[serde(untagged)] -pub enum AppOrProxyId { - App(AppId), - Proxy(ProxyId), -} - -#[cfg(feature = "strict-ids")] -impl PartialEq for AppOrProxyId { - fn eq(&self, other: &AppId) -> bool { - match self { - AppOrProxyId::App(app) => app == other, - AppOrProxyId::Proxy(_) => false, - } - } -} - -#[cfg(feature = "strict-ids")] -impl AsRef for AppOrProxyId { - fn as_ref(&self) -> &str { - match self { - AppOrProxyId::App(a) => a.as_ref(), - AppOrProxyId::Proxy(p) => p.as_ref(), - } - } -} - -#[cfg(feature = "strict-ids")] -impl Display for AppOrProxyId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str(match self { - AppOrProxyId::App(app) => &app.0, - AppOrProxyId::Proxy(proxy) => &proxy.0, - }) - } -} - -#[cfg(feature = "strict-ids")] -impl AppOrProxyId { - pub fn new(id: &str) -> Result { - match get_id_type(id)? { - BeamIdType::AppId => Ok(Self::App(AppId(id.to_owned()))), - BeamIdType::ProxyId => Ok(Self::Proxy(ProxyId(id.to_owned()))), - BeamIdType::BrokerId => Err(BeamIdError::InvalidIdKind), - } - } - - pub fn proxy_id(&self) -> ProxyId { - match self { - AppOrProxyId::App(app) => app.proxy_id(), - AppOrProxyId::Proxy(proxy) => proxy.clone(), - } - } - - pub fn can_be_signed_by(&self, other: &impl AsRef) -> bool { - self.as_ref().ends_with(other.as_ref()) - } - - pub fn hide_broker(&self) -> &str { - match self { - AppOrProxyId::App(app) => app.hide_broker_name(), - AppOrProxyId::Proxy(proxy) => proxy.proxy_name(), - } - } -} - -#[cfg(feature = "strict-ids")] -impl From for AppOrProxyId { - fn from(app: AppId) -> Self { - AppOrProxyId::App(app) - } -} - -#[cfg(feature = "strict-ids")] -impl From for AppOrProxyId { - fn from(proxy: ProxyId) -> Self { - AppOrProxyId::Proxy(proxy) - } +fn check_valid_id_part(id: &str) -> bool { + id.chars().all(|c| c.is_ascii_alphanumeric() || c == '-') && !id.is_empty() } +#[derive(Debug, Clone, Serialize, PartialEq, Eq, Hash)] +pub struct AppId(String); -#[cfg(feature = "strict-ids")] -pub fn set_broker_id(id: String) { - if let Err(value) = BROKER_ID.set(id) { - assert_eq!( - BROKER_ID.get().unwrap(), - &value, - "Tried to initialize broker_id with two different values" - ); - } -} - -#[cfg(feature = "strict-ids")] -pub fn get_broker_id() -> &'static String { - BROKER_ID - .get() - .expect("Global broker ID has not yet been set! This is required for feature strict-ids.") -} - -#[cfg(feature = "strict-ids")] -fn strip_broker_id(id: &str) -> Result<&str, BeamIdError> { - if let Some(rest) = id.strip_suffix(get_broker_id()) { - Ok(rest) - } else { - Err(BeamIdError::WrongBrokerId) - } -} - -#[cfg(feature = "strict-ids")] -#[derive(PartialEq, Debug)] -pub(crate) enum BeamIdType { - AppId, - ProxyId, - BrokerId, -} - -macro_rules! impl_id { - ($id:ident) => { - impl $id { - #[cfg(feature = "strict-ids")] - pub fn new(id: impl Into) -> Result { - { - let id = id.into(); - if get_id_type(&id)? == BeamIdType::$id { - Ok(Self(id)) - } else { - Err(BeamIdError::InvalidIdKind) - } - } - } - - pub fn new_unchecked(id: impl Into) -> Self { - Self(id.into()) - } - - #[cfg(feature = "strict-ids")] - pub fn can_be_signed_by(&self, other: &impl AsRef) -> bool { - self.as_ref().ends_with(other.as_ref()) - } - } - - impl AsRef for $id { - fn as_ref(&self) -> &str { - &self.0 - } - } - - impl Display for $id { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str(&self.0) - } +impl AppId { + pub fn new(id: impl Into) -> Result { + let id: String = id.into(); + let mut iter = id.split('.'); + let Some(app_name) = iter.next() else { + return Err(BeamIdError::InvalidNumberOfIdFragments); + }; + if !check_valid_id_part(app_name) { + return Err(BeamIdError::InvalidIdFragment); } - }; -} - -impl_id!(AppId); -impl_id!(ProxyId); - -#[cfg(feature = "strict-ids")] -fn get_id_type(id: &str) -> Result { - let rest = strip_broker_id(id)?; - let Some(rest) = rest.strip_suffix('.') else { - return Ok(BeamIdType::BrokerId); - }; - let mut split = rest.split('.'); - let ret = match (split.next(), split.next()) { - (Some(proxy), None) => { - check_valid_id_part(proxy)?; - Ok(BeamIdType::ProxyId) + let Some(proxy_name) = iter.next() else { + return Err(BeamIdError::InvalidNumberOfIdFragments); + }; + if !check_valid_id_part(proxy_name) { + return Err(BeamIdError::InvalidIdFragment); } - (Some(app), Some(proxy)) => { - check_valid_id_part(app)?; - check_valid_id_part(proxy)?; - Ok(BeamIdType::AppId) + let Some(broker_id) = iter.next() else { + return Err(BeamIdError::InvalidNumberOfIdFragments); + }; + if broker_id.is_empty() { + return Err(BeamIdError::InvalidIdFragment); } - (None, _) => unreachable!(), - }; - if let Some(_) = split.next() { - Err(BeamIdError::InvalidNumberOfIdFragments) - } else { - ret + Ok(Self(id)) } -} - -#[derive(Debug, Clone, Serialize, PartialEq, Eq, Hash)] -pub struct AppId(String); -impl AppId { pub fn app_name(&self) -> &str { - let idx = self.0.find('.').unwrap_or(self.0.len() - 1); - &self.0[0..idx] + self.as_ref().split_once('.').expect("This is a valid app id").0 } pub fn proxy_id(&self) -> ProxyId { - let proxy_id = self - .0 - .get(self.app_name().len() + 1..) - .expect("AppId should be valid"); - ProxyId(proxy_id.to_string()) + ProxyId(self.proxy_id_str().to_owned()) + } + + pub fn proxy_id_str(&self) -> &str { + self.as_ref().split_once('.').expect("This is a valid app id").1 } /// Returns the AppId as a string slice without the broker part of the string /// ## Example /// app1.proxy1.broker => app1.proxy1 - #[cfg(feature = "strict-ids")] pub fn hide_broker_name(&self) -> &str { - let without_broker = strip_broker_id(&self.0).expect("Is valid id"); - &without_broker[..without_broker.len() - 1] + let mut found_first_dot = false; + self.as_ref().split_once(|c| match (c, found_first_dot) { + ('.', true) => true, + ('.', false) => { + found_first_dot = true; + false + }, + _ => false, + }).expect("This is a valid app id").0 } } - #[derive(Debug, Clone, Serialize, PartialEq, Eq, Hash)] pub struct ProxyId(String); impl ProxyId { + pub fn new(id: impl Into) -> Result { + let id: String = id.into(); + let Some((proxy_name, broker_id)) = id.split_once('.') else { + return Err(BeamIdError::InvalidNumberOfIdFragments); + }; + if !check_valid_id_part(proxy_name) { + return Err(BeamIdError::InvalidIdFragment); + } + if broker_id.is_empty() { + return Err(BeamIdError::InvalidIdFragment); + } + Ok(Self(id)) + } /// Returns the proxies name without the broker id /// ## Example /// proxy1.broker => proxy1 - #[cfg(feature = "strict-ids")] pub fn proxy_name(&self) -> &str { self.0 .split_once('.') .map(|(proxy, _broker)| proxy) .expect("This is a valid proxy id") } + + pub fn hide_broker_name(&self) -> &str { + self.0.split_once('.').expect("This is a valid proxy id").0 + } } -#[cfg(feature = "strict-ids")] #[derive(Debug, Clone, Copy, PartialEq)] pub enum BeamIdError { InvalidNumberOfIdFragments, - InvalidIdKind, InvalidIdFragment, - #[cfg(feature = "strict-ids")] - WrongBrokerId, } -#[cfg(feature = "strict-ids")] impl std::error::Error for BeamIdError {} -#[cfg(feature = "strict-ids")] impl Display for BeamIdError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let text = match self { BeamIdError::InvalidIdFragment => "Id fragment may only contain alphanumeric values.", BeamIdError::InvalidNumberOfIdFragments => "Id had an unexpected amount of fragments.", - BeamIdError::InvalidIdKind => "Id parsed as a different kind of id then specified.", - #[cfg(feature = "strict-ids")] - BeamIdError::WrongBrokerId => { - "The broker id part of the id did not match the global broker id." - } }; f.write_str(text) } } -#[cfg(feature = "strict-ids")] -fn check_valid_id_part(id: &str) -> Result<(), BeamIdError> { - for char in id.chars() { - if !(char.is_alphanumeric() || char == '-') { - return Err(BeamIdError::InvalidIdFragment); +macro_rules! impl_id { + ($id:ident) => { + impl $id { + pub fn new_unchecked(id: impl Into) -> Self { + Self(id.into()) + } + } + + impl AsRef for $id { + fn as_ref(&self) -> &str { + &self.0 + } + } + + impl Display for $id { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&self.0) + } } - } - Ok(()) -} -macro_rules! impl_deserialize { - ($idType:ident) => { - impl<'de> Deserialize<'de> for $idType { + impl<'de> Deserialize<'de> for $id { fn deserialize>(deserializer: D) -> Result { - #[cfg(feature = "strict-ids")] - return Self::new(&String::deserialize(deserializer)?) - .map_err(serde::de::Error::custom); - #[cfg(not(feature = "strict-ids"))] - return Ok(Self::new_unchecked(String::deserialize(deserializer)?)) + String::deserialize(deserializer).map(Self::new)?.map_err(serde::de::Error::custom) } } }; } -impl_deserialize!(AppId); -impl_deserialize!(ProxyId); -#[cfg(feature = "strict-ids")] -impl_deserialize!(AppOrProxyId); +impl_id!(AppId); +impl_id!(ProxyId); -#[cfg(all(test, feature = "strict-ids"))] +#[cfg(test)] mod tests { use super::*; #[test] - fn test_str_has_type() { - set_broker_id("broker.samply.de".to_string()); - assert_eq!( - get_id_type("broker.samply.de").unwrap(), - BeamIdType::BrokerId - ); - assert_eq!( - get_id_type("proxy23.broker.samply.de").unwrap(), - BeamIdType::ProxyId - ); - assert_eq!( - get_id_type("app12.proxy23.broker.samply.de").unwrap(), - BeamIdType::AppId - ); - assert!(get_id_type("roker.samply.de").is_err()); - assert!(get_id_type("moreString.app12.proxy23.broker.samply.de").is_err()); - } - - #[test] - fn test_appid_proxyid() { - let app_id_str = "app.proxy1.broker.samply.de"; - - set_broker_id("broker.samply.de".to_string()); - let app_id = AppId::new(app_id_str).unwrap(); - assert_eq!( - app_id.proxy_id(), - ProxyId::new("proxy1.broker.samply.de").unwrap() - ); + fn test_app_id() { + let app_id = AppId::new("app1.proxy1.broker").unwrap(); + assert_eq!(app_id.app_name(), "app1"); + assert_eq!(app_id.proxy_id().proxy_name(), "proxy1"); + assert_eq!(app_id.hide_broker_name(), "app1.proxy1"); } #[test] - fn test_app_or_proxy_id() { - let app_id_str = "app.proxy1.broker.samply.de"; - - set_broker_id("broker.samply.de".to_string()); - let app_id = AppId::new(app_id_str).unwrap(); - let app_id_app: AppOrProxyId = app_id.clone().into(); - assert_eq!(app_id_app, app_id); - - let proxy_id = app_id.proxy_id(); - let app_id_proxy: AppOrProxyId = proxy_id.clone().into(); - assert_eq!(proxy_id, app_id_proxy.proxy_id()); + fn test_invalid_app_id() { + assert_eq!(AppId::new("app1..broker"), Err(BeamIdError::InvalidIdFragment)); + assert!(AppId::new("app1.proxy1.").is_err()); + assert!(AppId::new("app_2.proxy1").is_err()); } -} +} \ No newline at end of file diff --git a/beam-lib/src/messages.rs b/beam-lib/src/messages.rs index e4bbe9fd..887aff1f 100644 --- a/beam-lib/src/messages.rs +++ b/beam-lib/src/messages.rs @@ -1,7 +1,8 @@ use serde::{Serialize, Deserialize, de::DeserializeOwned}; use serde_json::Value; use uuid::Uuid; -use crate::AddressingId; + +use crate::AppId; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Deserialize, Serialize)] pub struct MsgId(Uuid); @@ -21,8 +22,8 @@ impl std::fmt::Display for MsgId { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TaskRequest { pub id: MsgId, - pub from: AddressingId, - pub to: Vec, + pub from: AppId, + pub to: Vec, #[serde( with = "serde_string", bound(serialize = "T: Serialize + 'static", deserialize = "T: DeserializeOwned + 'static") @@ -35,8 +36,8 @@ pub struct TaskRequest { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TaskResult { - pub from: AddressingId, - pub to: Vec, + pub from: AppId, + pub to: Vec, pub task: MsgId, pub status: WorkStatus, #[serde( @@ -50,8 +51,8 @@ pub struct TaskResult { #[cfg(feature = "sockets")] #[derive(Debug, Serialize, Deserialize)] pub struct SocketTask { - pub from: AddressingId, - pub to: Vec, + pub from: AppId, + pub to: Vec, pub ttl: String, pub id: MsgId, #[serde(default)] @@ -79,7 +80,7 @@ pub enum WorkStatus { #[derive(Debug, Serialize, Deserialize, Clone)] pub struct MsgEmpty { - pub from: AddressingId, + pub from: AppId, } mod serde_string { @@ -132,9 +133,6 @@ mod tests { use super::*; fn test_serialize_and_deserialize + PartialEq + Serialize + DeserializeOwned + std::fmt::Debug + 'static>() { - use crate::AppId; - #[cfg(feature = "strict-ids")] - crate::set_broker_id("broker.samply.de".to_string()); let from = AppId::new_unchecked("test.broker.samply.de").into(); let task = TaskRequest { id: MsgId::new(), diff --git a/broker/src/config.rs b/broker/src/config.rs index 35fed112..3180fafc 100644 --- a/broker/src/config.rs +++ b/broker/src/config.rs @@ -68,7 +68,6 @@ pub struct Config { impl Config { pub fn load() -> Result { let cli_args = CliArgs::parse(); - beam_lib::set_broker_id(cli_args.broker_url.host().unwrap().to_string()); let pki_token = read_to_string(&cli_args.pki_apikey_file) .map_err(|e| { SamplyBeamError::ConfigurationFailed(format!( diff --git a/broker/src/serve_health.rs b/broker/src/serve_health.rs index 4b37a49e..38e8699c 100644 --- a/broker/src/serve_health.rs +++ b/broker/src/serve_health.rs @@ -133,7 +133,7 @@ async fn get_control_tasks( State(state): State>>, proxy_auth: Authorized, ) -> Result, StatusCode> { - let proxy_id = proxy_auth.get_from().proxy_id(); + let proxy_id = proxy_auth.msg.from; // Once this is freed the connection will be removed from the map of connected proxies again // This ensures that when the connection is dropped and therefore this response future the status of this proxy will be updated let status_mutex = state diff --git a/broker/src/serve_sockets.rs b/broker/src/serve_sockets.rs index 9c2442ce..f3d3ae9d 100644 --- a/broker/src/serve_sockets.rs +++ b/broker/src/serve_sockets.rs @@ -4,7 +4,7 @@ use axum::{extract::{Path, Request, State}, http::{header, request::Parts, Heade use bytes::BufMut; use hyper_util::rt::TokioIo; use serde::{Serialize, Serializer, ser::SerializeSeq}; -use shared::{crypto_jwt::Authorized, expire_map::LazyExpireMap, serde_helpers::DerefSerializer, Encrypted, HasWaitId, HowLongToBlock, Msg, MsgEmpty, MsgId, MsgSigned, MsgSocketRequest}; +use shared::{expire_map::LazyExpireMap, serde_helpers::DerefSerializer, Encrypted, HasWaitId, HowLongToBlock, Msg, MsgEmpty, MsgId, MsgSigned, MsgSocketRequest}; use tokio::sync::{RwLock, broadcast::{Sender, self}, oneshot}; use tracing::{debug, log::error, warn}; @@ -85,7 +85,7 @@ async fn connect_socket( body: String, // This Result is just an Either type. An error value does not mean something went wrong ) -> Result { - let result = shared::crypto_jwt::verify_with_extended_header::(&mut parts, &body).await; + let result = shared::crypto_jwt::verify_with_extended_header::(&mut parts, body).await; let msg = match result { Ok(msg) => msg.msg, Err(e) => return Ok(e.into_response()), diff --git a/broker/src/serve_tasks.rs b/broker/src/serve_tasks.rs index 9c6a27d8..b7940944 100644 --- a/broker/src/serve_tasks.rs +++ b/broker/src/serve_tasks.rs @@ -11,10 +11,9 @@ use axum::{ routing::{get, post, put}, Json, Router, }; -use beam_lib::AppOrProxyId; use futures_core::{stream, Stream}; use serde::Deserialize; -use beam_lib::WorkStatus; +use beam_lib::{AppId, WorkStatus}; use shared::{ errors::SamplyBeamError, sse_event::SseEventType, EncryptedMsgTaskRequest, EncryptedMsgTaskResult, HasWaitId, HowLongToBlock, Msg, MsgEmpty, @@ -142,10 +141,12 @@ async fn get_results_for_task_stream( } +/// XXX: Breaking change find a way for the old behavior to still work. +/// Keeping the old behavior requires differentiating between app id and proxy id which is only possible with with knowlege of all connected brokers which is gonna be part of the next commit. #[derive(Deserialize)] struct TaskFilter { - from: Option, - to: Option, + from: Option, + to: Option, filter: Option, } @@ -214,8 +215,8 @@ async fn get_tasks( trait MsgFilterTrait { // fn new() -> Self; - fn from(&self) -> Option<&AppOrProxyId>; - fn to(&self) -> Option<&AppOrProxyId>; + fn from(&self) -> Option<&AppId>; + fn to(&self) -> Option<&AppId>; fn mode(&self) -> &MsgFilterMode; fn matches(&self, msg: &M) -> bool { @@ -268,14 +269,14 @@ enum MsgFilterMode { And, } struct MsgFilterNoTask { - from: Option, - to: Option, + from: Option, + to: Option, mode: MsgFilterMode, } struct MsgFilterForTask<'a> { normal: MsgFilterNoTask, - unanswered_by: Option<&'a AppOrProxyId>, + unanswered_by: Option<&'a AppId>, workstatus_is_not: Vec>, } @@ -302,11 +303,11 @@ impl<'a> MsgFilterForTask<'a> { } impl<'a> MsgFilterTrait for MsgFilterForTask<'a> { - fn from(&self) -> Option<&AppOrProxyId> { + fn from(&self) -> Option<&AppId> { self.normal.from.as_ref() } - fn to(&self) -> Option<&AppOrProxyId> { + fn to(&self) -> Option<&AppId> { self.normal.to.as_ref() } @@ -320,11 +321,11 @@ impl<'a> MsgFilterTrait for MsgFilterForTask<'a> { } impl<'a, M: Msg> MsgFilterTrait for MsgFilterNoTask { - fn from(&self) -> Option<&AppOrProxyId> { + fn from(&self) -> Option<&AppId> { self.from.as_ref() } - fn to(&self) -> Option<&AppOrProxyId> { + fn to(&self) -> Option<&AppId> { self.to.as_ref() } @@ -357,7 +358,7 @@ async fn post_task( // PUT /v1/tasks/{task_id}/results/{app_id} async fn put_result( ConnectInfo(addr): ConnectInfo, - Path((task_id, app_id)): Path<(MsgId, AppOrProxyId)>, + Path((task_id, app_id)): Path<(MsgId, AppId)>, State(state): State, result: MsgSigned, ) -> Result { diff --git a/broker/src/task_manager.rs b/broker/src/task_manager.rs index fded3089..4c57d796 100644 --- a/broker/src/task_manager.rs +++ b/broker/src/task_manager.rs @@ -8,7 +8,7 @@ use futures_core::Stream; use once_cell::sync::Lazy; use serde::Serialize; use serde_json::json; -use beam_lib::{AppOrProxyId, MsgEmpty, MsgId, WorkStatus}; +use beam_lib::{AppId, MsgEmpty, MsgId, WorkStatus}; use shared::{ HasWaitId, HowLongToBlock, Msg, MsgSigned, MsgState, MsgTaskRequest, MsgTaskResult, sse_event::SseEventType, @@ -19,7 +19,7 @@ use tracing::{warn, error}; pub trait Task { type Result; - fn get_results(&self) -> &HashMap; + fn get_results(&self) -> &HashMap; /// Returns true if the value as been updated and false if it was a result from a new app fn insert_result(&mut self, result: Self::Result) -> bool; fn is_expired(&self) -> bool; @@ -47,7 +47,7 @@ impl Task for MsgTaskRequest { } } - fn get_results(&self) -> &HashMap { + fn get_results(&self) -> &HashMap { &self.results } @@ -56,7 +56,7 @@ impl Task for MsgTaskRequest { } } -static EMPTY_MAP: Lazy> = Lazy::new(|| { +static EMPTY_MAP: Lazy> = Lazy::new(|| { HashMap::with_capacity(0) }); @@ -64,7 +64,7 @@ static EMPTY_MAP: Lazy> = Lazy::new(|| { impl Task for shared::MsgSocketRequest { type Result = (); - fn get_results(&self) -> &HashMap { + fn get_results(&self) -> &HashMap { &EMPTY_MAP } @@ -91,7 +91,7 @@ pub struct TaskManager + Task + Msg> { tasks: DashMap>, new_tasks: broadcast::Sender, /// Send the index at which the new result for the given Task was inserted - new_results: DashMap>, + new_results: DashMap>, } impl + Task + Msg + Send + Sync + 'static> TaskManager { diff --git a/proxy/src/auth.rs b/proxy/src/auth.rs index 7d7b0e77..d3702389 100644 --- a/proxy/src/auth.rs +++ b/proxy/src/auth.rs @@ -4,7 +4,7 @@ use axum::{ extract::{FromRef, FromRequest, FromRequestParts}, http::{header::{self, HeaderName}, request::Parts, Request, StatusCode}, }; -use beam_lib::{AppId, AppOrProxyId}; +use beam_lib::{AppId}; use tracing::{debug, Span, debug_span, warn}; diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 20ccb3be..8b674ee6 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -103,7 +103,6 @@ fn parse_apikeys(proxy_id: &ProxyId) -> Result, SamplyBea impl Config { pub fn load() -> Result { let cli_args = CliArgs::parse(); - beam_lib::set_broker_id(cli_args.broker_url.host().unwrap().to_string()); let proxy_id = ProxyId::new(&cli_args.proxy_id).map_err(|e| { SamplyBeamError::ConfigurationFailed(format!( "Invalid Beam ID \"{}\" supplied: {}", @@ -215,7 +214,6 @@ mod tests { std::env::set_var(format!("APP_{app}_KEY"), key); } const BROKER_ID: &str = "broker.samply.de"; - beam_lib::set_broker_id(BROKER_ID.to_string()); let parsed = parse_apikeys(&ProxyId::new(&format!("proxy.{BROKER_ID}")).unwrap()).unwrap(); assert_eq!(parsed.len(), apps.len() * 2); } diff --git a/proxy/src/crypto.rs b/proxy/src/crypto.rs index 3e8c7849..1cae5a31 100644 --- a/proxy/src/crypto.rs +++ b/proxy/src/crypto.rs @@ -1,10 +1,10 @@ use std::{fs, path::PathBuf}; use axum::{body::Bytes, http::{header, request, Method, Request, StatusCode, Uri}, response::Response, Json}; -use beam_lib::{AppOrProxyId, ProxyId}; +use beam_lib::AppId; use rsa::{pkcs1::{DecodeRsaPrivateKey, DecodeRsaPublicKey}, pkcs8::DecodePrivateKey, RsaPrivateKey, RsaPublicKey}; use shared::{ - async_trait, crypto::{self, asn_str_to_vault_str, get_all_certs_and_clients_by_cname_as_pemstr, get_best_own_certificate, x509_cert_to_x509_public_key, CryptoPublicPortion, GetCerts, ProxyCertInfo}, errors::{CertificateInvalidReason, SamplyBeamError}, http_client::SamplyHttpClient, jwt_simple::prelude::RS256KeyPair, openssl::x509::X509, reqwest, EncryptedMessage, MsgEmpty + async_trait, crypto::{self, asn_str_to_vault_str, get_all_certs_and_clients_by_cname_as_pemstr, get_best_own_certificate, x509_cert_to_x509_public_key, CryptoPublicPortion, GetCerts, ProxyCertInfo}, errors::{CertificateInvalidReason, SamplyBeamError}, http_client::SamplyHttpClient, jwt_simple::prelude::RS256KeyPair, openssl::x509::X509, reqwest, EncryptedMessage, MsgEmpty, MsgProto }; use tracing::{debug, info, warn, error}; @@ -24,9 +24,9 @@ impl GetCertsFromBroker { .build() .expect("To build request successfully"); - let body = EncryptedMessage::MsgEmpty(MsgEmpty { - from: AppOrProxyId::Proxy(self.config.proxy_id.clone()), - }); + let body = MsgProto { + from: self.config.proxy_id.clone(), + }; let (parts, body) = Request::builder() .method(Method::GET) .uri(&uri) diff --git a/proxy/src/main.rs b/proxy/src/main.rs index a1ff55b0..371fb38a 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -4,10 +4,10 @@ use std::future::Future; use std::time::Duration; use axum::http::{header, HeaderValue, StatusCode}; -use beam_lib::AppOrProxyId; +use beam_lib::AppId; use futures::future::Ready; use futures::{StreamExt, TryStreamExt}; -use shared::{reqwest, EncryptedMessage, MsgEmpty, PlainMessage}; +use shared::{reqwest, EncryptedMessage, MsgEmpty, MsgProto, PlainMessage}; use shared::crypto::{CryptoPublicPortion, ProxyCertInfo}; use shared::errors::SamplyBeamError; use shared::http_client::{self, SamplyHttpClient}; @@ -146,9 +146,9 @@ fn spawn_controller_polling(client: SamplyHttpClient, config: &'static Config) { retries_this_min = 0; reset_interval = Instant::now() + RETRY_INTERVAL; } - let body = EncryptedMessage::MsgEmpty(MsgEmpty { - from: AppOrProxyId::Proxy(config.proxy_id.clone()), - }); + let body = MsgProto { + from: config.proxy_id.clone(), + }; let (parts, body) = axum::http::Request::get(format!("{}v1/control", config.broker_uri)) .header(header::USER_AGENT, env!("SAMPLY_USER_AGENT")) .body(body) diff --git a/proxy/src/serve_sockets.rs b/proxy/src/serve_sockets.rs index 060e38bd..67a1ded5 100644 --- a/proxy/src/serve_sockets.rs +++ b/proxy/src/serve_sockets.rs @@ -5,6 +5,7 @@ use std::{ use axum::{ extract::{Path, Request, State}, http::{self, header, HeaderValue, StatusCode}, response::{IntoResponse, Response}, routing::{get, post}, Extension, Json, RequestPartsExt, Router }; +use beam_lib::AppId; use bytes::{Buf, BufMut, BytesMut}; use chacha20poly1305::{ aead::{ @@ -22,7 +23,6 @@ use hyper_util::rt::TokioIo; use rsa::rand_core::RngCore; use serde::{Deserialize, Serialize}; use serde_json::Value; -use beam_lib::AppOrProxyId; use shared::{ ct_codecs::{self, Base64UrlSafeNoPadding, Decoder as B64Decoder, Encoder as B64Encoder}, expire_map::LazyExpireMap, http_client::SamplyHttpClient, reqwest, MessageType, MsgEmpty, MsgId, MsgSocketRequest, Plain }; @@ -98,7 +98,7 @@ async fn get_tasks( async fn create_socket_con( AuthenticatedApp(sender): AuthenticatedApp, - Path(to): Path, + Path(to): Path, Extension(task_secret_map): Extension, state: State, mut req: Request, @@ -116,7 +116,7 @@ async fn create_socket_con( .and_then(|v| serde_json::from_slice(v.as_bytes()).ok()) .unwrap_or_default(); let socket_req = MsgSocketRequest { - from: AppOrProxyId::App(sender.clone()), + from: sender.clone(), to: vec![to], expire: SystemTime::now() + TTL, id: task_id, @@ -172,7 +172,7 @@ async fn connect_socket( }; let msg_empty = MsgEmpty { - from: AppOrProxyId::App(sender.clone()), + from: sender.clone(), }; let Ok(body) = serde_json::to_vec(&msg_empty) else { warn!("Failed to serialize MsgEmpty"); diff --git a/proxy/src/serve_tasks.rs b/proxy/src/serve_tasks.rs index d14a4158..1ead35e7 100644 --- a/proxy/src/serve_tasks.rs +++ b/proxy/src/serve_tasks.rs @@ -15,9 +15,9 @@ use httpdate::fmt_http_date; use rsa::{pkcs8::DecodePublicKey, RsaPublicKey}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde_json::Value; -use beam_lib::{AppId, AppOrProxyId, ProxyId}; +use beam_lib::{AppId, ProxyId}; use shared::{ - crypto::{self, CryptoPublicPortion}, crypto_jwt, errors::SamplyBeamError, http_client::SamplyHttpClient, reqwest, sse_event::SseEventType, DecryptableMsg, EncryptableMsg, EncryptedMessage, EncryptedMsgTaskRequest, EncryptedMsgTaskResult, MessageType, Msg, MsgEmpty, MsgId, MsgSigned, MsgTaskRequest, MsgTaskResult, PlainMessage + crypto::{self, CryptoPublicPortion}, crypto_jwt, errors::SamplyBeamError, http_client::SamplyHttpClient, reqwest, sse_event::SseEventType, DecryptableMsg, EncryptableMsg, EncryptedMessage, EncryptedMsgTaskRequest, EncryptedMsgTaskResult, IdHelper, MessageType, Msg, MsgEmpty, MsgId, MsgSigned, MsgTaskRequest, MsgTaskResult, PlainMessage }; use tokio::io::BufReader; use tracing::{debug, error, info, trace, warn}; @@ -318,12 +318,12 @@ pub(crate) fn to_server_error(res: Result) -> Result( + body: M, mut parts: Parts, config: &Config, ) -> Result { - let from = body.get_from(); + let from = body.app_or_proxy_id(); let token_without_extended_signature = crypto_jwt::sign_to_jwt(&body, &config.crypto.privkey_rs256) .await @@ -347,7 +347,7 @@ pub async fn sign_request( .expect("Internal error: Unable to format system time"), ); let digest = - crypto_jwt::make_extra_fields_digest(&parts.method, &parts.uri, &headers_mut, sig, &from) + crypto_jwt::make_extra_fields_digest(&parts.method, &parts.uri, &headers_mut, sig, from.to_owned()) .map_err(|_| ERR_INTERNALCRYPTO)?; let token_with_extended_signature = crypto_jwt::sign_to_jwt(&digest, &config.crypto.privkey_rs256) .await @@ -408,7 +408,7 @@ pub(crate) async fn validate_and_decrypt(json: Value, config: &Config) -> Result fn decrypt_msg(msg: M, config: &Config) -> Result { msg.decrypt( - &AppOrProxyId::Proxy(config.proxy_id.to_owned()), + &config.proxy_id, &config.crypto.privkey_rsa, ) } diff --git a/shared/src/crypto.rs b/shared/src/crypto.rs index 1140a0d2..60ef9b91 100644 --- a/shared/src/crypto.rs +++ b/shared/src/crypto.rs @@ -26,7 +26,7 @@ use std::{ use tokio::{sync::{mpsc, oneshot, RwLock}, time::Instant}; use tracing::{debug, error, info, warn}; -use beam_lib::{AppOrProxyId, ProxyId}; +use beam_lib::{AppId, ProxyId}; use crate::{ crypto, errors::{CertificateInvalidReason, SamplyBeamError}, @@ -908,7 +908,7 @@ pub fn get_best_other_certificate( } pub async fn get_proxy_public_keys( - receivers: impl IntoIterator, + receivers: impl IntoIterator, ) -> Result, SamplyBeamError> { let proxy_receivers: Vec = receivers .into_iter() diff --git a/shared/src/crypto_jwt.rs b/shared/src/crypto_jwt.rs index 25e8b139..13a75b9b 100644 --- a/shared/src/crypto_jwt.rs +++ b/shared/src/crypto_jwt.rs @@ -1,12 +1,10 @@ use std::net::{SocketAddr, IpAddr}; -use beam_lib::{AppOrProxyId, ProxyId}; use crate::{ - crypto::{self, CryptoPublicPortion}, - errors::{CertificateInvalidReason, SamplyBeamError}, - Msg, MsgEmpty, MsgId, MsgSigned, + crypto::{self, CryptoPublicPortion}, errors::{CertificateInvalidReason, SamplyBeamError}, Msg, MsgEmpty, MsgId, MsgProto, MsgSigned, IdHelper }; use axum::{body::HttpBody, extract::{{FromRequest, ConnectInfo, FromRequestParts}, Request}, http::{header, request::Parts, uri::PathAndQuery, HeaderMap, HeaderName, Method, StatusCode, Uri}, BoxError, RequestExt}; +use beam_lib::{AppId, ProxyId}; use jwt_simple::{ claims::JWTClaims, prelude::{ @@ -30,10 +28,7 @@ const ERR_FROM: (StatusCode, &str) = ( impl FromRequest for MsgSigned where - // these trait bounds are copied from `impl FromRequest for axum::Json` - // T: DeserializeOwned, - // B: axum::body::HttpBody + Send, - T: Serialize + DeserializeOwned + Msg, + T: DeserializeOwned + IdHelper + Serialize, { type Rejection = (StatusCode, &'static str); @@ -46,11 +41,11 @@ where ); ERR_SIG })?; - verify_with_extended_header(&mut parts, &token_without_extended_signature).await + verify_with_extended_header(&mut parts, token_without_extended_signature).await } } -pub type Authorized = MsgSigned; +pub type Authorized = MsgSigned; #[tracing::instrument] pub async fn extract_jwt( @@ -93,7 +88,8 @@ pub async fn extract_jwt( warn!("Failed to decode {data:?} to JwtClaims. Err: {e}"); SamplyBeamError::RequestValidationFailed("Invalid JWT body in header".to_string()) })?; - let proxy_id: ProxyId = json.custom.from.proxy_id(); + // In the case where the JWT does not have a serial the HeaderClaim will always contain a proxy id + let proxy_id = ProxyId::new(json.custom.from)?; let mut certs = crypto::get_all_certs_and_clients_by_cname_as_pemstr(&proxy_id) .await .into_iter() @@ -128,9 +124,9 @@ pub const JWT_VERIFICATION_OPTIONS: Lazy = Lazy::new(|| Ver /// The Message is encoded in the JWT Claims of the body which is a JWT. /// There is never really a [`MsgSigned`] involved in Deserializing the message as the signature is just copied from the body JWT. /// The token is verified by a key derived from the kid of the JWT in the Header which should also match the kid of the body JWT. -pub async fn verify_with_extended_header( +pub async fn verify_with_extended_header( req: &mut Parts, - token_without_extended_signature: &str, + token_without_extended_signature: String, ) -> Result, (StatusCode, &'static str)> { let ip = get_ip(req).await; let token_with_extended_signature = req.headers @@ -155,8 +151,6 @@ pub async fn verify_with_extended_header( ERR_SIG })?; - Span::current().record("from", header_claims.custom.from.hide_broker()); - // Check extra digest let custom = header_claims.custom; @@ -166,7 +160,7 @@ pub async fn verify_with_extended_header( // Check if short token matches the long token let msg = pubkey .verify_token::( - token_without_extended_signature, + &token_without_extended_signature, Some(JWT_VERIFICATION_OPTIONS.clone()), ) .map_err(|e| { @@ -182,11 +176,12 @@ pub async fn verify_with_extended_header( warn!("Cannot split signature from body token"); return Err(ERR_SIG); }; - let sender_actual = msg.get_from(); + let sender_actual = msg.app_or_proxy_id(); + Span::current().record("from", msg.hide_broker_name()); // Check if header claims is matching the body token let digest_actual = - make_extra_fields_digest(&req.method, &req.uri, &req.headers, &sig, &sender_actual) + make_extra_fields_digest(&req.method, &req.uri, &req.headers, &sig, sender_actual.to_owned()) .map_err(|e| { warn!("Got error in make_extra_fields_digest: {}", e); ERR_SIG @@ -201,7 +196,7 @@ pub async fn verify_with_extended_header( return Err(ERR_SIG); } - if sender_actual.to_owned() != sender_claimed { + if sender_actual != &sender_claimed { warn!( "Sender did not match: expected {}, received {}", sender_claimed, sender_actual @@ -210,7 +205,7 @@ pub async fn verify_with_extended_header( } // Check if Messages' "from" attribute can be signed by the proxy - if !msg.get_from().can_be_signed_by(&proxy_public_info.beam_id) { + if !sender_actual.ends_with(proxy_public_info.beam_id.as_ref()) { warn!( "Received messages' \"from\" attribute which should not have been signed by the proxy." ); @@ -220,7 +215,7 @@ pub async fn verify_with_extended_header( let msg_signed = MsgSigned { msg, - jwt: token_without_extended_signature.to_string(), + jwt: token_without_extended_signature, }; Ok(msg_signed) } @@ -245,7 +240,7 @@ pub struct HeaderClaim { #[serde(rename = "s")] //safes 2 bytes sig: String, #[serde(rename = "f")] //safes 3 bytes - from: AppOrProxyId, + from: String, } pub fn make_extra_fields_digest( @@ -253,7 +248,7 @@ pub fn make_extra_fields_digest( uri: &Uri, headers: &HeaderMap, sig: &str, - from: &AppOrProxyId, + from: String, ) -> Result { const HEADERS_TO_SIGN: [HeaderName; 1] = [ // header::HOST, // Host header differs from proxy to broker @@ -279,14 +274,14 @@ pub fn make_extra_fields_digest( } } buf.append(&mut sig.as_bytes().to_vec()); - buf.append(&mut from.to_string().as_bytes().to_vec()); + buf.append(&mut from.as_bytes().to_vec()); let digest = crypto::hash(&buf)?; let digest = base64::encode_block(&digest); Ok(HeaderClaim { sig: digest, - from: from.to_owned(), + from, }) } diff --git a/shared/src/lib.rs b/shared/src/lib.rs index 2ce85383..82cef932 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -1,6 +1,6 @@ #![allow(unused_imports)] -use beam_lib::{AppId, AppOrProxyId, ProxyId, FailureStrategy, WorkStatus}; +use beam_lib::{AppId, ProxyId, FailureStrategy, WorkStatus}; use chacha20poly1305::{ aead::{Aead, AeadCore, KeyInit, OsRng}, XChaCha20Poly1305, XNonce, @@ -16,9 +16,7 @@ use sha2::Sha256; use tracing::debug; use std::{ - fmt::{Debug, Display}, - ops::Deref, - time::{Duration, Instant, SystemTime}, net::SocketAddr, error::Error, + borrow::Cow, error::Error, fmt::{Debug, Display}, net::SocketAddr, ops::Deref, time::{Duration, Instant, SystemTime} }; use rand::Rng; @@ -74,13 +72,13 @@ pub struct HowLongToBlock { } #[derive(Clone, Debug, PartialEq, Serialize)] -pub struct MsgSigned { +pub struct MsgSigned { #[serde(skip)] pub msg: M, pub jwt: String, } -impl MsgSigned { +impl MsgSigned { pub async fn verify(token: &str) -> Result { let msg = extract_jwt(token).await?.2.custom; @@ -92,20 +90,20 @@ impl MsgSigned { } } -pub static EMPTY_VEC_APPORPROXYID: Vec = Vec::new(); +pub static EMPTY_VEC_APPORPROXYID: Vec = Vec::new(); #[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct MsgEmpty { - pub from: AppOrProxyId, + pub from: AppId, } impl Msg for MsgEmpty { - fn get_from(&self) -> &AppOrProxyId { + fn get_from(&self) -> &AppId { &self.from } - fn get_to(&self) -> &Vec { + fn get_to(&self) -> &Vec { &EMPTY_VEC_APPORPROXYID } @@ -114,6 +112,45 @@ impl Msg for MsgEmpty { } } +#[derive(Serialize, Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct MsgProto { + pub from: ProxyId, +} + +pub trait IdHelper { + fn proxy_id_ref(&self) -> &str; + fn app_or_proxy_id(&self) -> &str; + fn hide_broker_name(&self) -> &str; +} + +impl IdHelper for M { + fn proxy_id_ref(&self) -> &str { + self.get_from().proxy_id_str() + } + + fn app_or_proxy_id(&self) -> &str { + self.get_from().as_ref() + } + + fn hide_broker_name(&self) -> &str { + self.get_from().hide_broker_name() + } +} + +impl IdHelper for MsgProto { + fn proxy_id_ref(&self) -> &str { + self.from.as_ref() + } + + fn app_or_proxy_id(&self) -> &str { + self.from.as_ref() + } + + fn hide_broker_name(&self) -> &str { + self.from.hide_broker_name() + } +} #[derive(Serialize, Deserialize)] #[serde(untagged)] @@ -181,7 +218,7 @@ impl DecryptableMsg for EncryptedMessage { } impl Msg for MessageType { - fn get_from(&self) -> &AppOrProxyId { + fn get_from(&self) -> &AppId { use MessageType::*; match self { MsgTaskRequest(m) => m.get_from(), @@ -192,7 +229,7 @@ impl Msg for MessageType { } } - fn get_to(&self) -> &Vec { + fn get_to(&self) -> &Vec { use MessageType::*; match self { MsgTaskRequest(m) => m.get_to(), @@ -225,7 +262,7 @@ pub trait DecryptableMsg: Msg + Serialize + Sized { #[allow(clippy::or_fun_call)] fn decrypt( self, - my_id: &AppOrProxyId, + my_id: &ProxyId, my_priv_key: &RsaPrivateKey, ) -> Result { let Some(Encrypted { @@ -240,10 +277,10 @@ pub trait DecryptableMsg: Msg + Serialize + Sized { .get_to() .iter() .position(|entry| { - let entry_str = entry.to_string(); + let entry_str = entry.as_ref(); - let mut matched = entry_str.ends_with(&my_id.to_string()); - matched &= match entry_str.find(&my_id.to_string()) { + let mut matched = entry_str.ends_with(my_id.as_ref()); + matched &= match entry_str.find(my_id.as_ref()) { Some(0) => true, // Begins with id Some(i) => entry_str.chars().nth(i - 1) == Some('.'), // Ends with id, but before is a separator (e.g. appId) None => false, @@ -346,17 +383,17 @@ pub trait EncryptableMsg: Msg + Serialize + Sized { } pub trait Msg: Serialize { - fn get_from(&self) -> &AppOrProxyId; - fn get_to(&self) -> &Vec; + fn get_from(&self) -> &AppId; + fn get_to(&self) -> &Vec; fn get_metadata(&self) -> &Value; } impl Msg for MsgSigned { - fn get_from(&self) -> &AppOrProxyId { + fn get_from(&self) -> &AppId { self.msg.get_from() } - fn get_to(&self) -> &Vec { + fn get_to(&self) -> &Vec { self.msg.get_to() } @@ -366,11 +403,11 @@ impl Msg for MsgSigned { } impl Msg for MsgTaskRequest { - fn get_from(&self) -> &AppOrProxyId { + fn get_from(&self) -> &AppId { &self.from } - fn get_to(&self) -> &Vec { + fn get_to(&self) -> &Vec { &self.to } @@ -380,11 +417,11 @@ impl Msg for MsgTaskRequest { } impl Msg for MsgTaskResult { - fn get_from(&self) -> &AppOrProxyId { + fn get_from(&self) -> &AppId { &self.from } - fn get_to(&self) -> &Vec { + fn get_to(&self) -> &Vec { &self.to } @@ -455,15 +492,15 @@ where State: MsgState, { pub id: MsgId, - pub from: AppOrProxyId, - pub to: Vec, + pub from: AppId, + pub to: Vec, #[serde(flatten)] pub body: State, #[serde(with = "serialize_time", rename = "ttl")] pub expire: SystemTime, pub failure_strategy: FailureStrategy, #[serde(skip)] - pub results: HashMap>>, + pub results: HashMap>>, pub metadata: Value, } @@ -536,8 +573,8 @@ pub struct MsgTaskResult where State: MsgState, { - pub from: AppOrProxyId, - pub to: Vec, + pub from: AppId, + pub to: Vec, pub task: MsgId, pub status: WorkStatus, #[serde(flatten)] @@ -644,8 +681,8 @@ impl MsgTaskRequest { } impl MsgTaskRequest { pub fn new( - from: AppOrProxyId, - to: Vec, + from: AppId, + to: Vec, body: String, failure_strategy: FailureStrategy, metadata: serde_json::Value, @@ -678,54 +715,6 @@ impl PartialEq for MsgTaskRequest { } impl Eq for MsgTaskRequest {} -#[derive(Debug, Serialize, Deserialize)] -pub struct MsgPing { - id: MsgId, - from: AppOrProxyId, - to: Vec, - nonce: [u8; 16], - metadata: Value, -} - -impl MsgPing { - pub fn new(from: AppOrProxyId, to: AppOrProxyId) -> Self { - let mut nonce = [0; 16]; - openssl::rand::rand_bytes(&mut nonce) - .expect("Critical Error: Failed to generate random byte array."); - MsgPing { - id: MsgId::new(), - from, - to: vec![to], - nonce, - metadata: json!(null), - } - } -} - -impl Msg for MsgPing { - fn get_from(&self) -> &AppOrProxyId { - &self.from - } - - fn get_to(&self) -> &Vec { - &self.to - } - - fn get_metadata(&self) -> &Value { - &self.metadata - } -} - -pub fn try_read(map: &HashMap, key: &str) -> Option -where - T: FromStr, -{ - map.get(key).and_then(|value| match value.parse() { - Ok(v) => Some(v), - Err(_) => None, - }) -} - #[cfg(test)] mod tests { @@ -734,9 +723,8 @@ mod tests { #[test] fn encrypt_decrypt_task() { //Create Task - beam_lib::set_broker_id("broker.samply.de".to_string()); - let p1_id = AppOrProxyId::App(AppId::new("app.proxy1.broker.samply.de").unwrap()); - let p2_id = AppOrProxyId::App(AppId::new("app.proxy2.broker.samply.de").unwrap()); + let p1_id = AppId::new("app.proxy1.broker.samply.de").unwrap(); + let p2_id = AppId::new("app.proxy2.broker.samply.de").unwrap(); let from = p1_id.clone(); let to = vec![p1_id.clone(), p2_id.clone()]; let expiry = SystemTime::now() + Duration::from_secs(60); @@ -771,10 +759,10 @@ mod tests { // Decrypt for both proxies let msg_p1_decr = msg_encr .clone() - .decrypt(&p1_id, &p1_private) + .decrypt(&p1_id.proxy_id(), &p1_private) .expect("Cannot decrypt message"); let msg_p2_decr = msg_encr - .decrypt(&p2_id, &p2_private) + .decrypt(&p2_id.proxy_id(), &p2_private) .expect("Cannot decrypt message"); assert_eq!(msg_p1_decr, msg_p2_decr); @@ -783,9 +771,8 @@ mod tests { #[test] fn encrypt_decrypt_result() { - beam_lib::set_broker_id("broker.samply.de".to_string()); - let p1_id = AppOrProxyId::App(AppId::new("app.proxy1.broker.samply.de").unwrap()); - let p2_id = AppOrProxyId::App(AppId::new("app.proxy2.broker.samply.de").unwrap()); + let p1_id = AppId::new("app.proxy1.broker.samply.de").unwrap(); + let p2_id = AppId::new("app.proxy2.broker.samply.de").unwrap(); let from = p1_id.clone(); let to = vec![p1_id.clone(), p2_id.clone()]; let status = WorkStatus::Succeeded; @@ -817,11 +804,11 @@ mod tests { // Decrypt for both proxies let msg_p1_decr = msg_encr .clone() - .decrypt(&p1_id, &p1_private) + .decrypt(&p1_id.proxy_id(), &p1_private) .expect("Cannot decrypt message"); let msg_p2_decr = msg_encr .clone() - .decrypt(&p2_id, &p2_private) + .decrypt(&p2_id.proxy_id(), &p2_private) .expect("Cannot decrypt message"); assert_eq!(msg_p1_decr, msg_p2_decr); diff --git a/shared/src/serializing_compatibility_test.rs b/shared/src/serializing_compatibility_test.rs index b0a6ff92..ce277b5e 100644 --- a/shared/src/serializing_compatibility_test.rs +++ b/shared/src/serializing_compatibility_test.rs @@ -3,7 +3,7 @@ use std::{ time::{Duration, SystemTime}, }; -use beam_lib::{set_broker_id, AppOrProxyId, MsgId}; +use beam_lib::{AppId, MsgId, ProxyId}; use serde::{de::DeserializeOwned, Serialize}; use serde_json::json; @@ -23,26 +23,24 @@ where #[test] fn test_msg_empty() { - set_broker_id("broker.samply.de".to_string()); let internal = crate::MsgEmpty { - from: AppOrProxyId::new("app1.proxy1.broker.samply.de").unwrap(), + from: AppId::new("app1.proxy1.broker.samply.de").unwrap(), }; let lib = beam_lib::MsgEmpty { - from: AppOrProxyId::new("app1.proxy1.broker.samply.de").unwrap(), + from: AppId::new("app1.proxy1.broker.samply.de").unwrap(), }; assert_json_eq(internal, lib); } #[test] fn test_msg_task() { - set_broker_id("broker.samply.de".to_string()); let json_data = json!({ "foo": 1, "bar": true, }); let id = MsgId::new(); let internal = crate::MsgTaskRequest { - from: AppOrProxyId::new("app1.proxy1.broker.samply.de").unwrap(), + from: AppId::new("app1.proxy1.broker.samply.de").unwrap(), to: vec![], id, body: Plain::from(serde_json::to_string(&json_data).unwrap()), @@ -55,7 +53,7 @@ fn test_msg_task() { metadata: json_data.clone(), }; let lib = beam_lib::TaskRequest { - from: AppOrProxyId::new("app1.proxy1.broker.samply.de").unwrap(), + from: AppId::new("app1.proxy1.broker.samply.de").unwrap(), id, to: vec![], body: json_data.clone(), @@ -71,13 +69,12 @@ fn test_msg_task() { #[test] fn test_task_result() { - set_broker_id("broker.samply.de".to_string()); let json_data = json!({ "foo": 1, "bar": true, }); let task = MsgId::new(); - let from = AppOrProxyId::new("app1.proxy1.broker.samply.de").unwrap(); + let from = AppId::new("app1.proxy1.broker.samply.de").unwrap(); let internal = crate::MsgTaskResult { from: from.clone(), to: vec![], @@ -100,9 +97,8 @@ fn test_task_result() { #[cfg(feature = "sockets")] #[test] fn test_socket_task() { - set_broker_id("broker.samply.de".to_string()); let id = MsgId::new(); - let from = AppOrProxyId::new("app1.proxy1.broker.samply.de").unwrap(); + let from = AppId::new("app1.proxy1.broker.samply.de").unwrap(); let internal = crate::MsgSocketRequest { from: from.clone(), to: vec![], diff --git a/shared/src/sockets.rs b/shared/src/sockets.rs index d746a855..a5403309 100644 --- a/shared/src/sockets.rs +++ b/shared/src/sockets.rs @@ -1,18 +1,17 @@ use std::time::SystemTime; +use beam_lib::AppId; use serde::{Deserialize, Serialize}; use serde_json::Value; -use crate::{MsgState, serialize_time, MsgId, Msg, DecryptableMsg, Plain, Encrypted, EncryptableMsg, HasWaitId}; -use beam_lib::AppOrProxyId; - +use crate::{serialize_time, DecryptableMsg, EncryptableMsg, Encrypted, HasWaitId, Msg, MsgId, MsgState, Plain}; #[derive(Debug, Deserialize, Serialize, Clone)] pub struct MsgSocketRequest where State: MsgState { - pub from: AppOrProxyId, + pub from: AppId, // TODO: Tell serde to serialize only one - pub to: Vec, + pub to: Vec, #[serde(with="serialize_time", rename="ttl")] pub expire: SystemTime, pub id: MsgId, @@ -23,11 +22,11 @@ where State: MsgState { } impl Msg for MsgSocketRequest { - fn get_from(&self) -> &AppOrProxyId { + fn get_from(&self) -> &AppId { &self.from } - fn get_to(&self) -> &Vec { + fn get_to(&self) -> &Vec { &self.to } diff --git a/tests/src/lib.rs b/tests/src/lib.rs index 68c14973..91e04c37 100644 --- a/tests/src/lib.rs +++ b/tests/src/lib.rs @@ -1,6 +1,6 @@ use once_cell::sync::Lazy; -use beam_lib::{AddressingId, set_broker_id, AppOrProxyId, BeamClient}; +use beam_lib::{AppId, BeamClient}; #[cfg(all(feature = "sockets", test))] mod socket_test; @@ -11,13 +11,11 @@ mod task_test; #[cfg(test)] mod test_sse; -pub static APP1: Lazy = Lazy::new(|| { - set_broker_id("broker".into()); - AppOrProxyId::new(option_env!("APP1_P1").unwrap_or("app1.proxy1.broker")).unwrap() +pub static APP1: Lazy = Lazy::new(|| { + AppId::new(option_env!("APP1_P1").unwrap_or("app1.proxy1.broker")).unwrap() }); -pub static APP2: Lazy = Lazy::new(|| { - set_broker_id("broker".into()); - AppOrProxyId::new(option_env!("APP2_P2").unwrap_or("app2.proxy2.broker")).unwrap() +pub static APP2: Lazy = Lazy::new(|| { + AppId::new(option_env!("APP2_P2").unwrap_or("app2.proxy2.broker")).unwrap() }); // unwrap_or is not const yet