diff --git a/rust/pygpukit-python/src/errors.rs b/rust/pygpukit-python/src/errors.rs new file mode 100644 index 0000000..25668dc --- /dev/null +++ b/rust/pygpukit-python/src/errors.rs @@ -0,0 +1,72 @@ +//! Unified error handling for PyGPUkit Python bindings +//! +//! This module provides helper functions for consistent error conversion +//! from Rust errors to Python exceptions. +//! +//! Error convention: +//! - MemoryError: Resource exhaustion (quota exceeded, allocation failures) +//! - ValueError: Invalid arguments (bad IDs, not found) +//! - RuntimeError: Operation failures (eviction, state errors) + +use pyo3::exceptions::{PyRuntimeError, PyValueError, PyMemoryError}; +use pyo3::PyErr; +use pygpukit_core::memory::MemoryError; +use pygpukit_core::scheduler::PartitionError; +use pygpukit_core::transfer::PinnedError; + +/// Convert MemoryError to PyErr +pub fn memory_error_to_py(err: MemoryError) -> PyErr { + match err { + MemoryError::QuotaExceeded { requested, used, quota } => { + PyMemoryError::new_err(format!( + "Memory quota exceeded: requested {} bytes, {} used, {} quota", + requested, used, quota + )) + } + MemoryError::InvalidBlock(id) => { + PyValueError::new_err(format!("Invalid memory block ID: {}", id)) + } + MemoryError::BlockEvicted(id) => { + PyRuntimeError::new_err(format!("Memory block {} was evicted", id)) + } + } +} + +/// Convert PartitionError to PyErr +pub fn partition_error_to_py(err: PartitionError) -> PyErr { + match err { + PartitionError::NotFound { id } => { + PyValueError::new_err(format!("Partition not found: {}", id)) + } + PartitionError::AlreadyExists { id } => { + PyValueError::new_err(format!("Partition already exists: {}", id)) + } + PartitionError::InsufficientResources { resource, requested, available } => { + PyRuntimeError::new_err(format!( + "Insufficient {} for partition: requested {}, {} available", + resource, requested, available + )) + } + PartitionError::NotAllowed { reason } => { + PyRuntimeError::new_err(format!("Operation not allowed: {}", reason)) + } + } +} + +/// Convert PinnedError to PyErr +pub fn pinned_error_to_py(err: PinnedError) -> PyErr { + match err { + PinnedError::QuotaExceeded { requested, available } => { + PyMemoryError::new_err(format!( + "Pinned memory quota exceeded: requested {} bytes, {} available", + requested, available + )) + } + PinnedError::InvalidBlock { id } => { + PyValueError::new_err(format!("Pinned memory block not found: {}", id)) + } + PinnedError::AllocationFailed { reason } => { + PyMemoryError::new_err(format!("Pinned memory allocation failed: {}", reason)) + } + } +} diff --git a/rust/pygpukit-python/src/lib.rs b/rust/pygpukit-python/src/lib.rs index cfb225a..8ea56e2 100644 --- a/rust/pygpukit-python/src/lib.rs +++ b/rust/pygpukit-python/src/lib.rs @@ -4,6 +4,7 @@ use pyo3::prelude::*; +mod errors; mod memory; mod scheduler; mod transfer; @@ -52,9 +53,12 @@ fn _pygpukit_rust(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; // Admission control + m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; - m.add_class::()?; + m.add_class::()?; + m.add_class::()?; // QoS policy m.add_class::()?; m.add_class::()?; diff --git a/rust/pygpukit-python/src/memory.rs b/rust/pygpukit-python/src/memory.rs index ff44909..d39852f 100644 --- a/rust/pygpukit-python/src/memory.rs +++ b/rust/pygpukit-python/src/memory.rs @@ -1,10 +1,11 @@ //! Memory module Python bindings use pyo3::prelude::*; -use pyo3::exceptions::PyRuntimeError; use std::sync::Arc; use pygpukit_core::memory::{MemoryPool, MemoryBlock, PoolStats}; +use crate::errors::memory_error_to_py; + /// Python wrapper for MemoryBlock #[pyclass(name = "MemoryBlock")] #[derive(Clone)] @@ -207,11 +208,9 @@ impl PyMemoryPool { /// Block ID for the allocated block /// /// Raises: - /// RuntimeError: If quota exceeded and cannot evict + /// MemoryError: If quota exceeded and cannot evict fn allocate(&self, size: usize) -> PyResult { - self.inner.allocate(size).map_err(|e| { - PyRuntimeError::new_err(e.to_string()) - }) + self.inner.allocate(size).map_err(memory_error_to_py) } /// Free a memory block (return to free list). diff --git a/rust/pygpukit-python/src/scheduler.rs b/rust/pygpukit-python/src/scheduler.rs index 38e8b6f..36db1b6 100644 --- a/rust/pygpukit-python/src/scheduler.rs +++ b/rust/pygpukit-python/src/scheduler.rs @@ -2,9 +2,13 @@ use pyo3::prelude::*; use std::sync::Arc; +use std::collections::HashMap; + +use crate::errors::partition_error_to_py; + use pygpukit_core::scheduler::{ Scheduler, SchedulerStats, TaskMeta, TaskState, TaskPolicy, TaskStats, - AdmissionDecision, AdmissionStats, RejectReason, + AdmissionController, AdmissionConfig, AdmissionDecision, AdmissionStats, RejectReason, QosClass, QosPolicy, QosTaskMeta, QosEvaluation, QosPolicyEvaluator, QosStats, ResourceRequirements, PartitionManager, PartitionConfig, Partition, PartitionLimits, PartitionUsage, PartitionStats, @@ -321,15 +325,15 @@ impl PyTaskStats { } } -/// Rejection reason enum for Python -#[pyclass(name = "RejectReason")] +/// Rejection reason details for Python (provides detailed info about rejection) +#[pyclass(name = "RejectReasonDetails")] #[derive(Clone)] -pub struct PyRejectReason { +pub struct PyRejectReasonDetails { inner: RejectReason, } #[pymethods] -impl PyRejectReason { +impl PyRejectReasonDetails { /// Get rejection type as string #[getter] fn reason_type(&self) -> String { @@ -380,10 +384,21 @@ impl PyRejectReason { } fn __repr__(&self) -> String { - format!("RejectReason({})", self.message()) + format!("RejectReasonDetails({})", self.message()) } } +/// Rejection reason enum for comparison in Python +#[pyclass(name = "RejectReason", eq, eq_int)] +#[derive(Clone, Copy, PartialEq, Eq)] +pub enum PyRejectReasonEnum { + InsufficientMemory = 0, + InsufficientBandwidth = 1, + QueueFull = 2, + UnsatisfiableDependencies = 3, + Custom = 4, +} + /// Admission decision for Python #[pyclass(name = "AdmissionDecision")] #[derive(Clone)] @@ -393,6 +408,30 @@ pub struct PyAdmissionDecision { #[pymethods] impl PyAdmissionDecision { + /// Create an Admitted decision (for testing) + #[staticmethod] + #[pyo3(name = "Admitted")] + fn admitted() -> Self { + Self { + inner: AdmissionDecision::Admit { + reserved_memory: 0, + reserved_bandwidth: 0.0, + }, + } + } + + /// Create a Queued decision (for testing) + #[staticmethod] + #[pyo3(name = "Queued")] + fn queued() -> Self { + Self { + inner: AdmissionDecision::Queue { + position: 0, + estimated_wait_ms: 0.0, + }, + } + } + /// Check if admitted fn is_admitted(&self) -> bool { self.inner.is_admitted() @@ -408,6 +447,22 @@ impl PyAdmissionDecision { self.inner.is_queued() } + /// Get rejection reason type (for comparison) + fn reject_reason(&self) -> Option { + match &self.inner { + AdmissionDecision::Reject { reason } => { + Some(match reason { + RejectReason::InsufficientMemory { .. } => PyRejectReasonEnum::InsufficientMemory, + RejectReason::BandwidthExceeded { .. } => PyRejectReasonEnum::InsufficientBandwidth, + RejectReason::QueueFull { .. } => PyRejectReasonEnum::QueueFull, + RejectReason::UnsatisfiableDependencies { .. } => PyRejectReasonEnum::UnsatisfiableDependencies, + RejectReason::Custom(_) => PyRejectReasonEnum::Custom, + }) + } + _ => None, + } + } + /// Get decision type as string #[getter] fn decision_type(&self) -> String { @@ -454,11 +509,11 @@ impl PyAdmissionDecision { } } - /// Get rejection reason (if rejected) + /// Get rejection reason details (if rejected) #[getter] - fn rejection_reason(&self) -> Option { + fn rejection_reason(&self) -> Option { match &self.inner { - AdmissionDecision::Reject { reason } => Some(PyRejectReason { inner: reason.clone() }), + AdmissionDecision::Reject { reason } => Some(PyRejectReasonDetails { inner: reason.clone() }), _ => None, } } @@ -469,7 +524,7 @@ impl PyAdmissionDecision { format!("AdmissionDecision(Admit, memory={}, bandwidth={:.4})", reserved_memory, reserved_bandwidth) } AdmissionDecision::Reject { reason } => { - format!("AdmissionDecision(Reject, reason={})", PyRejectReason { inner: reason.clone() }.message()) + format!("AdmissionDecision(Reject, reason={})", PyRejectReasonDetails { inner: reason.clone() }.message()) } AdmissionDecision::Queue { position, estimated_wait_ms } => { format!("AdmissionDecision(Queue, position={}, wait={:.1}ms)", position, estimated_wait_ms) @@ -517,12 +572,24 @@ impl PyAdmissionStats { self.inner.reserved_memory } + /// Currently used memory (alias for reserved_memory) + #[getter] + fn used_memory(&self) -> usize { + self.inner.reserved_memory + } + /// Currently reserved bandwidth #[getter] fn reserved_bandwidth(&self) -> f64 { self.inner.reserved_bandwidth } + /// Currently used bandwidth (alias for reserved_bandwidth) + #[getter] + fn used_bandwidth(&self) -> f64 { + self.inner.reserved_bandwidth + } + /// Available memory #[getter] fn available_memory(&self) -> usize { @@ -544,6 +611,115 @@ impl PyAdmissionStats { } } +/// Admission configuration for Python +#[pyclass(name = "AdmissionConfig")] +#[derive(Clone)] +pub struct PyAdmissionConfig { + inner: AdmissionConfig, +} + +#[pymethods] +impl PyAdmissionConfig { + /// Create a new AdmissionConfig + #[new] + #[pyo3(signature = (max_memory, max_bandwidth, max_pending_tasks=1000, enable_best_effort=true))] + fn new(max_memory: usize, max_bandwidth: f64, max_pending_tasks: usize, enable_best_effort: bool) -> Self { + Self { + inner: AdmissionConfig { + total_memory: max_memory, + max_reserved_memory: max_memory, + max_pending_tasks, + total_bandwidth: max_bandwidth, + enable_best_effort, + memory_overcommit_ratio: 1.0, + }, + } + } + + #[getter] + fn max_memory(&self) -> usize { + self.inner.total_memory + } + + #[getter] + fn max_bandwidth(&self) -> f64 { + self.inner.total_bandwidth + } + + #[getter] + fn max_pending_tasks(&self) -> usize { + self.inner.max_pending_tasks + } + + #[getter] + fn enable_best_effort(&self) -> bool { + self.inner.enable_best_effort + } + + fn __repr__(&self) -> String { + format!( + "AdmissionConfig(max_memory={}, max_bandwidth={:.2}, max_pending={})", + self.inner.total_memory, self.inner.total_bandwidth, self.inner.max_pending_tasks + ) + } +} + +/// Admission controller for Python with task tracking +#[pyclass(name = "AdmissionController")] +pub struct PyAdmissionController { + inner: AdmissionController, + // Track allocations by task_id for release() + allocations: HashMap, +} + +#[pymethods] +impl PyAdmissionController { + /// Create a new AdmissionController + #[new] + fn new(config: PyAdmissionConfig) -> Self { + Self { + inner: AdmissionController::new(config.inner), + allocations: HashMap::new(), + } + } + + /// Try to admit a task with given resource requirements + fn try_admit(&mut self, task_id: &str, memory: usize, bandwidth: f64) -> PyAdmissionDecision { + // Create a temporary TaskMeta for admission + let task = TaskMeta::with_memory(task_id.to_string(), task_id.to_string(), memory); + let decision = self.inner.admit(&task); + + // Track allocation if admitted + if decision.is_admitted() { + self.allocations.insert(task_id.to_string(), (memory, bandwidth)); + } + + PyAdmissionDecision { inner: decision } + } + + /// Release resources for a task + fn release(&mut self, task_id: &str) { + if let Some((memory, bandwidth)) = self.allocations.remove(task_id) { + self.inner.release(memory, bandwidth); + } + } + + /// Get admission statistics + fn stats(&self) -> PyAdmissionStats { + PyAdmissionStats { + inner: self.inner.stats(), + } + } + + fn __repr__(&self) -> String { + let stats = self.inner.stats(); + format!( + "AdmissionController(admitted={}, rejected={}, pending={})", + stats.admitted_count, stats.rejected_count, stats.pending_count + ) + } +} + // ============================================================================= // QoS Policy Types // ============================================================================= @@ -782,6 +958,34 @@ impl PyQosTaskMeta { self.inner.qos.class.into() } + /// Get memory request (bytes) + #[getter] + fn memory_request(&self) -> usize { + self.inner.qos.resources.memory_request + } + + /// Get memory limit (bytes) + #[getter] + fn memory_limit(&self) -> usize { + self.inner.qos.resources.memory_limit + } + + /// Get burst ratio (memory_limit / memory_request) + #[getter] + fn burst_ratio(&self) -> f64 { + if self.inner.qos.resources.memory_request > 0 { + self.inner.qos.resources.memory_limit as f64 / self.inner.qos.resources.memory_request as f64 + } else { + 1.0 + } + } + + /// Get bandwidth request (0.0 - 1.0) + #[getter] + fn bandwidth_request(&self) -> f64 { + self.inner.qos.resources.bandwidth_request + } + /// Get effective priority fn effective_priority(&self) -> i32 { self.inner.effective_priority() @@ -789,8 +993,8 @@ impl PyQosTaskMeta { fn __repr__(&self) -> String { format!( - "QosTaskMeta(id='{}', class={:?}, priority={})", - self.inner.task.id, self.inner.qos.class, self.inner.effective_priority() + "QosTaskMeta(id='{}', class={:?}, memory={}, priority={})", + self.inner.task.id, self.inner.qos.class, self.inner.qos.resources.memory_request, self.inner.effective_priority() ) } } @@ -1460,16 +1664,14 @@ impl PyPartitionManager { /// Create a new partition fn create_partition(&mut self, id: &str, name: &str, limits: PyPartitionLimits) -> PyResult<()> { - self.inner.create_partition(id, name, limits.inner).map_err(|e| { - pyo3::exceptions::PyValueError::new_err(e.to_string()) - }) + self.inner.create_partition(id, name, limits.inner).map_err(partition_error_to_py) } /// Delete a partition fn delete_partition(&mut self, id: &str) -> PyResult { self.inner.delete_partition(id) .map(|p| PyPartition { inner: p }) - .map_err(|e| pyo3::exceptions::PyValueError::new_err(e.to_string())) + .map_err(partition_error_to_py) } /// Get a partition @@ -1479,9 +1681,7 @@ impl PyPartitionManager { /// Assign a task to a partition fn assign_task(&mut self, task_id: &str, partition_id: &str) -> PyResult<()> { - self.inner.assign_task(task_id, partition_id).map_err(|e| { - pyo3::exceptions::PyValueError::new_err(e.to_string()) - }) + self.inner.assign_task(task_id, partition_id).map_err(partition_error_to_py) } /// Get partition for a task @@ -1496,9 +1696,7 @@ impl PyPartitionManager { /// Set default partition fn set_default(&mut self, id: &str) -> PyResult<()> { - self.inner.set_default(id).map_err(|e| { - pyo3::exceptions::PyValueError::new_err(e.to_string()) - }) + self.inner.set_default(id).map_err(partition_error_to_py) } /// Get default partition @@ -1541,9 +1739,12 @@ pub fn register(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; // Admission control + m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; - m.add_class::()?; + m.add_class::()?; + m.add_class::()?; // QoS policy m.add_class::()?; m.add_class::()?; diff --git a/rust/pygpukit-python/src/transfer.rs b/rust/pygpukit-python/src/transfer.rs index a6db9fb..e37f7b5 100644 --- a/rust/pygpukit-python/src/transfer.rs +++ b/rust/pygpukit-python/src/transfer.rs @@ -3,9 +3,11 @@ use pyo3::prelude::*; use pygpukit_core::transfer::{ AsyncTransferEngine, TransferOp, TransferState, TransferStats, TransferType, StreamType, - PinnedMemoryManager, PinnedPoolConfig, PinnedBlock, PinnedStats, PinnedError, + PinnedMemoryManager, PinnedPoolConfig, PinnedBlock, PinnedStats, }; +use crate::errors::pinned_error_to_py; + /// Python wrapper for TransferType enum #[pyclass(name = "TransferType")] #[derive(Clone)] @@ -667,9 +669,7 @@ impl PyPinnedMemoryManager { /// If reused=False, caller must perform cudaHostAlloc /// and then call register(). fn allocate(&mut self, size: usize) -> PyResult<(u64, usize, bool)> { - self.inner.allocate(size).map_err(|e| { - pyo3::exceptions::PyMemoryError::new_err(e.to_string()) - }) + self.inner.allocate(size).map_err(pinned_error_to_py) } /// Register an allocated block @@ -684,16 +684,12 @@ impl PyPinnedMemoryManager { /// Returns (should_free, host_ptr) tuple. /// If should_free=True, caller should call cudaFreeHost. fn free(&mut self, id: u64) -> PyResult<(bool, u64)> { - self.inner.free(id).map_err(|e| { - pyo3::exceptions::PyValueError::new_err(e.to_string()) - }) + self.inner.free(id).map_err(pinned_error_to_py) } /// Associate a block with a task fn associate_task(&mut self, id: u64, task_id: String) -> PyResult<()> { - self.inner.associate_task(id, task_id).map_err(|e| { - pyo3::exceptions::PyValueError::new_err(e.to_string()) - }) + self.inner.associate_task(id, task_id).map_err(pinned_error_to_py) } /// Get a block by ID @@ -703,9 +699,7 @@ impl PyPinnedMemoryManager { /// Touch a block to update access time fn touch(&mut self, id: u64) -> PyResult<()> { - self.inner.touch(id).map_err(|e| { - pyo3::exceptions::PyValueError::new_err(e.to_string()) - }) + self.inner.touch(id).map_err(pinned_error_to_py) } /// Get blocks for a task diff --git a/tests/stress_test.py b/tests/stress_test.py new file mode 100644 index 0000000..411518f --- /dev/null +++ b/tests/stress_test.py @@ -0,0 +1,340 @@ +""" +PyGPUkit Stress Test Script for v0.2.1 +Tests Rust backend components under sustained load. +Default: 5 minutes runtime. +""" +import argparse +import random +import threading +import time +from concurrent.futures import ThreadPoolExecutor, as_completed + +# Skip if Rust module not available +try: + import _pygpukit_rust as rust +except ImportError: + print("Rust module not available. Skipping stress test.") + exit(0) + + +class StressTestStats: + """Thread-safe statistics collector.""" + def __init__(self): + self.lock = threading.Lock() + self.operations = 0 + self.errors = 0 + self.memory_ops = 0 + self.scheduler_ops = 0 + self.admission_ops = 0 + self.qos_ops = 0 + self.partition_ops = 0 + + def inc(self, op_type: str): + with self.lock: + self.operations += 1 + if op_type == "memory": + self.memory_ops += 1 + elif op_type == "scheduler": + self.scheduler_ops += 1 + elif op_type == "admission": + self.admission_ops += 1 + elif op_type == "qos": + self.qos_ops += 1 + elif op_type == "partition": + self.partition_ops += 1 + + def inc_error(self): + with self.lock: + self.errors += 1 + + def get_stats(self): + with self.lock: + return { + "operations": self.operations, + "errors": self.errors, + "memory_ops": self.memory_ops, + "scheduler_ops": self.scheduler_ops, + "admission_ops": self.admission_ops, + "qos_ops": self.qos_ops, + "partition_ops": self.partition_ops, + } + + +def stress_memory_pool(stats: StressTestStats, duration_sec: float): + """Stress test memory pool with allocations and frees.""" + pool = rust.MemoryPool(quota=100 * 1024 * 1024, enable_eviction=True) + end_time = time.time() + duration_sec + blocks = [] + + while time.time() < end_time: + try: + # Random allocation size (1KB - 1MB) + size = random.randint(1024, 1024 * 1024) + block = pool.allocate(size) + blocks.append(block) + stats.inc("memory") + + # Randomly free some blocks to keep memory bounded + if len(blocks) > 50: + idx = random.randint(0, len(blocks) - 1) + b = blocks.pop(idx) + pool.free(b.id) + stats.inc("memory") + + except Exception: + stats.inc_error() + + # Cleanup + for b in blocks: + try: + pool.free(b.id) + except Exception: + pass + + +def stress_scheduler(stats: StressTestStats, duration_sec: float): + """Stress test scheduler with task submissions.""" + scheduler = rust.Scheduler(total_memory=1024 * 1024 * 1024) + end_time = time.time() + duration_sec + task_counter = 0 + + while time.time() < end_time: + try: + # Submit task + task_id = f"task-{task_counter}" + task = rust.TaskMeta( + id=task_id, + name=f"Stress Task {task_counter}", + memory_estimate=random.randint(1024, 10 * 1024 * 1024), + priority=random.randint(0, 10), + ) + scheduler.submit(task) + stats.inc("scheduler") + task_counter += 1 + + # Randomly complete some tasks + if task_counter % 10 == 0: + runnable = scheduler.get_runnable_tasks(5) + for tid in runnable: + scheduler.start_task(tid) + scheduler.complete_task(tid) + stats.inc("scheduler") + + except Exception: + stats.inc_error() + + +def stress_admission_controller(stats: StressTestStats, duration_sec: float): + """Stress test admission controller.""" + config = rust.AdmissionConfig( + max_memory=100 * 1024 * 1024, + max_bandwidth=1.0, + ) + controller = rust.AdmissionController(config) + end_time = time.time() + duration_sec + task_counter = 0 + admitted_tasks = [] + + while time.time() < end_time: + try: + task_id = f"admit-task-{task_counter}" + memory = random.randint(1024, 20 * 1024 * 1024) + bandwidth = random.uniform(0.01, 0.3) + + decision = controller.try_admit(task_id, memory, bandwidth) + stats.inc("admission") + task_counter += 1 + + if decision.is_admitted(): + admitted_tasks.append(task_id) + + # Release some tasks to free resources + if len(admitted_tasks) > 20: + release_id = admitted_tasks.pop(0) + controller.release(release_id) + stats.inc("admission") + + except Exception: + stats.inc_error() + + +def stress_qos_evaluator(stats: StressTestStats, duration_sec: float): + """Stress test QoS policy evaluator.""" + evaluator = rust.QosPolicyEvaluator( + total_memory=1024 * 1024 * 1024, + total_bandwidth=1.0, + ) + end_time = time.time() + duration_sec + task_counter = 0 + reservations = [] + + while time.time() < end_time: + try: + # Create task with random QoS class + qos_type = random.choice(["guaranteed", "burstable", "best_effort"]) + task_id = f"qos-task-{task_counter}" + + if qos_type == "guaranteed": + task = rust.QosTaskMeta.guaranteed( + task_id, f"Guaranteed {task_counter}", + random.randint(1024, 50 * 1024 * 1024) + ) + elif qos_type == "burstable": + task = rust.QosTaskMeta.burstable( + task_id, f"Burstable {task_counter}", + random.randint(1024, 30 * 1024 * 1024), + random.uniform(1.5, 3.0) + ) + else: + task = rust.QosTaskMeta.best_effort(task_id, f"BestEffort {task_counter}") + + result = evaluator.evaluate(task) + stats.inc("qos") + task_counter += 1 + + if result.is_admitted(): + evaluator.reserve(result) + reservations.append((task.qos_class, task.memory_request, 0.0)) + stats.inc("qos") + + # Release some reservations + if len(reservations) > 30: + qos_class, mem, bw = reservations.pop(0) + evaluator.release(qos_class, mem, bw) + stats.inc("qos") + + except Exception: + stats.inc_error() + + +def stress_partition_manager(stats: StressTestStats, duration_sec: float): + """Stress test partition manager.""" + config = rust.PartitionConfig(total_memory=8 * 1024 * 1024 * 1024) + manager = rust.PartitionManager(config) + end_time = time.time() + duration_sec + partition_counter = 0 + task_counter = 0 + partitions = [] + + while time.time() < end_time: + try: + action = random.choice(["create", "assign", "stats", "delete"]) + + if action == "create" and len(partitions) < 10: + pid = f"partition-{partition_counter}" + limits = rust.PartitionLimits().memory( + random.randint(100 * 1024 * 1024, 500 * 1024 * 1024) + ).compute(random.uniform(0.05, 0.3)) + manager.create_partition(pid, f"Partition {partition_counter}", limits) + partitions.append(pid) + partition_counter += 1 + stats.inc("partition") + + elif action == "assign" and partitions: + pid = random.choice(partitions) + task_id = f"p-task-{task_counter}" + try: + manager.assign_task(task_id, pid) + task_counter += 1 + stats.inc("partition") + except Exception: + pass # Partition might not exist + + elif action == "stats": + manager.stats() + stats.inc("partition") + + elif action == "delete" and len(partitions) > 3: + pid = partitions.pop(random.randint(0, len(partitions) - 1)) + try: + manager.delete_partition(pid) + stats.inc("partition") + except Exception: + pass + + except Exception: + stats.inc_error() + + +def run_stress_test(duration_minutes: float = 5.0, workers: int = 4): + """Run all stress tests concurrently.""" + duration_sec = duration_minutes * 60 + stats = StressTestStats() + + print("Starting PyGPUkit Stress Test") + print(f"Duration: {duration_minutes} minutes") + print(f"Workers per component: {workers}") + print("-" * 50) + + start_time = time.time() + + test_functions = [ + stress_memory_pool, + stress_scheduler, + stress_admission_controller, + stress_qos_evaluator, + stress_partition_manager, + ] + + # Start all stress tests in parallel + with ThreadPoolExecutor(max_workers=len(test_functions) * workers) as executor: + futures = [] + for test_func in test_functions: + for _ in range(workers): + futures.append(executor.submit(test_func, stats, duration_sec)) + + # Progress reporting + last_report = start_time + while not all(f.done() for f in futures): + time.sleep(1) + now = time.time() + if now - last_report >= 10: # Report every 10 seconds + elapsed = now - start_time + current_stats = stats.get_stats() + ops_per_sec = current_stats["operations"] / elapsed if elapsed > 0 else 0 + print(f"[{elapsed:.0f}s] Ops: {current_stats['operations']:,} " + f"({ops_per_sec:.0f}/s) | Errors: {current_stats['errors']}") + last_report = now + + # Wait for all to complete + for f in as_completed(futures): + try: + f.result() + except Exception as e: + print(f"Worker error: {e}") + + elapsed = time.time() - start_time + final_stats = stats.get_stats() + + print("-" * 50) + print("Stress Test Complete") + print(f"Duration: {elapsed:.1f}s") + print(f"Total Operations: {final_stats['operations']:,}") + print(f"Operations/sec: {final_stats['operations'] / elapsed:.0f}") + print(f"Errors: {final_stats['errors']}") + print("-" * 50) + print("Breakdown:") + print(f" Memory Pool: {final_stats['memory_ops']:,}") + print(f" Scheduler: {final_stats['scheduler_ops']:,}") + print(f" Admission: {final_stats['admission_ops']:,}") + print(f" QoS: {final_stats['qos_ops']:,}") + print(f" Partitioning: {final_stats['partition_ops']:,}") + print("-" * 50) + + if final_stats['errors'] > 0: + print(f"WARNING: {final_stats['errors']} errors occurred during test") + return 1 + else: + print("SUCCESS: No errors detected") + return 0 + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="PyGPUkit Stress Test") + parser.add_argument("--duration", type=float, default=5.0, + help="Test duration in minutes (default: 5)") + parser.add_argument("--workers", type=int, default=4, + help="Workers per component (default: 4)") + args = parser.parse_args() + + exit(run_stress_test(args.duration, args.workers)) diff --git a/tests/test_rust_admission_qos.py b/tests/test_rust_admission_qos.py new file mode 100644 index 0000000..89a257a --- /dev/null +++ b/tests/test_rust_admission_qos.py @@ -0,0 +1,298 @@ +""" +TDD Tests for v0.2.1 - Admission Control & QoS Policy Spec +Tests written FIRST before implementation fixes. +""" +import pytest + +# Skip all tests if Rust module not available +pytest.importorskip("_pygpukit_rust") +import _pygpukit_rust as rust + + +class TestAdmissionControlSpec: + """Admission Control specification tests.""" + + def test_admission_config_defaults(self): + """AdmissionConfig should have sensible defaults.""" + config = rust.AdmissionConfig( + max_memory=1024 * 1024 * 1024, # 1GB + max_bandwidth=1.0, + ) + assert config.max_memory == 1024 * 1024 * 1024 + assert config.max_bandwidth == 1.0 + + def test_admission_controller_creation(self): + """AdmissionController should be creatable with config.""" + config = rust.AdmissionConfig( + max_memory=1024 * 1024 * 1024, + max_bandwidth=1.0, + ) + controller = rust.AdmissionController(config) + assert controller is not None + + def test_admission_decision_types(self): + """AdmissionDecision should have Admitted, Queued, Rejected states.""" + # Test that we can create decisions + admitted = rust.AdmissionDecision.Admitted() + queued = rust.AdmissionDecision.Queued() + + assert admitted.is_admitted() + assert not admitted.is_queued() + assert not admitted.is_rejected() + + assert queued.is_queued() + assert not queued.is_admitted() + + def test_admission_with_memory_request(self): + """Admission should consider memory requests.""" + config = rust.AdmissionConfig( + max_memory=100 * 1024 * 1024, # 100MB + max_bandwidth=1.0, + enable_best_effort=False, # Strict mode - reject over-quota requests + ) + controller = rust.AdmissionController(config) + + # Small request should be admitted + decision1 = controller.try_admit("task1", 10 * 1024 * 1024, 0.1) + assert decision1.is_admitted() + + # Request exceeding quota should be rejected + decision2 = controller.try_admit("task2", 200 * 1024 * 1024, 0.1) + assert decision2.is_rejected() + + def test_admission_release(self): + """Released resources should be available again.""" + config = rust.AdmissionConfig( + max_memory=100 * 1024 * 1024, + max_bandwidth=1.0, + ) + controller = rust.AdmissionController(config) + + # Admit first task + decision1 = controller.try_admit("task1", 80 * 1024 * 1024, 0.5) + assert decision1.is_admitted() + + # Second task should be queued/rejected (not enough memory) + decision2 = controller.try_admit("task2", 50 * 1024 * 1024, 0.3) + assert not decision2.is_admitted() + + # Release first task + controller.release("task1") + + # Now second task should be admittable + decision3 = controller.try_admit("task2", 50 * 1024 * 1024, 0.3) + assert decision3.is_admitted() + + def test_admission_stats(self): + """AdmissionController should provide stats.""" + config = rust.AdmissionConfig( + max_memory=100 * 1024 * 1024, + max_bandwidth=1.0, + ) + controller = rust.AdmissionController(config) + + stats = controller.stats() + assert hasattr(stats, 'used_memory') + assert hasattr(stats, 'used_bandwidth') + assert hasattr(stats, 'admitted_count') + assert hasattr(stats, 'rejected_count') + + +class TestQoSPolicySpec: + """QoS Policy specification tests.""" + + def test_qos_class_enum(self): + """QoS classes should be Guaranteed, Burstable, BestEffort.""" + assert hasattr(rust, 'QosClass') + + # Should be able to get class values + guaranteed = rust.QosClass.Guaranteed + burstable = rust.QosClass.Burstable + best_effort = rust.QosClass.BestEffort + + # They should be distinct + assert guaranteed != burstable + assert burstable != best_effort + + def test_qos_task_meta_guaranteed(self): + """Guaranteed tasks should have strict resource requirements.""" + task = rust.QosTaskMeta.guaranteed("task1", "Test Task", 256 * 1024 * 1024) + + assert task.id == "task1" + assert task.name == "Test Task" + assert task.qos_class == rust.QosClass.Guaranteed + assert task.memory_request == 256 * 1024 * 1024 + + def test_qos_task_meta_burstable(self): + """Burstable tasks should have base + burst ratio.""" + task = rust.QosTaskMeta.burstable("task2", "Burstable Task", 128 * 1024 * 1024, 2.0) + + assert task.id == "task2" + assert task.qos_class == rust.QosClass.Burstable + assert task.memory_request == 128 * 1024 * 1024 + assert task.burst_ratio == 2.0 + + def test_qos_task_meta_best_effort(self): + """BestEffort tasks should have minimal requirements.""" + task = rust.QosTaskMeta.best_effort("task3", "Background Task") + + assert task.id == "task3" + assert task.qos_class == rust.QosClass.BestEffort + + def test_qos_evaluator_creation(self): + """QosPolicyEvaluator should be creatable with resource limits.""" + evaluator = rust.QosPolicyEvaluator( + total_memory=8 * 1024 * 1024 * 1024, # 8GB + total_bandwidth=1.0, + ) + assert evaluator is not None + + def test_qos_evaluation_guaranteed_priority(self): + """Guaranteed tasks should have highest priority.""" + evaluator = rust.QosPolicyEvaluator( + total_memory=8 * 1024 * 1024 * 1024, + total_bandwidth=1.0, + ) + + guaranteed = rust.QosTaskMeta.guaranteed("g1", "Guaranteed", 1024 * 1024 * 1024) + burstable = rust.QosTaskMeta.burstable("b1", "Burstable", 512 * 1024 * 1024, 1.5) + best_effort = rust.QosTaskMeta.best_effort("be1", "BestEffort") + + _eval_g = evaluator.evaluate(guaranteed) + _eval_b = evaluator.evaluate(burstable) + _eval_be = evaluator.evaluate(best_effort) + + # Guaranteed should have highest effective priority + assert guaranteed.effective_priority() > burstable.effective_priority() + assert burstable.effective_priority() > best_effort.effective_priority() + + def test_qos_resource_reservation(self): + """QoS should track reserved resources correctly.""" + evaluator = rust.QosPolicyEvaluator( + total_memory=1024 * 1024 * 1024, # 1GB + total_bandwidth=1.0, + ) + + task = rust.QosTaskMeta.guaranteed("task1", "Test", 512 * 1024 * 1024) + eval_result = evaluator.evaluate(task) + + assert eval_result.is_admitted() + + # Reserve the resources + evaluator.reserve(eval_result) + + # Check stats + stats = evaluator.stats() + assert stats.guaranteed_memory == 512 * 1024 * 1024 + + def test_qos_throttling(self): + """Burstable tasks should be throttled when exceeding base allocation.""" + evaluator = rust.QosPolicyEvaluator( + total_memory=1024 * 1024 * 1024, + total_bandwidth=1.0, + ) + + # Fill up guaranteed capacity + for i in range(4): + task = rust.QosTaskMeta.guaranteed(f"g{i}", f"Guaranteed {i}", 200 * 1024 * 1024) + eval_result = evaluator.evaluate(task) + if eval_result.is_admitted(): + evaluator.reserve(eval_result) + + # New burstable task should potentially be throttled + burstable = rust.QosTaskMeta.burstable("b1", "Burstable", 100 * 1024 * 1024, 2.0) + eval_b = evaluator.evaluate(burstable) + + # Should either be admitted with throttling or queued + assert eval_b.is_admitted() or eval_b.is_throttled() or eval_b.is_queued() + + +class TestQoSPolicyIntegration: + """Integration tests for QoS with other components.""" + + def test_qos_with_partitioning(self): + """QoS should work with GPU partitioning.""" + # Create partition manager + pm = rust.PartitionManager(rust.PartitionConfig(total_memory=8 * 1024 * 1024 * 1024)) + + # Create inference partition + pm.create_partition( + "inference", "Inference Partition", + rust.PartitionLimits().memory(4 * 1024 * 1024 * 1024).compute(0.5) + ) + + # Create QoS evaluator for the partition + evaluator = rust.QosPolicyEvaluator( + total_memory=4 * 1024 * 1024 * 1024, # Partition's quota + total_bandwidth=0.5, # Partition's bandwidth share + ) + + # Submit task to partition + task = rust.QosTaskMeta.guaranteed("inf1", "Inference Task", 1024 * 1024 * 1024) + eval_result = evaluator.evaluate(task) + + assert eval_result.is_admitted() + + def test_qos_with_admission_control(self): + """QoS decisions should align with admission control.""" + # Admission control with strict limits + admission_config = rust.AdmissionConfig( + max_memory=1024 * 1024 * 1024, + max_bandwidth=1.0, + ) + admission = rust.AdmissionController(admission_config) + + # QoS evaluator with same limits + qos = rust.QosPolicyEvaluator( + total_memory=1024 * 1024 * 1024, + total_bandwidth=1.0, + ) + + # Create task + task = rust.QosTaskMeta.guaranteed("task1", "Test", 512 * 1024 * 1024) + + # Both should agree on admission + qos_result = qos.evaluate(task) + admission_result = admission.try_admit("task1", 512 * 1024 * 1024, 0.5) + + assert qos_result.is_admitted() == admission_result.is_admitted() + + +class TestRejectReason: + """Tests for rejection reason reporting.""" + + def test_reject_reason_memory(self): + """Should report memory as rejection reason.""" + config = rust.AdmissionConfig( + max_memory=100 * 1024 * 1024, + max_bandwidth=1.0, + enable_best_effort=False, # Strict mode for rejection + ) + controller = rust.AdmissionController(config) + + # Request more than available + decision = controller.try_admit("task1", 200 * 1024 * 1024, 0.1) + + assert decision.is_rejected() + assert decision.reject_reason() == rust.RejectReason.InsufficientMemory + + def test_reject_reason_bandwidth(self): + """Should report bandwidth as rejection reason. + + Note: Bandwidth is calculated from memory request (memory/total_memory). + To trigger bandwidth rejection, request memory that results in + bandwidth_estimate > max_bandwidth. + """ + config = rust.AdmissionConfig( + max_memory=1024 * 1024 * 1024, # 1GB + max_bandwidth=0.5, # 50% bandwidth limit + enable_best_effort=False, # Strict mode for rejection + ) + controller = rust.AdmissionController(config) + + # Request 600MB which results in bandwidth_estimate = 600/1024 = 0.586 > 0.5 + # This should trigger bandwidth rejection (memory is available but bandwidth exceeded) + decision = controller.try_admit("task1", 600 * 1024 * 1024, 0.0) + + assert decision.is_rejected() + assert decision.reject_reason() == rust.RejectReason.InsufficientBandwidth