Skip to content

Commit 3540a60

Browse files
committed
Implements #95 and fixes #162
1 parent c050df0 commit 3540a60

File tree

3 files changed

+62
-7
lines changed

3 files changed

+62
-7
lines changed

src/client/mod.rs

+50-5
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use rabbitmq_stream_protocol::{
2525
delete::Delete,
2626
delete_publisher::DeletePublisherCommand,
2727
generic::GenericResponse,
28+
heart_beat::HeartBeatCommand,
2829
metadata::MetadataCommand,
2930
open::{OpenCommand, OpenResponse},
3031
peer_properties::{PeerPropertiesCommand, PeerPropertiesResponse},
@@ -54,10 +55,11 @@ use self::{
5455
use std::{
5556
collections::HashMap,
5657
sync::{atomic::AtomicU64, Arc},
58+
time::{Duration, Instant},
5759
};
5860
use std::{future::Future, sync::atomic::Ordering};
59-
use tokio::sync::RwLock;
6061
use tokio::{net::TcpStream, sync::Notify};
62+
use tokio::{sync::RwLock, task::JoinHandle};
6163
use tokio_util::codec::Framed;
6264

6365
type SinkConnection = SplitSink<Framed<TcpStream, RabbitMqStreamCodec>, Request>;
@@ -69,6 +71,8 @@ pub struct ClientState {
6971
handler: Option<Arc<dyn MessageHandler>>,
7072
heartbeat: u32,
7173
max_frame_size: u32,
74+
last_heatbeat: Instant,
75+
heartbeat_task: Option<JoinHandle<()>>,
7276
}
7377

7478
#[async_trait::async_trait]
@@ -77,6 +81,7 @@ impl MessageHandler for Client {
7781
match &item {
7882
Some(Ok(response)) => match response.kind_ref() {
7983
ResponseKind::Tunes(tune) => self.handle_tune_command(tune).await,
84+
ResponseKind::Heartbeat(_) => self.handle_heart_beat_command().await,
8085
_ => {
8186
if let Some(handler) = self.state.read().await.handler.as_ref() {
8287
let handler = handler.clone();
@@ -132,6 +137,8 @@ impl Client {
132137
handler: None,
133138
heartbeat: broker.heartbeat,
134139
max_frame_size: broker.max_frame_size,
140+
last_heatbeat: Instant::now(),
141+
heartbeat_task: None,
135142
};
136143
let mut client = Client {
137144
dispatcher,
@@ -172,6 +179,14 @@ impl Client {
172179
CloseRequest::new(correlation_id, ResponseCode::Ok, "Ok".to_owned())
173180
})
174181
.await?;
182+
183+
let mut state = self.state.write().await;
184+
185+
if let Some(heartbeat_task) = state.heartbeat_task.take() {
186+
heartbeat_task.abort();
187+
}
188+
189+
drop(state);
175190
self.channel.close().await
176191
}
177192
pub async fn subscribe(
@@ -378,10 +393,10 @@ impl Client {
378393
Ok(())
379394
}
380395

381-
fn max_value(&self, client: u32, server: u32) -> u32 {
396+
fn negotiate_value(&self, client: u32, server: u32) -> u32 {
382397
match (client, server) {
383398
(client, server) if client == 0 || server == 0 => client.max(server),
384-
(client, server) => client.max(server),
399+
(client, server) => client.min(server),
385400
}
386401
}
387402

@@ -470,11 +485,35 @@ impl Client {
470485

471486
async fn handle_tune_command(&self, tunes: &TunesCommand) {
472487
let mut state = self.state.write().await;
473-
state.heartbeat = self.max_value(self.opts.heartbeat, tunes.heartbeat);
474-
state.max_frame_size = self.max_value(self.opts.max_frame_size, tunes.max_frame_size);
488+
state.heartbeat = self.negotiate_value(self.opts.heartbeat, tunes.heartbeat);
489+
state.max_frame_size = self.negotiate_value(self.opts.max_frame_size, tunes.max_frame_size);
475490

476491
let heart_beat = state.heartbeat;
477492
let max_frame_size = state.max_frame_size;
493+
494+
trace!(
495+
"Handling tune with frame size {} and heartbeat {}",
496+
max_frame_size,
497+
heart_beat
498+
);
499+
500+
if let Some(task) = state.heartbeat_task.take() {
501+
task.abort();
502+
}
503+
504+
if heart_beat != 0 {
505+
let heartbeat_interval = (heart_beat / 2).max(1);
506+
let channel = self.channel.clone();
507+
let heartbeat_task = tokio::spawn(async move {
508+
loop {
509+
trace!("Sending heartbeat");
510+
let _ = channel.send(HeartBeatCommand::default().into()).await;
511+
tokio::time::sleep(Duration::from_secs(heartbeat_interval.into())).await;
512+
}
513+
});
514+
state.heartbeat_task = Some(heartbeat_task);
515+
}
516+
478517
drop(state);
479518

480519
let _ = self
@@ -484,4 +523,10 @@ impl Client {
484523

485524
self.tune_notifier.notify_one();
486525
}
526+
527+
async fn handle_heart_beat_command(&self) {
528+
trace!("Received heartbeat");
529+
let mut state = self.state.write().await;
530+
state.last_heatbeat = Instant::now();
531+
}
487532
}

src/environment.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ impl Environment {
6060

6161
/// Delete a stream
6262
pub async fn delete_stream(&self, stream: &str) -> Result<(), StreamDeleteError> {
63-
let response = self.create_client().await?.delete_stream(stream).await?;
63+
let client = self.create_client().await?;
64+
let response = client.delete_stream(stream).await?;
65+
client.close().await?;
6466

6567
if response.is_ok() {
6668
Ok(())
@@ -105,6 +107,10 @@ impl EnvironmentBuilder {
105107
self
106108
}
107109

110+
pub fn heartbeat(mut self, heartbeat: u32) -> EnvironmentBuilder {
111+
self.0.client_options.heartbeat = heartbeat;
112+
self
113+
}
108114
pub fn metrics_collector(
109115
mut self,
110116
collector: impl MetricsCollector + Send + Sync + 'static,

src/producer.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ impl<T> ProducerBuilder<T> {
125125
metadata.leader,
126126
stream
127127
);
128+
client.close().await?;
128129
client = Client::connect(ClientOptions {
129130
host: metadata.leader.host.clone(),
130131
port: metadata.leader.port as u16,
@@ -553,7 +554,10 @@ impl MessageHandler for ProducerConfirmHandler {
553554
trace!(?error);
554555
// TODO clean all waiting for confirm
555556
}
556-
None => todo!(),
557+
None => {
558+
trace!("Connection closed");
559+
// TODO connection close clean all waiting
560+
}
557561
}
558562
Ok(())
559563
}

0 commit comments

Comments
 (0)