Skip to content

Commit 1131218

Browse files
committed
#4 - split proto handler interfaces into publish and subscribe
Signed-off-by: Lance-Drane <Lance-Drane@users.noreply.github.com>
1 parent ac09517 commit 1131218

File tree

24 files changed

+698
-403
lines changed

24 files changed

+698
-403
lines changed

Cargo.lock

Lines changed: 110 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ amqprs = { version = "2", features = ["traces"] }
1212
config = { version = "0.14", default-features = false, features = ["yaml"] } # TODO check to see why this can't be upgraded to v15
1313
deadpool-amqprs = "0.3"
1414
futures = "0.3"
15+
rumqttc = "0.24.0"
1516
secrecy = { version = "0.10", features = ["serde"] }
1617
serde = { version = "1", features = ["derive"] }
1718
serde-aux = "4"

proxy-http-client/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ name = "proxy-http-client"
1515
[dependencies]
1616
anyhow = { workspace = true }
1717
async-stream = { workspace = true }
18-
deadpool-amqprs = { workspace = true }
1918
futures = { workspace = true }
19+
rumqttc = { workspace = true }
2020
secrecy = { workspace = true }
2121
serde = { workspace = true }
2222
serde-aux = { workspace = true }

proxy-http-client/src/event_source.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,17 @@ use reqwest_eventsource::{Event, EventSource};
33
use secrecy::ExposeSecret;
44

55
use intersect_ingress_proxy_common::intersect_messaging::extract_eventsource_data;
6-
use intersect_ingress_proxy_common::protocols::ProtoHandler;
6+
use intersect_ingress_proxy_common::protocols::interfaces::PublishProtoHandler;
77
use intersect_ingress_proxy_common::server_paths::SUBSCRIBE_URL;
88
use intersect_ingress_proxy_common::signals::wait_for_os_signal;
99

1010
use crate::configuration::ExternalProxy;
1111

1212
/// Return Err only if we weren't able to publish a correct message to the broker, invalid messages are ignored
13-
async fn send_message(message: String, proto_handler: &impl ProtoHandler) -> Result<(), &str> {
13+
async fn send_message(
14+
message: String,
15+
proto_handler: &impl PublishProtoHandler,
16+
) -> Result<(), &str> {
1417
let es_data_result = extract_eventsource_data(&message);
1518
if es_data_result.is_err() {
1619
return Ok(());
@@ -26,7 +29,7 @@ async fn send_message(message: String, proto_handler: &impl ProtoHandler) -> Res
2629
/// - Inner API could potentially panic but is currently not expected to do so
2730
pub async fn event_source_loop(
2831
other_proxy: ExternalProxy,
29-
connection_pool: &impl ProtoHandler,
32+
proto_handler: impl PublishProtoHandler,
3033
) -> i32 {
3134
let mut es = EventSource::new(
3235
reqwest::Client::new()
@@ -55,7 +58,7 @@ pub async fn event_source_loop(
5558
tracing::info!("connected to {}", &other_proxy.url);
5659
},
5760
Ok(Event::Message(message)) => {
58-
if let Err(e) = send_message(message.data, connection_pool).await {
61+
if let Err(e) = send_message(message.data, &proto_handler).await {
5962
tracing::error!(e);
6063
};
6164
},

0 commit comments

Comments
 (0)