|
1 | | -use lapin::{Channel, Connection, ConnectionProperties}; |
2 | | -use std::sync::Arc; |
3 | | -use tokio::sync::Mutex; |
| 1 | +use lapin::Connection; |
4 | 2 |
|
5 | | -pub type FlowQueue = Arc<Mutex<Box<Connection>>>; |
6 | | - |
7 | | -pub type FlowChannel = Arc<Mutex<Box<Channel>>>; |
8 | | - |
9 | | -async fn build_connection(rabbitmq_url: &str) -> Connection { |
10 | | - match Connection::connect(rabbitmq_url, ConnectionProperties::default()).await { |
| 3 | +pub async fn build_connection(rabbitmq_url: &str) -> Connection { |
| 4 | + match Connection::connect(rabbitmq_url, lapin::ConnectionProperties::default()).await { |
11 | 5 | Ok(env) => env, |
12 | | - Err(error) => panic!("Cannot connect to FlowQueue (RabbitMQ) instance! Reason: {:?}", error), |
13 | | - } |
14 | | -} |
15 | | - |
16 | | -pub async fn create_flow_channel_connection(uri: &str) -> FlowChannel { |
17 | | - let connection = build_connection(uri).await; |
18 | | - |
19 | | - match connection.create_channel().await { |
20 | | - Ok(channel) => Arc::new(Mutex::new(Box::new(channel))), |
21 | | - Err(error) => panic!("Cannot create channel {:?}", error), |
| 6 | + Err(error) => panic!( |
| 7 | + "Cannot connect to FlowQueue (RabbitMQ) instance! Reason: {:?}", |
| 8 | + error |
| 9 | + ), |
22 | 10 | } |
23 | 11 | } |
24 | 12 |
|
25 | 13 | #[cfg(test)] |
26 | 14 | mod tests { |
| 15 | + use crate::flow_queue::connection::build_connection; |
27 | 16 | use testcontainers::core::{IntoContainerPort, WaitFor}; |
28 | 17 | use testcontainers::runners::AsyncRunner; |
29 | 18 | use testcontainers::GenericImage; |
30 | | - use crate::flow_queue::connection::build_connection; |
31 | 19 |
|
32 | 20 | macro_rules! rabbitmq_container_test { |
33 | 21 | ($test_name:ident, $consumer:expr) => { |
34 | | - |
35 | 22 | #[tokio::test] |
36 | 23 | async fn $test_name() { |
37 | 24 | let port: u16 = 5672; |
38 | 25 | let image_name = "rabbitmq"; |
39 | 26 | let wait_message = "Server startup complete"; |
40 | | - |
| 27 | + |
41 | 28 | let container = GenericImage::new(image_name, "latest") |
42 | 29 | .with_exposed_port(port.tcp()) |
43 | 30 | .with_wait_for(WaitFor::message_on_stdout(wait_message)) |
44 | 31 | .start() |
45 | 32 | .await |
46 | 33 | .unwrap(); |
47 | | - |
| 34 | + |
48 | 35 | let host_port = container.get_host_port_ipv4(port).await.unwrap(); |
49 | 36 | let url = format!("amqp://guest:guest@localhost:{}", host_port); |
50 | | - |
| 37 | + |
51 | 38 | $consumer(url).await; |
52 | 39 | } |
53 | 40 | }; |
54 | 41 | } |
55 | 42 |
|
56 | | - rabbitmq_container_test!(test_rabbitmq_startup, (|url: String| async move { |
57 | | - println!("RabbitMQ started with the url: {}", url); |
58 | | - })); |
59 | | - |
60 | | - rabbitmq_container_test!(test_rabbitmq_connection, (|url: String| async move { |
61 | | - build_connection(&*url).await; |
62 | | - })); |
63 | | - |
| 43 | + rabbitmq_container_test!( |
| 44 | + test_rabbitmq_startup, |
| 45 | + (|url: String| async move { |
| 46 | + println!("RabbitMQ started with the url: {}", url); |
| 47 | + }) |
| 48 | + ); |
| 49 | + |
| 50 | + rabbitmq_container_test!( |
| 51 | + test_rabbitmq_connection, |
| 52 | + (|url: String| async move { |
| 53 | + build_connection(&*url).await; |
| 54 | + }) |
| 55 | + ); |
64 | 56 | } |
0 commit comments