Skip to content

Commit faa14e0

Browse files
committed
feat: reworked rabbit-mq handler & added messages
1 parent c720b7a commit faa14e0

File tree

7 files changed

+231
-271
lines changed

7 files changed

+231
-271
lines changed

src/flow_queue/connection.rs

Lines changed: 23 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,63 +2,57 @@ use lapin::{Channel, Connection, ConnectionProperties};
22
use std::sync::Arc;
33
use tokio::sync::Mutex;
44

5-
pub type FlowQueue = Arc<Mutex<Box<Connection>>>;
6-
7-
pub type FlowChannel = Arc<Mutex<Box<Channel>>>;
8-
9-
async fn build_connection(rabbitmq_url: &str) -> Connection {
10-
match Connection::connect(rabbitmq_url, ConnectionProperties::default()).await {
5+
pub async fn build_connection(rabbitmq_url: &str) -> Connection {
6+
match Connection::connect(rabbitmq_url, lapin::ConnectionProperties::default()).await {
117
Ok(env) => env,
12-
Err(error) => panic!("Cannot connect to FlowQueue (RabbitMQ) instance! Reason: {:?}", error),
13-
}
14-
}
15-
16-
pub async fn create_flow_channel_connection(uri: &str) -> FlowChannel {
17-
let connection = build_connection(uri).await;
18-
19-
match connection.create_channel().await {
20-
Ok(channel) => Arc::new(Mutex::new(Box::new(channel))),
21-
Err(error) => panic!("Cannot create channel {:?}", error),
8+
Err(error) => panic!(
9+
"Cannot connect to FlowQueue (RabbitMQ) instance! Reason: {:?}",
10+
error
11+
),
2212
}
2313
}
2414

2515
#[cfg(test)]
2616
mod tests {
17+
use crate::flow_queue::connection::build_connection;
2718
use testcontainers::core::{IntoContainerPort, WaitFor};
2819
use testcontainers::runners::AsyncRunner;
2920
use testcontainers::GenericImage;
30-
use crate::flow_queue::connection::build_connection;
3121

3222
macro_rules! rabbitmq_container_test {
3323
($test_name:ident, $consumer:expr) => {
34-
3524
#[tokio::test]
3625
async fn $test_name() {
3726
let port: u16 = 5672;
3827
let image_name = "rabbitmq";
3928
let wait_message = "Server startup complete";
40-
29+
4130
let container = GenericImage::new(image_name, "latest")
4231
.with_exposed_port(port.tcp())
4332
.with_wait_for(WaitFor::message_on_stdout(wait_message))
4433
.start()
4534
.await
4635
.unwrap();
47-
36+
4837
let host_port = container.get_host_port_ipv4(port).await.unwrap();
4938
let url = format!("amqp://guest:guest@localhost:{}", host_port);
50-
39+
5140
$consumer(url).await;
5241
}
5342
};
5443
}
5544

56-
rabbitmq_container_test!(test_rabbitmq_startup, (|url: String| async move {
57-
println!("RabbitMQ started with the url: {}", url);
58-
}));
59-
60-
rabbitmq_container_test!(test_rabbitmq_connection, (|url: String| async move {
61-
build_connection(&*url).await;
62-
}));
63-
45+
rabbitmq_container_test!(
46+
test_rabbitmq_startup,
47+
(|url: String| async move {
48+
println!("RabbitMQ started with the url: {}", url);
49+
})
50+
);
51+
52+
rabbitmq_container_test!(
53+
test_rabbitmq_connection,
54+
(|url: String| async move {
55+
build_connection(&*url).await;
56+
})
57+
);
6458
}

src/flow_queue/delegate.rs

Lines changed: 0 additions & 55 deletions
This file was deleted.

src/flow_queue/handler.rs

Lines changed: 0 additions & 119 deletions
This file was deleted.

src/flow_queue/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,2 @@
11
pub mod connection;
2-
pub mod name;
3-
pub mod handler;
4-
pub mod delegate;
2+
pub mod service;

src/flow_queue/name.rs

Lines changed: 0 additions & 62 deletions
This file was deleted.

0 commit comments

Comments
 (0)