Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ on:
options: ["20", "22"]
default: "20"

concurrency:
group: ci-${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
unit-tests:
name: Unit Tests
Expand Down
226 changes: 214 additions & 12 deletions backend/routers/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
"""
import logging
import os
import uuid
from urllib.parse import unquote

from fastapi import APIRouter, Depends, HTTPException, Request, UploadFile, File
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request, UploadFile, File
from fastapi.responses import Response
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
Expand Down Expand Up @@ -49,6 +50,9 @@
log = logging.getLogger(__name__)
router = APIRouter(prefix="/ftp", tags=["ftp"])

# Track active uploads for UI feedback
_upload_status: dict[str, dict] = {}


# -- Session management --------------------------------------------------------

Expand Down Expand Up @@ -184,11 +188,34 @@ async def list_dir(
_: str = Depends(get_current_user),
):
"""List directory contents at the given remote path."""
log.debug(
"FTP list directory request: path=%s, session=%s",
path,
session_id[:8],
)

try:
entries = await list_directory(session_id, path)
log.info(
"FTP list directory successful: path=%s, entries=%d, session=%s",
path,
len(entries),
session_id[:8],
)
except ValueError as exc:
log.error(
"FTP list directory failed (session not found): path=%s, session=%s",
path,
session_id[:8],
)
raise HTTPException(status_code=404, detail=str(exc))
except Exception as exc: # noqa: BLE001
log.error(
"FTP list directory failed: path=%s, error=%s, session=%s",
path,
exc,
session_id[:8],
)
raise HTTPException(status_code=500, detail=f"Directory listing failed: {exc}")
return {"path": path, "entries": entries}

Expand All @@ -202,11 +229,30 @@ async def download_file(
"""Download a remote file. ``path`` must be URL-encoded."""
remote_path = unquote(path)
filename = os.path.basename(remote_path)

log.debug(
"FTP download request: filename=%s, path=%s, session=%s",
filename,
remote_path,
session_id[:8],
)

try:
data = await read_file_bytes(session_id, remote_path)
except ValueError as exc:
log.error(
"FTP download failed (session not found): path=%s, session=%s",
remote_path,
session_id[:8],
)
raise HTTPException(status_code=404, detail=str(exc))
except Exception as exc: # noqa: BLE001
log.error(
"FTP download failed: path=%s, error=%s, session=%s",
remote_path,
exc,
session_id[:8],
)
raise HTTPException(status_code=500, detail=f"Download failed: {exc}")

return Response(
Expand All @@ -223,40 +269,141 @@ class UploadResponse(BaseModel):
size: int


@router.post("/{session_id}/upload", response_model=UploadResponse)
@router.post("/{session_id}/upload")
async def upload_file(
session_id: str,
path: str,
request: Request,
file: UploadFile = File(...),
background_tasks: BackgroundTasks = BackgroundTasks(),
_: str = Depends(get_current_user),
):
"""
Upload a file to the remote server.
Upload a file to the remote server (returns immediately, processes in background).

``path`` is the target directory; the remote file will be placed at
``{path}/{file.filename}``.

Returns upload_id for status tracking via GET /ftp/{session_id}/upload/{upload_id}
"""
target_dir = unquote(path)
if target_dir.endswith("/"):
remote_path = target_dir + (file.filename or "upload")
else:
remote_path = target_dir + "/" + (file.filename or "upload")

data = await file.read()
upload_id = str(uuid.uuid4())

log.debug(
"FTP upload request: filename=%s, target_path=%s, upload_id=%s, session=%s",
file.filename,
remote_path,
upload_id[:8],
session_id[:8],
)

# Try to read content length for progress reporting
try:
await write_file_bytes(session_id, remote_path, data)
except ValueError as exc:
raise HTTPException(status_code=404, detail=str(exc))
content_length = int(request.headers.get("content-length", "0") or 0)
except Exception:
content_length = 0

# Read the full request body before returning. This is required to
# avoid losing the upload when the request ends.
try:
file_data = await file.read()
except Exception as exc: # noqa: BLE001
raise HTTPException(status_code=500, detail=f"Upload failed: {exc}")
log.error("Failed to read upload file: %s", exc)
raise HTTPException(status_code=400, detail=f"Failed to read file: {exc}") from exc

file_size = len(file_data)
file_size_mb = file_size / (1024 * 1024)

# Initialize status entry
_upload_status[upload_id] = {
"status": "uploading",
"filename": file.filename or "upload",
"size_bytes": file_size,
"transferred_bytes": 0,
}

log.info(
"FTP uploaded %s bytes to %s (session %s)",
len(data),
remote_path,
"FTP upload buffered: filename=%s, size=%s bytes (%.2f MB), upload_id=%s, session=%s",
file.filename,
file_size,
file_size_mb,
upload_id[:8],
session_id[:8],
)
return UploadResponse(uploaded=remote_path, size=len(data))

def progress_callback(bytes_written: int) -> None:
"""Update progress status."""
sts = _upload_status.get(upload_id)
if sts is None:
return
sts["transferred_bytes"] = bytes_written
if sts.get("size_bytes") and sts["size_bytes"] > 0:
try:
sts["percent"] = round(bytes_written / sts["size_bytes"] * 100, 2)
except Exception:
sts["percent"] = None

async def uploader_task():
"""Upload buffered chunks to FTP (runs in background)."""
try:
await write_file_bytes(session_id, remote_path, file_data)
progress_callback(file_size)
_upload_status[upload_id]["status"] = "completed"
log.info(
"FTP upload completed: filename=%s, upload_id=%s, session=%s",
file.filename,
upload_id[:8],
session_id[:8],
)
except Exception as exc:
log.error(
"FTP upload failed: filename=%s, error=%s, upload_id=%s, session=%s",
file.filename,
exc,
upload_id[:8],
session_id[:8],
)
_upload_status[upload_id] = {"status": "failed", "error": str(exc)}

# Schedule the uploader task as a background task
background_tasks.add_task(uploader_task)

# Return immediately with upload_id
log.info(
"FTP upload scheduled in background: filename=%s, size=%.2f MB, upload_id=%s, session=%s",
file.filename,
file_size_mb,
upload_id[:8],
session_id[:8],
)

return {
"upload_id": upload_id,
"uploaded": remote_path,
"filename": file.filename or "upload",
"size": file_size,
"size_bytes": file_size,
"size_mb": file_size_mb,
"status": "queued",
}


@router.get("/{session_id}/upload/{upload_id}")
async def get_upload_status(
session_id: str,
upload_id: str,
_: str = Depends(get_current_user),
):
"""Check status of a background upload."""
status_info = _upload_status.get(upload_id)
if not status_info:
raise HTTPException(status_code=404, detail=f"Upload {upload_id[:8]} not found")
return status_info


class DeleteRequest(BaseModel):
Expand All @@ -273,11 +420,29 @@ async def delete_path(
_: str = Depends(get_current_user),
):
"""Delete a remote file or directory."""
log.debug(
"FTP delete request: path=%s, is_dir=%s, session=%s",
body.path,
body.is_dir,
session_id[:8],
)

try:
await delete_remote(session_id, body.path, body.is_dir)
except ValueError as exc:
log.error(
"FTP delete failed (session not found): path=%s, session=%s",
body.path,
session_id[:8],
)
raise HTTPException(status_code=404, detail=str(exc))
except Exception as exc: # noqa: BLE001
log.error(
"FTP delete failed: path=%s, error=%s, session=%s",
body.path,
exc,
session_id[:8],
)
raise HTTPException(status_code=500, detail=f"Delete failed: {exc}")


Expand All @@ -295,11 +460,31 @@ async def rename_path(
_: str = Depends(get_current_user),
):
"""Rename or move a remote path."""
log.debug(
"FTP rename request: old_path=%s, new_path=%s, session=%s",
body.old_path,
body.new_path,
session_id[:8],
)

try:
await rename_remote(session_id, body.old_path, body.new_path)
except ValueError as exc:
log.error(
"FTP rename failed (session not found): old_path=%s, new_path=%s, session=%s",
body.old_path,
body.new_path,
session_id[:8],
)
raise HTTPException(status_code=404, detail=str(exc))
except Exception as exc: # noqa: BLE001
log.error(
"FTP rename failed: old_path=%s, new_path=%s, error=%s, session=%s",
body.old_path,
body.new_path,
exc,
session_id[:8],
)
raise HTTPException(status_code=500, detail=f"Rename failed: {exc}")


Expand All @@ -316,9 +501,26 @@ async def make_directory(
_: str = Depends(get_current_user),
):
"""Create a remote directory."""
log.debug(
"FTP mkdir request: path=%s, session=%s",
body.path,
session_id[:8],
)

try:
await mkdir_remote(session_id, body.path)
except ValueError as exc:
log.error(
"FTP mkdir failed (session not found): path=%s, session=%s",
body.path,
session_id[:8],
)
raise HTTPException(status_code=404, detail=str(exc))
except Exception as exc: # noqa: BLE001
log.error(
"FTP mkdir failed: path=%s, error=%s, session=%s",
body.path,
exc,
session_id[:8],
)
raise HTTPException(status_code=500, detail=f"Mkdir failed: {exc}")
Loading
Loading