Skip to content

Commit f3108be

Browse files
committed
Revert "Try without buffer"
This reverts commit 301ba13.
1 parent 301ba13 commit f3108be

File tree

1 file changed

+33
-43
lines changed

1 file changed

+33
-43
lines changed

src/lib.rs

Lines changed: 33 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ extern crate log;
33

44
use futures::future::join_all;
55
use serde::Deserialize;
6-
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
6+
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
77
use tokio::net::{TcpListener, TcpStream};
88
use tokio::sync::{mpsc, oneshot};
99

@@ -28,11 +28,25 @@ type ChannelTx = mpsc::Sender<Message>;
2828
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
2929
type Result<T> = std::result::Result<T, Error>;
3030

31+
type TcpReader = BufReader<tokio::net::tcp::OwnedReadHalf>;
32+
type TcpWriter = BufWriter<tokio::net::tcp::OwnedWriteHalf>;
33+
3134
fn frame_size(frame: &[u8]) -> Result<usize> {
3235
Ok(u16::from_be_bytes(frame[4..6].try_into()?) as usize)
3336
}
3437

35-
async fn read_frame(stream: &mut (impl AsyncRead + Unpin)) -> Result<Frame> {
38+
fn split_connection(stream: TcpStream) -> (TcpReader, TcpWriter) {
39+
let (reader, writer) = stream.into_split();
40+
(BufReader::new(reader), BufWriter::new(writer))
41+
}
42+
43+
async fn create_connection(url: &str) -> Result<(TcpReader, TcpWriter)> {
44+
let stream = TcpStream::connect(url).await?;
45+
stream.set_nodelay(true)?;
46+
Ok(split_connection(stream))
47+
}
48+
49+
async fn read_frame(stream: &mut TcpReader) -> Result<Frame> {
3650
let mut buf = vec![0u8; 6];
3751
// Read header
3852
stream.read_exact(&mut buf).await?;
@@ -55,7 +69,7 @@ struct Modbus {
5569

5670
struct Device {
5771
url: String,
58-
stream: Option<TcpStream>,
72+
stream: Option<(TcpReader, TcpWriter)>,
5973
}
6074

6175
impl Device {
@@ -67,17 +81,16 @@ impl Device {
6781
}
6882

6983
async fn connect(&mut self) -> Result<()> {
70-
match TcpStream::connect(&self.url).await {
71-
Ok(stream) => {
72-
stream.set_nodelay(true)?;
84+
match create_connection(&self.url).await {
85+
Ok(connection) => {
7386
info!("modbus connection to {} sucessfull", self.url);
74-
self.stream = Some(stream);
87+
self.stream = Some(connection);
7588
Ok(())
7689
}
7790
Err(error) => {
7891
self.stream = None;
7992
info!("modbus connection to {} error: {} ", self.url, error);
80-
Err(Box::new(error))
93+
Err(error)
8194
}
8295
}
8396
}
@@ -91,10 +104,10 @@ impl Device {
91104
}
92105

93106
async fn raw_write_read(&mut self, frame: &Frame) -> Result<Frame> {
94-
let stream = self.stream.as_mut().ok_or("no modbus connection")?;
95-
stream.write_all(&frame).await?;
96-
stream.flush().await?;
97-
read_frame(stream).await
107+
let (reader, writer) = self.stream.as_mut().ok_or("no modbus connection")?;
108+
writer.write_all(&frame).await?;
109+
writer.flush().await?;
110+
read_frame(reader).await
98111
}
99112

100113
async fn write_read(&mut self, frame: &Frame) -> Result<Frame> {
@@ -176,27 +189,25 @@ impl Bridge {
176189
&self.listen.bind, &self.modbus.url
177190
);
178191
loop {
179-
let (mut client, _) = listener.accept().await.unwrap();
180-
client.set_nodelay(true).unwrap();
192+
let (client, _) = listener.accept().await.unwrap();
181193
let tx = tx.clone();
182194
tokio::spawn(async move {
183-
if let Err(err) = Self::handle_client(&mut client, tx).await {
195+
if let Err(err) = Self::handle_client(client, tx).await {
184196
error!("Client error: {:?}", err);
185197
}
186198
});
187199
}
188200
}
189201

190-
async fn handle_client(
191-
client: &mut (impl AsyncRead + AsyncWrite + Unpin),
192-
channel: ChannelTx,
193-
) -> Result<()> {
202+
async fn handle_client(client: TcpStream, channel: ChannelTx) -> Result<()> {
203+
client.set_nodelay(true)?;
194204
channel.send(Message::Connection).await?;
195-
while let Ok(buf) = read_frame(client).await {
205+
let (mut reader, mut writer) = split_connection(client);
206+
while let Ok(buf) = read_frame(&mut reader).await {
196207
let (tx, rx) = oneshot::channel();
197208
channel.send(Message::Packet(buf, tx)).await?;
198-
client.write_all(&rx.await?).await?;
199-
client.flush().await?;
209+
writer.write_all(&rx.await?).await?;
210+
writer.flush().await?;
200211
}
201212
channel.send(Message::Disconnection).await?;
202213
Ok(())
@@ -227,24 +238,3 @@ impl Server {
227238
Ok(Self::new(config_file)?.run().await)
228239
}
229240
}
230-
231-
#[test]
232-
fn test_device_not_connected() {
233-
let device = Device::new("some url");
234-
assert_eq!(device.url, "some url");
235-
assert!(device.stream.is_none());
236-
assert_eq!(device.is_connected(), false);
237-
}
238-
239-
#[test]
240-
fn test_device_not_connected_disconnects() {
241-
let mut device = Device::new("some url");
242-
assert_eq!(device.is_connected(), false);
243-
device.disconnect();
244-
assert_eq!(device.is_connected(), false);
245-
}
246-
247-
#[tokio::test]
248-
async fn test_device_connection() {
249-
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
250-
}

0 commit comments

Comments
 (0)