From 63e27967f81a85afcbd050e363325753ab0939d5 Mon Sep 17 00:00:00 2001 From: vkill Date: Mon, 21 Nov 2022 12:00:30 +0800 Subject: [PATCH] Try fix block --- .../src/session_stream/impl_async_io.rs | 33 +++++++- .../src/session_stream/impl_tokio.rs | 83 +++++++++++++++++-- 2 files changed, 108 insertions(+), 8 deletions(-) diff --git a/async-ssh2-lite/src/session_stream/impl_async_io.rs b/async-ssh2-lite/src/session_stream/impl_async_io.rs index 1e1b8bf..6a90d16 100644 --- a/async-ssh2-lite/src/session_stream/impl_async_io.rs +++ b/async-ssh2-lite/src/session_stream/impl_async_io.rs @@ -6,7 +6,10 @@ use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use async_io::{Async, Timer}; use async_trait::async_trait; -use futures_util::{future, pin_mut, ready}; +use futures_util::{ + future::{self, Either}, + pin_mut, ready, +}; use ssh2::{BlockDirections, Error as Ssh2Error, Session}; use super::{AsyncSessionStream, BlockDirectionsExt as _}; @@ -51,10 +54,34 @@ where assert!(expected_block_directions.is_readable()); assert!(expected_block_directions.is_writable()); - let (ret, _) = future::select(self.readable(), self.writable()) + let (ret, either) = future::select(self.readable(), self.writable()) .await .factor_first(); - ret? + ret?; + match either { + Either::Left(_) => { + let either = future::select( + self.writable(), + Box::pin(sleep(Duration::from_millis(1000))), + ) + .await; + match either { + Either::Left((x, _)) => x?, + Either::Right(_) => {} + } + } + Either::Right(_) => { + let either = future::select( + self.readable(), + Box::pin(sleep(Duration::from_millis(1000))), + ) + .await; + match either { + Either::Left((x, _)) => x?, + Either::Right(_) => {} + } + } + } } } diff --git a/async-ssh2-lite/src/session_stream/impl_tokio.rs b/async-ssh2-lite/src/session_stream/impl_tokio.rs index 129c5d6..d2ff85c 100644 --- a/async-ssh2-lite/src/session_stream/impl_tokio.rs +++ b/async-ssh2-lite/src/session_stream/impl_tokio.rs @@ -5,7 +5,10 @@ use core::{ use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use async_trait::async_trait; -use futures_util::ready; +use futures_util::{ + future::{self, Either}, + ready, +}; use ssh2::{BlockDirections, Error as Ssh2Error, Session}; use tokio::net::TcpStream; #[cfg(unix)] @@ -50,8 +53,43 @@ impl AsyncSessionStream for TcpStream { assert!(expected_block_directions.is_readable()); assert!(expected_block_directions.is_writable()); - self.ready(tokio::io::Interest::READABLE | tokio::io::Interest::WRITABLE) - .await?; + let mut n_retry = 0; + loop { + let ready = self + .ready(tokio::io::Interest::READABLE | tokio::io::Interest::WRITABLE) + .await?; + if ready.is_readable() { + let either = future::select( + Box::pin(self.writable()), + Box::pin(sleep(Duration::from_millis(1000))), + ) + .await; + match either { + Either::Left((x, _)) => x?, + Either::Right(_) => {} + } + break; + } else if ready.is_writable() { + let either = future::select( + Box::pin(self.readable()), + Box::pin(sleep(Duration::from_millis(1000))), + ) + .await; + match either { + Either::Left((x, _)) => x?, + Either::Right(_) => {} + } + break; + } else if ready.is_empty() { + n_retry += 1; + if n_retry > 3 { + break; + } + continue; + } else { + break; + } + } } } @@ -146,8 +184,43 @@ impl AsyncSessionStream for UnixStream { assert!(expected_block_directions.is_readable()); assert!(expected_block_directions.is_writable()); - self.ready(tokio::io::Interest::READABLE | tokio::io::Interest::WRITABLE) - .await?; + let mut n_retry = 0; + loop { + let ready = self + .ready(tokio::io::Interest::READABLE | tokio::io::Interest::WRITABLE) + .await?; + if ready.is_readable() { + let either = future::select( + Box::pin(self.writable()), + Box::pin(sleep(Duration::from_millis(1000))), + ) + .await; + match either { + Either::Left((x, _)) => x?, + Either::Right(_) => {} + } + break; + } else if ready.is_writable() { + let either = future::select( + Box::pin(self.readable()), + Box::pin(sleep(Duration::from_millis(1000))), + ) + .await; + match either { + Either::Left((x, _)) => x?, + Either::Right(_) => {} + } + break; + } else if ready.is_empty() { + n_retry += 1; + if n_retry > 3 { + break; + } + continue; + } else { + break; + } + } } }