diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a9279bc..666feac 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,6 +36,10 @@ on: options: ["20", "22"] default: "20" +concurrency: + group: ci-${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + jobs: unit-tests: name: Unit Tests diff --git a/backend/routers/ftp.py b/backend/routers/ftp.py index 19f576c..4f45372 100644 --- a/backend/routers/ftp.py +++ b/backend/routers/ftp.py @@ -14,9 +14,10 @@ """ import logging import os +import uuid from urllib.parse import unquote -from fastapi import APIRouter, Depends, HTTPException, Request, UploadFile, File +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request, UploadFile, File from fastapi.responses import Response from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncSession @@ -49,6 +50,9 @@ log = logging.getLogger(__name__) router = APIRouter(prefix="/ftp", tags=["ftp"]) +# Track active uploads for UI feedback +_upload_status: dict[str, dict] = {} + # -- Session management -------------------------------------------------------- @@ -184,11 +188,34 @@ async def list_dir( _: str = Depends(get_current_user), ): """List directory contents at the given remote path.""" + log.debug( + "FTP list directory request: path=%s, session=%s", + path, + session_id[:8], + ) + try: entries = await list_directory(session_id, path) + log.info( + "FTP list directory successful: path=%s, entries=%d, session=%s", + path, + len(entries), + session_id[:8], + ) except ValueError as exc: + log.error( + "FTP list directory failed (session not found): path=%s, session=%s", + path, + session_id[:8], + ) raise HTTPException(status_code=404, detail=str(exc)) except Exception as exc: # noqa: BLE001 + log.error( + "FTP list directory failed: path=%s, error=%s, session=%s", + path, + exc, + session_id[:8], + ) raise HTTPException(status_code=500, detail=f"Directory listing failed: {exc}") return {"path": path, "entries": entries} @@ -202,11 +229,30 @@ async def download_file( """Download a remote file. ``path`` must be URL-encoded.""" remote_path = unquote(path) filename = os.path.basename(remote_path) + + log.debug( + "FTP download request: filename=%s, path=%s, session=%s", + filename, + remote_path, + session_id[:8], + ) + try: data = await read_file_bytes(session_id, remote_path) except ValueError as exc: + log.error( + "FTP download failed (session not found): path=%s, session=%s", + remote_path, + session_id[:8], + ) raise HTTPException(status_code=404, detail=str(exc)) except Exception as exc: # noqa: BLE001 + log.error( + "FTP download failed: path=%s, error=%s, session=%s", + remote_path, + exc, + session_id[:8], + ) raise HTTPException(status_code=500, detail=f"Download failed: {exc}") return Response( @@ -223,18 +269,22 @@ class UploadResponse(BaseModel): size: int -@router.post("/{session_id}/upload", response_model=UploadResponse) +@router.post("/{session_id}/upload") async def upload_file( session_id: str, path: str, + request: Request, file: UploadFile = File(...), + background_tasks: BackgroundTasks = BackgroundTasks(), _: str = Depends(get_current_user), ): """ - Upload a file to the remote server. + Upload a file to the remote server (returns immediately, processes in background). ``path`` is the target directory; the remote file will be placed at ``{path}/{file.filename}``. + + Returns upload_id for status tracking via GET /ftp/{session_id}/upload/{upload_id} """ target_dir = unquote(path) if target_dir.endswith("/"): @@ -242,21 +292,118 @@ async def upload_file( else: remote_path = target_dir + "/" + (file.filename or "upload") - data = await file.read() + upload_id = str(uuid.uuid4()) + + log.debug( + "FTP upload request: filename=%s, target_path=%s, upload_id=%s, session=%s", + file.filename, + remote_path, + upload_id[:8], + session_id[:8], + ) + + # Try to read content length for progress reporting try: - await write_file_bytes(session_id, remote_path, data) - except ValueError as exc: - raise HTTPException(status_code=404, detail=str(exc)) + content_length = int(request.headers.get("content-length", "0") or 0) + except Exception: + content_length = 0 + + # Read the full request body before returning. This is required to + # avoid losing the upload when the request ends. + try: + file_data = await file.read() except Exception as exc: # noqa: BLE001 - raise HTTPException(status_code=500, detail=f"Upload failed: {exc}") + log.error("Failed to read upload file: %s", exc) + raise HTTPException(status_code=400, detail=f"Failed to read file: {exc}") from exc + + file_size = len(file_data) + file_size_mb = file_size / (1024 * 1024) + + # Initialize status entry + _upload_status[upload_id] = { + "status": "uploading", + "filename": file.filename or "upload", + "size_bytes": file_size, + "transferred_bytes": 0, + } log.info( - "FTP uploaded %s bytes to %s (session %s)", - len(data), - remote_path, + "FTP upload buffered: filename=%s, size=%s bytes (%.2f MB), upload_id=%s, session=%s", + file.filename, + file_size, + file_size_mb, + upload_id[:8], session_id[:8], ) - return UploadResponse(uploaded=remote_path, size=len(data)) + + def progress_callback(bytes_written: int) -> None: + """Update progress status.""" + sts = _upload_status.get(upload_id) + if sts is None: + return + sts["transferred_bytes"] = bytes_written + if sts.get("size_bytes") and sts["size_bytes"] > 0: + try: + sts["percent"] = round(bytes_written / sts["size_bytes"] * 100, 2) + except Exception: + sts["percent"] = None + + async def uploader_task(): + """Upload buffered chunks to FTP (runs in background).""" + try: + await write_file_bytes(session_id, remote_path, file_data) + progress_callback(file_size) + _upload_status[upload_id]["status"] = "completed" + log.info( + "FTP upload completed: filename=%s, upload_id=%s, session=%s", + file.filename, + upload_id[:8], + session_id[:8], + ) + except Exception as exc: + log.error( + "FTP upload failed: filename=%s, error=%s, upload_id=%s, session=%s", + file.filename, + exc, + upload_id[:8], + session_id[:8], + ) + _upload_status[upload_id] = {"status": "failed", "error": str(exc)} + + # Schedule the uploader task as a background task + background_tasks.add_task(uploader_task) + + # Return immediately with upload_id + log.info( + "FTP upload scheduled in background: filename=%s, size=%.2f MB, upload_id=%s, session=%s", + file.filename, + file_size_mb, + upload_id[:8], + session_id[:8], + ) + + return { + "upload_id": upload_id, + "uploaded": remote_path, + "filename": file.filename or "upload", + "size": file_size, + "size_bytes": file_size, + "size_mb": file_size_mb, + "status": "queued", + } + + +@router.get("/{session_id}/upload/{upload_id}") +async def get_upload_status( + session_id: str, + upload_id: str, + _: str = Depends(get_current_user), +): + """Check status of a background upload.""" + status_info = _upload_status.get(upload_id) + if not status_info: + raise HTTPException(status_code=404, detail=f"Upload {upload_id[:8]} not found") + return status_info class DeleteRequest(BaseModel): @@ -273,11 +420,29 @@ async def delete_path( _: str = Depends(get_current_user), ): """Delete a remote file or directory.""" + log.debug( + "FTP delete request: path=%s, is_dir=%s, session=%s", + body.path, + body.is_dir, + session_id[:8], + ) + try: await delete_remote(session_id, body.path, body.is_dir) except ValueError as exc: + log.error( + "FTP delete failed (session not found): path=%s, session=%s", + body.path, + session_id[:8], + ) raise HTTPException(status_code=404, detail=str(exc)) except Exception as exc: # noqa: BLE001 + log.error( + "FTP delete failed: path=%s, error=%s, session=%s", + body.path, + exc, + session_id[:8], + ) raise HTTPException(status_code=500, detail=f"Delete failed: {exc}") @@ -295,11 +460,31 @@ async def rename_path( _: str = Depends(get_current_user), ): """Rename or move a remote path.""" + log.debug( + "FTP rename request: old_path=%s, new_path=%s, session=%s", + body.old_path, + body.new_path, + session_id[:8], + ) + try: await rename_remote(session_id, body.old_path, body.new_path) except ValueError as exc: + log.error( + "FTP rename failed (session not found): old_path=%s, new_path=%s, session=%s", + body.old_path, + body.new_path, + session_id[:8], + ) raise HTTPException(status_code=404, detail=str(exc)) except Exception as exc: # noqa: BLE001 + log.error( + "FTP rename failed: old_path=%s, new_path=%s, error=%s, session=%s", + body.old_path, + body.new_path, + exc, + session_id[:8], + ) raise HTTPException(status_code=500, detail=f"Rename failed: {exc}") @@ -316,9 +501,26 @@ async def make_directory( _: str = Depends(get_current_user), ): """Create a remote directory.""" + log.debug( + "FTP mkdir request: path=%s, session=%s", + body.path, + session_id[:8], + ) + try: await mkdir_remote(session_id, body.path) except ValueError as exc: + log.error( + "FTP mkdir failed (session not found): path=%s, session=%s", + body.path, + session_id[:8], + ) raise HTTPException(status_code=404, detail=str(exc)) except Exception as exc: # noqa: BLE001 + log.error( + "FTP mkdir failed: path=%s, error=%s, session=%s", + body.path, + exc, + session_id[:8], + ) raise HTTPException(status_code=500, detail=f"Mkdir failed: {exc}") diff --git a/backend/services/ftp.py b/backend/services/ftp.py index 2655f33..e4db581 100644 --- a/backend/services/ftp.py +++ b/backend/services/ftp.py @@ -24,10 +24,12 @@ We default to ``latin-1`` because it is a strict superset of ASCII and never raises a decode error (every byte 0x00–0xFF is valid Latin-1). """ +import asyncio import logging import ssl import uuid import hashlib +from typing import Callable, Optional, AsyncIterable from dataclasses import dataclass, field import aioftp @@ -47,6 +49,9 @@ class _FtpSession: source_ip: str | None = None use_tls: bool = False _cwd: str = field(default="/", init=False) + # Per-session lock to serialize control/data channel operations. Lazily + # created to avoid creating an asyncio.Lock at import time. + lock: asyncio.Lock | None = field(default=None, init=False) _ftp_sessions: dict[str, _FtpSession] = {} @@ -233,28 +238,32 @@ async def list_directory(session_id: str, remote_path: str) -> list[dict]: raise ValueError("FTP session not found") result = [] - async for path_obj, info in entry.client.list(remote_path, recursive=False): - name = path_obj.name - if name in (".", ".."): - continue - is_dir = info.get("type") == "dir" - # Build a clean joined path without double-slashes - parent = remote_path.rstrip("/") - full_path = f"{parent}/{name}" if parent else f"/{name}" - size = int(info.get("size", 0) or 0) - modify = info.get("modify", "") - # modify is a 14-char timestamp: YYYYMMDDHHMMSS - modified_ts = _parse_ftp_mtime(modify) - result.append( - { - "name": name, - "path": full_path, - "size": size, - "is_dir": is_dir, - "permissions": info.get("unix.mode", None), - "modified": modified_ts, - } - ) + # Ensure a per-session lock exists and serialize the listing operation. + if entry.lock is None: + entry.lock = asyncio.Lock() + async with entry.lock: + async for path_obj, info in entry.client.list(remote_path, recursive=False): + name = path_obj.name + if name in (".", ".."): + continue + is_dir = info.get("type") == "dir" + # Build a clean joined path without double-slashes + parent = remote_path.rstrip("/") + full_path = f"{parent}/{name}" if parent else f"/{name}" + size = int(info.get("size", 0) or 0) + modify = info.get("modify", "") + # modify is a 14-char timestamp: YYYYMMDDHHMMSS + modified_ts = _parse_ftp_mtime(modify) + result.append( + { + "name": name, + "path": full_path, + "size": size, + "is_dir": is_dir, + "permissions": info.get("unix.mode", None), + "modified": modified_ts, + } + ) result.sort(key=lambda x: (not x["is_dir"], x["name"].lower())) return result @@ -285,11 +294,43 @@ async def read_file_bytes(session_id: str, remote_path: str) -> bytes: entry = _ftp_sessions.get(session_id) if entry is None: raise ValueError("FTP session not found") - chunks: list[bytes] = [] - async with entry.client.download_stream(remote_path) as stream: - async for chunk in stream.iter_by_block(): - chunks.append(chunk) - return b"".join(chunks) + + log.debug( + "FTP download starting: path=%s, session=%s", + remote_path, + session_id[:8], + ) + + try: + chunks: list[bytes] = [] + if entry.lock is None: + entry.lock = asyncio.Lock() + async with entry.lock: + async with entry.client.download_stream(remote_path) as stream: + chunk_count = 0 + async for chunk in stream.iter_by_block(): + chunks.append(chunk) + chunk_count += 1 + + total_size = len(b"".join(chunks)) + file_size_mb = total_size / (1024 * 1024) + log.info( + "FTP download completed: path=%s, size=%s bytes (%.2f MB), chunks=%d, session=%s", + remote_path, + total_size, + file_size_mb, + chunk_count, + session_id[:8], + ) + return b"".join(chunks) + except Exception as exc: + log.error( + "FTP download failed for %s: %s (session %s)", + remote_path, + exc, + session_id[:8], + ) + raise async def write_file_bytes(session_id: str, remote_path: str, data: bytes) -> None: @@ -297,8 +338,146 @@ async def write_file_bytes(session_id: str, remote_path: str, data: bytes) -> No entry = _ftp_sessions.get(session_id) if entry is None: raise ValueError("FTP session not found") - async with entry.client.upload_stream(remote_path) as stream: - await stream.write(data) + + file_size = len(data) + file_size_mb = file_size / (1024 * 1024) + log.debug( + "FTP upload starting: path=%s, size=%s bytes (%.2f MB), session=%s", + remote_path, + file_size, + file_size_mb, + session_id[:8], + ) + + try: + if entry.lock is None: + entry.lock = asyncio.Lock() + async with entry.lock: + async with entry.client.upload_stream(remote_path) as stream: + log.debug( + "FTP upload stream opened for %s (session %s)", + remote_path, + session_id[:8], + ) + + # Write in chunks to avoid timeout and allow progress monitoring + chunk_size = 1024 * 1024 # 1 MB chunks + bytes_written = 0 + chunk_num = 0 + + while bytes_written < file_size: + chunk_num += 1 + end = min(bytes_written + chunk_size, file_size) + chunk = data[bytes_written:end] + + try: + # Each chunk has 10-minute timeout + await asyncio.wait_for(stream.write(chunk), timeout=600) + except asyncio.TimeoutError as exc: + log.error( + "FTP upload timeout at chunk %d (%.2f MB of %.2f MB), session=%s", + chunk_num, + bytes_written / (1024 * 1024), + file_size_mb, + session_id[:8], + ) + raise TimeoutError( + f"FTP upload timed out at chunk {chunk_num} after {bytes_written} bytes" + ) from exc + + bytes_written = end + + progress_mb = bytes_written / (1024 * 1024) + log.debug( + "FTP upload progress: path=%s, chunk=%d, progress=%.2f MB / %.2f MB, session=%s", + remote_path, + chunk_num, + progress_mb, + file_size_mb, + session_id[:8], + ) + + log.debug( + "FTP upload stream write completed for %s (session %s)", + remote_path, + session_id[:8], + ) + except Exception as exc: + log.error( + "FTP upload failed for %s (%.2f MB): %s (session %s)", + remote_path, + file_size_mb, + exc, + session_id[:8], + ) + raise + + +async def upload_stream_from_async_iter( + session_id: str, + remote_path: str, + async_iterable: AsyncIterable[bytes], + progress_callback: Optional[Callable[[int], None]] = None, +) -> None: + """Upload data provided by an async iterable of bytes to `remote_path`. + + The `async_iterable` should yield bytes objects. The function writes each + chunk to the FTP upload stream and calls `progress_callback(bytes_written)` + (if provided) after each successful write. + """ + entry = _ftp_sessions.get(session_id) + if entry is None: + raise ValueError("FTP session not found") + + log.debug( + "FTP streaming upload starting: path=%s, session=%s", + remote_path, + session_id[:8], + ) + + try: + if entry.lock is None: + entry.lock = asyncio.Lock() + async with entry.lock: + async with entry.client.upload_stream(remote_path) as stream: + bytes_written = 0 + chunk_num = 0 + async for chunk in async_iterable: + chunk_num += 1 + # Per-chunk timeout to avoid hangs + try: + await asyncio.wait_for(stream.write(chunk), timeout=600) + except asyncio.TimeoutError as exc: + log.error( + "FTP streaming upload timeout at chunk %d (session=%s)", + chunk_num, + session_id[:8], + ) + raise TimeoutError("FTP upload timed out") from exc + + bytes_written += len(chunk) + if progress_callback: + try: + progress_callback(bytes_written) + except Exception: # don't let callback break upload + log.debug("FTP progress callback raised, ignoring") + + log.debug( + "FTP streaming upload progress: path=%s, chunk=%d, bytes=%d, session=%s", + remote_path, + chunk_num, + bytes_written, + session_id[:8], + ) + + except Exception as exc: + log.error( + "FTP streaming upload failed for %s: %s (session %s)", + remote_path, + exc, + session_id[:8], + ) + raise async def delete_remote(session_id: str, remote_path: str, is_dir: bool) -> None: @@ -306,10 +485,40 @@ async def delete_remote(session_id: str, remote_path: str, is_dir: bool) -> None entry = _ftp_sessions.get(session_id) if entry is None: raise ValueError("FTP session not found") - if is_dir: - await entry.client.remove_directory(remote_path) - else: - await entry.client.remove_file(remote_path) + + try: + if entry.lock is None: + entry.lock = asyncio.Lock() + async with entry.lock: + if is_dir: + log.debug( + "FTP delete directory: path=%s, session=%s", + remote_path, + session_id[:8], + ) + await entry.client.remove_directory(remote_path) + else: + log.debug( + "FTP delete file: path=%s, session=%s", + remote_path, + session_id[:8], + ) + await entry.client.remove_file(remote_path) + + log.info( + "FTP delete successful: %s (%s), session=%s", + remote_path, + "directory" if is_dir else "file", + session_id[:8], + ) + except Exception as exc: + log.error( + "FTP delete failed for %s: %s (session %s)", + remote_path, + exc, + session_id[:8], + ) + raise async def rename_remote(session_id: str, old_path: str, new_path: str) -> None: @@ -317,7 +526,34 @@ async def rename_remote(session_id: str, old_path: str, new_path: str) -> None: entry = _ftp_sessions.get(session_id) if entry is None: raise ValueError("FTP session not found") - await entry.client.rename(old_path, new_path) + + try: + if entry.lock is None: + entry.lock = asyncio.Lock() + async with entry.lock: + log.debug( + "FTP rename: %s -> %s, session=%s", + old_path, + new_path, + session_id[:8], + ) + await entry.client.rename(old_path, new_path) + + log.info( + "FTP rename successful: %s -> %s, session=%s", + old_path, + new_path, + session_id[:8], + ) + except Exception as exc: + log.error( + "FTP rename failed: %s -> %s, error=%s (session %s)", + old_path, + new_path, + exc, + session_id[:8], + ) + raise async def mkdir_remote(session_id: str, remote_path: str) -> None: @@ -325,4 +561,28 @@ async def mkdir_remote(session_id: str, remote_path: str) -> None: entry = _ftp_sessions.get(session_id) if entry is None: raise ValueError("FTP session not found") - await entry.client.make_directory(remote_path) + + try: + if entry.lock is None: + entry.lock = asyncio.Lock() + async with entry.lock: + log.debug( + "FTP mkdir: path=%s, session=%s", + remote_path, + session_id[:8], + ) + await entry.client.make_directory(remote_path) + + log.info( + "FTP mkdir successful: path=%s, session=%s", + remote_path, + session_id[:8], + ) + except Exception as exc: + log.error( + "FTP mkdir failed for %s: %s (session %s)", + remote_path, + exc, + session_id[:8], + ) + raise diff --git a/frontend/src/api/client.ts b/frontend/src/api/client.ts index ef8e59f..8935ddc 100644 --- a/frontend/src/api/client.ts +++ b/frontend/src/api/client.ts @@ -577,15 +577,73 @@ export async function ftpUpload( const xhr = new XMLHttpRequest(); xhr.open("POST", `${BASE}/ftp/${sessionId}/upload?path=${encoded}`); + let uploadId: string | null = null; + if (onProgress) { xhr.upload.onprogress = (e) => { - if (e.lengthComputable) onProgress(Math.round((e.loaded / e.total) * 100)); + if (e.lengthComputable) { + // Client->server progress (0-50%) + const clientPct = Math.round((e.loaded / e.total) * 50); + onProgress(clientPct); + } }; } - xhr.onload = () => { + xhr.onload = async () => { if (xhr.status >= 200 && xhr.status < 300) { - resolve(); + try { + const response = JSON.parse(xhr.responseText); + uploadId = response.upload_id; + + // Now poll the server->FTP progress + if (uploadId && onProgress) { + let pollCount = 0; + const maxPolls = 1800; // 30 minutes (1 poll per second) + + while (pollCount < maxPolls) { + try { + const statusRes = await fetch(`${BASE}/ftp/${sessionId}/upload/${uploadId}`); + if (statusRes.ok) { + const status = await statusRes.json(); + + if (status.percent !== undefined && status.percent !== null) { + // Server->FTP progress (50-100%) + const serverPct = 50 + Math.round(status.percent * 0.5); + onProgress(Math.min(serverPct, 100)); + } + + if (status.status === "completed") { + onProgress(100); + resolve(); + return; + } else if (status.status === "failed") { + reject(new Error(status.error ?? "Server-side upload failed")); + return; + } + } else if (statusRes.status === 404) { + // Status not found (may have been cleaned up) + resolve(); + return; + } + } catch (pollErr) { + // If polling fails, just resolve anyway (upload likely succeeded) + console.warn("Poll error, resolving:", pollErr); + resolve(); + return; + } + + pollCount++; + await new Promise(r => setTimeout(r, 1000)); // Poll every 1 second + } + + // Timeout after 30 min + reject(new Error("Upload progress polling timeout")); + } else { + resolve(); + } + } catch (parseErr) { + reject(new Error("Failed to parse upload response")); + } } else if (xhr.status === 401) { _forceLogout(); reject(new Error("Session expired")); diff --git a/frontend/src/components/DeviceListWithFolders.tsx b/frontend/src/components/DeviceListWithFolders.tsx index 18d16d6..d3ad525 100644 --- a/frontend/src/components/DeviceListWithFolders.tsx +++ b/frontend/src/components/DeviceListWithFolders.tsx @@ -54,7 +54,6 @@ export function DeviceListWithFolders({ const loadFolders = async () => { try { const data = await listFolders(); - console.log("Folders data from API:", JSON.stringify(data, null, 2)); setFolders(data); } catch (err) { // Folders might not exist yet, which is fine diff --git a/frontend/src/components/FolderTreeItem.tsx b/frontend/src/components/FolderTreeItem.tsx index d4c0c7e..1ca4fca 100644 --- a/frontend/src/components/FolderTreeItem.tsx +++ b/frontend/src/components/FolderTreeItem.tsx @@ -78,10 +78,6 @@ export function FolderTreeItem({ const hasChildren = (folder.children && Array.isArray(folder.children) && folder.children.length > 0) || folderIdsWithDevices.has(folder.id); - - if (level > 0) { - console.log(`Nested folder "${folder.name}": children=${folder.children?.length ?? 0}, device_count=${folder.device_count}, hasChildren=${hasChildren}`); - } return (