From ad8cd4456e7503e4acb7fb54a74646be8d8e507f Mon Sep 17 00:00:00 2001 From: janskiba Date: Tue, 13 Jun 2023 08:22:57 +0000 Subject: [PATCH 01/21] Parsing for allow and block list --- dev/docker-compose.yml | 2 ++ shared/src/config_proxy.rs | 32 +++++++++++++++++++++++++++++++- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml index 249094f3..01b32fe2 100644 --- a/dev/docker-compose.yml +++ b/dev/docker-compose.yml @@ -58,6 +58,8 @@ services: BIND_ADDR: 0.0.0.0:8081 RUST_LOG: ${RUST_LOG} ALL_PROXY: http://mitmproxy:8080 + ALLOW_LIST: '["app1.proxy1.broker", "proxy1.broker"]' + BLOCK_LIST: '["app1.proxy1.broker", "proxy1.broker"]' secrets: - proxy1.pem - root.crt.pem diff --git a/shared/src/config_proxy.rs b/shared/src/config_proxy.rs index bd164bc9..58252167 100644 --- a/shared/src/config_proxy.rs +++ b/shared/src/config_proxy.rs @@ -16,7 +16,7 @@ use serde::Deserialize; use tracing::{debug, info}; use crate::{ - beam_id::{self, AppId, BeamId, BrokerId, ProxyId}, + beam_id::{self, AppId, BeamId, BrokerId, ProxyId, AppOrProxyId}, errors::SamplyBeamError, }; @@ -28,6 +28,8 @@ pub struct Config { pub proxy_id: ProxyId, pub api_keys: HashMap, pub tls_ca_certificates: Vec, + pub allow_list: Vec, + pub block_list: Vec, } pub type ApiKey = String; @@ -63,6 +65,14 @@ pub struct CliArgs { /// samply.pki: Path to CA Root certificate #[clap(long, env, value_parser, default_value = "/run/secrets/root.crt.pem")] rootcert_file: PathBuf, + + /// A whitelist of apps or proxies that may connect, e.g. ["app1.proxy1.broker", "proxy2.broker", ...] + #[clap(long, env, value_parser)] + pub allow_list: Option, + + /// A blacklist of apps or proxies that may not connect, e.g. ["app1.proxy1.broker", "proxy2.broker", ...] + #[clap(long, env, value_parser)] + pub block_list: Option, /// (included for technical reasons) #[clap(long, hide(true))] @@ -96,6 +106,7 @@ fn parse_apikeys(proxy_id: &ProxyId) -> Result, SamplyBea Ok(api_keys) } + impl crate::config::Config for Config { fn load() -> Result { let cli_args = CliArgs::parse(); @@ -119,6 +130,7 @@ impl crate::config::Config for Config { e )) })?; + let config = Config { broker_host_header: uri_to_host_header(&cli_args.broker_url)?, broker_uri: cli_args.broker_url, @@ -126,12 +138,30 @@ impl crate::config::Config for Config { proxy_id, api_keys, tls_ca_certificates, + allow_list: parse_to_list_of_ids(cli_args.allow_list)?, + block_list: parse_to_list_of_ids(cli_args.block_list)? }; info!("Successfully read config and API keys from CLI and secrets file."); + if !config.allow_list.is_empty() { + info!("Allow list set: {:?}", config.allow_list); + } + if !config.block_list.is_empty() { + info!("Block list set: {:?}", config.block_list); + } Ok(config) } } +fn parse_to_list_of_ids(input: Option) -> Result, SamplyBeamError> { + if let Some(app_list_str) = input { + serde_json::from_str(&app_list_str).map_err(|e| SamplyBeamError::ConfigurationFailed( + format!("Failed to parse: {app_list_str} to a list of beam ids: {e}.\n The requiered format is a json array of strings that match the beam id spec see the system architecture section in the readme.") + )) + } else { + Ok(Vec::with_capacity(0)) + } +} + fn uri_to_host_header(uri: &Uri) -> Result { let hostname: String = uri .host() From 8b007b73b18fb830db5759ba7227c4308ee0b657 Mon Sep 17 00:00:00 2001 From: janskiba Date: Tue, 13 Jun 2023 08:37:20 +0000 Subject: [PATCH 02/21] Add tests --- shared/src/config_proxy.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/shared/src/config_proxy.rs b/shared/src/config_proxy.rs index 58252167..cbd1e67d 100644 --- a/shared/src/config_proxy.rs +++ b/shared/src/config_proxy.rs @@ -198,4 +198,15 @@ mod tests { let parsed = parse_apikeys(&ProxyId::new(&format!("proxy.{BROKER_ID}")).unwrap()).unwrap(); assert_eq!(parsed.len(), apps.len()); } + + #[test] + fn test_parse_app_list() { + const BROKER_ID: &str = "broker"; + BrokerId::set_broker_id(BROKER_ID.to_string()); + assert_eq!( + parse_to_list_of_ids(Some(r#"["app1.proxy1.broker", "proxy1.broker"]"#.to_string())).unwrap(), + vec![AppOrProxyId::AppId(AppId::new("app1.proxy1.broker").unwrap()), AppOrProxyId::ProxyId(ProxyId::new("proxy1.broker").unwrap())] + ); + assert_eq!(parse_to_list_of_ids(None).unwrap(), Vec::::new()); + } } From 53c547203596643ad25c689f16fbeea49c8ff9a9 Mon Sep 17 00:00:00 2001 From: janskiba Date: Tue, 13 Jun 2023 13:01:41 +0000 Subject: [PATCH 03/21] Create and use a permission manager to handle io --- proxy/src/serve_tasks.rs | 30 +++++++++++++-- shared/src/config_proxy.rs | 78 ++++++++++++++++++++++++++++---------- shared/src/errors.rs | 4 ++ 3 files changed, 89 insertions(+), 23 deletions(-) diff --git a/proxy/src/serve_tasks.rs b/proxy/src/serve_tasks.rs index ca021a68..cf450472 100644 --- a/proxy/src/serve_tasks.rs +++ b/proxy/src/serve_tasks.rs @@ -422,7 +422,13 @@ async fn validate_and_decrypt(json: Value) -> Result { if let Value::Array(arr) = json { let mut results = Vec::with_capacity(arr.len()); for value in arr { - results.push(validate_and_decrypt(value).await?); + match validate_and_decrypt(value).await { + // If we get an array of results we just filter out the ones we are not allowed to receive + Err(SamplyBeamError::DisallowedReceiver(app)) => { + debug!("Filtered receiving message from {app} as it was blocked due to proxy permissions"); + }, + other => results.push(other?) + } } Ok(Value::Array(results)) } else if json.is_object() { @@ -431,7 +437,13 @@ async fn validate_and_decrypt(json: Value) -> Result { let msg = MsgSigned::::verify(&signed.jwt) .await? .msg; - Ok(serde_json::to_value(decrypt_msg(msg)?).expect("Should serialize fine")) + let decrypted = decrypt_msg(msg)?; + + if CONFIG_PROXY.permission_manager.allowed_to_recieve(decrypted.get_from()) { + Ok(serde_json::to_value(decrypted).expect("Should serialize fine")) + } else { + Err(SamplyBeamError::DisallowedReceiver(decrypted.get_from().clone())) + } } Err(e) => Err(SamplyBeamError::JsonParseError(format!( "Failed to parse broker response as a signed encrypted message. Err is {e}" @@ -468,8 +480,20 @@ async fn encrypt_request( }) } else { match serde_json::from_slice(&body) { - Ok(val) => { + Ok(mut val) => { debug!("Body is valid json"); + // If Msg was a struct generic over its other contents this would be so much nicer but this would take a lot of refactoring + match val { + MessageType::MsgTaskRequest(ref mut m) => m.to.retain(|to| CONFIG_PROXY.permission_manager.allowed_to_send(to)), + MessageType::MsgTaskResult(ref mut m) => m.to.retain(|to| CONFIG_PROXY.permission_manager.allowed_to_send(to)), + MessageType::MsgEmpty(..) => (), + }; + if !matches!(val, MessageType::MsgEmpty(..)) { + if val.get_to().is_empty() { + return Err((StatusCode::UNPROCESSABLE_ENTITY, "Message needs to have at least one `to` entry that is not blocked by this proxies recipient policy")); + } + } + val } Err(e) => { diff --git a/shared/src/config_proxy.rs b/shared/src/config_proxy.rs index cbd1e67d..ddde5d52 100644 --- a/shared/src/config_proxy.rs +++ b/shared/src/config_proxy.rs @@ -28,8 +28,7 @@ pub struct Config { pub proxy_id: ProxyId, pub api_keys: HashMap, pub tls_ca_certificates: Vec, - pub allow_list: Vec, - pub block_list: Vec, + pub permission_manager: PermissionManager, } pub type ApiKey = String; @@ -66,13 +65,21 @@ pub struct CliArgs { #[clap(long, env, value_parser, default_value = "/run/secrets/root.crt.pem")] rootcert_file: PathBuf, - /// A whitelist of apps or proxies that may connect, e.g. ["app1.proxy1.broker", "proxy2.broker", ...] + /// A whitelist of apps or proxies that messages may be sent to, e.g. ["app1.proxy1.broker", "proxy2.broker", ...] #[clap(long, env, value_parser)] - pub allow_list: Option, + pub allowed_receivers: Option, - /// A blacklist of apps or proxies that may not connect, e.g. ["app1.proxy1.broker", "proxy2.broker", ...] + /// A blacklist of apps or proxies that messages may not be sent to, e.g. ["app1.proxy1.broker", "proxy2.broker", ...] #[clap(long, env, value_parser)] - pub block_list: Option, + pub blocked_receivers: Option, + + /// A whitelist of apps or proxies that may send messages to this proxy, e.g. ["app1.proxy1.broker", "proxy2.broker", ...] + #[clap(long, env, value_parser)] + pub allowed_remotes: Option, + + /// A blacklist of apps or proxies that may notsend messages to this proxy, e.g. ["app1.proxy1.broker", "proxy2.broker", ...] + #[clap(long, env, value_parser)] + pub blocked_remotes: Option, /// (included for technical reasons) #[clap(long, hide(true))] @@ -106,6 +113,39 @@ fn parse_apikeys(proxy_id: &ProxyId) -> Result, SamplyBea Ok(api_keys) } +#[derive(Debug, Clone)] +pub struct PermissionManager { + pub recv_allow_list: Option>, + pub recv_block_list: Option>, + pub send_allow_list: Option>, + pub send_block_list: Option>, +} + +impl PermissionManager { + pub fn allowed_to_recieve(&self, from: &AppOrProxyId) -> bool { + Self::check_permissions_with(&self.recv_allow_list,&self.recv_block_list, from) + } + + pub fn allowed_to_send(&self, to: &AppOrProxyId) -> bool { + Self::check_permissions_with(&self.send_allow_list, &self.send_block_list, to) + } + + fn check_permissions_with(allow: &Option>, deny: &Option>, beam_id: &AppOrProxyId) -> bool { + match (allow, deny) { + (None, None) => true, + (None, Some(block_list)) => !Self::contains(&block_list, beam_id), + (Some(allow_list), None) => Self::contains(&allow_list, beam_id), + (Some(allow_list), Some(block_list)) => !Self::contains(&block_list, beam_id) || Self::contains(&allow_list, beam_id) + } + } + + fn contains(ids: &Vec, needle: &AppOrProxyId) -> bool { + ids.iter().find(|id| match id { + AppOrProxyId::AppId(app) => needle == app, + AppOrProxyId::ProxyId(proxy) => proxy == &needle.get_proxy_id(), + }).is_some() + } +} impl crate::config::Config for Config { fn load() -> Result { @@ -138,27 +178,25 @@ impl crate::config::Config for Config { proxy_id, api_keys, tls_ca_certificates, - allow_list: parse_to_list_of_ids(cli_args.allow_list)?, - block_list: parse_to_list_of_ids(cli_args.block_list)? + permission_manager: PermissionManager { + recv_allow_list: parse_to_list_of_ids(cli_args.allowed_remotes)?, + recv_block_list: parse_to_list_of_ids(cli_args.blocked_remotes)?, + send_allow_list: parse_to_list_of_ids(cli_args.allowed_receivers)?, + send_block_list: parse_to_list_of_ids(cli_args.blocked_receivers)? + } }; info!("Successfully read config and API keys from CLI and secrets file."); - if !config.allow_list.is_empty() { - info!("Allow list set: {:?}", config.allow_list); - } - if !config.block_list.is_empty() { - info!("Block list set: {:?}", config.block_list); - } Ok(config) } } -fn parse_to_list_of_ids(input: Option) -> Result, SamplyBeamError> { +fn parse_to_list_of_ids(input: Option) -> Result>, SamplyBeamError> { if let Some(app_list_str) = input { - serde_json::from_str(&app_list_str).map_err(|e| SamplyBeamError::ConfigurationFailed( + Ok(Some(serde_json::from_str(&app_list_str).map_err(|e| SamplyBeamError::ConfigurationFailed( format!("Failed to parse: {app_list_str} to a list of beam ids: {e}.\n The requiered format is a json array of strings that match the beam id spec see the system architecture section in the readme.") - )) + ))?)) } else { - Ok(Vec::with_capacity(0)) + Ok(None) } } @@ -205,8 +243,8 @@ mod tests { BrokerId::set_broker_id(BROKER_ID.to_string()); assert_eq!( parse_to_list_of_ids(Some(r#"["app1.proxy1.broker", "proxy1.broker"]"#.to_string())).unwrap(), - vec![AppOrProxyId::AppId(AppId::new("app1.proxy1.broker").unwrap()), AppOrProxyId::ProxyId(ProxyId::new("proxy1.broker").unwrap())] + Some(vec![AppOrProxyId::AppId(AppId::new("app1.proxy1.broker").unwrap()),AppOrProxyId::ProxyId(ProxyId::new("proxy1.broker").unwrap())]) ); - assert_eq!(parse_to_list_of_ids(None).unwrap(), Vec::::new()); + assert_eq!(parse_to_list_of_ids(None).unwrap(), None); } } diff --git a/shared/src/errors.rs b/shared/src/errors.rs index 046f726c..d55accf1 100644 --- a/shared/src/errors.rs +++ b/shared/src/errors.rs @@ -4,6 +4,8 @@ use http::StatusCode; use openssl::error::ErrorStack; use tokio::time::error::Elapsed; +use crate::beam_id::AppOrProxyId; + #[derive(thiserror::Error, Debug)] pub enum SamplyBeamError { #[error("Invalid bind address supplied: {0}")] @@ -50,6 +52,8 @@ pub enum SamplyBeamError { CertificateError(#[from] CertificateInvalidReason), #[error("Timeout executing HTTP request: {0}")] HttpTimeoutError(Elapsed), + #[error("Not allowed to receive messge from: {0}")] + DisallowedReceiver(AppOrProxyId), } impl From for SamplyBeamError { From 7ea7ba6407d9a2fcf8788d9c673c60f947844d1e Mon Sep 17 00:00:00 2001 From: janskiba Date: Tue, 13 Jun 2023 13:13:50 +0000 Subject: [PATCH 04/21] Update names in compose --- dev/docker-compose.yml | 4 ++-- shared/src/config_proxy.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml index 01b32fe2..12b240da 100644 --- a/dev/docker-compose.yml +++ b/dev/docker-compose.yml @@ -58,8 +58,8 @@ services: BIND_ADDR: 0.0.0.0:8081 RUST_LOG: ${RUST_LOG} ALL_PROXY: http://mitmproxy:8080 - ALLOW_LIST: '["app1.proxy1.broker", "proxy1.broker"]' - BLOCK_LIST: '["app1.proxy1.broker", "proxy1.broker"]' + ALLOWED_REMOTES: '["app1.proxy1.broker", "proxy1.broker"]' + BLOCKED_REMOTES: '["app1.proxy1.broker", "proxy1.broker"]' secrets: - proxy1.pem - root.crt.pem diff --git a/shared/src/config_proxy.rs b/shared/src/config_proxy.rs index ddde5d52..38b530ee 100644 --- a/shared/src/config_proxy.rs +++ b/shared/src/config_proxy.rs @@ -77,7 +77,7 @@ pub struct CliArgs { #[clap(long, env, value_parser)] pub allowed_remotes: Option, - /// A blacklist of apps or proxies that may notsend messages to this proxy, e.g. ["app1.proxy1.broker", "proxy2.broker", ...] + /// A blacklist of apps or proxies that may not send messages to this proxy, e.g. ["app1.proxy1.broker", "proxy2.broker", ...] #[clap(long, env, value_parser)] pub blocked_remotes: Option, From 02ab398b0cdb8d4a72f198b36a6d8f7459a6554c Mon Sep 17 00:00:00 2001 From: janskiba Date: Mon, 19 Jun 2023 08:56:27 +0000 Subject: [PATCH 05/21] Change parsing of ids to be able to omit broker id --- shared/src/beam_id.rs | 18 ++++++++++++------ shared/src/config_proxy.rs | 22 +++++++++++++++++----- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/shared/src/beam_id.rs b/shared/src/beam_id.rs index 2b419cb8..0b7a0bfe 100644 --- a/shared/src/beam_id.rs +++ b/shared/src/beam_id.rs @@ -6,7 +6,7 @@ use serde::{de::Visitor, Deserialize, Serialize}; use crate::{config, errors::SamplyBeamError}; -static BROKER_ID: OnceCell = OnceCell::new(); +pub(crate) static BROKER_ID: OnceCell = OnceCell::new(); #[derive(PartialEq, Debug)] pub enum BeamIdType { @@ -379,13 +379,19 @@ impl<'de> Visitor<'de> for AppOrProxyIdVisitor { where E: serde::de::Error, { - let t = AppId::str_has_type(v) - .map_err(|e| serde::de::Error::custom(format!("Invalid Beam ID \"{v}\": {e}")))?; - match t { + v.parse().map_err(serde::de::Error::custom) + } +} + +impl FromStr for AppOrProxyId { + type Err = SamplyBeamError; + + fn from_str(v: &str) -> Result { + match AppId::str_has_type(v)? { BeamIdType::AppId => Ok(AppOrProxyId::AppId(AppId::new(v).unwrap())), BeamIdType::ProxyId => Ok(AppOrProxyId::ProxyId(ProxyId::new(v).unwrap())), - BeamIdType::BrokerId => Err(serde::de::Error::custom( - "Expected AppOrProxyId, got BrokerId.", + BeamIdType::BrokerId => Err(SamplyBeamError::InvalidBeamId( + "Expected AppOrProxyId, got BrokerId.".to_string() )), } } diff --git a/shared/src/config_proxy.rs b/shared/src/config_proxy.rs index 38b530ee..395ade12 100644 --- a/shared/src/config_proxy.rs +++ b/shared/src/config_proxy.rs @@ -16,7 +16,7 @@ use serde::Deserialize; use tracing::{debug, info}; use crate::{ - beam_id::{self, AppId, BeamId, BrokerId, ProxyId, AppOrProxyId}, + beam_id::{self, AppId, BeamId, BrokerId, ProxyId, AppOrProxyId, BROKER_ID}, errors::SamplyBeamError, }; @@ -191,10 +191,22 @@ impl crate::config::Config for Config { } fn parse_to_list_of_ids(input: Option) -> Result>, SamplyBeamError> { + let broker_id = BROKER_ID.get().expect("Should be set before parsing beam ids"); if let Some(app_list_str) = input { - Ok(Some(serde_json::from_str(&app_list_str).map_err(|e| SamplyBeamError::ConfigurationFailed( - format!("Failed to parse: {app_list_str} to a list of beam ids: {e}.\n The requiered format is a json array of strings that match the beam id spec see the system architecture section in the readme.") - ))?)) + let Ok(strings): Result, _> = serde_json::from_str(&app_list_str) else { + return Err(SamplyBeamError::ConfigurationFailed(format!("Failed to parse {app_list_str} as a json array."))); + }; + Ok(Some(strings.into_iter() + .map(|mut id| { + id.push('.'); + id.push_str(broker_id); + id.parse() + }) + .collect::>() + .map_err(|e| SamplyBeamError::ConfigurationFailed( + format!("Failed to parse {app_list_str} to a list of beam ids: {e}") + ))? + )) } else { Ok(None) } @@ -242,7 +254,7 @@ mod tests { const BROKER_ID: &str = "broker"; BrokerId::set_broker_id(BROKER_ID.to_string()); assert_eq!( - parse_to_list_of_ids(Some(r#"["app1.proxy1.broker", "proxy1.broker"]"#.to_string())).unwrap(), + parse_to_list_of_ids(Some(r#"["app1.proxy1", "proxy1"]"#.to_string())).unwrap(), Some(vec![AppOrProxyId::AppId(AppId::new("app1.proxy1.broker").unwrap()),AppOrProxyId::ProxyId(ProxyId::new("proxy1.broker").unwrap())]) ); assert_eq!(parse_to_list_of_ids(None).unwrap(), None); From e3f12c815bdbb8d0f4e9e141c6c3346b72686c6c Mon Sep 17 00:00:00 2001 From: janskiba Date: Mon, 19 Jun 2023 09:16:31 +0000 Subject: [PATCH 06/21] Add entry to readme --- README.md | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/README.md b/README.md index ec54cd4c..ed582044 100644 --- a/README.md +++ b/README.md @@ -459,6 +459,33 @@ Next, send the CSR to the central CA's administrator for signing and enrolling t Both the Broker and the Proxy respect the log level in the `RUST_LOG` environment variable. E.g., `RUST_LOG=debug` enables debug outputs. Warning: the `trace` log level is *very* noisy. +## Restricting accesses +We have a black/whitelisting option that can be used to restrict traeffic to and from a given proxy. The lists contains a json array of app or proxy ids (see [system architecture](#system-architecture)). Setting a proxy id will permit or allow, dpending on the kind of list, all apps from the given proxy while the app id will only do that for the specific app. + +Restrinction logic will follow these rules: +- Only a whitelist is set -> Traeffic is permitted according to the whitelist +- Only a blacklist is set -> Traeffic is denied according to the blacklist +- Neither white nor blacklist -> All traeffic is permitted +- A white and blacklist is set -> The whitelist will be used as an **exception** to the blacklist + +> Note: All config options may also be set via environment variables matching the cli argument name in uppercase with `-` repplaced by `_`. +### Deny receiving messages from other proxies +This is done by setting `allowed-remotes` and/or `blocked-remotes` options accordingly as described in [Restricting access](#restricting-accesses). + +Example: +```bash +./proxy ... --allowed-remotes='["app1.proxy1", "proxy2"]' +``` + +### Deny sending messages to specific proxies +This is done by setting `allowed-receivers` and/or `blocked-receivers` options accordingly as described in [Restricting access](#restricting-accesses). +> Note: When all receivers of a message are being blocked due to these restrictions the proxy will return a status code of 422 Unprocessable Entity. + +Example: +```bash +./proxy ... --allowed-receivers='["app1.proxy1", "proxy2"]' +``` + ## Technical Background Information ### End-to-End Encryption From 57c3ffe4fda3a5be54a04472202ac31a2c80e6f8 Mon Sep 17 00:00:00 2001 From: janskiba Date: Mon, 19 Jun 2023 09:25:39 +0000 Subject: [PATCH 07/21] Include sockets in match --- proxy/src/serve_tasks.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/proxy/src/serve_tasks.rs b/proxy/src/serve_tasks.rs index f45e7f0a..3c4b625b 100644 --- a/proxy/src/serve_tasks.rs +++ b/proxy/src/serve_tasks.rs @@ -486,6 +486,8 @@ async fn encrypt_request( match val { MessageType::MsgTaskRequest(ref mut m) => m.to.retain(|to| CONFIG_PROXY.permission_manager.allowed_to_send(to)), MessageType::MsgTaskResult(ref mut m) => m.to.retain(|to| CONFIG_PROXY.permission_manager.allowed_to_send(to)), + #[cfg(feature = "sockets")] + MessageType::MsgSocketRequest(ref mut m) => m.to.retain(|to| CONFIG_PROXY.permission_manager.allowed_to_send(to)), MessageType::MsgEmpty(..) => (), }; if !matches!(val, MessageType::MsgEmpty(..)) { From 97b9871dcaecafaad945715f55897dfaa937fb36 Mon Sep 17 00:00:00 2001 From: janskiba Date: Mon, 19 Jun 2023 11:07:54 +0000 Subject: [PATCH 08/21] Fix app ids in compose --- dev/docker-compose.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml index 1f6c96f3..6aeace73 100644 --- a/dev/docker-compose.yml +++ b/dev/docker-compose.yml @@ -63,8 +63,8 @@ services: BIND_ADDR: 0.0.0.0:8081 RUST_LOG: ${RUST_LOG} ALL_PROXY: http://mitmproxy:8080 - ALLOWED_REMOTES: '["app1.proxy1.broker", "proxy1.broker"]' - BLOCKED_REMOTES: '["app1.proxy1.broker", "proxy1.broker"]' + ALLOWED_REMOTES: '["app1.proxy1", "proxy1"]' + BLOCKED_REMOTES: '["app1.proxy1", "proxy1"]' secrets: - proxy1.pem - root.crt.pem From cbc5d18cd2d9f2b292472d827ff0b0543fa847a5 Mon Sep 17 00:00:00 2001 From: janskiba Date: Mon, 19 Jun 2023 12:06:46 +0000 Subject: [PATCH 09/21] Fix test race condition --- shared/src/config_proxy.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/shared/src/config_proxy.rs b/shared/src/config_proxy.rs index 395ade12..6158adfa 100644 --- a/shared/src/config_proxy.rs +++ b/shared/src/config_proxy.rs @@ -251,11 +251,14 @@ mod tests { #[test] fn test_parse_app_list() { - const BROKER_ID: &str = "broker"; + const BROKER_ID: &str = "broker.samply.de"; BrokerId::set_broker_id(BROKER_ID.to_string()); assert_eq!( parse_to_list_of_ids(Some(r#"["app1.proxy1", "proxy1"]"#.to_string())).unwrap(), - Some(vec![AppOrProxyId::AppId(AppId::new("app1.proxy1.broker").unwrap()),AppOrProxyId::ProxyId(ProxyId::new("proxy1.broker").unwrap())]) + Some(vec![ + AppOrProxyId::AppId(AppId::new(&format!("app1.proxy1.{BROKER_ID}")).unwrap()), + AppOrProxyId::ProxyId(ProxyId::new(&format!("proxy1.{BROKER_ID}")).unwrap()) + ]) ); assert_eq!(parse_to_list_of_ids(None).unwrap(), None); } From e5c9ac65af259046c291891f105a7b29273722ef Mon Sep 17 00:00:00 2001 From: janskiba Date: Mon, 19 Jun 2023 13:24:41 +0000 Subject: [PATCH 10/21] Add unit tests --- shared/src/config_proxy.rs | 51 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/shared/src/config_proxy.rs b/shared/src/config_proxy.rs index 6158adfa..2397aa4c 100644 --- a/shared/src/config_proxy.rs +++ b/shared/src/config_proxy.rs @@ -256,10 +256,57 @@ mod tests { assert_eq!( parse_to_list_of_ids(Some(r#"["app1.proxy1", "proxy1"]"#.to_string())).unwrap(), Some(vec![ - AppOrProxyId::AppId(AppId::new(&format!("app1.proxy1.{BROKER_ID}")).unwrap()), - AppOrProxyId::ProxyId(ProxyId::new(&format!("proxy1.{BROKER_ID}")).unwrap()) + AppOrProxyId::new(&format!("app1.proxy1.{BROKER_ID}")).unwrap(), + AppOrProxyId::new(&format!("proxy1.{BROKER_ID}")).unwrap() ]) ); assert_eq!(parse_to_list_of_ids(None).unwrap(), None); } + + #[test] + fn test_contains() { + const BROKER_ID: &str = "broker.samply.de"; + BrokerId::set_broker_id(BROKER_ID.to_string()); + let app_ids = vec![AppOrProxyId::new(&format!("app1.proxy1.{BROKER_ID}")).unwrap()]; + let proxy_id = vec![AppOrProxyId::new(&format!("proxy1.{BROKER_ID}")).unwrap()]; + assert!(PermissionManager::contains(&app_ids, &AppOrProxyId::new(&format!("app1.proxy1.{BROKER_ID}")).unwrap())); + assert!(!PermissionManager::contains(&app_ids, &AppOrProxyId::new(&format!("proxy1.{BROKER_ID}")).unwrap())); + assert!(PermissionManager::contains(&proxy_id, &AppOrProxyId::new(&format!("app2.proxy1.{BROKER_ID}")).unwrap())); + assert!(PermissionManager::contains(&proxy_id, &AppOrProxyId::new(&format!("proxy1.{BROKER_ID}")).unwrap())); + assert!(!PermissionManager::contains(&proxy_id, &AppOrProxyId::new(&format!("proxy2.{BROKER_ID}")).unwrap())); + } + + #[test] + fn test_check_permissions_with() { + const BROKER_ID: &str = "broker.samply.de"; + BrokerId::set_broker_id(BROKER_ID.to_string()); + let proxy1 = AppOrProxyId::new(&format!("proxy1.{BROKER_ID}")).unwrap(); + let app_id1 = AppOrProxyId::new(&format!("app1.proxy1.{BROKER_ID}")).unwrap(); + let app_id2 = AppOrProxyId::new(&format!("app2.proxy1.{BROKER_ID}")).unwrap(); + + // Both allow and deny lists empty + assert!(PermissionManager::check_permissions_with(&None, &None, &app_id1)); + + // Deny list empty, allow list contains ID + let allow_list = vec![app_id1.clone()]; + assert!(PermissionManager::check_permissions_with(&Some(allow_list.clone()), &None, &app_id1)); + assert!(!PermissionManager::check_permissions_with(&Some(allow_list), &None, &app_id2)); + + // Deny list contains ID, allow list empty + let block_list = vec![app_id1.clone()]; + assert!(!PermissionManager::check_permissions_with(&None, &Some(block_list.clone()), &app_id1)); + assert!(PermissionManager::check_permissions_with(&None, &Some(block_list), &app_id2)); + + // Both lists contain ID + let allow_list = vec![app_id1.clone()]; + let block_list = vec![app_id1.clone(), app_id2.clone()]; + assert!(PermissionManager::check_permissions_with(&Some(allow_list.clone()), &Some(block_list.clone()), &app_id1)); + assert!(!PermissionManager::check_permissions_with(&Some(allow_list), &Some(block_list), &app_id2)); + + // Both lists with proxy + let allow_list = vec![app_id1.clone()]; + assert!(PermissionManager::check_permissions_with(&Some(allow_list.clone()), &Some(vec![proxy1.clone()]), &app_id1)); + assert!(!PermissionManager::check_permissions_with(&Some(allow_list), &Some(vec![proxy1]), &app_id2)); + } + } From bfb8fc96475d1ad5539d6e67e63628feb90c0cd4 Mon Sep 17 00:00:00 2001 From: janskiba Date: Mon, 19 Jun 2023 13:25:36 +0000 Subject: [PATCH 11/21] Add integration tests --- dev/docker-compose.yml | 3 +++ tests/src/lib.rs | 39 ++++++++++++++++++++++++++-- tests/src/permission_test.rs | 49 ++++++++++++++++++++++++++++++++++++ 3 files changed, 89 insertions(+), 2 deletions(-) create mode 100644 tests/src/permission_test.rs diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml index 6aeace73..696a2177 100644 --- a/dev/docker-compose.yml +++ b/dev/docker-compose.yml @@ -65,6 +65,9 @@ services: ALL_PROXY: http://mitmproxy:8080 ALLOWED_REMOTES: '["app1.proxy1", "proxy1"]' BLOCKED_REMOTES: '["app1.proxy1", "proxy1"]' + # does not exist only used for testing + BLOCKED_RECEIVERS: '["proxy3"]' + ALLOWED_RECEIVERS: '["app2.proxy3"]' secrets: - proxy1.pem - root.crt.pem diff --git a/tests/src/lib.rs b/tests/src/lib.rs index 2f8758d9..8d2ec885 100644 --- a/tests/src/lib.rs +++ b/tests/src/lib.rs @@ -1,4 +1,6 @@ +use http::{Request, request, header}; +use hyper::{Body, Client, client::HttpConnector}; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use shared::{beam_id::{AppOrProxyId, BeamId, AppId}, MsgId}; @@ -6,11 +8,13 @@ use shared::{beam_id::{AppOrProxyId, BeamId, AppId}, MsgId}; #[cfg(all(feature = "sockets", test))] mod socket_test; -pub const APP1: Lazy = Lazy::new(|| { +mod permission_test; + +pub static APP1: Lazy = Lazy::new(|| { AppId::set_broker_id("broker".to_string()); AppOrProxyId::new(option_env!("APP1_P1").unwrap_or("app1.proxy1.broker")).unwrap() }); -pub const APP2: Lazy = Lazy::new(|| { +pub static APP2: Lazy = Lazy::new(|| { AppId::set_broker_id("broker".to_string()); AppOrProxyId::new(option_env!("APP2_P2").unwrap_or("app2.proxy2.broker")).unwrap() }); @@ -31,6 +35,37 @@ pub const APP_KEY: &str = match option_env!("APP_KEY") { }; +pub fn beam_request(r#as: &AppOrProxyId, path: &str) -> request::Builder { + let proxy = match r#as { + app if app == &*APP1 => PROXY1, + app if app == &*APP2 => PROXY2, + _ => panic!("Failed to find matching proxy for app") + }; + Request::builder() + .as_app(r#as, APP_KEY) + .uri(format!("{proxy}{path}")) +} + +pub static CLIENT: Lazy> = Lazy::new(|| Client::new()); + +// This could be in a beam lib as well maybe +trait BeamRequestBuilder { + fn as_app(self, app: &AppOrProxyId, key: &str) -> Self; + // We do a generic B here to not require hyper as a dependency + fn with_json>, T: Serialize>(self, json: &T) -> Result, http::Error>; +} + +impl BeamRequestBuilder for request::Builder { + fn as_app(self, app: &AppOrProxyId, key: &str) -> Self { + self.header(header::AUTHORIZATION, format!("ApiKey {app} {key}")) + } + + fn with_json>, T: Serialize>(self, json: &T) -> Result, http::Error> { + self.body(B::from(serde_json::to_vec(json).unwrap())) + } +} + +// Move to beam lib when I get to write it #[derive(Debug, Serialize, Deserialize)] pub struct SocketTask { pub to: Vec, diff --git a/tests/src/permission_test.rs b/tests/src/permission_test.rs new file mode 100644 index 00000000..3a6f1523 --- /dev/null +++ b/tests/src/permission_test.rs @@ -0,0 +1,49 @@ +use std::time::{SystemTime, Duration}; + +use http::{Request, StatusCode, Method}; +use shared::{MsgId, Plain, beam_id::{AppOrProxyId, BeamId}}; + +use crate::{BeamRequestBuilder, APP1, APP_KEY, CLIENT, PROXY1}; + + +#[tokio::test] +async fn test_no_senders() { + let req = Request::builder() + .uri(format!("{PROXY1}/v1/tasks")) + .method(Method::POST) + .as_app(&APP1, APP_KEY) + .with_json(&shared::MsgTaskRequest { + id: MsgId::new(), + from: APP1.clone(), + to: vec![AppOrProxyId::new("app1.proxy3.broker").unwrap()], + body: Plain::from(""), + expire: SystemTime::now() + Duration::from_secs(60), + failure_strategy: shared::FailureStrategy::Discard, + results: Default::default(), + metadata: serde_json::Value::Null + }) + .unwrap(); + let res = CLIENT.request(req).await.unwrap(); + assert_eq!(res.status(), StatusCode::UNPROCESSABLE_ENTITY) +} + +#[tokio::test] +async fn test_allowed_sender() { + let req = Request::builder() + .uri(format!("{PROXY1}/v1/tasks")) + .method(Method::POST) + .as_app(&APP1, APP_KEY) + .with_json(&shared::MsgTaskRequest { + id: MsgId::new(), + from: APP1.clone(), + to: vec![AppOrProxyId::new("app2.proxy3.broker").unwrap()], + body: Plain::from(""), + expire: SystemTime::now() + Duration::from_secs(60), + failure_strategy: shared::FailureStrategy::Discard, + results: Default::default(), + metadata: serde_json::Value::Null + }) + .unwrap(); + let res = CLIENT.request(req).await.unwrap(); + assert_eq!(res.status(), StatusCode::CREATED) +} From 2e2ae6bcf67afad3aacec8d1a4bc8c11716cefeb Mon Sep 17 00:00:00 2001 From: janskiba Date: Thu, 22 Jun 2023 13:02:45 +0000 Subject: [PATCH 12/21] Keep track of expired proxies --- shared/src/crypto.rs | 41 ++++++++++++++++++++++------------------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/shared/src/crypto.rs b/shared/src/crypto.rs index 30e0c89c..6b4d5488 100644 --- a/shared/src/crypto.rs +++ b/shared/src/crypto.rs @@ -213,7 +213,7 @@ impl CertificateCache { pub async fn get_all_certs_by_cname(cname: &ProxyId) -> Vec { // TODO: What if multiple certs are found? let mut result = get_all_certs_from_cache_by_cname(cname).await; // Drop Read Locks - if result.is_empty() { + if result.iter().filter(|cert| matches!(cert, CertificateCacheEntry::Valid(_))).count() == 0 { // requires write lock. Self::update_certificates().await.unwrap_or_else(|e| { @@ -596,20 +596,22 @@ pub async fn get_cert_and_client_by_serial_as_pemstr( } pub async fn get_newest_certs_for_cnames_as_pemstr( - cnames: impl IntoIterator, -) -> Option> { - let mut result: Vec = Vec::new(); // No fancy map/iter, bc of async + cnames: Vec, +) -> Vec> { + let mut result = Vec::with_capacity(cnames.len()); // No fancy map/iter, bc of async for id in cnames { - let certs = get_all_certs_and_clients_by_cname_as_pemstr(id) + let certs = get_all_certs_and_clients_by_cname_as_pemstr(&id) .await .into_iter() .flatten() .collect(); if let Some(best_candidate) = get_best_other_certificate(&certs) { - result.push(best_candidate); + result.push(Ok(best_candidate)); + } else { + result.push(Err(id)) } } - (!result.is_empty()).then_some(result) + result } fn extract_x509(cert: &X509) -> Result { @@ -861,18 +863,19 @@ pub async fn get_proxy_public_keys( }) .collect(); let receivers_crypto_bundle = - crypto::get_newest_certs_for_cnames_as_pemstr(proxy_receivers.iter()).await; - let receivers_keys = match receivers_crypto_bundle { - Some(vec) => vec - .iter() - .map(|crypt_publ| { - rsa::RsaPublicKey::from_public_key_pem(&crypt_publ.pubkey) - .expect("Cannot collect recipients' public keys") - }) - .collect::>(), // TODO Expect - None => Vec::new(), - }; - Ok(receivers_keys) + crypto::get_newest_certs_for_cnames_as_pemstr(proxy_receivers).await; + let (receivers_keys, proxies_with_invalid_certs): (Vec<_>, Vec<_>) = receivers_crypto_bundle + .into_iter() + .map(|crypt_publ_res| { + crypt_publ_res.and_then(|crypto| + rsa::RsaPublicKey::from_public_key_pem(&crypto.pubkey).map_err(|_| crypto.beam_id)) + }) + .partition_result(); + if proxies_with_invalid_certs.is_empty() { + Ok(receivers_keys) + } else { + Err(SamplyBeamError::ExpiredProxyCerts(proxies_with_invalid_certs)) + } } #[tokio::test] From f7c0b6b594d6e0552a66fb71d2fd0b6c24427bb5 Mon Sep 17 00:00:00 2001 From: janskiba Date: Thu, 22 Jun 2023 13:04:37 +0000 Subject: [PATCH 13/21] Add error type for this case --- shared/src/errors.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/shared/src/errors.rs b/shared/src/errors.rs index d4ec0a23..742ecb29 100644 --- a/shared/src/errors.rs +++ b/shared/src/errors.rs @@ -4,6 +4,8 @@ use http::StatusCode; use openssl::error::ErrorStack; use tokio::time::error::Elapsed; +use crate::beam_id::ProxyId; + #[derive(thiserror::Error, Debug)] pub enum SamplyBeamError { #[error("Invalid bind address supplied: {0}")] @@ -50,6 +52,8 @@ pub enum SamplyBeamError { CertificateError(#[from] CertificateInvalidReason), #[error("Timeout executing HTTP request: {0}")] HttpTimeoutError(Elapsed), + #[error("Expired receiver public keys: {0:?}")] + ExpiredProxyCerts(Vec) } impl From for SamplyBeamError { From c471d5ebb4b9d7710b2fad330906e0380b545fa3 Mon Sep 17 00:00:00 2001 From: janskiba Date: Thu, 22 Jun 2023 13:05:14 +0000 Subject: [PATCH 14/21] Use collect:: instead of partion result --- shared/src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/shared/src/lib.rs b/shared/src/lib.rs index b7ea4471..2d62f8ae 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -381,7 +381,7 @@ pub trait EncryptableMsg: Msg + Serialize + Sized { let nonce = XChaCha20Poly1305::generate_nonce(&mut rng); // Encrypt symmetric key with receivers' public keys - let (encrypted_keys, err): (Vec<_>, Vec<_>) = receivers_public_keys + let Ok(encrypted_keys) = receivers_public_keys .iter() .map(|key| { key.encrypt( @@ -390,12 +390,12 @@ pub trait EncryptableMsg: Msg + Serialize + Sized { symmetric_key.as_slice(), ) }) - .partition_result(); - if !err.is_empty() { + .collect() + else { return Err(SamplyBeamError::SignEncryptError( "Encryption error: Cannot encrypt symmetric key".into(), )); - } + }; // Encrypt fields content let cipher = XChaCha20Poly1305::new(&symmetric_key); From 193465e8f9f5bf1aedec37964482c532b1f12433 Mon Sep 17 00:00:00 2001 From: janskiba Date: Thu, 22 Jun 2023 13:06:14 +0000 Subject: [PATCH 15/21] Change error part of results to Response --- proxy/src/serve_sockets.rs | 22 +++++++++------ proxy/src/serve_tasks.rs | 56 ++++++++++++++++++++------------------ 2 files changed, 42 insertions(+), 36 deletions(-) diff --git a/proxy/src/serve_sockets.rs b/proxy/src/serve_sockets.rs index 41b25104..592a2413 100644 --- a/proxy/src/serve_sockets.rs +++ b/proxy/src/serve_sockets.rs @@ -78,19 +78,23 @@ async fn get_tasks( state: State, Extension(task_secret_map): Extension, req: Request -) -> Result>>, StatusCode> { - let mut res = forward_request(req, &state.config, &sender, &state.client).await.map_err(|e| e.0)?; +) -> Result>>, Response> { + let mut res = forward_request(req, &state.config, &sender, &state.client).await?; if res.status() != StatusCode::OK { - return Err(res.status()); + return Err(res.into_response()); } - let body = hyper::body::to_bytes(res.body_mut()).await.map_err(|_| StatusCode::BAD_GATEWAY)?; - let enc_json = serde_json::from_slice(&body).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - let plain_json = to_server_error(validate_and_decrypt(enc_json).await).map_err(|e| e.0)?; - let tasks: Vec> = serde_json::from_value(plain_json).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + let body = hyper::body::to_bytes(res.body_mut()).await.map_err(|_| StatusCode::BAD_GATEWAY.into_response())?; + let enc_json = serde_json::from_slice(&body).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR.into_response())?; + let plain_json = to_server_error(validate_and_decrypt(enc_json).await).map_err(IntoResponse::into_response)?; + let tasks: Vec> = serde_json::from_value(plain_json).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR.into_response())?; let mut out = Vec::with_capacity(tasks.len()); for task in tasks { if let MessageType::MsgSocketRequest(mut socket_task) = task { - let key = serde_json::from_value(Value::String(socket_task.secret.body.as_ref().ok_or(StatusCode::INTERNAL_SERVER_ERROR)?.to_string())).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + let key = serde_json::from_value(Value::String(socket_task.secret.body + .as_ref() + .ok_or(StatusCode::INTERNAL_SERVER_ERROR.into_response())? + .to_string() + )).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR.into_response())?; let Ok(ttl) = socket_task.expire.duration_since(SystemTime::now()) else { continue; }; @@ -98,7 +102,7 @@ async fn get_tasks( socket_task.secret.body = None; out.push(socket_task); } else { - return Err(StatusCode::INTERNAL_SERVER_ERROR); + return Err(StatusCode::INTERNAL_SERVER_ERROR.into_response()); } } diff --git a/proxy/src/serve_tasks.rs b/proxy/src/serve_tasks.rs index 91005894..be54f2af 100644 --- a/proxy/src/serve_tasks.rs +++ b/proxy/src/serve_tasks.rs @@ -10,7 +10,7 @@ use axum::{ http::{request::Parts, HeaderValue}, response::{sse::Event, IntoResponse, Response, Sse}, routing::{any, get, put}, - Router, + Router, Json, }; use futures::{ stream::{StreamExt, TryStreamExt}, @@ -90,7 +90,7 @@ pub(crate) async fn forward_request( config: &config_proxy::Config, sender: &AppId, client: &SamplyHttpClient, -) -> Result, (StatusCode, &'static str)> { +) -> Result, Response> { // Create uri to contact broker let path = req.uri().path(); let path_query = req @@ -100,7 +100,7 @@ pub(crate) async fn forward_request( .unwrap_or(path); let target_uri = Uri::try_from(config.broker_uri.to_string() + path_query.trim_start_matches('/')) - .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid path queried."))?; + .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid path queried.").into_response())?; *req.uri_mut() = target_uri; req.headers_mut().append( @@ -108,7 +108,7 @@ pub(crate) async fn forward_request( HeaderValue::from_static(env!("SAMPLY_USER_AGENT")), ); let (encrypted_msg, parts) = encrypt_request(req, &sender).await?; - let req = sign_request(encrypted_msg, parts, &config, None).await?; + let req = sign_request(encrypted_msg, parts, &config, None).await.map_err(IntoResponse::into_response)?; trace!("Requesting: {:?}", req); let resp = client.request(req).await.map_err(|e| { if is_actually_hyper_timeout(&e) { @@ -117,7 +117,7 @@ pub(crate) async fn forward_request( } else { warn!("Request to broker failed: {}", e.to_string()); (StatusCode::BAD_GATEWAY, "Upstream error; see server logs.") - } + }.into_response() })?; Ok(resp) } @@ -128,7 +128,7 @@ pub(crate) async fn handler_task( AuthenticatedApp(sender): AuthenticatedApp, headers: HeaderMap, req: Request, -) -> Result { +) -> Response { let found = &headers .get(header::ACCEPT) .unwrap_or(&HeaderValue::from_static("")) @@ -139,18 +139,15 @@ pub(crate) async fn handler_task( .find(|part| *part == "text/event-stream") .is_some(); - let result = if *found { + if *found { handler_tasks_stream(client, config, sender, req) - .await? + .await .into_response() } else { handler_tasks_nostream(client, config, sender, req) .await - .map_err(|e| (e.0, e.1.to_string()))? .into_response() - }; - - return Ok(result); + } } async fn handler_tasks_nostream( @@ -158,7 +155,7 @@ async fn handler_tasks_nostream( config: config_proxy::Config, sender: AppId, req: Request, -) -> Result, (StatusCode, &'static str)> { +) -> Result, Response> { // Validate Query, forward to server, get response. let resp = forward_request(req, &config, &sender, &client).await?; @@ -168,7 +165,7 @@ async fn handler_tasks_nostream( let (mut parts, body) = resp.into_parts(); let mut bytes = body::to_bytes(body).await.map_err(|e| { error!("Error receiving reply from the broker: {}", e); - ERR_UPSTREAM + ERR_UPSTREAM.into_response() })?; // TODO: Always return application/jwt from server. @@ -210,12 +207,10 @@ async fn handler_tasks_stream( config: config_proxy::Config, sender: AppId, req: Request, -) -> Result>>, (StatusCode, String)> { +) -> Result>>, Response> { // Validate Query, forward to server, get response. - let mut resp = forward_request(req, &config, &sender, &client) - .await - .map_err(|err| (err.0, err.1.into()))?; + let mut resp = forward_request(req, &config, &sender, &client).await?; let code = resp.status(); if !code.is_success() { @@ -224,7 +219,7 @@ async fn handler_tasks_stream( .and_then(|v| String::from_utf8(v.into()).ok()) .unwrap_or("(unable to parse reply)".into()); warn!("Got unexpected response code from server: {code}. Returning error message as-is: \"{error_msg}\""); - return Err((code, error_msg)); + return Err((code, error_msg).into_response()); } let outgoing = async_stream::stream! { @@ -325,7 +320,7 @@ async fn handler_tasks_stream( Ok(sse) } -pub(crate) fn to_server_error(res: Result) -> Result { +pub(crate) fn to_server_error(res: Result) -> Result { res.map_err(|e| match e { SamplyBeamError::JsonParseError(e) => { warn!("{e}"); @@ -340,7 +335,7 @@ pub(crate) fn to_server_error(res: Result) -> Result(msg: M) -> Result async fn encrypt_request( req: Request, sender: &AppId, -) -> Result<(EncryptedMessage, Parts), (StatusCode, &'static str)> { +) -> Result<(EncryptedMessage, Parts), Response> { let (parts, body) = req.into_parts(); let body = body::to_bytes(body).await.map_err(|e| { warn!("Unable to read message body: {e}"); - ERR_BODY + ERR_BODY.into_response() })?; let msg = if body.is_empty() { @@ -478,17 +473,24 @@ async fn encrypt_request( e, std::str::from_utf8(&body).unwrap_or("(not valid UTF-8)") ); - return Err(ERR_BODY); + return Err(ERR_BODY.into_response()); } } }; // Sanity/security checks: From address sane? if msg.get_from() != sender { - return Err(ERR_FAKED_FROM); + return Err(ERR_FAKED_FROM.into_response()); } let body = encrypt_msg(msg).await.map_err(|e| { - warn!("Encryption failed with: {e}"); - ERR_INTERNALCRYPTO + match e { + SamplyBeamError::ExpiredProxyCerts(proxies) => { + (StatusCode::FAILED_DEPENDENCY, Json(proxies)).into_response() + } + e => { + warn!("Encryption failed with: {e}"); + ERR_INTERNALCRYPTO.into_response() + } + } })?; Ok((body, parts)) } From a759e7415690d5dba7901f82037728a1283dae06 Mon Sep 17 00:00:00 2001 From: janskiba Date: Tue, 4 Jul 2023 11:23:32 +0000 Subject: [PATCH 16/21] Rename Error --- proxy/src/serve_tasks.rs | 2 +- shared/src/crypto.rs | 2 +- shared/src/errors.rs | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/proxy/src/serve_tasks.rs b/proxy/src/serve_tasks.rs index be54f2af..93750e3c 100644 --- a/proxy/src/serve_tasks.rs +++ b/proxy/src/serve_tasks.rs @@ -483,7 +483,7 @@ async fn encrypt_request( } let body = encrypt_msg(msg).await.map_err(|e| { match e { - SamplyBeamError::ExpiredProxyCerts(proxies) => { + SamplyBeamError::InvalidReceivers(proxies) => { (StatusCode::FAILED_DEPENDENCY, Json(proxies)).into_response() } e => { diff --git a/shared/src/crypto.rs b/shared/src/crypto.rs index 6b4d5488..027d2b6f 100644 --- a/shared/src/crypto.rs +++ b/shared/src/crypto.rs @@ -874,7 +874,7 @@ pub async fn get_proxy_public_keys( if proxies_with_invalid_certs.is_empty() { Ok(receivers_keys) } else { - Err(SamplyBeamError::ExpiredProxyCerts(proxies_with_invalid_certs)) + Err(SamplyBeamError::InvalidReceivers(proxies_with_invalid_certs)) } } diff --git a/shared/src/errors.rs b/shared/src/errors.rs index 742ecb29..fd10a449 100644 --- a/shared/src/errors.rs +++ b/shared/src/errors.rs @@ -52,8 +52,8 @@ pub enum SamplyBeamError { CertificateError(#[from] CertificateInvalidReason), #[error("Timeout executing HTTP request: {0}")] HttpTimeoutError(Elapsed), - #[error("Expired receiver public keys: {0:?}")] - ExpiredProxyCerts(Vec) + #[error("Invalid receivers: {0:?}")] + InvalidReceivers(Vec) } impl From for SamplyBeamError { From 3d53f50bb6fc3a9c9c30ed4808405f0f7ea6a97f Mon Sep 17 00:00:00 2001 From: janskiba Date: Tue, 4 Jul 2023 13:03:16 +0000 Subject: [PATCH 17/21] Log when a sse result was blocked --- proxy/src/serve_tasks.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/proxy/src/serve_tasks.rs b/proxy/src/serve_tasks.rs index 3c4b625b..c0780437 100644 --- a/proxy/src/serve_tasks.rs +++ b/proxy/src/serve_tasks.rs @@ -299,6 +299,10 @@ async fn handler_tasks_stream( }; let json = match validate_and_decrypt(json).await { Ok(json) => json, + Err(SamplyBeamError::DisallowedReceiver(rec)) => { + info!("Did not send result from {rec} as it was forbidden by the config"); + continue; + }, Err(err) => { warn!("Got an error decrypting Broker's reply: {err}"); continue; From 345ac3ac04b36a919b353cc96842d82935098d9e Mon Sep 17 00:00:00 2001 From: janskiba Date: Tue, 4 Jul 2023 13:58:26 +0000 Subject: [PATCH 18/21] Adapt encrypt request to return json --- proxy/src/serve_tasks.rs | 31 +++++++++++-------------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/proxy/src/serve_tasks.rs b/proxy/src/serve_tasks.rs index c0780437..41f8ee08 100644 --- a/proxy/src/serve_tasks.rs +++ b/proxy/src/serve_tasks.rs @@ -10,7 +10,7 @@ use axum::{ http::{request::Parts, HeaderValue}, response::{sse::Event, IntoResponse, Response, Sse}, routing::{any, get, put}, - Router, + Router, Json, }; use futures::{ stream::{StreamExt, TryStreamExt}, @@ -470,11 +470,11 @@ fn decrypt_msg(msg: M) -> Result async fn encrypt_request( req: Request, sender: &AppId, -) -> Result<(EncryptedMessage, Parts), (StatusCode, &'static str)> { +) -> Result<(EncryptedMessage, Parts), Response> { let (parts, body) = req.into_parts(); let body = body::to_bytes(body).await.map_err(|e| { warn!("Unable to read message body: {e}"); - ERR_BODY + ERR_BODY.into_response() })?; let msg = if body.is_empty() { @@ -483,21 +483,12 @@ async fn encrypt_request( from: sender.into(), }) } else { - match serde_json::from_slice(&body) { - Ok(mut val) => { + match serde_json::from_slice::(&body) { + Ok(val) => { debug!("Body is valid json"); - // If Msg was a struct generic over its other contents this would be so much nicer but this would take a lot of refactoring - match val { - MessageType::MsgTaskRequest(ref mut m) => m.to.retain(|to| CONFIG_PROXY.permission_manager.allowed_to_send(to)), - MessageType::MsgTaskResult(ref mut m) => m.to.retain(|to| CONFIG_PROXY.permission_manager.allowed_to_send(to)), - #[cfg(feature = "sockets")] - MessageType::MsgSocketRequest(ref mut m) => m.to.retain(|to| CONFIG_PROXY.permission_manager.allowed_to_send(to)), - MessageType::MsgEmpty(..) => (), - }; - if !matches!(val, MessageType::MsgEmpty(..)) { - if val.get_to().is_empty() { - return Err((StatusCode::UNPROCESSABLE_ENTITY, "Message needs to have at least one `to` entry that is not blocked by this proxies recipient policy")); - } + let filtered = val.get_to().iter().filter(|app| !CONFIG_PROXY.permission_manager.allowed_to_send(app)).cloned().collect::>(); + if !filtered.is_empty() { + return Err((StatusCode::UNPROCESSABLE_ENTITY, Json(filtered)).into_response()); } val @@ -508,17 +499,17 @@ async fn encrypt_request( e, std::str::from_utf8(&body).unwrap_or("(not valid UTF-8)") ); - return Err(ERR_BODY); + return Err(ERR_BODY.into_response()); } } }; // Sanity/security checks: From address sane? if msg.get_from() != sender { - return Err(ERR_FAKED_FROM); + return Err(ERR_FAKED_FROM.into_response()); } let body = encrypt_msg(msg).await.map_err(|e| { warn!("Encryption failed with: {e}"); - ERR_INTERNALCRYPTO + ERR_INTERNALCRYPTO.into_response() })?; Ok((body, parts)) } From b3b52629dcf33f041ba804a11c3447846d499495 Mon Sep 17 00:00:00 2001 From: janskiba Date: Tue, 4 Jul 2023 13:58:36 +0000 Subject: [PATCH 19/21] Update Readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index fa445c11..712571a0 100644 --- a/README.md +++ b/README.md @@ -581,7 +581,7 @@ Example: ### Deny sending messages to specific proxies This is done by setting `allowed-receivers` and/or `blocked-receivers` options accordingly as described in [Restricting access](#restricting-accesses). -> Note: When all receivers of a message are being blocked due to these restrictions the proxy will return a status code of 422 Unprocessable Entity. +> Note: When a receiver of a message is being blocked due to these restrictions the proxy will return a status code of 422 Unprocessable Entity and a json list containing the offending entries. Example: ```bash From 03857f6dac4f8f8d7b25cc937a20b6b9598f5709 Mon Sep 17 00:00:00 2001 From: janskiba Date: Tue, 4 Jul 2023 14:12:56 +0000 Subject: [PATCH 20/21] Adapt tests --- shared/src/errors.rs | 2 +- tests/src/permission_test.rs | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/shared/src/errors.rs b/shared/src/errors.rs index 71c0bf06..8bcb550b 100644 --- a/shared/src/errors.rs +++ b/shared/src/errors.rs @@ -53,7 +53,7 @@ pub enum SamplyBeamError { #[error("Timeout executing HTTP request: {0}")] HttpTimeoutError(Elapsed), #[error("Invalid receivers: {0:?}")] - InvalidReceivers(Vec) + InvalidReceivers(Vec), #[error("Not allowed to receive messge from: {0}")] DisallowedReceiver(AppOrProxyId), } diff --git a/tests/src/permission_test.rs b/tests/src/permission_test.rs index 3a6f1523..c86a0051 100644 --- a/tests/src/permission_test.rs +++ b/tests/src/permission_test.rs @@ -8,6 +8,7 @@ use crate::{BeamRequestBuilder, APP1, APP_KEY, CLIENT, PROXY1}; #[tokio::test] async fn test_no_senders() { + let to = vec![AppOrProxyId::new("app1.proxy3.broker").unwrap()]; let req = Request::builder() .uri(format!("{PROXY1}/v1/tasks")) .method(Method::POST) @@ -15,7 +16,7 @@ async fn test_no_senders() { .with_json(&shared::MsgTaskRequest { id: MsgId::new(), from: APP1.clone(), - to: vec![AppOrProxyId::new("app1.proxy3.broker").unwrap()], + to: to.clone(), body: Plain::from(""), expire: SystemTime::now() + Duration::from_secs(60), failure_strategy: shared::FailureStrategy::Discard, @@ -24,11 +25,12 @@ async fn test_no_senders() { }) .unwrap(); let res = CLIENT.request(req).await.unwrap(); - assert_eq!(res.status(), StatusCode::UNPROCESSABLE_ENTITY) + assert_eq!(res.status(), StatusCode::UNPROCESSABLE_ENTITY); + assert_eq!(serde_json::from_slice::>(&hyper::body::to_bytes(res.into_body()).await.unwrap()).unwrap(), to); } #[tokio::test] -async fn test_allowed_sender() { +async fn test_allowed_sender_but_invalid_proxy() { let req = Request::builder() .uri(format!("{PROXY1}/v1/tasks")) .method(Method::POST) @@ -45,5 +47,5 @@ async fn test_allowed_sender() { }) .unwrap(); let res = CLIENT.request(req).await.unwrap(); - assert_eq!(res.status(), StatusCode::CREATED) + assert_eq!(res.status(), StatusCode::FAILED_DEPENDENCY) } From 14287252555a3a1e2d4e396c8ca47444be21909c Mon Sep 17 00:00:00 2001 From: janskiba Date: Wed, 5 Jul 2023 11:16:18 +0000 Subject: [PATCH 21/21] Fix tests --- tests/src/permission_test.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/src/permission_test.rs b/tests/src/permission_test.rs index c86a0051..51fc7348 100644 --- a/tests/src/permission_test.rs +++ b/tests/src/permission_test.rs @@ -1,13 +1,14 @@ use std::time::{SystemTime, Duration}; use http::{Request, StatusCode, Method}; -use shared::{MsgId, Plain, beam_id::{AppOrProxyId, BeamId}}; +use shared::{MsgId, Plain, beam_id::{AppOrProxyId, BeamId, BrokerId}}; use crate::{BeamRequestBuilder, APP1, APP_KEY, CLIENT, PROXY1}; #[tokio::test] async fn test_no_senders() { + BrokerId::set_broker_id("broker".to_string()); let to = vec![AppOrProxyId::new("app1.proxy3.broker").unwrap()]; let req = Request::builder() .uri(format!("{PROXY1}/v1/tasks")) @@ -31,6 +32,7 @@ async fn test_no_senders() { #[tokio::test] async fn test_allowed_sender_but_invalid_proxy() { + BrokerId::set_broker_id("broker".to_string()); let req = Request::builder() .uri(format!("{PROXY1}/v1/tasks")) .method(Method::POST)