From 34c0d242d3b521356c554a99750267e0e58a7719 Mon Sep 17 00:00:00 2001 From: "Thanh-Giang (River) Tan Nguyen" Date: Mon, 5 Jan 2026 21:39:05 +0700 Subject: [PATCH 1/3] feat: handle server disconnect --- backend/app/service_job/models.py | 2 +- backend/app/service_job/tasks.py | 1 - backend/app/utils/executor/ssh.py | 13 +++-- .../models/5_20260105205918_update.py | 49 +++++++++++++++++++ 4 files changed, 59 insertions(+), 6 deletions(-) create mode 100644 backend/migrations/models/5_20260105205918_update.py diff --git a/backend/app/service_job/models.py b/backend/app/service_job/models.py index ef58399..28c566b 100644 --- a/backend/app/service_job/models.py +++ b/backend/app/service_job/models.py @@ -12,7 +12,7 @@ class JobStatus(str, Enum): PENDING = "PENDING" FAILED = "FAILED" COMPLETED = "COMPLETED" - SERVER_ERROR = "SERVER_ERROR" + HPC_DISCONNECTED = "HPC_DISCONNECTED" CANCELLED = "CANCELLED" CANCELLING = "CANCELLED+" TIMEOUT = "TIMEOUT" diff --git a/backend/app/service_job/tasks.py b/backend/app/service_job/tasks.py index 3be64d5..93de5cc 100644 --- a/backend/app/service_job/tasks.py +++ b/backend/app/service_job/tasks.py @@ -77,7 +77,6 @@ async def _handle(): JobStatus.CANCELLED, JobStatus.CANCELLING, JobStatus.TIMEOUT, - JobStatus.SERVER_ERROR, ]: logger.info(f"Rescheduling monitoring for job {job.id}, current status: {job.status}") monitor_job.apply_async(args=[job.id, is_web_job], countdown=5) diff --git a/backend/app/utils/executor/ssh.py b/backend/app/utils/executor/ssh.py index 95280ef..aa72c24 100644 --- a/backend/app/utils/executor/ssh.py +++ b/backend/app/utils/executor/ssh.py @@ -207,7 +207,7 @@ def _delete_tunnel(job: WebJob): logger.info(f"[INFO] Killing process {proc.pid} on port {job.local_port}") proc.kill() except Exception as e: - logger.error(f"[ERROR] Failed to kill tunnel: {e}") + logger.error(f"Failed to kill tunnel, does not exist: {e}") def _create_traefik_config(job: WebJob): config_path = os.path.join(TRAEFIK_DYNAMIC_FOLDER, f"{job.id}.yaml") @@ -306,9 +306,11 @@ def _create_traefik_config(job: WebJob): # Monitor job STAGING_STATUS = [ JobStatus.SUBMITTED, + JobStatus.SUBMITTING, JobStatus.PENDING, JobStatus.RUNNING, JobStatus.CANCELLING, + JobStatus.HPC_DISCONNECTED, ] if job.status in STAGING_STATUS: update_info = {} @@ -317,9 +319,12 @@ def _create_traefik_config(job: WebJob): out, error, exit_code = self._run_cmd( f"squeue --job {job.external_id} --format='%.18i %.9T %.10M %.20V %R' --noheader | head -n1" ) - logger.info(f"squeue output: {out.strip() if out else ''}, error: {error.strip()}, exit_code: {exit_code}") - - if exit_code == 0 and out.strip(): + logger.info( + f"squeue output: {out.strip() if out else ''}, error: {error.strip() if error else ''}, exit_code: {exit_code}" + ) + if exit_code is None: + update_info["status"] = JobStatus.HPC_DISCONNECTED + elif exit_code == 0 and out.strip(): _get_update_info(update_info, out, job, method="squeue") else: # Fallback to sacct (for completed, failed, or recently finished jobs) diff --git a/backend/migrations/models/5_20260105205918_update.py b/backend/migrations/models/5_20260105205918_update.py new file mode 100644 index 0000000..44afd44 --- /dev/null +++ b/backend/migrations/models/5_20260105205918_update.py @@ -0,0 +1,49 @@ +from tortoise import BaseDBAsyncClient + + +async def upgrade(db: BaseDBAsyncClient) -> str: + return """ + COMMENT ON COLUMN "batchjob"."status" IS 'SUBMITTING: SUBMITTING +SUBMITTED: SUBMITTED +RUNNING: RUNNING +PENDING: PENDING +FAILED: FAILED +COMPLETED: COMPLETED +HPC_DISCONNECTED: HPC_DISCONNECTED +CANCELLED: CANCELLED +CANCELLING: CANCELLED+ +TIMEOUT: TIMEOUT'; + COMMENT ON COLUMN "webjob"."status" IS 'SUBMITTING: SUBMITTING +SUBMITTED: SUBMITTED +RUNNING: RUNNING +PENDING: PENDING +FAILED: FAILED +COMPLETED: COMPLETED +HPC_DISCONNECTED: HPC_DISCONNECTED +CANCELLED: CANCELLED +CANCELLING: CANCELLED+ +TIMEOUT: TIMEOUT';""" + + +async def downgrade(db: BaseDBAsyncClient) -> str: + return """ + COMMENT ON COLUMN "webjob"."status" IS 'SUBMITTING: SUBMITTING +SUBMITTED: SUBMITTED +RUNNING: RUNNING +PENDING: PENDING +FAILED: FAILED +COMPLETED: COMPLETED +SERVER_ERROR: SERVER_ERROR +CANCELLED: CANCELLED +CANCELLING: CANCELLED+ +TIMEOUT: TIMEOUT'; + COMMENT ON COLUMN "batchjob"."status" IS 'SUBMITTING: SUBMITTING +SUBMITTED: SUBMITTED +RUNNING: RUNNING +PENDING: PENDING +FAILED: FAILED +COMPLETED: COMPLETED +SERVER_ERROR: SERVER_ERROR +CANCELLED: CANCELLED +CANCELLING: CANCELLED+ +TIMEOUT: TIMEOUT';""" From 4e78f7c0bb0f7ae22c54162e6d11bb900528d780 Mon Sep 17 00:00:00 2001 From: "Thanh-Giang (River) Tan Nguyen" Date: Mon, 5 Jan 2026 22:48:32 +0700 Subject: [PATCH 2/3] fix: add github token to encrypt --- backend/app/service_credential/models/personal.py | 2 +- backend/migrations/models/6_20260105224743_update.py | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) create mode 100644 backend/migrations/models/6_20260105224743_update.py diff --git a/backend/app/service_credential/models/personal.py b/backend/app/service_credential/models/personal.py index 226de32..9db68ec 100644 --- a/backend/app/service_credential/models/personal.py +++ b/backend/app/service_credential/models/personal.py @@ -25,4 +25,4 @@ class Github(models.Model): credential = fields.OneToOneField("models.Credential", related_name="github", on_delete=fields.CASCADE) id = fields.UUIDField(primary_key=True, default=uuid.uuid4) username = fields.CharField(max_length=50, null=False) - token = fields.CharField(max_length=200, null=False) + token = EncryptedTextField(max_length=200, null=False) diff --git a/backend/migrations/models/6_20260105224743_update.py b/backend/migrations/models/6_20260105224743_update.py new file mode 100644 index 0000000..53b0033 --- /dev/null +++ b/backend/migrations/models/6_20260105224743_update.py @@ -0,0 +1,11 @@ +from tortoise import BaseDBAsyncClient + + +async def upgrade(db: BaseDBAsyncClient) -> str: + return """ + ALTER TABLE "github" ALTER COLUMN "token" TYPE TEXT USING "token"::TEXT;""" + + +async def downgrade(db: BaseDBAsyncClient) -> str: + return """ + ALTER TABLE "github" ALTER COLUMN "token" TYPE VARCHAR(200) USING "token"::VARCHAR(200);""" From f74337265a30e6320b41430c94257ed5b40754ff Mon Sep 17 00:00:00 2001 From: "Thanh-Giang (River) Tan Nguyen" Date: Wed, 7 Jan 2026 22:33:06 +0700 Subject: [PATCH 3/3] feat: remove credentails from params in celery --- backend/app/service_job/controller.py | 16 +++++---- backend/app/service_job/tasks.py | 50 +++++++++++++++++++++++++-- backend/app/utils/executor/ssh.py | 17 ++++++--- 3 files changed, 70 insertions(+), 13 deletions(-) diff --git a/backend/app/service_job/controller.py b/backend/app/service_job/controller.py index 32e5d60..6bbc9ee 100644 --- a/backend/app/service_job/controller.py +++ b/backend/app/service_job/controller.py @@ -54,10 +54,11 @@ async def create_job(self, project_id: UUID, job: JobSerializerIn, request: Requ project = await Project.get_or_none(id=project_id) params["bucket_name"] = project.bucket_name await project.fetch_related("storage") + + # Validate that storage credentials are configured (don't generate temp creds here) storage_manager = await get_storage_manager(project.storage) - temp_secrets = storage_manager.get_temp_secret(session_name=user.email, duration=job["time"]) - if not temp_secrets: - return bad_request({"message": "The ARN failed to create"}) + if not storage_manager.is_valid(): + return bad_request({"message": "Storage credentials are not valid"}) if analysis.allow_access: job = await WebJob.create(**job) @@ -74,9 +75,12 @@ async def create_job(self, project_id: UUID, job: JobSerializerIn, request: Requ job_out = job_out.model_dump() user = await request.identity.get_user() job_out["created_by"] = user.email - # Submit to monitor job - params.update(temp_secrets) - submit_job.apply_async((job.id, analysis.allow_access, params)) + + # SECURITY: Pass only storage_id and user_email, not credentials + # Worker will regenerate credentials just-in-time to avoid Redis exposure + submit_job.apply_async( + (str(job.id), analysis.allow_access, params, str(project.storage.pk), user.email, job.time) + ) return created(job_out) @auth("viewer") diff --git a/backend/app/service_job/tasks.py b/backend/app/service_job/tasks.py index 93de5cc..cbed903 100644 --- a/backend/app/service_job/tasks.py +++ b/backend/app/service_job/tasks.py @@ -6,8 +6,24 @@ @shared_task -def submit_job(job_id: str, is_web_job: bool, params: dict): +def submit_job(job_id: str, is_web_job: bool, params: dict, storage_id: str, user_email: str, duration: int): + """ + Submit job to compute executor. + + SECURITY: Regenerates AWS credentials just-in-time in the worker to avoid + passing credentials through Redis/Celery broker. + + Args: + job_id: Job UUID + is_web_job: Whether this is a web job + params: Job parameters (without credentials) + storage_id: Storage credential ID to fetch from database + user_email: User email for STS session naming + duration: Duration in seconds for temporary credentials + """ from app.utils.executor.manager import get_compute_executor + from app.utils.storage.manager import get_storage_manager + from app.service_credential.models.base import Credential import logging logger = logging.getLogger(__name__) @@ -23,13 +39,36 @@ async def _handle(): return try: + # SECURITY: Fetch storage credential from DB and generate temp credentials + # This happens in the worker, so credentials never pass through Redis + storage_credential = await Credential.get_or_none(id=storage_id) + if not storage_credential: + logger.error(f"Storage credential {storage_id} not found for job {job_id}") + job.status = JobStatus.FAILED + await job.save() + return + + storage_manager = await get_storage_manager(storage_credential) + temp_secrets = storage_manager.get_temp_secret(session_name=user_email, duration=duration) + + if not temp_secrets: + logger.error(f"Failed to generate temporary credentials for job {job_id}") + job.status = JobStatus.FAILED + await job.save() + return + + # Add temp credentials to params + params.update(temp_secrets) + logger.info(f"Generated temporary credentials for job {job_id} (duration: {duration}s)") + + # Submit job with credentials await job.fetch_related("compute") executor = await get_compute_executor(job.compute) result = executor.submit(job, params) job.external_id = result.get("job_id") if job.external_id is None: - job.status = JobStatus.SERVER_ERROR + job.status = JobStatus.HPC_DISCONNECTED else: job.status = JobStatus.SUBMITTED await job.save() @@ -83,6 +122,13 @@ async def _handle(): else: logger.info(f"Monitoring finished for job {job.id}, status: {job.status}") + # SECURITY: Cleanup params.json from HPC in case job failed before it could delete it + try: + await executor.cleanup_credentials(job) + logger.info(f"Cleaned up credential files for job {job.id}") + except Exception as e: + logger.warning(f"Failed to cleanup credentials for job {job.id}: {e}") + except Exception: logger.exception(f"Monitoring failed for WebJob {job_id}") job.status = JobStatus.FAILED diff --git a/backend/app/utils/executor/ssh.py b/backend/app/utils/executor/ssh.py index aa72c24..5e19fb1 100644 --- a/backend/app/utils/executor/ssh.py +++ b/backend/app/utils/executor/ssh.py @@ -201,13 +201,20 @@ def _create_tunnel(job: WebJob): def _delete_tunnel(job: WebJob): try: + killed = False for proc in psutil.process_iter(["pid", "name", "connections"]): - for conn in proc.info.get("connections", []): - if conn.laddr and conn.laddr.port == job.local_port: - logger.info(f"[INFO] Killing process {proc.pid} on port {job.local_port}") - proc.kill() + try: + for conn in proc.info.get("connections", []): + if conn.laddr and conn.laddr.port == job.local_port: + logger.info(f"[INFO] Killing process {proc.pid} on port {job.local_port}") + proc.kill() + killed = True + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + continue + if not killed: + logger.info(f"[INFO] No process found on port {job.local_port}") except Exception as e: - logger.error(f"Failed to kill tunnel, does not exist: {e}") + logger.error(f"Failed to kill tunnel: {e}") def _create_traefik_config(job: WebJob): config_path = os.path.join(TRAEFIK_DYNAMIC_FOLDER, f"{job.id}.yaml")