Skip to content

Commit d0b034f

Browse files
committed
#4 - refactor application to support multiple protocols
MQTT currently just a stub Signed-off-by: Lance-Drane <Lance-Drane@users.noreply.github.com>
1 parent 9284ad6 commit d0b034f

File tree

20 files changed

+641
-333
lines changed

20 files changed

+641
-333
lines changed

proxy-http-client/conf.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ broker:
99
host: "127.0.0.1"
1010
# note: differs from other config file (two separate brokers used)
1111
port: 5673
12+
protocol: "amqp"
1213
# use amqp topic notation, note that the system is different from the other configuration
1314
topic_prefix: "organization.facility.system2" # CHANGE THIS PER DEPLOYMENT!!!
1415
log_level: "debug"

proxy-http-client/src/event_source.rs

Lines changed: 8 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,33 @@
1-
use deadpool_amqprs::Pool;
21
use futures::StreamExt;
32
use reqwest_eventsource::{Event, EventSource};
43
use secrecy::ExposeSecret;
54

65
use intersect_ingress_proxy_common::intersect_messaging::extract_eventsource_data;
7-
use intersect_ingress_proxy_common::protocols::amqp::{
8-
get_channel, is_routing_key_compliant, publish::amqp_publish_message,
9-
};
6+
use intersect_ingress_proxy_common::protocols::ProtoHandler;
107
use intersect_ingress_proxy_common::server_paths::SUBSCRIBE_URL;
118
use intersect_ingress_proxy_common::signals::wait_for_os_signal;
129

1310
use crate::configuration::ExternalProxy;
1411

