Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Project Overview

This is a Rust workspace containing two main crates for asynchronous SSH2 operations:
- `async-ssh2-lite`: Core async SSH2 client library with support for both async-io and tokio runtimes
- `bb8-async-ssh2-lite`: Connection pooling for async-ssh2-lite using bb8

## Development Commands

### Linting and Formatting
```bash
cargo clippy --all-features --tests -- -D clippy::all
cargo +nightly clippy --all-features --tests -- -D clippy::all
cargo fmt -- --check
```

### Building and Testing
```bash
cargo build-all-features
cargo test-all-features -- --nocapture
```

### Integration Tests
Run the automated integration test script:
```bash
./async-ssh2-lite/tests/run_integration_tests.sh
```

For manual integration testing with a specific SSH server:
```bash
SSH_SERVER_HOST=127.0.0.1 SSH_SERVER_PORT=22 SSH_USERNAME=xx SSH_PASSWORD=xxx SSH_PRIVATEKEY_PATH=~/.ssh/id_rsa cargo test -p async-ssh2-lite --features _integration_tests,async-io,tokio -- --nocapture
```

Run a specific integration test:
```bash
SSH_SERVER_HOST=127.0.0.1 SSH_SERVER_PORT=22 SSH_USERNAME=xx SSH_PRIVATEKEY_PATH=~/.ssh/id_rsa cargo test -p async-ssh2-lite --features _integration_tests,async-io,tokio -- integration_tests::session__scp_send_and_scp_recv --nocapture
```

## Architecture

### async-ssh2-lite
The core library provides async wrappers around ssh2 functionality:
- **Session management** (`session.rs`): Handles SSH connections and authentication
- **Channel operations** (`channel.rs`): Execute commands and manage data streams
- **SFTP support** (`sftp.rs`): File transfer operations
- **Agent support** (`agent.rs`): SSH agent authentication
- **Port forwarding** (`listener.rs`): Remote and local port forwarding

The library supports two async runtimes through feature flags:
- `async-io`: For async-std/smol runtime compatibility
- `tokio`: For tokio runtime compatibility

Session streams are implemented in `session_stream/` with runtime-specific implementations.

### bb8-async-ssh2-lite
Provides connection pooling for SSH sessions using the bb8 pool manager. Includes managers for both sessions and SFTP connections.

### Test Infrastructure
- Integration tests are in `async-ssh2-lite/tests/integration_tests/`
- Test SSH keys are provided in `keys/` directory
- Docker-based OpenSSH server configurations in `openssh_server_docker/` for testing

## Key Features and Capabilities
- Password, public key, and agent-based authentication
- Command execution via SSH channels
- SCP file upload/download
- SFTP operations
- SSH agent integration
- Remote port forwarding
- Proxy jump host support

## Important Notes
- The workspace uses Rust 2021 edition
- Integration tests require special feature flags (`_integration_tests`)
- The library wraps the synchronous ssh2 crate with async runtime support
- Both async-io and tokio runtime support can be enabled simultaneously
30 changes: 13 additions & 17 deletions async-ssh2-lite/src/session_stream/impl_async_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use async_trait::async_trait;
use futures_util::{future, pin_mut, ready};
use ssh2::{BlockDirections, Error as Ssh2Error, Session};

use super::{AsyncSessionStream, BlockDirectionsExt as _};
use super::AsyncSessionStream;
use crate::{error::Error, util::ssh2_error_is_would_block};

//
Expand All @@ -22,7 +22,7 @@ where
&self,
mut op: impl FnMut() -> Result<R, Ssh2Error> + Send,
sess: &Session,
expected_block_directions: BlockDirections,
_expected_block_directions: BlockDirections,
sleep_dur: Option<Duration>,
) -> Result<R, Error> {
loop {
Expand All @@ -35,22 +35,20 @@ where
}
}

