Skip to content

Commit 301ba13

Browse files
committed
Try without buffer
1 parent 3044e06 commit 301ba13

File tree

1 file changed

+43
-33
lines changed

1 file changed

+43
-33
lines changed

src/lib.rs

Lines changed: 43 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ extern crate log;
33

44
use futures::future::join_all;
55
use serde::Deserialize;
6-
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
6+
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
77
use tokio::net::{TcpListener, TcpStream};
88
use tokio::sync::{mpsc, oneshot};
99

@@ -28,25 +28,11 @@ type ChannelTx = mpsc::Sender<Message>;
2828
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
2929
type Result<T> = std::result::Result<T, Error>;
3030

31-
type TcpReader = BufReader<tokio::net::tcp::OwnedReadHalf>;
32-
type TcpWriter = BufWriter<tokio::net::tcp::OwnedWriteHalf>;
33-
3431
fn frame_size(frame: &[u8]) -> Result<usize> {
3532
Ok(u16::from_be_bytes(frame[4..6].try_into()?) as usize)
3633
}
3734

38-
fn split_connection(stream: TcpStream) -> (TcpReader, TcpWriter) {
39-
let (reader, writer) = stream.into_split();
40-
(BufReader::new(reader), BufWriter::new(writer))
41-
}
42-
43-
async fn create_connection(url: &str) -> Result<(TcpReader, TcpWriter)> {
44-
let stream = TcpStream::connect(url).await?;
45-
stream.set_nodelay(true)?;
46-
Ok(split_connection(stream))
47-
}
48-
49-
async fn read_frame(stream: &mut TcpReader) -> Result<Frame> {
35+
async fn read_frame(stream: &mut (impl AsyncRead + Unpin)) -> Result<Frame> {
5036
let mut buf = vec![0u8; 6];
5137
// Read header
5238
stream.read_exact(&mut buf).await?;
@@ -69,7 +55,7 @@ struct Modbus {
6955

7056
struct Device {
7157
url: String,
72-
stream: Option<(TcpReader, TcpWriter)>,
58+
stream: Option<TcpStream>,
7359
}
7460

7561
impl Device {
@@ -81,16 +67,17 @@ impl Device {
8167
}
8268

8369
async fn connect(&mut self) -> Result<()> {
84-
match create_connection(&self.url).await {
85-
Ok(connection) => {
70+
match TcpStream::connect(&self.url).await {
71+
Ok(stream) => {
72+
stream.set_nodelay(true)?;
8673
info!("modbus connection to {} sucessfull", self.url);
87-
self.stream = Some(connection);
74+
self.stream = Some(stream);
8875
Ok(())
8976
}
9077
Err(error) => {
9178
self.stream = None;
9279
info!("modbus connection to {} error: {} ", self.url, error);
93-
Err(error)
80+
Err(Box::new(error))
9481
}
9582
}
9683
}
@@ -104,10 +91,10 @@ impl Device {
10491
}
10592

10693
async fn raw_write_read(&mut self, frame: &Frame) -> Result<Frame> {
107-
let (reader, writer) = self.stream.as_mut().ok_or("no modbus connection")?;
108-
writer.write_all(&frame).await?;
109-
writer.flush().await?;
110-
read_frame(reader).await
94+
let stream = self.stream.as_mut().ok_or("no modbus connection")?;
95+
stream.write_all(&frame).await?;
96+
stream.flush().await?;
97+
read_frame(stream).await
11198
}
11299

113100
async fn write_read(&mut self, frame: &Frame) -> Result<Frame> {
@@ -189,25 +176,27 @@ impl Bridge {
189176
&self.listen.bind, &self.modbus.url
190177
);
191178
loop {
192-
let (client, _) = listener.accept().await.unwrap();
179+
let (mut client, _) = listener.accept().await.unwrap();
180+
client.set_nodelay(true).unwrap();
193181
let tx = tx.clone();
194182
tokio::spawn(async move {
195-
if let Err(err) = Self::handle_client(client, tx).await {
183+
if let Err(err) = Self::handle_client(&mut client, tx).await {
196184
error!("Client error: {:?}", err);
197185
}
198186
});
199187
}
200188
}
201189

202-
async fn handle_client(client: TcpStream, channel: ChannelTx) -> Result<()> {
203-
client.set_nodelay(true)?;
190+
async fn handle_client(
191+
client: &mut (impl AsyncRead + AsyncWrite + Unpin),
192+
channel: ChannelTx,
193+
) -> Result<()> {
204194
channel.send(Message::Connection).await?;
205-
let (mut reader, mut writer) = split_connection(client);
206-
while let Ok(buf) = read_frame(&mut reader).await {
195+
while let Ok(buf) = read_frame(client).await {
207196
let (tx, rx) = oneshot::channel();
208197
channel.send(Message::Packet(buf, tx)).await?;
209-
writer.write_all(&rx.await?).await?;
210-
writer.flush().await?;
198+
client.write_all(&rx.await?).await?;
199+
client.flush().await?;
211200
}
212201
channel.send(Message::Disconnection).await?;
213202
Ok(())
@@ -238,3 +227,24 @@ impl Server {
238227
Ok(Self::new(config_file)?.run().await)
239228
}
240229
}
230+
231+
#[test]
232+
fn test_device_not_connected() {
233+
let device = Device::new("some url");
234+
assert_eq!(device.url, "some url");
235+
assert!(device.stream.is_none());
236+
assert_eq!(device.is_connected(), false);
237+
}
238+
239+
#[test]
240+
fn test_device_not_connected_disconnects() {
241+
let mut device = Device::new("some url");
242+
assert_eq!(device.is_connected(), false);
243+
device.disconnect();
244+
assert_eq!(device.is_connected(), false);
245+
}
246+
247+
#[tokio::test]
248+
async fn test_device_connection() {
249+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
250+
}

0 commit comments

Comments
 (0)