1512
/// Return Err only if we weren't able to publish a correct message to the broker, invalid messages are ignored
16-
async fn send_message(message: String, connection_pool: &Pool) -> Result<(), String> {
13+
async fn send_message(message: String, proto_handler: &impl ProtoHandler) -> Result<(), &str> {
1714
let es_data_result = extract_eventsource_data(&message);
1815
if es_data_result.is_err() {
1916
return Ok(());
2017
}
2118
let (topic, data) = es_data_result.unwrap();
22-
if !is_routing_key_compliant(&topic) {
23-
tracing::warn!(
24-
"{} is not a valid AMQP topic name, will not attempt publish",
25-
topic
26-
);
27-
return Ok(());
28-
}
29-
tracing::debug!("Publishing message with topic: {}", &topic);
30-
31-
let connection = connection_pool.get().await.map_err(|_| {
32-
"WARNING: Couldn't get connection, message received from other proxy was NOT published on our own broker."
33-
.to_string()
34-
})?;
3519

36-
let channel = get_channel(&connection).await.map_err(|_| {
37-
"WARNING: Couldn't get channel, message received from other proxy was NOT published on our own broker."
38-
.to_string()
39-
})?;
40-
41-
match amqp_publish_message(channel, &topic, data).await {
42-
Ok(()) => Ok(()),
43-
Err(()) => Err(
44-
"WARNING: message received from other proxy was NOT published on our own broker."
45-
.into(),
46-
),
47-
}
20+
proto_handler.publish_message(&topic, data).await
4821
}
4922

5023
/// Return value - exit code to use
5124
///
5225
/// # Panics
5326
/// - Inner API could potentially panic but is currently not expected to do so
54-
pub async fn event_source_loop(other_proxy: ExternalProxy, connection_pool: Pool) -> i32 {
27+
pub async fn event_source_loop(
28+
other_proxy: ExternalProxy,
29+
connection_pool: &impl ProtoHandler,
30+
) -> i32 {
5531
let mut es = EventSource::new(
5632
reqwest::Client::new()
5733
.get(format!("{}{}", &other_proxy.url, SUBSCRIBE_URL))
@@ -79,7 +55,7 @@ pub async fn event_source_loop(other_proxy: ExternalProxy, connection_pool: Pool
7955
tracing::info!("connected to {}", &other_proxy.url);
8056
},
8157
Ok(Event::Message(message)) => {
82-
if let Err(e) = send_message(message.data, &connection_pool).await {
58+
if let Err(e) = send_message(message.data, connection_pool).await {
8359
tracing::error!(e);
8460
};
8561
},

proxy-http-client/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
pub mod configuration;
22
pub mod event_source;
33
pub mod poster;
4+
5+
pub const APPLICATION_NAME: &str = "proxy-http-client";

proxy-http-client/src/main.rs

Lines changed: 50 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,46 +3,26 @@ use std::sync::Arc;
33
use tokio::sync::oneshot;
44

55
use intersect_ingress_proxy_common::configuration::get_configuration;
6-
use intersect_ingress_proxy_common::protocols::amqp::{
7-
get_connection_pool, subscribe::broker_consumer_loop, verify_connection_pool,
6+
use intersect_ingress_proxy_common::protocols::{
7+
amqp::AmqpProtoHandler, mqtt::MqttProtoHandler, ProtoHandler,
88
};
99
use intersect_ingress_proxy_common::telemetry::{
1010
get_json_subscriber, get_pretty_subscriber, init_subscriber,
1111
};
1212

13-
use proxy_http_client::{configuration::Settings, event_source::event_source_loop, poster::Poster};
14-
15-
const APPLICATION_NAME: &str = "proxy-http-client";
13+
use proxy_http_client::{
14+
configuration::Settings, event_source::event_source_loop, poster::Poster, APPLICATION_NAME,
15+
};
1616

1717
// Muslc has a slow allocator, but we can only use jemalloc on 64-bit systems since jemalloc doesn't support i686.
1818
#[cfg(all(target_env = "musl", target_pointer_width = "64"))]
1919
#[global_allocator]
2020
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
2121

22-
#[tokio::main]
23-
pub async fn main() -> anyhow::Result<()> {
24-
let configuration = get_configuration::<Settings>().expect("Failed to read configuration");
25-
26-
// Start logging
27-
if configuration.production {
28-
let subscriber = get_json_subscriber(
29-
APPLICATION_NAME.into(),
30-
configuration.log_level.to_string(),
31-
std::io::stderr,
32-
);
33-
init_subscriber(subscriber);
34-
} else {
35-
let subscriber = get_pretty_subscriber(configuration.log_level.to_string());
36-
init_subscriber(subscriber);
37-
}
38-
39-
// set up broker connection pool
40-
let pool = get_connection_pool(&configuration.broker);
41-
if let Err(msg) = verify_connection_pool(&pool, APPLICATION_NAME).await {
42-
tracing::error!(msg);
43-
std::process::exit(1);
44-
}
45-
22+
async fn begin_execution(
23+
configuration: Settings,
24+
proto_handler: impl ProtoHandler,
25+
) -> anyhow::Result<()> {
4626
// How this works:
4727
// - Pass in the receiver to the broker consumer loop
4828
// - In the broker consumer loop, use tokio::select! to wait for rx.recv() at key points
@@ -51,10 +31,8 @@ pub async fn main() -> anyhow::Result<()> {
5131
// - This allows us to "finish up" publishing a message to our broker before killing the application.
5232
let (tx, rx) = oneshot::channel();
5333

54-
let broker_join_handle = broker_consumer_loop(
55-
pool.clone(),
34+
let broker_join_handle = proto_handler.begin_subscribe_loop(
5635
configuration.topic_prefix.clone(),
57-
APPLICATION_NAME.into(),
5836
Arc::new(Poster::new(&configuration.other_proxy)),
5937
rx,
6038
);
@@ -63,11 +41,50 @@ pub async fn main() -> anyhow::Result<()> {
6341
drop(configuration);
6442

6543
// this will run until we get an event source error or we catch an OS signal
66-
let rc = event_source_loop(other_proxy, pool).await;
44+
let rc = event_source_loop(other_proxy, &proto_handler).await;
6745

6846
tracing::info!("Attempting graceful shutdown: No longer listening for events over HTTP");
6947
drop(tx);
7048
broker_join_handle.await?;
7149

7250
std::process::exit(rc);
7351
}
52+
53+
#[tokio::main]
54+
async fn main() -> anyhow::Result<()> {
55+
let configuration = get_configuration::<Settings>().expect("Failed to read configuration");
56+
57+
// Start logging
58+
if configuration.production {
59+
let subscriber = get_json_subscriber(
60+
APPLICATION_NAME.into(),
61+
configuration.log_level.to_string(),
62+
std::io::stderr,
63+
);
64+
init_subscriber(subscriber);
65+
} else {
66+
let subscriber = get_pretty_subscriber(configuration.log_level.to_string());
67+
init_subscriber(subscriber);
68+
}
69+
70+
match configuration.broker.protocol {
71+
intersect_ingress_proxy_common::configuration::Protocol::Amqp => {
72+
let proto_handler =
73+
AmqpProtoHandler::new(&configuration.broker, APPLICATION_NAME).await;
74+
if proto_handler.is_err() {
75+
tracing::error!("{}", proto_handler.unwrap_err());
76+
std::process::exit(1);
77+
}
78+
begin_execution(configuration, proto_handler.unwrap()).await
79+
}
80+
intersect_ingress_proxy_common::configuration::Protocol::Mqtt => {
81+
let proto_handler =
82+
MqttProtoHandler::new(&configuration.broker, APPLICATION_NAME).await;
83+
if proto_handler.is_err() {
84+
tracing::error!("{}", proto_handler.unwrap_err());
85+
std::process::exit(1);
86+
}
87+
begin_execution(configuration, proto_handler.unwrap()).await
88+
}
89+
}
90+
}

proxy-http-client/src/poster.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use secrecy::ExposeSecret;
22

3-
use intersect_ingress_proxy_common::protocols::amqp::subscribe::HttpBroadcast;
3+
use intersect_ingress_proxy_common::protocols::HttpBroadcast;
44
use intersect_ingress_proxy_common::server_paths::PUBLISH_URL;
55

66
use crate::configuration::ExternalProxy;
@@ -29,10 +29,9 @@ impl HttpBroadcast for Poster {
2929
.body(event)
3030
.send()
3131
.await;
32-
if result.is_ok() {
33-
let result = result.unwrap();
34-
let status = result.status().as_u16();
35-
match result.bytes().await {
32+
if let Ok(response) = result {
33+
let status = response.status().as_u16();
34+
match response.bytes().await {
3635
Ok(bytes) => tracing::debug!("{:?}", bytes),
3736
Err(err) => tracing::debug!("ERROR: {}", err.to_string()),
3837
}

proxy-http-server/conf.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ broker:
66
host: "127.0.0.1"
77
# note: differs from other config file (two separate brokers used)
88
port: 5672
9+
protocol: "amqp"
910
# use amqp topic notation
1011
topic_prefix: "organization.facility.system" # CHANGE THIS PER DEPLOYMENT!!!
1112
log_level: "debug"

proxy-http-server/src/broadcaster.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use axum::response::sse::Event;
22
use std::sync::Arc;
33
use tokio::sync::broadcast;
44

5-
use intersect_ingress_proxy_common::protocols::amqp::subscribe::HttpBroadcast;
5+
use intersect_ingress_proxy_common::protocols::HttpBroadcast;
66

77
/// This broadcaster is an optimized implementation of a single-producer, multi-consumer channel.
88
/// The Broadcaster is effectively the "link" between the broker and the HTTP gateway.

proxy-http-server/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
pub mod broadcaster;
22
pub mod configuration;
33
pub mod routes;
4-
pub mod webapp;
4+
pub mod webapp_server;
5+
pub mod webapp_state;
6+
7+
pub const APPLICATION_NAME: &str = "proxy-http-server";

proxy-http-server/src/main.rs

Lines changed: 69 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,54 @@
1+
use std::sync::Arc;
2+
13
use tokio::sync::oneshot;
24

35
use intersect_ingress_proxy_common::configuration::get_configuration;
4-
use intersect_ingress_proxy_common::protocols::amqp::subscribe::broker_consumer_loop;
5-
use intersect_ingress_proxy_common::protocols::amqp::{
6-
get_connection_pool, verify_connection_pool,
6+
use intersect_ingress_proxy_common::protocols::{
7+
amqp::AmqpProtoHandler, mqtt::MqttProtoHandler, ProtoHandler,
78
};
89
use intersect_ingress_proxy_common::telemetry::{
910
get_json_subscriber, get_pretty_subscriber, init_subscriber,
1011
};
1112

1213
use proxy_http_server::{
13-
broadcaster::Broadcaster, configuration::Settings, webapp::WebApplication,
14+
broadcaster::Broadcaster,
15+
configuration::Settings,
16+
webapp_server::{AmqpWebApplication, MqttWebApplication, WebApplication},
17+
APPLICATION_NAME,
1418
};
1519

16-
const APPLICATION_NAME: &str = "proxy-http-server";
17-
1820
// Muslc has a slow allocator, but we can only use jemalloc on 64-bit systems since jemalloc doesn't support i686.
1921
#[cfg(all(target_env = "musl", target_pointer_width = "64"))]
2022
#[global_allocator]
2123
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
2224

25+
async fn begin_execution(
26+
configuration: Settings,
27+
proto_handler: impl ProtoHandler,
28+
application: impl WebApplication,
29+
broadcaster: Arc<Broadcaster>,
30+
) -> anyhow::Result<()> {
31+
// How this works:
32+
// - Pass in the receiver to the broker consumer loop
33+
// - In the broker consumer loop, use tokio::select! to wait for rx.recv() at key points
34+
// - After the HTTP server has been shut down, drop the sender from memory, which will trigger an rx.recv() command
35+
// - This allows us to "finish up" publishing a message to our broker before killing the application.
36+
let (tx, rx) = oneshot::channel();
37+
38+
let broker_join_handle =
39+
proto_handler.begin_subscribe_loop(configuration.topic_prefix.clone(), broadcaster, rx);
40+
41+
drop(configuration);
42+
43+
application.run_until_stopped().await?;
44+
45+
tracing::info!("Application shutting down, please wait for cleanups...");
46+
drop(tx);
47+
broker_join_handle.await?;
48+
49+
Ok(())
50+
}
51+
2352
#[tokio::main]
2453
async fn main() -> anyhow::Result<()> {
2554
let configuration = get_configuration::<Settings>().expect("Failed to read configuration");
@@ -37,39 +66,39 @@ async fn main() -> anyhow::Result<()> {
3766
init_subscriber(subscriber);
3867
}
3968

40-
// set up broker connection pool
41-
let pool = get_connection_pool(&configuration.broker);
42-
if let Err(msg) = verify_connection_pool(&pool, APPLICATION_NAME).await {
43-
tracing::error!(msg);
44-
std::process::exit(1);
45-
}
46-
4769
let broadcaster = Broadcaster::new();
48-
let application =
49-
WebApplication::build(&configuration, broadcaster.clone(), pool.clone()).await?;
50-
51-
// How this works:
52-
// - Pass in the receiver to the broker consumer loop
53-
// - In the broker consumer loop, use tokio::select! to wait for rx.recv() at key points
54-
// - After the HTTP server has been shut down, drop the sender from memory, which will trigger an rx.recv() command
55-
// - This allows us to "finish up" publishing a message to our broker before killing the application.
56-
let (tx, rx) = oneshot::channel();
57-
58-
let broker_join_handle = broker_consumer_loop(
59-
pool,
60-
configuration.topic_prefix.clone(),
61-
APPLICATION_NAME.into(),
62-
broadcaster,
63-
rx,
64-
);
65-
66-
drop(configuration);
67-
68-
application.run_until_stopped().await?;
69-
70-
tracing::info!("Application shutting down, please wait for cleanups...");
71-
drop(tx);
72-
broker_join_handle.await?;
73-
74-
Ok(())
70+
match configuration.broker.protocol {
71+
intersect_ingress_proxy_common::configuration::Protocol::Amqp => {
72+
let proto_handler =
73+
AmqpProtoHandler::new(&configuration.broker, APPLICATION_NAME).await;
74+
if proto_handler.is_err() {
75+
tracing::error!("{}", proto_handler.unwrap_err());
76+
std::process::exit(1);
77+
}
78+
let proto_handler = proto_handler.unwrap();
79+
let web_server = AmqpWebApplication::build(
80+
&configuration,
81+
broadcaster.clone(),
82+
proto_handler.clone(),
83+
)
84+
.await?;
85+
begin_execution(configuration, proto_handler, web_server, broadcaster).await
86+
}
87+
intersect_ingress_proxy_common::configuration::Protocol::Mqtt => {
88+
let proto_handler =
89+
MqttProtoHandler::new(&configuration.broker, APPLICATION_NAME).await;
90+
if proto_handler.is_err() {
91+
tracing::error!("{}", proto_handler.unwrap_err());
92+
std::process::exit(1);
93+
}
94+
let proto_handler = proto_handler.unwrap();
95+
let web_server = MqttWebApplication::build(
96+
&configuration,
97+
broadcaster.clone(),
98+
proto_handler.clone(),
99+
)
100+
.await?;
101+
begin_execution(configuration, proto_handler, web_server, broadcaster).await
102+
}
103+
}
75104
}

0 commit comments

Comments
 (0)