Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions crates/navigator-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1339,6 +1339,7 @@ async fn main() -> Result<()> {
mod tests {
use super::*;
use std::ffi::OsString;
use std::fs;

#[test]
fn cli_debug_assert() {
Expand Down Expand Up @@ -1462,4 +1463,50 @@ mod tests {
);
}
}

#[test]
fn sandbox_sync_up_uses_path_value_hint() {
let cmd = Cli::command();
let sandbox = cmd
.get_subcommands()
.find(|c| c.get_name() == "sandbox")
.expect("missing sandbox subcommand");
let sync = sandbox
.get_subcommands()
.find(|c| c.get_name() == "sync")
.expect("missing sandbox sync subcommand");
let up = sync
.get_arguments()
.find(|arg| arg.get_id() == "up")
.expect("missing --up argument");

assert_eq!(up.get_value_hint(), ValueHint::AnyPath);
}

#[test]
fn sandbox_sync_up_completion_suggests_local_paths() {
let temp = tempfile::tempdir().expect("failed to create tempdir");
fs::write(temp.path().join("sample.txt"), "x").expect("failed to create sample file");

let mut cmd = Cli::command();
let args: Vec<OsString> = vec![
"nemoclaw".into(),
"sandbox".into(),
"sync".into(),
"demo".into(),
"--up".into(),
"sa".into(),
];
let candidates = clap_complete::engine::complete(&mut cmd, args, 5, Some(temp.path()))
.expect("completion engine failed");

let names: Vec<String> = candidates
.iter()
.map(|c| c.get_value().to_string_lossy().into_owned())
.collect();
assert!(
names.iter().any(|name| name.contains("sample.txt")),
"expected path completion for --up, got: {names:?}"
);
}
}
139 changes: 127 additions & 12 deletions crates/navigator-sandbox/src/ssh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,19 @@ impl russh::server::Handler for SshHandler {
}
Ok(())
}

async fn channel_eof(
&mut self,
_channel: ChannelId,
_session: &mut Session,
) -> Result<(), Self::Error> {
// Drop the input sender so the stdin writer thread sees a
// disconnected channel and closes the child's stdin pipe. This
// is essential for commands like `cat | tar xf -` which need
// stdin EOF to know the input stream is complete.
self.input_sender.take();
Ok(())
}
}

