From b197a4cd5aeec9dff01f98010bb216f763990846 Mon Sep 17 00:00:00 2001 From: Johann Wagner Date: Mon, 16 Feb 2026 09:26:23 +0100 Subject: [PATCH 1/4] fix: Implemented cleanup locks --- dashboard/src/components/task.tsx | 16 +++- dashboard/src/services/api.tsx | 2 +- .../down.sql | 17 +++++ .../2026-02-16-100033-0000_lock_clean/up.sql | 4 + vicky/src/lib/database/entities/lock.rs | 9 +++ vicky/src/lib/database/entities/task.rs | 16 ++++ vicky/src/lib/vicky/scheduler.rs | 75 +++++++++++++++++-- 7 files changed, 131 insertions(+), 8 deletions(-) create mode 100644 vicky/migrations/2026-02-16-100033-0000_lock_clean/down.sql create mode 100644 vicky/migrations/2026-02-16-100033-0000_lock_clean/up.sql diff --git a/dashboard/src/components/task.tsx b/dashboard/src/components/task.tsx index 1b37e35..e27c62c 100644 --- a/dashboard/src/components/task.tsx +++ b/dashboard/src/components/task.tsx @@ -51,6 +51,18 @@ const Task = (props: TaskProps) => { } } + const BADGE_LOCK_COLOR = { + "WRITE": "red", + "READ": "green", + "CLEAN": "red", + } + + const BADGE_LOCK_CONTENT = { + "WRITE": "W", + "READ": "R", + "CLEAN": "C", + } + return ( @@ -75,8 +87,8 @@ const Task = (props: TaskProps) => { return ( {lock.name} diff --git a/dashboard/src/services/api.tsx b/dashboard/src/services/api.tsx index 0026713..a3eeb58 100644 --- a/dashboard/src/services/api.tsx +++ b/dashboard/src/services/api.tsx @@ -5,7 +5,7 @@ type ITask = { id: string, display_name: string, locks: { - type: "WRITE" | "READ" + type: "WRITE" | "READ" | "CLEAN" name: string, poisoned: string, }[] diff --git a/vicky/migrations/2026-02-16-100033-0000_lock_clean/down.sql b/vicky/migrations/2026-02-16-100033-0000_lock_clean/down.sql new file mode 100644 index 0000000..f27be20 --- /dev/null +++ b/vicky/migrations/2026-02-16-100033-0000_lock_clean/down.sql @@ -0,0 +1,17 @@ +-- This file should undo anything in `up.sql` + +-- can't drop enum values from an enum. +CREATE TYPE "LockKind_Type_New" AS ENUM ( + 'READ', + 'WRITE', +); + +DELETE FROM locks WHERE type = 'CLEAN'; + +ALTER TABLE tasks + ALTER COLUMN status TYPE "LockKind_Type_New" + USING (status::text::"LockKind_Type_New"); + +DROP TYPE "LockKind_Type"; + +ALTER TYPE "LockKind_Type_New" RENAME TO "LockKind_Type"; \ No newline at end of file diff --git a/vicky/migrations/2026-02-16-100033-0000_lock_clean/up.sql b/vicky/migrations/2026-02-16-100033-0000_lock_clean/up.sql new file mode 100644 index 0000000..b562674 --- /dev/null +++ b/vicky/migrations/2026-02-16-100033-0000_lock_clean/up.sql @@ -0,0 +1,4 @@ +-- Your SQL goes here + +ALTER TYPE "LockKind_Type" ADD VALUE 'CLEAN'; + diff --git a/vicky/src/lib/database/entities/lock.rs b/vicky/src/lib/database/entities/lock.rs index 6e515a1..ad242db 100644 --- a/vicky/src/lib/database/entities/lock.rs +++ b/vicky/src/lib/database/entities/lock.rs @@ -26,12 +26,16 @@ use crate::database::entities::task::db_impl::DbTask; pub enum LockKind { Read, Write, + Clean } impl LockKind { pub fn is_write(&self) -> bool { matches!(self, LockKind::Write) } + pub fn is_cleanup(&self) -> bool { + matches!(self, LockKind::Clean) + } } impl TryFrom<&str> for LockKind { @@ -41,6 +45,7 @@ impl TryFrom<&str> for LockKind { match value { "READ" => Ok(Self::Read), "WRITE" => Ok(Self::Write), + "CLEAN" => Ok(Self::Clean), _ => Err("Unexpected lock type received."), } } @@ -64,6 +69,10 @@ impl Lock { Self::new(name, LockKind::Write) } + pub fn clean>(name: S) -> Self { + Self::new(name, LockKind::Clean) + } + pub fn is_conflicting(&self, other: &Lock) -> bool { if self.name() != other.name() { return false; diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs index b83ffa5..fda26a3 100644 --- a/vicky/src/lib/database/entities/task.rs +++ b/vicky/src/lib/database/entities/task.rs @@ -125,6 +125,11 @@ impl TaskBuilder { self } + pub fn clean_lock>(mut self, name: S) -> Self { + self.locks.push(Lock::clean(name)); + self + } + pub fn locks(mut self, locks: Vec) -> Self { self.locks = locks; self @@ -221,6 +226,17 @@ impl TaskStatus { } } } + + pub fn is_finished(&self) -> bool { + match self { + TaskStatus::NeedsUserValidation + | TaskStatus::New + | TaskStatus::Running => false, + TaskStatus::Finished(_) => { + true + } + } + } } // this was on purpose because these macro-generated entity types diff --git a/vicky/src/lib/vicky/scheduler.rs b/vicky/src/lib/vicky/scheduler.rs index 98f686e..9c7ce8f 100644 --- a/vicky/src/lib/vicky/scheduler.rs +++ b/vicky/src/lib/vicky/scheduler.rs @@ -32,11 +32,11 @@ impl<'a> ConstraintMgmt<'a> for Constraints<'a> { if !self.contains_key(lock.name()) { return true; // lock wasn't used yet } - let Some(lock) = self.get(lock.name()) else { - return false; // block execution if missing lock entry + if let Some(existing_lock) = self.get(lock.name()) { + return !lock.is_conflicting(existing_lock) }; - !lock.is_conflicting(lock) + false } fn from_tasks(tasks: &'a [Task]) -> Result { @@ -81,6 +81,13 @@ impl<'a> Scheduler<'a> { Ok(s) } + fn is_cleanup_and_conflicts(&self, lock: &Lock) -> bool { + // If there is any other lock, we conflict with them and we do not want to be added to the queue. + // We cannot use constraints here, because constraints only contains locks with the status running. + let other_task_with_same_lock_exists = self.tasks.iter().any(|task| !task.status.is_finished() && task.locks.iter().any(|lock| !lock.kind.is_cleanup() && lock.name() == lock.name())); + lock.kind.is_cleanup() && other_task_with_same_lock_exists + } + fn is_poisoned(&self, lock: &Lock) -> bool { self.poisoned_locks .iter() @@ -90,7 +97,7 @@ impl<'a> Scheduler<'a> { fn is_unconstrained(&self, task: &Task) -> bool { task.locks .iter() - .all(|lock| self.constraints.can_get_lock(lock) && !self.is_poisoned(lock)) + .all(|lock| self.constraints.can_get_lock(lock) && !self.is_poisoned(lock) && !self.is_cleanup_and_conflicts(lock)) } fn supports_all_features(&self, task: &Task) -> bool { @@ -117,7 +124,7 @@ impl<'a> Scheduler<'a> { mod tests { use uuid::Uuid; - use crate::database::entities::task::TaskStatus; + use crate::database::entities::task::{TaskResult, TaskStatus}; use crate::database::entities::{Lock, Task}; use super::Scheduler; @@ -314,6 +321,64 @@ mod tests { assert_eq!(res.get_next_task(), None) } + #[test] + fn scheduler_new_task_cleanup_single() { + let tasks = vec![ + Task::builder() + .display_name("Test 1") + .status(TaskStatus::New) + .clean_lock("foo1") + .build_expect(), + ]; + + let res = Scheduler::new(&tasks, &[], &[]).unwrap(); + // Test 1 is currently running and has the write lock + assert_eq!(res.get_next_task().unwrap().display_name, "Test 1") + } + + #[test] + fn scheduler_new_task_cleanup_with_finished() { + let tasks = vec![ + Task::builder() + .display_name("Test 5") + .status(TaskStatus::Finished(TaskResult::Success)) + .write_lock("foo1") + .build_expect(), + Task::builder() + .display_name("Test 1") + .status(TaskStatus::New) + .clean_lock("foo1") + .build_expect(), + ]; + + let res = Scheduler::new(&tasks, &[], &[]).unwrap(); + // Test 1 is currently running and has the write lock + assert_eq!(res.get_next_task().unwrap().display_name, "Test 1") + } + + + #[test] + fn scheduler_new_task_cleanup() { + let tasks = vec![ + Task::builder() + .display_name("Test 1") + .status(TaskStatus::New) + .clean_lock("foo1") + .build_expect(), + Task::builder() + .display_name("Test 2") + .status(TaskStatus::New) + .read_lock("foo1") + .build_expect(), + ]; + + let res = Scheduler::new(&tasks, &[], &[]).unwrap(); + // Test 1 is currently running and has the write lock + assert_eq!(res.get_next_task().unwrap().display_name, "Test 2") + } + + + #[test] fn schedule_with_poisoned_lock() { let tasks = vec![ From d0ae31251fbf5159a00464137afd8eec3641940b Mon Sep 17 00:00:00 2001 From: willow <52585984+Kek5chen@users.noreply.github.com> Date: Tue, 17 Feb 2026 11:55:58 +0100 Subject: [PATCH 2/4] chore: cargo fmt --- fairy/src/main.rs | 5 +---- vicky/src/bin/vicky/main.rs | 6 +++--- vicky/src/lib/database/entities/lock.rs | 2 +- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/fairy/src/main.rs b/fairy/src/main.rs index c52151f..f7feefd 100644 --- a/fairy/src/main.rs +++ b/fairy/src/main.rs @@ -147,10 +147,7 @@ fn log_sink(cfg: Arc, task_id: Uuid) -> impl Sink, Error } async fn try_run_task(cfg: Arc, task: &Task) -> Result<()> { - let mut args = vec![ - "run".into(), - "--refresh".into() - ]; + let mut args = vec!["run".into(), "--refresh".into()]; if !&cfg.verbose_nix_logs { args.push("--quiet".into()); diff --git a/vicky/src/bin/vicky/main.rs b/vicky/src/bin/vicky/main.rs index 21edb9c..964874a 100644 --- a/vicky/src/bin/vicky/main.rs +++ b/vicky/src/bin/vicky/main.rs @@ -5,15 +5,15 @@ use crate::locks::{ }; use crate::startup::Result; use crate::tasks::{ - tasks_add, tasks_claim, tasks_confirm, tasks_count, tasks_download_logs, tasks_finish, - tasks_get, tasks_get_logs, tasks_get_specific, tasks_heartbeat, tasks_put_logs, tasks_cancel + tasks_add, tasks_cancel, tasks_claim, tasks_confirm, tasks_count, tasks_download_logs, + tasks_finish, tasks_get, tasks_get_logs, tasks_get_specific, tasks_heartbeat, tasks_put_logs, }; use crate::user::get_user; use crate::webconfig::get_web_config; use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations}; use errors::AppError; use jwtk::jwk::RemoteJwksVerifier; -use log::{LevelFilter, error, trace, warn, info}; +use log::{LevelFilter, error, info, trace, warn}; use rocket::fairing::AdHoc; use rocket::{Build, Ignite, Rocket, routes}; use snafu::ResultExt; diff --git a/vicky/src/lib/database/entities/lock.rs b/vicky/src/lib/database/entities/lock.rs index ad242db..6c2db5c 100644 --- a/vicky/src/lib/database/entities/lock.rs +++ b/vicky/src/lib/database/entities/lock.rs @@ -26,7 +26,7 @@ use crate::database::entities::task::db_impl::DbTask; pub enum LockKind { Read, Write, - Clean + Clean, } impl LockKind { From 94b1a4fe1a70e842aa0e51dd8ad9c150dce8e166 Mon Sep 17 00:00:00 2001 From: willow <52585984+Kek5chen@users.noreply.github.com> Date: Tue, 17 Feb 2026 13:23:32 +0100 Subject: [PATCH 3/4] fix: scheduling and constraint system --- vicky/src/lib/database/entities/lock.rs | 24 +- vicky/src/lib/database/entities/task.rs | 34 ++- vicky/src/lib/vicky/constraints.rs | 185 ++++++++++++++++ vicky/src/lib/vicky/mod.rs | 1 + vicky/src/lib/vicky/scheduler.rs | 279 +++++++++++++++++------- 5 files changed, 426 insertions(+), 97 deletions(-) create mode 100644 vicky/src/lib/vicky/constraints.rs diff --git a/vicky/src/lib/database/entities/lock.rs b/vicky/src/lib/database/entities/lock.rs index 6c2db5c..f16000d 100644 --- a/vicky/src/lib/database/entities/lock.rs +++ b/vicky/src/lib/database/entities/lock.rs @@ -1,3 +1,14 @@ +//! The (presumably) intended locking/scheduling behavior: +//! +//! - Task that needs confirmation: locks can't be acquired. +//! - (Task A with Write Lock A and Read Lock B needs confirmation, then Task A2 in Validation with Read Lock A can't be scheduled, Task B with Read Lock B can be scheduled) +//! - (Task A with Read Lock A, then Task A2 in Validation with Read Lock A can be scheduled) +//! - Task in validation: locks can be freely acquired as long as they're not locked by a running task, or they are poisoned locks (by same name). +//! - Task is running: used locks are blocked from acquiring. +//! - Task failed: locks are poisoned until manually cleared. +//! - Task succeeded: locks are free again. +//! - Tasks are scheduled in queue order where FI is also FO + use diesel::{AsExpression, FromSqlRow}; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -30,10 +41,11 @@ pub enum LockKind { } impl LockKind { - pub fn is_write(&self) -> bool { + pub const fn is_write(&self) -> bool { matches!(self, LockKind::Write) } - pub fn is_cleanup(&self) -> bool { + + pub const fn is_cleanup(&self) -> bool { matches!(self, LockKind::Clean) } } @@ -85,19 +97,19 @@ impl Lock { self.kind.is_write() || other.kind.is_write() } - pub fn poison(&mut self, by_task: &Uuid) { + pub const fn poison(&mut self, by_task: &Uuid) { self.poisoned_by = Some(*by_task); } - pub fn clear_poison(&mut self) { + pub const fn clear_poison(&mut self) { self.poisoned_by = None; } - pub fn name(&self) -> &str { + pub const fn name(&self) -> &str { self.name.as_str() } - pub fn is_poisoned(&self) -> bool { + pub const fn is_poisoned(&self) -> bool { self.poisoned_by.is_some() } diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs index fda26a3..30fdc98 100644 --- a/vicky/src/lib/database/entities/task.rs +++ b/vicky/src/lib/database/entities/task.rs @@ -106,6 +106,18 @@ impl Task { pub fn clear_poisoned_locks(&mut self) { self.locks.iter_mut().for_each(|lock| lock.clear_poison()); } + + pub fn is_running(&self) -> bool { + self.status == TaskStatus::Running + } + + pub fn is_waiting_confirmation(&self) -> bool { + self.status.is_waiting_confirmation() + } + + pub fn is_new(&self) -> bool { + self.status == TaskStatus::New + } } impl AsRef for Task { @@ -215,7 +227,8 @@ impl From<(DbTask, Vec)> for Task { } impl TaskStatus { - pub fn is_failed(&self) -> bool { + pub fn is_failed(self) -> bool { + // explicitly state all variants so that rust makes us add new ones match self { TaskStatus::NeedsUserValidation | TaskStatus::New @@ -227,15 +240,16 @@ impl TaskStatus { } } - pub fn is_finished(&self) -> bool { - match self { - TaskStatus::NeedsUserValidation - | TaskStatus::New - | TaskStatus::Running => false, - TaskStatus::Finished(_) => { - true - } - } + pub fn is_finished(self) -> bool { + matches!(self, TaskStatus::Finished(_)) + } + + pub fn is_pending(self) -> bool { + !self.is_finished() + } + + pub fn is_waiting_confirmation(self) -> bool { + matches!(self, TaskStatus::NeedsUserValidation) } } diff --git a/vicky/src/lib/vicky/constraints.rs b/vicky/src/lib/vicky/constraints.rs new file mode 100644 index 0000000..098d91a --- /dev/null +++ b/vicky/src/lib/vicky/constraints.rs @@ -0,0 +1,185 @@ +use crate::database::entities::{Lock, Task}; +use crate::errors::SchedulerError; +use std::collections::HashMap; + +#[derive(Clone, Debug)] +#[allow(unused)] +pub enum ConstraintFail<'a> { + UnsupportedFeature(String), + ActiveLockCollision(&'a Lock), + PassiveLockCollision(&'a Lock), + PoisonedBy(&'a Lock), +} + +#[derive(Clone, Debug)] +#[allow(unused)] +pub enum ConstraintEvaluation<'a> { + Ready, + NotReady, + Constrained(ConstraintFail<'a>), +} + +#[derive(Clone, Debug, Default)] +pub struct Constraints<'a> { + /// Takes all active locks and validates running ownership + active_locks: HashMap<&'a str, &'a Lock>, + /// Takes all non-running locks that need to be considered for future acquires + passive_locks: HashMap<&'a str, Vec<&'a Lock>>, + /// Takes all non-running locks that need to be considered for cleanup order + waiting_locks: HashMap<&'a str, Vec<&'a Lock>>, + poisoned_locks: &'a [Lock], +} + +impl<'a> Constraints<'a> { + pub fn insert_task_locks(&mut self, task: &'a Task) -> Result<(), SchedulerError> { + for lock in &task.locks { + if task.is_running() { + self.insert_active_lock(lock)?; + } else if task.is_waiting_confirmation() { + self.insert_passive_lock(lock); + } else if task.is_new() { + self.insert_waiting_lock(lock); + } + } + + Ok(()) + } + + fn insert_active_lock(&mut self, lock: &'a Lock) -> Result<(), SchedulerError> { + if self.is_actively_locked(lock) { + return Err(SchedulerError::LockAlreadyOwnedError); + } + + self.active_locks.insert(lock.name(), lock); + + Ok(()) + } + + fn insert_passive_lock(&mut self, lock: &'a Lock) { + self.passive_locks + .entry(lock.name()) + .or_default() + .push(lock); + } + + fn insert_waiting_lock(&mut self, lock: &'a Lock) { + if lock.kind.is_cleanup() { + return; + } + + self.waiting_locks + .entry(lock.name()) + .or_default() + .push(lock); + } + + pub fn try_acquire(&'a self, lock: &Lock) -> Option> { + if let Some(conflict) = self.find_active_conflict(lock) { + return Some(ConstraintFail::ActiveLockCollision(conflict)); + } + + if let Some(conflict) = self.find_passive_conflict(lock) { + return Some(ConstraintFail::PassiveLockCollision(conflict)); + } + + if let Some(conflict) = self.find_cleanup_conflict(lock) { + return Some(ConstraintFail::PassiveLockCollision(conflict)); + } + + if let Some(poison) = self.find_poisoner(lock) { + return Some(ConstraintFail::PoisonedBy(poison)); + } + + None + } + + fn is_actively_locked(&self, lock: &Lock) -> bool { + self.find_active_conflict(lock).is_some() + } + + #[allow(unused)] + fn is_poisoned(&self, lock: &Lock) -> bool { + self.find_poisoner(lock).is_some() + } + + fn find_active_conflict(&self, lock: &Lock) -> Option<&'a Lock> { + let existing_lock = self.active_locks.get(lock.name())?; + + if lock.kind.is_cleanup() || existing_lock.kind.is_cleanup() { + return Some(existing_lock); + } + + lock.is_conflicting(existing_lock).then_some(existing_lock) + } + + fn find_passive_conflict(&self, lock: &Lock) -> Option<&'a Lock> { + let existing_locks = self.passive_locks.get(lock.name())?; + + if lock.kind.is_cleanup() { + let existing_lock = *existing_locks.first()?; + return Some(existing_lock); + } + + existing_locks + .iter() + .copied() + .find(|existing_lock| lock.is_conflicting(existing_lock)) + } + + fn find_poisoner(&self, lock: &Lock) -> Option<&'a Lock> { + self.poisoned_locks + .iter() + .find(|plock| lock.is_conflicting(plock)) + } + + fn find_cleanup_conflict(&self, lock: &Lock) -> Option<&'a Lock> { + if !lock.kind.is_cleanup() { + return None; + } + + let existing_locks = self.waiting_locks.get(lock.name())?; + existing_locks.first().copied() + } + + pub fn from_tasks( + tasks: &'a [Task], + poisoned_locks: &'a [Lock], + ) -> Result { + let mut constraints = Self { + poisoned_locks, + ..Default::default() + }; + + for task in tasks { + constraints.insert_task_locks(task)?; + } + + Ok(constraints) + } +} + +impl ConstraintEvaluation<'_> { + pub fn missing_feature(feature: String) -> Self { + ConstraintEvaluation::Constrained(ConstraintFail::UnsupportedFeature(feature)) + } + + pub fn is_ready(&self) -> bool { + matches!(self, ConstraintEvaluation::Ready) + } + + #[allow(unused)] + pub fn is_active_collision(&self) -> bool { + matches!( + self, + ConstraintEvaluation::Constrained(ConstraintFail::ActiveLockCollision(_)) + ) + } + + #[allow(unused)] + pub fn is_passive_collision(&self) -> bool { + matches!( + self, + ConstraintEvaluation::Constrained(ConstraintFail::PassiveLockCollision(_)) + ) + } +} diff --git a/vicky/src/lib/vicky/mod.rs b/vicky/src/lib/vicky/mod.rs index 81b3546..801968f 100644 --- a/vicky/src/lib/vicky/mod.rs +++ b/vicky/src/lib/vicky/mod.rs @@ -1 +1,2 @@ +mod constraints; pub mod scheduler; diff --git a/vicky/src/lib/vicky/scheduler.rs b/vicky/src/lib/vicky/scheduler.rs index 9c7ce8f..3500f70 100644 --- a/vicky/src/lib/vicky/scheduler.rs +++ b/vicky/src/lib/vicky/scheduler.rs @@ -1,65 +1,13 @@ -use std::collections::HashMap; - use crate::database::entities::task::TaskStatus; +use crate::vicky::constraints::{ConstraintEvaluation, ConstraintFail, Constraints}; use crate::{ database::entities::{Lock, Task}, errors::SchedulerError, }; -type Constraints<'a> = HashMap<&'a str, &'a Lock>; - -trait ConstraintMgmt<'a> { - type Type; - - fn insert_lock(&mut self, lock: &'a Lock) -> Result<(), SchedulerError>; - fn can_get_lock(&self, lock: &Lock) -> bool; - fn from_tasks(tasks: &'a [Task]) -> Result; -} - -impl<'a> ConstraintMgmt<'a> for Constraints<'a> { - type Type = Constraints<'a>; - - fn insert_lock(&mut self, lock: &'a Lock) -> Result<(), SchedulerError> { - if !self.can_get_lock(lock) { - return Err(SchedulerError::LockAlreadyOwnedError); - } - self.insert(lock.name(), lock); - - Ok(()) - } - - fn can_get_lock(&self, lock: &Lock) -> bool { - if !self.contains_key(lock.name()) { - return true; // lock wasn't used yet - } - if let Some(existing_lock) = self.get(lock.name()) { - return !lock.is_conflicting(existing_lock) - }; - - false - } - - fn from_tasks(tasks: &'a [Task]) -> Result { - let mut constraints = Self::new(); - - for task in tasks { - if task.status != TaskStatus::Running { - continue; - } - - for lock in &task.locks { - constraints.insert_lock(lock)?; - } - } - - Ok(constraints) - } -} - pub struct Scheduler<'a> { constraints: Constraints<'a>, tasks: &'a Vec, - poisoned_locks: &'a [Lock], machine_features: &'a [String], } @@ -69,66 +17,73 @@ impl<'a> Scheduler<'a> { poisoned_locks: &'a [Lock], machine_features: &'a [String], ) -> Result { - let constraints: Constraints = Constraints::from_tasks(tasks)?; + let constraints: Constraints = Constraints::from_tasks(tasks, poisoned_locks)?; let s = Scheduler { constraints, tasks, - poisoned_locks, machine_features, }; - Ok(s) - } - - fn is_cleanup_and_conflicts(&self, lock: &Lock) -> bool { - // If there is any other lock, we conflict with them and we do not want to be added to the queue. - // We cannot use constraints here, because constraints only contains locks with the status running. - let other_task_with_same_lock_exists = self.tasks.iter().any(|task| !task.status.is_finished() && task.locks.iter().any(|lock| !lock.kind.is_cleanup() && lock.name() == lock.name())); - lock.kind.is_cleanup() && other_task_with_same_lock_exists - } + #[cfg(test)] + s.print_debug_evaluation(); - fn is_poisoned(&self, lock: &Lock) -> bool { - self.poisoned_locks - .iter() - .any(|plock| lock.is_conflicting(plock)) + Ok(s) } - fn is_unconstrained(&self, task: &Task) -> bool { + fn is_unconstrained(&'a self, task: &Task) -> Option> { task.locks .iter() - .all(|lock| self.constraints.can_get_lock(lock) && !self.is_poisoned(lock) && !self.is_cleanup_and_conflicts(lock)) + .find_map(|lock| self.constraints.try_acquire(lock)) } - fn supports_all_features(&self, task: &Task) -> bool { + fn find_unsupported_features(&self, task: &Task) -> Option { task.features .iter() - .all(|feat| self.machine_features.contains(feat)) + .find(|feat| !self.machine_features.contains(feat)) + .cloned() } - fn should_pick_task(&self, task: &Task) -> bool { - task.status == TaskStatus::New - && self.supports_all_features(task) - && self.is_unconstrained(task) + fn evaluate_task_readiness(&'a self, task: &Task) -> ConstraintEvaluation<'a> { + if task.status != TaskStatus::New { + return ConstraintEvaluation::NotReady; + } + + if let Some(feature) = self.find_unsupported_features(task) { + return ConstraintEvaluation::missing_feature(feature); + } + + if let Some(constraint) = self.is_unconstrained(task) { + return ConstraintEvaluation::Constrained(constraint); + } + + ConstraintEvaluation::Ready } pub fn get_next_task(self) -> Option { self.tasks .iter() - .find(|task| self.should_pick_task(task)) + .find(|task| self.evaluate_task_readiness(task).is_ready()) .cloned() } + + #[allow(unused)] + pub fn print_debug_evaluation(&self) { + for task in self.tasks { + let eval = self.evaluate_task_readiness(task); + println!("Readiness of {} ({}): {eval:?}", task.id, task.display_name); + } + } } #[cfg(test)] mod tests { use uuid::Uuid; + use super::Scheduler; use crate::database::entities::task::{TaskResult, TaskStatus}; use crate::database::entities::{Lock, Task}; - use super::Scheduler; - #[test] fn scheduler_creation_no_constraints() { let tasks = vec![ @@ -181,6 +136,31 @@ mod tests { Scheduler::new(&tasks, &[], &[]).unwrap(); } + #[test] + fn scheduler_creation_read_and_cleanup_constraints_is_order_independent() { + let mut tasks_read_then_clean = vec![ + Task::builder() + .display_name("Read lock") + .status(TaskStatus::Running) + .read_lock("shared") + .build_expect(), + Task::builder() + .display_name("Cleanup lock") + .status(TaskStatus::New) + .clean_lock("shared") + .build_expect(), + ]; + + Scheduler::new(&tasks_read_then_clean, &[], &[]) + .expect("read->cleanup lock order must not fail scheduler creation"); + + tasks_read_then_clean.reverse(); + let tasks_clean_then_read = tasks_read_then_clean; + + Scheduler::new(&tasks_clean_then_read, &[], &[]) + .expect("cleanup->read lock order must not fail scheduler creation"); + } + #[test] fn scheduler_creation_multiple_write_constraints() { let tasks = vec![ @@ -356,8 +336,32 @@ mod tests { assert_eq!(res.get_next_task().unwrap().display_name, "Test 1") } + #[test] + fn scheduler_cleanup_waits_for_running_task_using_same_lock() { + let tasks = vec![ + Task::builder() + .display_name("Im doing something") + .status(TaskStatus::Running) + .read_lock("foo1") + .build_expect(), + Task::builder() + .display_name("Cleanup after") + .status(TaskStatus::New) + .clean_lock("foo1") + .build_expect(), + ]; - #[test] + let res = Scheduler::new(&tasks, &[], &[]).unwrap(); + let eval = res.evaluate_task_readiness(&res.tasks[1]); + + assert!( + !eval.is_ready(), + "Expected cleanup to wait while same lock is currently in use. Got {eval:?}" + ); + assert_eq!(res.get_next_task(), None); + } + + #[test] fn scheduler_new_task_cleanup() { let tasks = vec![ Task::builder() @@ -373,11 +377,124 @@ mod tests { ]; let res = Scheduler::new(&tasks, &[], &[]).unwrap(); - // Test 1 is currently running and has the write lock + let eval = res.evaluate_task_readiness(&res.tasks[0]); + assert!( + eval.is_passive_collision(), + "Expected evaluation to actively collide. But received {eval:?}" + ); + let eval = res.evaluate_task_readiness(&res.tasks[1]); + assert!( + eval.is_ready(), + "Expected evaluation to succeed. But received {eval:?}" + ); + + // Run tasks before cleanup assert_eq!(res.get_next_task().unwrap().display_name, "Test 2") } + #[test] + fn scheduler_new_task_cleanup_unrelated_pending_lock() { + let tasks = vec![ + Task::builder() + .display_name("Cleanup lock A") + .status(TaskStatus::New) + .clean_lock("lock_a") + .build_expect(), + Task::builder() + .display_name("Pending lock B") + .status(TaskStatus::New) + .read_lock("lock_b") + .build_expect(), + ]; + + let res = Scheduler::new(&tasks, &[], &[]).unwrap(); + + assert_eq!(res.get_next_task().unwrap().display_name, "Cleanup lock A") + } + + #[test] + fn scheduler_needs_validation_locks_block_conflicts_only() { + let tasks = vec![ + Task::builder() + .display_name("Task A") + .status(TaskStatus::NeedsUserValidation) + .write_lock("lock_a") + .read_lock("lock_b") + .build_expect(), + Task::builder() + .display_name("Task A2") + .status(TaskStatus::New) + .read_lock("lock_a") + .build_expect(), + Task::builder() + .display_name("Task B") + .status(TaskStatus::New) + .read_lock("lock_b") + .build_expect(), + ]; + + let res = Scheduler::new(&tasks, &[], &[]).unwrap(); + + assert_eq!(res.get_next_task().unwrap().display_name, "Task B"); + } + + #[test] + fn scheduler_needs_validation_keeps_strongest_lock_when_names_collide() { + let tasks = vec![ + Task::builder() + .display_name("Validation writer") + .status(TaskStatus::NeedsUserValidation) + .write_lock("shared_lock") + .build_expect(), + Task::builder() + .display_name("Validation reader") + .status(TaskStatus::NeedsUserValidation) + .read_lock("shared_lock") + .build_expect(), + Task::builder() + .display_name("New reader") + .status(TaskStatus::New) + .read_lock("shared_lock") + .build_expect(), + ]; + + let res = Scheduler::new(&tasks, &[], &[]).unwrap(); + let eval = res.evaluate_task_readiness(&res.tasks[2]); + assert!( + eval.is_passive_collision(), + "Expected passive collision from validation writer, got {eval:?}" + ); + assert_eq!(res.get_next_task(), None); + } + + #[test] + fn scheduler_cleanup_waits_for_non_cleanup_even_with_later_cleanup() { + let tasks = vec![ + Task::builder() + .display_name("Cleanup 1") + .status(TaskStatus::New) + .clean_lock("shared_lock") + .build_expect(), + Task::builder() + .display_name("Reader") + .status(TaskStatus::New) + .read_lock("shared_lock") + .build_expect(), + Task::builder() + .display_name("Cleanup 2") + .status(TaskStatus::New) + .clean_lock("shared_lock") + .build_expect(), + ]; + let res = Scheduler::new(&tasks, &[], &[]).unwrap(); + let eval = res.evaluate_task_readiness(&res.tasks[0]); + assert!( + eval.is_passive_collision(), + "Expected cleanup to wait for pending non-clean lock, got {eval:?}" + ); + assert_eq!(res.get_next_task().unwrap().display_name, "Reader"); + } #[test] fn schedule_with_poisoned_lock() { From e94e8b45053d5923414133ba291925b9b45ac4da Mon Sep 17 00:00:00 2001 From: willow <52585984+Kek5chen@users.noreply.github.com> Date: Tue, 17 Feb 2026 13:28:32 +0100 Subject: [PATCH 4/4] refactor: rename function --- vicky/src/lib/vicky/scheduler.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vicky/src/lib/vicky/scheduler.rs b/vicky/src/lib/vicky/scheduler.rs index 3500f70..dba48f3 100644 --- a/vicky/src/lib/vicky/scheduler.rs +++ b/vicky/src/lib/vicky/scheduler.rs @@ -31,7 +31,7 @@ impl<'a> Scheduler<'a> { Ok(s) } - fn is_unconstrained(&'a self, task: &Task) -> Option> { + fn find_constraint(&'a self, task: &Task) -> Option> { task.locks .iter() .find_map(|lock| self.constraints.try_acquire(lock)) @@ -53,7 +53,7 @@ impl<'a> Scheduler<'a> { return ConstraintEvaluation::missing_feature(feature); } - if let Some(constraint) = self.is_unconstrained(task) { + if let Some(constraint) = self.find_constraint(task) { return ConstraintEvaluation::Constrained(constraint); }