diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..184cdbb --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,79 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +This is a Rust workspace containing two main crates for asynchronous SSH2 operations: +- `async-ssh2-lite`: Core async SSH2 client library with support for both async-io and tokio runtimes +- `bb8-async-ssh2-lite`: Connection pooling for async-ssh2-lite using bb8 + +## Development Commands + +### Linting and Formatting +```bash +cargo clippy --all-features --tests -- -D clippy::all +cargo +nightly clippy --all-features --tests -- -D clippy::all +cargo fmt -- --check +``` + +### Building and Testing +```bash +cargo build-all-features +cargo test-all-features -- --nocapture +``` + +### Integration Tests +Run the automated integration test script: +```bash +./async-ssh2-lite/tests/run_integration_tests.sh +``` + +For manual integration testing with a specific SSH server: +```bash +SSH_SERVER_HOST=127.0.0.1 SSH_SERVER_PORT=22 SSH_USERNAME=xx SSH_PASSWORD=xxx SSH_PRIVATEKEY_PATH=~/.ssh/id_rsa cargo test -p async-ssh2-lite --features _integration_tests,async-io,tokio -- --nocapture +``` + +Run a specific integration test: +```bash +SSH_SERVER_HOST=127.0.0.1 SSH_SERVER_PORT=22 SSH_USERNAME=xx SSH_PRIVATEKEY_PATH=~/.ssh/id_rsa cargo test -p async-ssh2-lite --features _integration_tests,async-io,tokio -- integration_tests::session__scp_send_and_scp_recv --nocapture +``` + +## Architecture + +### async-ssh2-lite +The core library provides async wrappers around ssh2 functionality: +- **Session management** (`session.rs`): Handles SSH connections and authentication +- **Channel operations** (`channel.rs`): Execute commands and manage data streams +- **SFTP support** (`sftp.rs`): File transfer operations +- **Agent support** (`agent.rs`): SSH agent authentication +- **Port forwarding** (`listener.rs`): Remote and local port forwarding + +The library supports two async runtimes through feature flags: +- `async-io`: For async-std/smol runtime compatibility +- `tokio`: For tokio runtime compatibility + +Session streams are implemented in `session_stream/` with runtime-specific implementations. + +### bb8-async-ssh2-lite +Provides connection pooling for SSH sessions using the bb8 pool manager. Includes managers for both sessions and SFTP connections. + +### Test Infrastructure +- Integration tests are in `async-ssh2-lite/tests/integration_tests/` +- Test SSH keys are provided in `keys/` directory +- Docker-based OpenSSH server configurations in `openssh_server_docker/` for testing + +## Key Features and Capabilities +- Password, public key, and agent-based authentication +- Command execution via SSH channels +- SCP file upload/download +- SFTP operations +- SSH agent integration +- Remote port forwarding +- Proxy jump host support + +## Important Notes +- The workspace uses Rust 2021 edition +- Integration tests require special feature flags (`_integration_tests`) +- The library wraps the synchronous ssh2 crate with async runtime support +- Both async-io and tokio runtime support can be enabled simultaneously \ No newline at end of file diff --git a/async-ssh2-lite/src/session_stream/impl_async_io.rs b/async-ssh2-lite/src/session_stream/impl_async_io.rs index 1e1b8bf..d3bd3fd 100644 --- a/async-ssh2-lite/src/session_stream/impl_async_io.rs +++ b/async-ssh2-lite/src/session_stream/impl_async_io.rs @@ -9,7 +9,7 @@ use async_trait::async_trait; use futures_util::{future, pin_mut, ready}; use ssh2::{BlockDirections, Error as Ssh2Error, Session}; -use super::{AsyncSessionStream, BlockDirectionsExt as _}; +use super::AsyncSessionStream; use crate::{error::Error, util::ssh2_error_is_would_block}; // @@ -22,7 +22,7 @@ where &self, mut op: impl FnMut() -> Result + Send, sess: &Session, - expected_block_directions: BlockDirections, + _expected_block_directions: BlockDirections, sleep_dur: Option, ) -> Result { loop { @@ -35,22 +35,20 @@ where } } + // Wait for whatever I/O the session needs, not what we expect. + // The session knows the aggregate state of all channels. match sess.block_directions() { BlockDirections::None => continue, BlockDirections::Inbound => { - assert!(expected_block_directions.is_readable()); - + // Session needs to read data (could be for any channel) self.readable().await? } BlockDirections::Outbound => { - assert!(expected_block_directions.is_writable()); - + // Session needs to write data (could be for any channel) self.writable().await? } BlockDirections::Both => { - assert!(expected_block_directions.is_readable()); - assert!(expected_block_directions.is_writable()); - + // Session needs both read and write let (ret, _) = future::select(self.readable(), self.writable()) .await .factor_first(); @@ -69,7 +67,7 @@ where cx: &mut Context, mut op: impl FnMut() -> Result + Send, sess: &Session, - expected_block_directions: BlockDirections, + _expected_block_directions: BlockDirections, sleep_dur: Option, ) -> Poll> { match op() { @@ -77,22 +75,20 @@ where ret => return Poll::Ready(ret), } + // Wait for whatever I/O the session needs, not what we expect. + // The session knows the aggregate state of all channels. match sess.block_directions() { BlockDirections::None => return Poll::Pending, BlockDirections::Inbound => { - assert!(expected_block_directions.is_readable()); - + // Session needs to read data (could be for any channel) ready!(self.poll_readable(cx))?; } BlockDirections::Outbound => { - assert!(expected_block_directions.is_writable()); - + // Session needs to write data (could be for any channel) ready!(self.poll_writable(cx))?; } BlockDirections::Both => { - assert!(expected_block_directions.is_readable()); - assert!(expected_block_directions.is_writable()); - + // Session needs both read and write // Must first poll_writable, because session__scp_send_and_scp_recv.rs ready!(self.poll_writable(cx))?; ready!(self.poll_readable(cx))?; diff --git a/async-ssh2-lite/src/session_stream/impl_tokio.rs b/async-ssh2-lite/src/session_stream/impl_tokio.rs index 129c5d6..08c0ea5 100644 --- a/async-ssh2-lite/src/session_stream/impl_tokio.rs +++ b/async-ssh2-lite/src/session_stream/impl_tokio.rs @@ -11,7 +11,7 @@ use tokio::net::TcpStream; #[cfg(unix)] use tokio::net::UnixStream; -use super::{AsyncSessionStream, BlockDirectionsExt as _}; +use super::AsyncSessionStream; use crate::{error::Error, util::ssh2_error_is_would_block}; // @@ -21,7 +21,7 @@ impl AsyncSessionStream for TcpStream { &self, mut op: impl FnMut() -> Result + Send, sess: &Session, - expected_block_directions: BlockDirections, + _expected_block_directions: BlockDirections, sleep_dur: Option, ) -> Result { loop { @@ -34,22 +34,20 @@ impl AsyncSessionStream for TcpStream { } } + // Wait for whatever I/O the session needs, not what we expect. + // The session knows the aggregate state of all channels. match sess.block_directions() { BlockDirections::None => continue, BlockDirections::Inbound => { - assert!(expected_block_directions.is_readable()); - + // Session needs to read data (could be for any channel) self.readable().await? } BlockDirections::Outbound => { - assert!(expected_block_directions.is_writable()); - + // Session needs to write data (could be for any channel) self.writable().await? } BlockDirections::Both => { - assert!(expected_block_directions.is_readable()); - assert!(expected_block_directions.is_writable()); - + // Session needs both read and write self.ready(tokio::io::Interest::READABLE | tokio::io::Interest::WRITABLE) .await?; } @@ -66,7 +64,7 @@ impl AsyncSessionStream for TcpStream { cx: &mut Context, mut op: impl FnMut() -> Result + Send, sess: &Session, - expected_block_directions: BlockDirections, + _expected_block_directions: BlockDirections, sleep_dur: Option, ) -> Poll> { match op() { @@ -74,22 +72,20 @@ impl AsyncSessionStream for TcpStream { ret => return Poll::Ready(ret), } + // Wait for whatever I/O the session needs, not what we expect. + // The session knows the aggregate state of all channels. match sess.block_directions() { BlockDirections::None => return Poll::Pending, BlockDirections::Inbound => { - assert!(expected_block_directions.is_readable()); - + // Session needs to read data (could be for any channel) ready!(self.poll_read_ready(cx))?; } BlockDirections::Outbound => { - assert!(expected_block_directions.is_writable()); - + // Session needs to write data (could be for any channel) ready!(self.poll_write_ready(cx))?; } BlockDirections::Both => { - assert!(expected_block_directions.is_readable()); - assert!(expected_block_directions.is_writable()); - + // Session needs both read and write ready!(self.poll_write_ready(cx))?; ready!(self.poll_read_ready(cx))?; } @@ -117,7 +113,7 @@ impl AsyncSessionStream for UnixStream { &self, mut op: impl FnMut() -> Result + Send, sess: &Session, - expected_block_directions: BlockDirections, + _expected_block_directions: BlockDirections, sleep_dur: Option, ) -> Result { loop { @@ -130,22 +126,20 @@ impl AsyncSessionStream for UnixStream { } } + // Wait for whatever I/O the session needs, not what we expect. + // The session knows the aggregate state of all channels. match sess.block_directions() { BlockDirections::None => continue, BlockDirections::Inbound => { - assert!(expected_block_directions.is_readable()); - + // Session needs to read data (could be for any channel) self.readable().await? } BlockDirections::Outbound => { - assert!(expected_block_directions.is_writable()); - + // Session needs to write data (could be for any channel) self.writable().await? } BlockDirections::Both => { - assert!(expected_block_directions.is_readable()); - assert!(expected_block_directions.is_writable()); - + // Session needs both read and write self.ready(tokio::io::Interest::READABLE | tokio::io::Interest::WRITABLE) .await?; } @@ -162,7 +156,7 @@ impl AsyncSessionStream for UnixStream { cx: &mut Context, mut op: impl FnMut() -> Result + Send, sess: &Session, - expected_block_directions: BlockDirections, + _expected_block_directions: BlockDirections, sleep_dur: Option, ) -> Poll> { match op() { @@ -170,22 +164,20 @@ impl AsyncSessionStream for UnixStream { ret => return Poll::Ready(ret), } + // Wait for whatever I/O the session needs, not what we expect. + // The session knows the aggregate state of all channels. match sess.block_directions() { BlockDirections::None => return Poll::Pending, BlockDirections::Inbound => { - assert!(expected_block_directions.is_readable()); - + // Session needs to read data (could be for any channel) ready!(self.poll_read_ready(cx))?; } BlockDirections::Outbound => { - assert!(expected_block_directions.is_writable()); - + // Session needs to write data (could be for any channel) ready!(self.poll_write_ready(cx))?; } BlockDirections::Both => { - assert!(expected_block_directions.is_readable()); - assert!(expected_block_directions.is_writable()); - + // Session needs both read and write ready!(self.poll_write_ready(cx))?; ready!(self.poll_read_ready(cx))?; } diff --git a/async-ssh2-lite/tests/test_audit_multichannel.rs b/async-ssh2-lite/tests/test_audit_multichannel.rs new file mode 100644 index 0000000..f2f7ce6 --- /dev/null +++ b/async-ssh2-lite/tests/test_audit_multichannel.rs @@ -0,0 +1,221 @@ +#![cfg(all(feature = "_integration_tests", feature = "tokio"))] + +use std::sync::Arc; +use async_ssh2_lite::{AsyncSession, TokioTcpStream}; +use tokio::io::AsyncReadExt; +use std::time::Duration; + +async fn get_test_session() -> Result, Box> { + let host = std::env::var("SSH_SERVER_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); + let port: u16 = std::env::var("SSH_SERVER_PORT") + .unwrap_or_else(|_| "22".to_string()) + .parse()?; + + let addr: std::net::SocketAddr = format!("{}:{}", host, port).parse()?; + let mut session = AsyncSession::::connect(addr, None).await?; + session.handshake().await?; + + if let Ok(password) = std::env::var("SSH_PASSWORD") { + let username = std::env::var("SSH_USERNAME").unwrap_or_else(|_| "test".to_string()); + session.userauth_password(&username, &password).await?; + } else if let Ok(key_path) = std::env::var("SSH_PRIVATEKEY_PATH") { + let username = std::env::var("SSH_USERNAME").unwrap_or_else(|_| "test".to_string()); + session.userauth_pubkey_file(&username, None, std::path::Path::new(&key_path), None).await?; + } else { + return Err("No authentication method available".into()); + } + + Ok(session) +} + +#[tokio::test] +async fn test_single_channel_baseline() -> Result<(), Box> { + // Baseline test - single channel should always work + let session = get_test_session().await?; + + let mut channel = session.channel_session().await?; + channel.exec("echo baseline").await?; + + let mut output = String::new(); + channel.read_to_string(&mut output).await?; + channel.close().await?; + + println!("Baseline output: {}", output.trim()); + assert!(output.contains("baseline")); + + Ok(()) +} + +#[tokio::test(flavor = "current_thread")] +async fn test_two_channels_sequential() -> Result<(), Box> { + // Two channels used one after another - should work + let session = Arc::new(get_test_session().await?); + + // First channel + { + let mut channel1 = session.channel_session().await?; + channel1.exec("echo first").await?; + + let mut output1 = String::new(); + channel1.read_to_string(&mut output1).await?; + channel1.close().await?; + + println!("First channel: {}", output1.trim()); + assert!(output1.contains("first")); + } + + // Second channel + { + let mut channel2 = session.channel_session().await?; + channel2.exec("echo second").await?; + + let mut output2 = String::new(); + channel2.read_to_string(&mut output2).await?; + channel2.close().await?; + + println!("Second channel: {}", output2.trim()); + assert!(output2.contains("second")); + } + + Ok(()) +} + +#[tokio::test(flavor = "current_thread")] +async fn test_two_channels_interleaved() -> Result<(), Box> { + // Two channels with interleaved operations - this tests our fix + let session = Arc::new(get_test_session().await?); + + let mut channel1 = session.channel_session().await?; + let mut channel2 = session.channel_session().await?; + + // Start both commands + channel1.exec("echo first && sleep 0.1 && echo done1").await?; + channel2.exec("echo second && sleep 0.1 && echo done2").await?; + + // Read from both channels + let mut output1 = String::new(); + let mut output2 = String::new(); + + // This should work with our fix - the async runtime will serialize properly + let result1 = channel1.read_to_string(&mut output1); + let result2 = channel2.read_to_string(&mut output2); + + // Wait for both to complete + let (r1, r2) = tokio::try_join!(result1, result2)?; + + channel1.close().await?; + channel2.close().await?; + + println!("Channel 1 output: {}", output1.trim()); + println!("Channel 2 output: {}", output2.trim()); + + assert!(output1.contains("first") && output1.contains("done1")); + assert!(output2.contains("second") && output2.contains("done2")); + + Ok(()) +} + +#[tokio::test(flavor = "current_thread")] +async fn test_stress_many_channels() -> Result<(), Box> { + // Stress test with many channels + let session = Arc::new(get_test_session().await?); + + let mut channels = Vec::new(); + + // Create multiple channels + for i in 0..5 { + let mut channel = session.channel_session().await?; + channel.exec(&format!("echo channel{}", i)).await?; + channels.push(channel); + } + + // Read from all channels + let mut outputs = Vec::new(); + for (i, channel) in channels.iter_mut().enumerate() { + let mut output = String::new(); + channel.read_to_string(&mut output).await?; + println!("Channel {} output: {}", i, output.trim()); + assert!(output.contains(&format!("channel{}", i))); + outputs.push(output); + } + + // Close all channels + for channel in channels.iter_mut() { + channel.close().await?; + } + + Ok(()) +} + +#[tokio::test(flavor = "current_thread")] +async fn test_data_integrity() -> Result<(), Box> { + // Test that data doesn't get corrupted between channels + let session = Arc::new(get_test_session().await?); + + let mut channel1 = session.channel_session().await?; + let mut channel2 = session.channel_session().await?; + + // Send different data patterns to each channel + let data1 = "AAAAAAAAAAAAAAAAAAAA"; + let data2 = "BBBBBBBBBBBBBBBBBBBB"; + + channel1.exec(&format!("echo {}", data1)).await?; + channel2.exec(&format!("echo {}", data2)).await?; + + let mut output1 = String::new(); + let mut output2 = String::new(); + + // Read both outputs + let (r1, r2) = tokio::try_join!( + channel1.read_to_string(&mut output1), + channel2.read_to_string(&mut output2) + )?; + + channel1.close().await?; + channel2.close().await?; + + println!("Channel 1: {}", output1.trim()); + println!("Channel 2: {}", output2.trim()); + + // Verify data integrity - no cross-contamination + assert!(output1.contains(data1)); + assert!(output2.contains(data2)); + assert!(!output1.contains(data2), "Data contamination detected!"); + assert!(!output2.contains(data1), "Data contamination detected!"); + + Ok(()) +} + +#[tokio::test(flavor = "current_thread")] +async fn test_error_handling() -> Result<(), Box> { + // Test error conditions with multiple channels + let session = Arc::new(get_test_session().await?); + + let mut channel1 = session.channel_session().await?; + let mut channel2 = session.channel_session().await?; + + // One good command, one bad command + channel1.exec("echo good").await?; + channel2.exec("this_command_does_not_exist").await?; + + let mut output1 = String::new(); + let mut output2 = String::new(); + + let result1 = channel1.read_to_string(&mut output1).await; + let result2 = channel2.read_to_string(&mut output2).await; + + // Both should complete (even if one has error output) + assert!(result1.is_ok()); + assert!(result2.is_ok()); + + println!("Good command output: {}", output1.trim()); + println!("Bad command output: {}", output2.trim()); + + assert!(output1.contains("good")); + // Bad command might produce stderr or empty output, both are okay + + channel1.close().await?; + channel2.close().await?; + + Ok(()) +} \ No newline at end of file