impl SshHandler {
Expand Down Expand Up @@ -670,17 +683,14 @@ fn spawn_pty_shell(
Ok(n) => {
let data = CryptoVec::from_slice(&buf[..n]);
let handle_clone = handle_clone.clone();
drop(runtime_reader.spawn(async move {
let _ = handle_clone.data(channel, data).await;
}));
let _ = runtime_reader
.block_on(async move { handle_clone.data(channel, data).await });
}
}
}
// Send EOF to indicate no more data will be sent on this channel.
let eof_handle = handle_clone.clone();
drop(runtime_reader.spawn(async move {
let _ = eof_handle.eof(channel).await;
}));
let _ = runtime_reader.block_on(async move { eof_handle.eof(channel).await });
// Notify the exit thread that all output has been forwarded.
let _ = reader_done_tx.send(());
});
Expand Down Expand Up @@ -828,9 +838,7 @@ fn spawn_pipe_exec(
Ok(n) => {
let data = CryptoVec::from_slice(&buf[..n]);
let h = stdout_handle.clone();
drop(stdout_runtime.spawn(async move {
let _ = h.data(channel, data).await;
}));
let _ = stdout_runtime.block_on(async move { h.data(channel, data).await });
}
}
}
Expand All @@ -849,9 +857,8 @@ fn spawn_pipe_exec(
Ok(n) => {
let data = CryptoVec::from_slice(&buf[..n]);
let h = stderr_handle.clone();
drop(stderr_runtime.spawn(async move {
let _ = h.extended_data(channel, 1, data).await;
}));
let _ = stderr_runtime
.block_on(async move { h.extended_data(channel, 1, data).await });
}
}
}
Expand Down Expand Up @@ -968,3 +975,111 @@ mod unsafe_pty {
fn to_u16(value: u32) -> u16 {
u16::try_from(value.min(u32::from(u16::MAX))).unwrap_or(u16::MAX)
}

#[cfg(test)]
mod tests {
use super::*;

/// Verify that dropping the input sender (the operation `channel_eof`
/// performs) causes the stdin writer loop to exit and close the child's
/// stdin pipe. Without this, commands like `cat | tar xf -` used by
/// `sync --up` hang forever waiting for EOF on stdin.
#[test]
fn dropping_input_sender_closes_child_stdin() {
let (sender, receiver) = mpsc::channel::<Vec<u8>>();

let mut child = Command::new("cat")
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()
.expect("failed to spawn cat");

let child_stdin = child.stdin.take().expect("stdin must be piped");

// Replicate the stdin writer loop from spawn_pipe_exec.
std::thread::spawn(move || {
let mut stdin = child_stdin;
while let Ok(bytes) = receiver.recv() {
if stdin.write_all(&bytes).is_err() {
break;
}
let _ = stdin.flush();
}
});

sender.send(b"hello".to_vec()).unwrap();

// Simulate what channel_eof does: drop the sender.
drop(sender);

// cat should see EOF on stdin and exit. Use a timeout so the test
// fails fast instead of hanging if the mechanism is broken.
let (done_tx, done_rx) = mpsc::channel();
std::thread::spawn(move || {
let _ = done_tx.send(child.wait_with_output());
});
let output = done_rx
.recv_timeout(Duration::from_secs(5))
.expect("cat hung for 5s — stdin was not closed (channel_eof bug)")
.expect("failed to wait for cat");

assert!(
output.status.success(),
"cat exited with {:?}",
output.status
);
assert_eq!(output.stdout, b"hello");
}

/// Verify that the stdin writer delivers all buffered data before exiting
/// when the sender is dropped. This ensures channel_eof doesn't cause
/// data loss — only signals "no more data after this".
#[test]
fn stdin_writer_delivers_buffered_data_before_eof() {
let (sender, receiver) = mpsc::channel::<Vec<u8>>();

let mut child = Command::new("wc")
.arg("-c")
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()
.expect("failed to spawn wc");

let child_stdin = child.stdin.take().expect("stdin must be piped");

std::thread::spawn(move || {
let mut stdin = child_stdin;
while let Ok(bytes) = receiver.recv() {
if stdin.write_all(&bytes).is_err() {
break;
}
let _ = stdin.flush();
}
});

// Send multiple chunks, then drop the sender.
for _ in 0..100 {
sender.send(vec![0u8; 1024]).unwrap();
}
drop(sender);

let (done_tx, done_rx) = mpsc::channel();
std::thread::spawn(move || {
let _ = done_tx.send(child.wait_with_output());
});
let output = done_rx
.recv_timeout(Duration::from_secs(5))
.expect("wc hung for 5s — stdin was not closed")
.expect("failed to wait for wc");

let count: usize = String::from_utf8_lossy(&output.stdout)
.trim()
.parse()
.expect("wc output was not a number");
assert_eq!(
count,
100 * 1024,
"expected all 100 KiB delivered before EOF"
);
}
}
53 changes: 52 additions & 1 deletion e2e/bash/test_sandbox_sync.sh
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,58 @@ fi
info "subdir/nested.txt verified"

###############################################################################
# Step 4 — Sync up a single file and round-trip it.
# Step 4 — Large-file round-trip to exercise multi-chunk SSH transport.
#
# The tar archive is streamed through the SSH channel in 4096-byte chunks.
# Historically, a fire-and-forget tokio::spawn per chunk caused out-of-order
# delivery that corrupted the tar stream. A ~512 KiB file spans many chunks
# and makes such ordering bugs much more likely to surface.
###############################################################################

info "Generating large test file (~512 KiB)"

LARGE_DIR="${TMPDIR_ROOT}/large_upload"
mkdir -p "${LARGE_DIR}"

# Deterministic pseudo-random content so we can verify with a checksum.
dd if=/dev/urandom bs=1024 count=512 2>/dev/null > "${LARGE_DIR}/large.bin"
EXPECTED_HASH=$(shasum -a 256 "${LARGE_DIR}/large.bin" | awk '{print $1}')

info "Syncing large file up to sandbox"
if ! "${NAV}" sandbox sync "${SANDBOX_NAME}" --up "${LARGE_DIR}" /sandbox/large_test \
> /dev/null 2>&1; then
error "sync --up large file failed"
exit 1
fi

info "Syncing large file back down"
LARGE_DOWN="${TMPDIR_ROOT}/large_download"
mkdir -p "${LARGE_DOWN}"

if ! "${NAV}" sandbox sync "${SANDBOX_NAME}" --down /sandbox/large_test "${LARGE_DOWN}" \
> /dev/null 2>&1; then
error "sync --down large file failed"
exit 1
fi

ACTUAL_HASH=$(shasum -a 256 "${LARGE_DOWN}/large.bin" | awk '{print $1}')
if [[ "${EXPECTED_HASH}" != "${ACTUAL_HASH}" ]]; then
error "large.bin checksum mismatch after round-trip"
error " expected: ${EXPECTED_HASH}"
error " actual: ${ACTUAL_HASH}"
exit 1
fi

ACTUAL_SIZE=$(wc -c < "${LARGE_DOWN}/large.bin" | tr -d ' ')
if [[ "${ACTUAL_SIZE}" -ne 524288 ]]; then
error "large.bin size mismatch: expected 524288, got ${ACTUAL_SIZE}"
exit 1
fi

info "Large file round-trip verified (SHA-256 match, ${ACTUAL_SIZE} bytes)"

###############################################################################
# Step 5 — Sync up a single file and round-trip it.
###############################################################################

info "Testing single-file sync"
Expand Down
Loading