// Wait for whatever I/O the session needs, not what we expect.
// The session knows the aggregate state of all channels.
match sess.block_directions() {
BlockDirections::None => continue,
BlockDirections::Inbound => {
assert!(expected_block_directions.is_readable());

// Session needs to read data (could be for any channel)
self.readable().await?
}
BlockDirections::Outbound => {
assert!(expected_block_directions.is_writable());

// Session needs to write data (could be for any channel)
self.writable().await?
}
BlockDirections::Both => {
assert!(expected_block_directions.is_readable());
assert!(expected_block_directions.is_writable());

// Session needs both read and write
let (ret, _) = future::select(self.readable(), self.writable())
.await
.factor_first();
Expand All @@ -69,30 +67,28 @@ where
cx: &mut Context,
mut op: impl FnMut() -> Result<R, IoError> + Send,
sess: &Session,
expected_block_directions: BlockDirections,
_expected_block_directions: BlockDirections,
sleep_dur: Option<Duration>,
) -> Poll<Result<R, IoError>> {
match op() {
Err(err) if err.kind() == IoErrorKind::WouldBlock => {}
ret => return Poll::Ready(ret),
}

// Wait for whatever I/O the session needs, not what we expect.
// The session knows the aggregate state of all channels.
match sess.block_directions() {
BlockDirections::None => return Poll::Pending,
BlockDirections::Inbound => {
assert!(expected_block_directions.is_readable());

// Session needs to read data (could be for any channel)
ready!(self.poll_readable(cx))?;
}
BlockDirections::Outbound => {
assert!(expected_block_directions.is_writable());

// Session needs to write data (could be for any channel)
ready!(self.poll_writable(cx))?;
}
BlockDirections::Both => {
assert!(expected_block_directions.is_readable());
assert!(expected_block_directions.is_writable());

// Session needs both read and write
// Must first poll_writable, because session__scp_send_and_scp_recv.rs
ready!(self.poll_writable(cx))?;
ready!(self.poll_readable(cx))?;
Expand Down
58 changes: 25 additions & 33 deletions async-ssh2-lite/src/session_stream/impl_tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::net::TcpStream;
#[cfg(unix)]
use tokio::net::UnixStream;

use super::{AsyncSessionStream, BlockDirectionsExt as _};
use super::AsyncSessionStream;
use crate::{error::Error, util::ssh2_error_is_would_block};

//
Expand All @@ -21,7 +21,7 @@ impl AsyncSessionStream for TcpStream {
&self,
mut op: impl FnMut() -> Result<R, Ssh2Error> + Send,
sess: &Session,
expected_block_directions: BlockDirections,
_expected_block_directions: BlockDirections,
sleep_dur: Option<Duration>,
) -> Result<R, Error> {
loop {
Expand All @@ -34,22 +34,20 @@ impl AsyncSessionStream for TcpStream {
}
}

// Wait for whatever I/O the session needs, not what we expect.
// The session knows the aggregate state of all channels.
match sess.block_directions() {
BlockDirections::None => continue,
BlockDirections::Inbound => {
assert!(expected_block_directions.is_readable());

// Session needs to read data (could be for any channel)
self.readable().await?
}
BlockDirections::Outbound => {
assert!(expected_block_directions.is_writable());

// Session needs to write data (could be for any channel)
self.writable().await?
}
BlockDirections::Both => {
assert!(expected_block_directions.is_readable());
assert!(expected_block_directions.is_writable());

// Session needs both read and write
self.ready(tokio::io::Interest::READABLE | tokio::io::Interest::WRITABLE)
.await?;
}
Expand All @@ -66,30 +64,28 @@ impl AsyncSessionStream for TcpStream {
cx: &mut Context,
mut op: impl FnMut() -> Result<R, IoError> + Send,
sess: &Session,
expected_block_directions: BlockDirections,
_expected_block_directions: BlockDirections,
sleep_dur: Option<Duration>,
) -> Poll<Result<R, IoError>> {
match op() {
Err(err) if err.kind() == IoErrorKind::WouldBlock => {}
ret => return Poll::Ready(ret),
}

// Wait for whatever I/O the session needs, not what we expect.
// The session knows the aggregate state of all channels.
match sess.block_directions() {
BlockDirections::None => return Poll::Pending,
BlockDirections::Inbound => {
assert!(expected_block_directions.is_readable());

// Session needs to read data (could be for any channel)
ready!(self.poll_read_ready(cx))?;
}
BlockDirections::Outbound => {
assert!(expected_block_directions.is_writable());

// Session needs to write data (could be for any channel)
ready!(self.poll_write_ready(cx))?;
}
BlockDirections::Both => {
assert!(expected_block_directions.is_readable());
assert!(expected_block_directions.is_writable());

// Session needs both read and write
ready!(self.poll_write_ready(cx))?;
ready!(self.poll_read_ready(cx))?;
}
Expand Down Expand Up @@ -117,7 +113,7 @@ impl AsyncSessionStream for UnixStream {
&self,
mut op: impl FnMut() -> Result<R, Ssh2Error> + Send,
sess: &Session,
expected_block_directions: BlockDirections,
_expected_block_directions: BlockDirections,
sleep_dur: Option<Duration>,
) -> Result<R, Error> {
loop {
Expand All @@ -130,22 +126,20 @@ impl AsyncSessionStream for UnixStream {
}
}

// Wait for whatever I/O the session needs, not what we expect.
// The session knows the aggregate state of all channels.
match sess.block_directions() {
BlockDirections::None => continue,
BlockDirections::Inbound => {
assert!(expected_block_directions.is_readable());

// Session needs to read data (could be for any channel)
self.readable().await?
}
BlockDirections::Outbound => {
assert!(expected_block_directions.is_writable());

// Session needs to write data (could be for any channel)
self.writable().await?
}
BlockDirections::Both => {
assert!(expected_block_directions.is_readable());
assert!(expected_block_directions.is_writable());

// Session needs both read and write
self.ready(tokio::io::Interest::READABLE | tokio::io::Interest::WRITABLE)
.await?;
}
Expand All @@ -162,30 +156,28 @@ impl AsyncSessionStream for UnixStream {
cx: &mut Context,
mut op: impl FnMut() -> Result<R, IoError> + Send,
sess: &Session,
expected_block_directions: BlockDirections,
_expected_block_directions: BlockDirections,
sleep_dur: Option<Duration>,
) -> Poll<Result<R, IoError>> {
match op() {
Err(err) if err.kind() == IoErrorKind::WouldBlock => {}
ret => return Poll::Ready(ret),
}

// Wait for whatever I/O the session needs, not what we expect.
// The session knows the aggregate state of all channels.
match sess.block_directions() {
BlockDirections::None => return Poll::Pending,
BlockDirections::Inbound => {
assert!(expected_block_directions.is_readable());

// Session needs to read data (could be for any channel)
ready!(self.poll_read_ready(cx))?;
}
BlockDirections::Outbound => {
assert!(expected_block_directions.is_writable());

// Session needs to write data (could be for any channel)
ready!(self.poll_write_ready(cx))?;
}
BlockDirections::Both => {
assert!(expected_block_directions.is_readable());
assert!(expected_block_directions.is_writable());

// Session needs both read and write
ready!(self.poll_write_ready(cx))?;
ready!(self.poll_read_ready(cx))?;
}
Expand Down
Loading