Skip to content

Commit 11028ae

Browse files
authored
Implements #95 and fixes #162 (#163)
1 parent 634adb6 commit 11028ae

File tree

3 files changed

+63
-8
lines changed

3 files changed

+63
-8
lines changed

src/client/mod.rs

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use rabbitmq_stream_protocol::{
2525
delete::Delete,
2626
delete_publisher::DeletePublisherCommand,
2727
generic::GenericResponse,
28+
heart_beat::HeartBeatCommand,
2829
metadata::MetadataCommand,
2930
open::{OpenCommand, OpenResponse},
3031
peer_properties::{PeerPropertiesCommand, PeerPropertiesResponse},
@@ -41,6 +42,7 @@ use rabbitmq_stream_protocol::{
4142
types::PublishedMessage,
4243
FromResponse, Request, Response, ResponseCode, ResponseKind,
4344
};
45+
use tokio_native_tls::TlsStream;
4446
use tracing::trace;
4547

4648
pub use self::handler::{MessageHandler, MessageResult};
@@ -58,14 +60,14 @@ use std::{
5860
pin::Pin,
5961
sync::{atomic::AtomicU64, Arc},
6062
task::{Context, Poll},
63+
time::{Duration, Instant},
6164
};
6265
use std::{future::Future, sync::atomic::Ordering};
6366
use tokio::io::AsyncRead;
6467
use tokio::io::AsyncWrite;
6568
use tokio::io::ReadBuf;
66-
use tokio::sync::RwLock;
6769
use tokio::{net::TcpStream, sync::Notify};
68-
use tokio_native_tls::TlsStream;
70+
use tokio::{sync::RwLock, task::JoinHandle};
6971
use tokio_util::codec::Framed;
7072

7173
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-stream")))]
@@ -125,6 +127,8 @@ pub struct ClientState {
125127
handler: Option<Arc<dyn MessageHandler>>,
126128
heartbeat: u32,
127129
max_frame_size: u32,
130+
last_heatbeat: Instant,
131+
heartbeat_task: Option<JoinHandle<()>>,
128132
}
129133

130134
#[async_trait::async_trait]
@@ -133,6 +137,7 @@ impl MessageHandler for Client {
133137
match &item {
134138
Some(Ok(response)) => match response.kind_ref() {
135139
ResponseKind::Tunes(tune) => self.handle_tune_command(tune).await,
140+
ResponseKind::Heartbeat(_) => self.handle_heart_beat_command().await,
136141
_ => {
137142
if let Some(handler) = self.state.read().await.handler.as_ref() {
138143
let handler = handler.clone();
@@ -188,6 +193,8 @@ impl Client {
188193
handler: None,
189194
heartbeat: broker.heartbeat,
190195
max_frame_size: broker.max_frame_size,
196+
last_heatbeat: Instant::now(),
197+
heartbeat_task: None,
191198
};
192199
let mut client = Client {
193200
dispatcher,
@@ -228,6 +235,14 @@ impl Client {
228235
CloseRequest::new(correlation_id, ResponseCode::Ok, "Ok".to_owned())
229236
})
230237
.await?;
238+
239+
let mut state = self.state.write().await;
240+
241+
if let Some(heartbeat_task) = state.heartbeat_task.take() {
242+
heartbeat_task.abort();
243+
}
244+
245+
drop(state);
231246
self.channel.close().await
232247
}
233248
pub async fn subscribe(
@@ -451,10 +466,10 @@ impl Client {
451466
Ok(())
452467
}
453468

454-
fn max_value(&self, client: u32, server: u32) -> u32 {
469+
fn negotiate_value(&self, client: u32, server: u32) -> u32 {
455470
match (client, server) {
456471
(client, server) if client == 0 || server == 0 => client.max(server),
457-
(client, server) => client.max(server),
472+
(client, server) => client.min(server),
458473
}
459474
}
460475

@@ -543,11 +558,35 @@ impl Client {
543558

544559
async fn handle_tune_command(&self, tunes: &TunesCommand) {
545560
let mut state = self.state.write().await;
546-
state.heartbeat = self.max_value(self.opts.heartbeat, tunes.heartbeat);
547-
state.max_frame_size = self.max_value(self.opts.max_frame_size, tunes.max_frame_size);
561+
state.heartbeat = self.negotiate_value(self.opts.heartbeat, tunes.heartbeat);
562+
state.max_frame_size = self.negotiate_value(self.opts.max_frame_size, tunes.max_frame_size);
548563

549564
let heart_beat = state.heartbeat;
550565
let max_frame_size = state.max_frame_size;
566+
567+
trace!(
568+
"Handling tune with frame size {} and heartbeat {}",
569+
max_frame_size,
570+
heart_beat
571+
);
572+
573+
if let Some(task) = state.heartbeat_task.take() {
574+
task.abort();
575+
}
576+
577+
if heart_beat != 0 {
578+
let heartbeat_interval = (heart_beat / 2).max(1);
579+
let channel = self.channel.clone();
580+
let heartbeat_task = tokio::spawn(async move {
581+
loop {
582+
trace!("Sending heartbeat");
583+
let _ = channel.send(HeartBeatCommand::default().into()).await;
584+
tokio::time::sleep(Duration::from_secs(heartbeat_interval.into())).await;
585+
}
586+
});
587+
state.heartbeat_task = Some(heartbeat_task);
588+
}
589+
551590
drop(state);
552591

553592
let _ = self
@@ -557,4 +596,10 @@ impl Client {
557596

558597
self.tune_notifier.notify_one();
559598
}
599+
600+
async fn handle_heart_beat_command(&self) {
601+
trace!("Received heartbeat");
602+
let mut state = self.state.write().await;
603+
state.last_heatbeat = Instant::now();
604+
}
560605
}

src/environment.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ impl Environment {
6060

6161
/// Delete a stream
6262
pub async fn delete_stream(&self, stream: &str) -> Result<(), StreamDeleteError> {
63-
let response = self.create_client().await?.delete_stream(stream).await?;
63+
let client = self.create_client().await?;
64+
let response = client.delete_stream(stream).await?;
65+
client.close().await?;
6466

6567
if response.is_ok() {
6668
Ok(())
@@ -122,6 +124,10 @@ impl EnvironmentBuilder {
122124
self
123125
}
124126

127+
pub fn heartbeat(mut self, heartbeat: u32) -> EnvironmentBuilder {
128+
self.0.client_options.heartbeat = heartbeat;
129+
self
130+
}
125131
pub fn metrics_collector(
126132
mut self,
127133
collector: impl MetricsCollector + Send + Sync + 'static,

src/producer.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ impl<T> ProducerBuilder<T> {
125125
metadata.leader,
126126
stream
127127
);
128+
client.close().await?;
128129
client = Client::connect(ClientOptions {
129130
host: metadata.leader.host.clone(),
130131
port: metadata.leader.port as u16,
@@ -553,7 +554,10 @@ impl MessageHandler for ProducerConfirmHandler {
553554
trace!(?error);
554555
// TODO clean all waiting for confirm
555556
}
556-
None => todo!(),
557+
None => {
558+
trace!("Connection closed");
559+
// TODO connection close clean all waiting
560+
}
557561
}
558562
Ok(())
559563
}

0 commit comments

Comments
 (0)