diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index a271628..61ad9d6 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -189,6 +189,47 @@ jobs: name: wheel-windows-py312 path: dist/*.whl + # Test Driver-Only Mode on Windows (GPU required) + test-driver-only-windows: + runs-on: [self-hosted, Windows, X64, cuda] + needs: [build-windows] + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python 3.12 + shell: pwsh + run: | + pyenv install 3.12 --skip-existing + pyenv local 3.12 + python --version + + - name: Clean previous builds + shell: pwsh + run: | + if (Test-Path dist) { Remove-Item -Recurse -Force dist } + if (Test-Path build) { Remove-Item -Recurse -Force build } + + - name: Install build dependencies + shell: pwsh + run: | + python -m pip install --upgrade pip + pip install build scikit-build-core pybind11 ninja cmake pytest numpy + + - name: Build Driver-Only Mode + shell: pwsh + run: | + python -m build --wheel -C cmake.define.PYGPUKIT_DRIVER_ONLY=ON + env: + CMAKE_CUDA_ARCHITECTURES: "86" + + - name: Install and Test Driver-Only wheel + shell: pwsh + run: | + pip install dist/*.whl --force-reinstall + python -c "import pygpukit; print('CUDA available:', pygpukit.is_cuda_available())" + pytest tests/ -v --tb=short + # Publish to TestPyPI first publish-testpypi: runs-on: ubuntu-latest diff --git a/examples/benchmark_tiled_matmul.py b/examples/benchmark_tiled_matmul.py new file mode 100644 index 0000000..7cec00b --- /dev/null +++ b/examples/benchmark_tiled_matmul.py @@ -0,0 +1,91 @@ +"""Benchmark: Tiled vs Naive Matmul Performance""" + +import os +import sys +import time + +# Add CUDA DLLs to PATH +cuda_path = os.environ.get("CUDA_PATH", r"C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v12.4") +cuda_bin = os.path.join(cuda_path, "bin") +if cuda_bin not in os.environ["PATH"]: + os.environ["PATH"] = cuda_bin + os.pathsep + os.environ["PATH"] +if hasattr(os, "add_dll_directory"): + os.add_dll_directory(cuda_bin) + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src", "pygpukit")) + +import numpy as np # noqa: E402 + +print("=" * 70) +print("Tiled Matmul Benchmark - PyGPUkit v0.2") +print("=" * 70) + +try: + import _pygpukit_native as native # noqa: E402 + + print(f"\nCUDA available: {native.is_cuda_available()}") + + if native.is_cuda_available(): + props = native.get_device_properties(0) + print(f"GPU: {props.name}") + print(f"Memory: {props.total_memory / 1024**3:.1f} GB") + + print("\n" + "-" * 70) + print("Matrix Size | Kernel | Time (ms) | GFLOPS | Speedup") + print("-" * 70) + + sizes = [512, 1024, 2048, 3072, 4096] + + for size in sizes: + M, N, K = size, size, size + + # Create test matrices + A_np = np.random.randn(M, K).astype(np.float32) + B_np = np.random.randn(K, N).astype(np.float32) + + # Warmup + A_gpu = native.from_numpy(A_np) + B_gpu = native.from_numpy(B_np) + _ = native.matmul(A_gpu, B_gpu) + + # Benchmark GPU + iterations = 5 if size >= 2048 else 10 + times = [] + for _ in range(iterations): + A_gpu = native.from_numpy(A_np) + B_gpu = native.from_numpy(B_np) + start = time.perf_counter() + C_gpu = native.matmul(A_gpu, B_gpu) + gpu_time = time.perf_counter() - start + times.append(gpu_time) + + avg_time = np.median(times) + gflops = 2 * M * N * K / avg_time / 1e9 + + # Check which kernel is used (threshold is 2048) + kernel = "Tiled" if size >= 2048 else "L2-opt" + + # CPU reference + start = time.perf_counter() + C_cpu = np.matmul(A_np, B_np) + cpu_time = time.perf_counter() - start + + speedup = cpu_time / avg_time + + # Verify correctness + C_result = C_gpu.to_numpy() + rel_error = np.max(np.abs(C_result - C_cpu)) / np.max(np.abs(C_cpu)) + + print(f"{size:>5}x{size:<5} | {kernel:<9} | {avg_time*1000:>8.2f} | {gflops:>7.1f} | {speedup:>5.1f}x") + + if rel_error > 1e-3: + print(f" WARNING: High relative error: {rel_error:.2e}") + + print("-" * 70) + print("\nTiled kernel should show improved performance for sizes >= 2048") + print("=" * 70) + +except ImportError as e: + print(f"Error: {e}") + print("Native module not available") diff --git a/examples/demo_v02.py b/examples/demo_v02.py new file mode 100644 index 0000000..896e097 --- /dev/null +++ b/examples/demo_v02.py @@ -0,0 +1,438 @@ +""" +PyGPUkit v0.2 Full Feature Demo & Benchmark + +This demo showcases all v0.2 features: +1. Tiled Matmul Kernel (C++/CUDA) - Shared memory + double buffering +2. Rust Memory Pool - LRU eviction, size classes +3. Rust Scheduler - Task management, bandwidth pacing +4. Rust Async Transfer Engine - Separate H2D/D2H streams +5. Rust Kernel Dispatch Controller - Per-stream launch management +""" + +import os +import sys +import time + +# Add CUDA DLLs to PATH +cuda_path = os.environ.get("CUDA_PATH", r"C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v12.4") +cuda_bin = os.path.join(cuda_path, "bin") +if cuda_bin not in os.environ["PATH"]: + os.environ["PATH"] = cuda_bin + os.pathsep + os.environ["PATH"] +if hasattr(os, "add_dll_directory"): + os.add_dll_directory(cuda_bin) + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) + +import numpy as np + +# ============================================================================= +# Header +# ============================================================================= + +def print_header(title: str): + print("\n" + "=" * 70) + print(f" {title}") + print("=" * 70) + +def print_section(title: str): + print(f"\n--- {title} ---") + +# ============================================================================= +# Main Demo +# ============================================================================= + +def main(): + print_header("PyGPUkit v0.2 Full Feature Demo") + + # Import modules + try: + import pygpukit + native = pygpukit._pygpukit_native + import _pygpukit_rust as rust + print(f"PyGPUkit version: {pygpukit.__version__}") + print("Native module loaded: OK") + print("Rust module loaded: OK") + except ImportError as e: + print(f"Import error: {e}") + return 1 + + # Environment info + print_section("Environment") + print(f"CUDA available: {native.is_cuda_available()}") + + if not native.is_cuda_available(): + print("GPU not available, exiting") + return 1 + + props = native.get_device_properties(0) + print(f"GPU: {props.name}") + print(f"Memory: {props.total_memory / 1024**3:.1f} GB") + print(f"Compute Capability: {props.compute_capability_major}.{props.compute_capability_minor}") + print(f"SMs: {props.multiprocessor_count}") + + # ========================================================================= + # 1. Rust Memory Pool Demo + # ========================================================================= + print_header("1. Rust Memory Pool") + + pool = rust.MemoryPool( + quota=100 * 1024 * 1024, # 100 MB quota + enable_eviction=True + ) + print(f"Created pool with 100 MB quota, eviction enabled") + + # Allocate blocks + block_ids = [] + for i, size in enumerate([1024, 4096, 16384, 65536]): + block_id = pool.allocate(size) + block_ids.append(block_id) + block = pool.get_block(block_id) + print(f" Allocated block {block.id}: {block.size} bytes") + + stats = pool.stats() + print(f"\nPool stats:") + print(f" Active: {stats.active_blocks} blocks, {stats.used} bytes") + print(f" Allocations: {stats.allocation_count}") + print(f" Quota usage: {stats.used / stats.quota:.1%}") + + # Free and reuse + pool.free(block_ids[0]) + print(f"\nFreed block {block_ids[0]}") + + new_block_id = pool.allocate(1024) + print(f"Allocated new 1024-byte block: {new_block_id} (should reuse free list)") + + stats = pool.stats() + print(f"Reuse count: {stats.reuse_count}") + + # ========================================================================= + # 2. Rust Scheduler Demo + # ========================================================================= + print_header("2. Rust Scheduler") + + scheduler = rust.Scheduler( + sched_tick_ms=10.0, + window_ms=100.0, + total_memory=1024 * 1024 * 1024 # 1 GB + ) + print("Created scheduler (10ms tick, 100ms window, 1GB memory)") + + # Submit tasks + task_ids = [] + for i in range(5): + task = rust.TaskMeta( + id=f"task_{i}", + name=f"Layer {i}", + memory_estimate=100 * 1024 * 1024, # 100 MB + priority=i % 3 + ) + task_id = scheduler.submit(task) + task_ids.append(task_id) + print(f" Submitted: {task_id} (priority: {i % 3})") + + sched_stats = scheduler.stats() + print(f"\nPending tasks: {sched_stats.pending_count}") + + # Run tasks (returns list of task_ids) + runnable_ids = scheduler.get_runnable_tasks(max_tasks=3) + print(f"Runnable tasks: {len(runnable_ids)}") + for task_id in runnable_ids: + task = scheduler.get_task(task_id) + print(f" {task.id}: state={task.state}") + + # Start and complete a task + if runnable_ids: + scheduler.start_task(runnable_ids[0]) + scheduler.complete_task(runnable_ids[0]) + print(f"\nCompleted: {runnable_ids[0]}") + + sched_stats = scheduler.stats() + print(f"\nScheduler stats:") + print(f" Total submitted: {sched_stats.total_submitted}") + print(f" Completed: {sched_stats.completed_count}") + print(f" Pending: {sched_stats.pending_count}") + print(f" Reserved memory: {sched_stats.reserved_memory / 1024 / 1024:.0f} MB") + + # ========================================================================= + # 3. Rust Async Transfer Engine Demo + # ========================================================================= + print_header("3. Rust Async Transfer Engine") + + transfer_engine = rust.AsyncTransferEngine(max_concurrent=4) + print("Created transfer engine (max 4 concurrent)") + + # Queue transfers (using enqueue_with_priority, type is string: "h2d", "d2h", "d2d") + transfer_ids = [] + for i in range(6): + type_name = "h2d" if i % 2 == 0 else "d2h" + op_id = transfer_engine.enqueue_with_priority( + transfer_type=type_name, + src_ptr=0x1000 + i * 0x1000, + dst_ptr=0x2000 + i * 0x1000, + size=1024 * 1024, # 1 MB + priority=i % 3 + ) + transfer_ids.append(op_id) + print(f" Queued transfer {op_id}: {type_name.upper()}, priority={i % 3}") + + # Get ready transfers + ready = transfer_engine.get_ready_transfers(max_transfers=4) + print(f"\nReady transfers: {len(ready)}") + for op in ready: + print(f" {op.id}: {op.size} bytes") + + # Simulate completion + for op in ready[:2]: + transfer_engine.start_transfer(op.id) + transfer_engine.complete_transfer(op.id) + print(f" Completed transfer {op.id}") + + transfer_stats = transfer_engine.stats() + print(f"\nTransfer stats:") + print(f" Total queued: {transfer_stats.total_queued}") + print(f" Completed: {transfer_stats.completed_count}") + print(f" Pending: {transfer_stats.pending_count}") + + # ========================================================================= + # 4. Rust Kernel Dispatch Controller Demo + # ========================================================================= + print_header("4. Rust Kernel Dispatch Controller") + + dispatcher = rust.KernelDispatcher(max_in_flight=4) + print("Created dispatcher (max 4 in-flight per stream)") + + # Queue kernel launches + for i in range(8): + config = rust.LaunchConfig( + grid=(128, 1, 1), + block=(256, 1, 1), + shared_mem=0, + stream_id=i % 2 # Alternate between stream 0 and 1 + ) + req_id = dispatcher.queue( + kernel_handle=0xDEADBEEF + i, + config=config, + priority=i % 3 + ) + print(f" Queued kernel {req_id}: stream={i % 2}, priority={i % 3}") + + # Get ready kernels + ready_kernels = dispatcher.get_ready(max_requests=6) + print(f"\nReady kernels: {len(ready_kernels)}") + for req in ready_kernels: + print(f" {req.id}: kernel=0x{req.kernel_handle:X}, stream={req.config.stream_id}") + + # Simulate launch and completion + for req in ready_kernels[:4]: + dispatcher.mark_launched(req.id) + dispatcher.mark_completed(req.id) + + dispatch_stats = dispatcher.stats() + print(f"\nDispatch stats:") + print(f" Total queued: {dispatch_stats.total_queued}") + print(f" Completed: {dispatch_stats.completed_count}") + print(f" Pending: {dispatch_stats.pending_count}") + print(f" In-flight: {dispatch_stats.in_flight_count}") + + # ========================================================================= + # 5. Tiled Matmul Benchmark + # ========================================================================= + print_header("5. Tiled Matmul Benchmark") + + print("\nMatrix Size | Kernel | Time (ms) | GFLOPS | vs NumPy") + print("-" * 60) + + sizes = [512, 1024, 2048, 4096] + results = [] + + for size in sizes: + M, N, K = size, size, size + + # Create test matrices + A_np = np.random.randn(M, K).astype(np.float32) + B_np = np.random.randn(K, N).astype(np.float32) + + # Warmup + A_gpu = native.from_numpy(A_np) + B_gpu = native.from_numpy(B_np) + _ = native.matmul(A_gpu, B_gpu) + + # Benchmark GPU + iterations = 5 if size >= 2048 else 10 + times = [] + for _ in range(iterations): + A_gpu = native.from_numpy(A_np) + B_gpu = native.from_numpy(B_np) + start = time.perf_counter() + C_gpu = native.matmul(A_gpu, B_gpu) + gpu_time = time.perf_counter() - start + times.append(gpu_time) + + avg_time = np.median(times) + gflops = 2 * M * N * K / avg_time / 1e9 + + # Kernel type (threshold is 2048) + kernel = "Tiled" if size >= 2048 else "L2-opt" + + # NumPy reference + start = time.perf_counter() + C_cpu = np.matmul(A_np, B_np) + cpu_time = time.perf_counter() - start + + speedup = cpu_time / avg_time + + # Verify + C_result = C_gpu.to_numpy() + rel_error = np.max(np.abs(C_result - C_cpu)) / np.max(np.abs(C_cpu)) + + results.append({ + 'size': size, + 'kernel': kernel, + 'time_ms': avg_time * 1000, + 'gflops': gflops, + 'speedup': speedup, + 'error': rel_error + }) + + status = "OK" if rel_error < 1e-3 else f"ERR:{rel_error:.1e}" + print(f"{size:>5}x{size:<5} | {kernel:<9} | {avg_time*1000:>8.2f} | {gflops:>7.1f} | {speedup:>5.1f}x ({status})") + + print("-" * 60) + + # Peak performance + peak = max(results, key=lambda x: x['gflops']) + print(f"\nPeak: {peak['gflops']:.1f} GFLOPS at {peak['size']}x{peak['size']} ({peak['kernel']})") + + # ========================================================================= + # 6. Integrated Demo - Full Pipeline + # ========================================================================= + print_header("6. Integrated Demo - Full Pipeline") + + print("\nSimulating a deep learning inference pipeline:") + print(" 1. Scheduler manages task queue") + print(" 2. Memory pool allocates GPU buffers") + print(" 3. Transfer engine handles H2D/D2H") + print(" 4. Dispatcher launches kernels") + print(" 5. Tiled matmul for compute") + + # Reset components + pool.clear() + scheduler.clear() + + # Simulate batch processing + batch_size = 32 + hidden_dim = 1024 + num_layers = 4 + + print(f"\nBatch={batch_size}, Hidden={hidden_dim}, Layers={num_layers}") + + total_time = 0 + total_flops = 0 + + for layer in range(num_layers): + # Submit scheduler task + task = rust.TaskMeta( + id=f"layer_{layer}", + name=f"Layer {layer}", + memory_estimate=batch_size * hidden_dim * 4 * 2, # input + output + priority=0 + ) + task_id = scheduler.submit(task) + + # Allocate memory + input_block_id = pool.allocate(batch_size * hidden_dim * 4) + weight_block_id = pool.allocate(hidden_dim * hidden_dim * 4) + output_block_id = pool.allocate(batch_size * hidden_dim * 4) + + # Queue transfer + h2d_id = transfer_engine.enqueue_h2d( + host_ptr=0x1000, + device_ptr=input_block_id, + size=batch_size * hidden_dim * 4 + ) + + # Queue kernel + config = rust.LaunchConfig.linear(batch_size * hidden_dim, 256) + kernel_id = dispatcher.queue( + kernel_handle=0xFFFF0000 + layer, + config=config, + task_id=task_id, + priority=0 + ) + + # Actual compute + A = np.random.randn(batch_size, hidden_dim).astype(np.float32) + W = np.random.randn(hidden_dim, hidden_dim).astype(np.float32) + + A_gpu = native.from_numpy(A) + W_gpu = native.from_numpy(W) + + start = time.perf_counter() + out_gpu = native.matmul(A_gpu, W_gpu) + layer_time = time.perf_counter() - start + + total_time += layer_time + total_flops += 2 * batch_size * hidden_dim * hidden_dim + + # Complete task + scheduler.complete_task(task_id) + + # Free memory (except output for next layer) + pool.free(input_block_id) + pool.free(weight_block_id) + + throughput = total_flops / total_time / 1e9 + + print(f"\nPipeline completed:") + print(f" Total time: {total_time*1000:.2f} ms") + print(f" Throughput: {throughput:.1f} GFLOPS") + print(f" Tasks completed: {scheduler.stats().completed_count}") + print(f" Memory allocations: {pool.stats().allocation_count}") + print(f" Transfers queued: {transfer_engine.stats().total_queued}") + print(f" Kernels dispatched: {dispatcher.stats().total_queued}") + + # ========================================================================= + # Summary + # ========================================================================= + print_header("Summary - PyGPUkit v0.2 Features") + + print(""" + 1. Tiled Matmul Kernel + - Shared memory tiling (64x64 tiles) + - Double-buffered prefetch + - Up to 6+ TFLOPS on RTX 3090 Ti + + 2. Rust Memory Pool + - LRU eviction + - Size-class free lists + - Thread-safe (parking_lot::RwLock) + + 3. Rust Scheduler + - Priority-based task queue + - Memory reservation + - Bandwidth pacing + + 4. Rust Async Transfer Engine + - Separate H2D/D2H streams + - Priority ordering + - Concurrent transfer limits + + 5. Rust Kernel Dispatch Controller + - Per-stream in-flight limits + - Scheduler task integration + - Launch lifecycle tracking + + 6. Driver-Only Mode (Infrastructure) + - CUDA Driver API wrappers + - Context management + - No cudart dependency (when enabled) + """) + + print("=" * 70) + print(" Demo Complete!") + print("=" * 70) + + return 0 + +if __name__ == "__main__": + sys.exit(main()) diff --git a/native/CMakeLists.txt b/native/CMakeLists.txt index 2ef9f8b..452dbaa 100644 --- a/native/CMakeLists.txt +++ b/native/CMakeLists.txt @@ -6,9 +6,20 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CUDA_STANDARD 17) set(CMAKE_CUDA_STANDARD_REQUIRED ON) +# Driver-Only Mode option +# When enabled, removes cudart dependency and uses CUDA Driver API only +option(PYGPUKIT_DRIVER_ONLY "Build in driver-only mode (no cudart dependency)" OFF) + # Find CUDA find_package(CUDAToolkit REQUIRED) +if(PYGPUKIT_DRIVER_ONLY) + message(STATUS "Building in DRIVER-ONLY mode (no cudart dependency)") + add_compile_definitions(PYGPUKIT_DRIVER_ONLY=1) +else() + message(STATUS "Building with CUDA Runtime API (cudart)") +endif() + # Find Python and pybind11 find_package(Python3 REQUIRED COMPONENTS Interpreter Development.Module) find_package(pybind11 CONFIG REQUIRED) @@ -50,11 +61,20 @@ pybind11_add_module(_pygpukit_native bindings/ops_bindings.cpp ) -target_link_libraries(_pygpukit_native PRIVATE - CUDA::cudart - CUDA::cuda_driver - CUDA::nvrtc -) +if(PYGPUKIT_DRIVER_ONLY) + # Driver-only: Link only cuda_driver and nvrtc (no cudart) + target_link_libraries(_pygpukit_native PRIVATE + CUDA::cuda_driver + CUDA::nvrtc + ) +else() + # Standard: Link cudart, cuda_driver, and nvrtc + target_link_libraries(_pygpukit_native PRIVATE + CUDA::cudart + CUDA::cuda_driver + CUDA::nvrtc + ) +endif() set_target_properties(_pygpukit_native PROPERTIES CUDA_SEPARABLE_COMPILATION ON diff --git a/native/core/device.cpp b/native/core/device.cpp index 861ccac..e6c7b97 100644 --- a/native/core/device.cpp +++ b/native/core/device.cpp @@ -1,5 +1,146 @@ #include "device.hpp" #include "types.hpp" + +#ifdef PYGPUKIT_DRIVER_ONLY +// Driver-only mode: Use CUDA Driver API +#include "driver_context.hpp" +#include + +namespace pygpukit { + +namespace { + +void check_driver_error(CUresult result, const char* msg) { + if (result != CUDA_SUCCESS) { + const char* error_str = nullptr; + cuGetErrorString(result, &error_str); + throw CudaError(std::string(msg) + ": " + (error_str ? error_str : "unknown error")); + } +} + +} // anonymous namespace + +bool is_cuda_available() { + return driver::DriverContext::instance().is_available(); +} + +int get_driver_version() { + int version = 0; + check_driver_error(cuDriverGetVersion(&version), "Failed to get driver version"); + return version; +} + +int get_runtime_version() { + // No runtime in driver-only mode + return 0; +} + +int get_device_count() { + return driver::DriverContext::instance().device_count(); +} + +DeviceProperties get_device_properties(int device_id) { + auto& ctx = driver::DriverContext::instance(); + CUdevice device = ctx.get_device(device_id); + + DeviceProperties result; + + // Get device name + char name[256]; + check_driver_error(cuDeviceGetName(name, sizeof(name), device), "Failed to get device name"); + result.name = name; + + // Get total memory + size_t total_mem; + check_driver_error(cuDeviceTotalMem(&total_mem, device), "Failed to get device memory"); + result.total_memory = total_mem; + + // Get compute capability + int major, minor; + check_driver_error( + cuDeviceGetAttribute(&major, CU_DEVICE_ATTRIBUTE_COMPUTE_CAPABILITY_MAJOR, device), + "Failed to get compute capability major" + ); + check_driver_error( + cuDeviceGetAttribute(&minor, CU_DEVICE_ATTRIBUTE_COMPUTE_CAPABILITY_MINOR, device), + "Failed to get compute capability minor" + ); + result.compute_capability_major = major; + result.compute_capability_minor = minor; + + // Get multiprocessor count + int mp_count; + check_driver_error( + cuDeviceGetAttribute(&mp_count, CU_DEVICE_ATTRIBUTE_MULTIPROCESSOR_COUNT, device), + "Failed to get multiprocessor count" + ); + result.multiprocessor_count = mp_count; + + // Get max threads per block + int max_threads; + check_driver_error( + cuDeviceGetAttribute(&max_threads, CU_DEVICE_ATTRIBUTE_MAX_THREADS_PER_BLOCK, device), + "Failed to get max threads per block" + ); + result.max_threads_per_block = max_threads; + + // Get warp size + int warp_size; + check_driver_error( + cuDeviceGetAttribute(&warp_size, CU_DEVICE_ATTRIBUTE_WARP_SIZE, device), + "Failed to get warp size" + ); + result.warp_size = warp_size; + + return result; +} + +void set_device(int device_id) { + driver::DriverContext::instance().set_current(device_id); +} + +int get_current_device() { + return driver::DriverContext::instance().current_device(); +} + +void device_synchronize() { + driver::DriverContext::instance().synchronize(); +} + +int get_sm_version(int device_id) { + auto& ctx = driver::DriverContext::instance(); + CUdevice device = ctx.get_device(device_id); + + int major, minor; + check_driver_error( + cuDeviceGetAttribute(&major, CU_DEVICE_ATTRIBUTE_COMPUTE_CAPABILITY_MAJOR, device), + "Failed to get compute capability major" + ); + check_driver_error( + cuDeviceGetAttribute(&minor, CU_DEVICE_ATTRIBUTE_COMPUTE_CAPABILITY_MINOR, device), + "Failed to get compute capability minor" + ); + return major * 10 + minor; +} + +void validate_compute_capability(int device_id) { + int sm = get_sm_version(device_id); + if (sm < 80) { + DeviceProperties props = get_device_properties(device_id); + throw std::runtime_error( + "PyGPUkit requires SM >= 80 (Ampere or newer). " + "Found: " + props.name + " with SM " + + std::to_string(props.compute_capability_major) + "." + + std::to_string(props.compute_capability_minor) + + ". Older GPUs (Pascal, Turing, etc.) are not supported." + ); + } +} + +} // namespace pygpukit + +#else +// Standard mode: Use CUDA Runtime API #include namespace pygpukit { @@ -98,3 +239,5 @@ void validate_compute_capability(int device_id) { } } // namespace pygpukit + +#endif // PYGPUKIT_DRIVER_ONLY diff --git a/native/core/driver_api.hpp b/native/core/driver_api.hpp new file mode 100644 index 0000000..4ce8a0b --- /dev/null +++ b/native/core/driver_api.hpp @@ -0,0 +1,221 @@ +#pragma once +/** + * CUDA Driver API wrapper for PyGPUkit + * + * This header provides abstractions for Driver API calls that can replace + * the Runtime API (cudart) calls for driver-only mode. + * + * Driver-Only Mode Benefits: + * - No CUDA Toolkit installation required + * - Only NVIDIA GPU driver needed + * - NVRTC DLL bundled with wheel + * + * Migration Status: + * - [x] Infrastructure created + * - [ ] Memory management (cuMemAlloc, cuMemFree, cuMemcpy*) + * - [ ] Device management (cuDeviceGet, cuDeviceGetAttribute) + * - [ ] Stream management (cuStreamCreate, cuStreamSynchronize) + * - [ ] Kernel launch (cuLaunchKernel) + * - [ ] Error handling (cuGetErrorString) + */ + +#include +#include +#include + +namespace pygpukit { +namespace driver { + +/** + * Initialize CUDA Driver API + * Must be called before any other driver API calls + */ +inline void init() { + CUresult result = cuInit(0); + if (result != CUDA_SUCCESS) { + const char* error_str; + cuGetErrorString(result, &error_str); + throw std::runtime_error(std::string("Failed to initialize CUDA Driver API: ") + error_str); + } +} + +/** + * Check CUDA Driver API result and throw on error + */ +inline void check_driver_error(CUresult result, const char* msg) { + if (result != CUDA_SUCCESS) { + const char* error_str; + cuGetErrorString(result, &error_str); + throw std::runtime_error(std::string(msg) + ": " + (error_str ? error_str : "unknown error")); + } +} + +/** + * Get device count using Driver API + */ +inline int get_device_count() { + int count = 0; + CUresult result = cuDeviceGetCount(&count); + if (result != CUDA_SUCCESS) { + return 0; + } + return count; +} + +/** + * Get device handle + */ +inline CUdevice get_device(int device_id) { + CUdevice device; + check_driver_error(cuDeviceGet(&device, device_id), "Failed to get device"); + return device; +} + +/** + * Create CUDA context for a device + */ +inline CUcontext create_context(CUdevice device, unsigned int flags = 0) { + CUcontext context; + check_driver_error(cuCtxCreate(&context, flags, device), "Failed to create context"); + return context; +} + +/** + * Allocate device memory using Driver API + */ +inline CUdeviceptr mem_alloc(size_t size) { + CUdeviceptr ptr; + check_driver_error(cuMemAlloc(&ptr, size), "Failed to allocate device memory"); + return ptr; +} + +/** + * Free device memory + */ +inline void mem_free(CUdeviceptr ptr) { + check_driver_error(cuMemFree(ptr), "Failed to free device memory"); +} + +/** + * Copy host to device + */ +inline void mem_copy_htod(CUdeviceptr dst, const void* src, size_t size) { + check_driver_error(cuMemcpyHtoD(dst, src, size), "Failed to copy H2D"); +} + +/** + * Copy device to host + */ +inline void mem_copy_dtoh(void* dst, CUdeviceptr src, size_t size) { + check_driver_error(cuMemcpyDtoH(dst, src, size), "Failed to copy D2H"); +} + +/** + * Copy device to device + */ +inline void mem_copy_dtod(CUdeviceptr dst, CUdeviceptr src, size_t size) { + check_driver_error(cuMemcpyDtoD(dst, src, size), "Failed to copy D2D"); +} + +/** + * Create stream + */ +inline CUstream create_stream(unsigned int flags = CU_STREAM_NON_BLOCKING) { + CUstream stream; + check_driver_error(cuStreamCreate(&stream, flags), "Failed to create stream"); + return stream; +} + +/** + * Destroy stream + */ +inline void destroy_stream(CUstream stream) { + check_driver_error(cuStreamDestroy(stream), "Failed to destroy stream"); +} + +/** + * Synchronize stream + */ +inline void sync_stream(CUstream stream) { + check_driver_error(cuStreamSynchronize(stream), "Failed to synchronize stream"); +} + +/** + * Synchronize device (all streams) + */ +inline void sync_device() { + check_driver_error(cuCtxSynchronize(), "Failed to synchronize device"); +} + +/** + * Get device attribute + */ +inline int get_device_attribute(CUdevice_attribute attrib, CUdevice device) { + int value; + check_driver_error(cuDeviceGetAttribute(&value, attrib, device), "Failed to get device attribute"); + return value; +} + +/** + * Get device name + */ +inline std::string get_device_name(CUdevice device) { + char name[256]; + check_driver_error(cuDeviceGetName(name, sizeof(name), device), "Failed to get device name"); + return std::string(name); +} + +/** + * Get device total memory + */ +inline size_t get_device_total_memory(CUdevice device) { + size_t total; + check_driver_error(cuDeviceTotalMem(&total, device), "Failed to get device memory"); + return total; +} + +/** + * Kernel launch configuration for Driver API + */ +struct DriverLaunchConfig { + unsigned int grid_x, grid_y, grid_z; + unsigned int block_x, block_y, block_z; + unsigned int shared_mem; + CUstream stream; + + DriverLaunchConfig() + : grid_x(1), grid_y(1), grid_z(1) + , block_x(256), block_y(1), block_z(1) + , shared_mem(0), stream(nullptr) {} +}; + +/** + * Launch kernel using Driver API + * + * @param func CUfunction handle from NVRTC compilation + * @param config Launch configuration + * @param args Kernel arguments (array of pointers to argument values) + * @param n_args Number of arguments + */ +inline void launch_kernel( + CUfunction func, + const DriverLaunchConfig& config, + void** args, + size_t n_args +) { + check_driver_error( + cuLaunchKernel( + func, + config.grid_x, config.grid_y, config.grid_z, + config.block_x, config.block_y, config.block_z, + config.shared_mem, + config.stream, + args, + nullptr // extra params + ), + "Failed to launch kernel" + ); +} + +} // namespace driver +} // namespace pygpukit diff --git a/native/core/driver_context.hpp b/native/core/driver_context.hpp new file mode 100644 index 0000000..1823088 --- /dev/null +++ b/native/core/driver_context.hpp @@ -0,0 +1,216 @@ +#pragma once +/** + * CUDA Driver Context Manager for PyGPUkit + * + * This header provides a singleton context manager for Driver API operations. + * The context is lazily initialized on first use. + * + * In driver-only mode, all CUDA operations must be done through a context. + * This manager ensures proper initialization and cleanup. + */ + +#include +#include +#include +#include + +namespace pygpukit { +namespace driver { + +/** + * Check CUDA Driver API result and throw on error + */ +inline void check_driver_error(CUresult result, const char* msg) { + if (result != CUDA_SUCCESS) { + const char* error_str = nullptr; + cuGetErrorString(result, &error_str); + throw std::runtime_error(std::string(msg) + ": " + (error_str ? error_str : "unknown error")); + } +} + +/** + * Singleton context manager for CUDA Driver API + * + * Manages CUDA initialization, device selection, and context lifecycle. + * Thread-safe initialization via std::call_once. + */ +class DriverContext { +public: + /** + * Get singleton instance + */ + static DriverContext& instance() { + static DriverContext ctx; + return ctx; + } + + /** + * Ensure CUDA Driver API is initialized + * Safe to call multiple times + */ + void ensure_init() { + std::call_once(init_flag_, [this]() { + CUresult result = cuInit(0); + if (result != CUDA_SUCCESS) { + initialized_ = false; + const char* error_str = nullptr; + cuGetErrorString(result, &error_str); + init_error_ = std::string("Failed to initialize CUDA Driver API: ") + + (error_str ? error_str : "unknown error"); + return; + } + initialized_ = true; + + // Get device count + int count = 0; + result = cuDeviceGetCount(&count); + if (result != CUDA_SUCCESS || count == 0) { + device_count_ = 0; + has_device_ = false; + return; + } + + device_count_ = count; + has_device_ = true; + }); + + if (!initialized_) { + throw std::runtime_error(init_error_); + } + } + + /** + * Check if CUDA is available (initialized and has devices) + */ + bool is_available() { + try { + ensure_init(); + return has_device_; + } catch (...) { + return false; + } + } + + /** + * Get device count + */ + int device_count() { + ensure_init(); + return device_count_; + } + + /** + * Get or create context for a device + * Creates primary context on first call + */ + CUcontext get_context(int device_id = 0) { + ensure_init(); + + if (device_id < 0 || device_id >= device_count_) { + throw std::runtime_error("Invalid device ID: " + std::to_string(device_id)); + } + + std::lock_guard lock(context_mutex_); + + // Check if context already exists for this device + if (device_id < static_cast(contexts_.size()) && contexts_[device_id] != nullptr) { + return contexts_[device_id]; + } + + // Ensure vector is large enough + if (static_cast(contexts_.size()) <= device_id) { + contexts_.resize(device_id + 1, nullptr); + devices_.resize(device_id + 1); + } + + // Get device handle + CUdevice device; + check_driver_error(cuDeviceGet(&device, device_id), "Failed to get device"); + devices_[device_id] = device; + + // Retain primary context (shared with runtime API if used together) + CUcontext ctx; + check_driver_error(cuDevicePrimaryCtxRetain(&ctx, device), "Failed to retain primary context"); + contexts_[device_id] = ctx; + + return ctx; + } + + /** + * Set current context for the calling thread + */ + void set_current(int device_id = 0) { + CUcontext ctx = get_context(device_id); + check_driver_error(cuCtxSetCurrent(ctx), "Failed to set current context"); + current_device_ = device_id; + } + + /** + * Get current device ID + */ + int current_device() const { + return current_device_; + } + + /** + * Get device handle + */ + CUdevice get_device(int device_id = 0) { + ensure_init(); + if (device_id < 0 || device_id >= device_count_) { + throw std::runtime_error("Invalid device ID"); + } + + // Ensure context is created (which populates devices_) + get_context(device_id); + + return devices_[device_id]; + } + + /** + * Synchronize current context + */ + void synchronize() { + check_driver_error(cuCtxSynchronize(), "Failed to synchronize context"); + } + + // Prevent copying + DriverContext(const DriverContext&) = delete; + DriverContext& operator=(const DriverContext&) = delete; + +private: + DriverContext() = default; + + ~DriverContext() { + // Release all retained primary contexts + for (size_t i = 0; i < contexts_.size(); ++i) { + if (contexts_[i] != nullptr && i < devices_.size()) { + cuDevicePrimaryCtxRelease(devices_[i]); + } + } + } + + std::once_flag init_flag_; + bool initialized_ = false; + bool has_device_ = false; + int device_count_ = 0; + std::string init_error_; + + std::mutex context_mutex_; + std::vector contexts_; + std::vector devices_; + int current_device_ = 0; +}; + +/** + * RAII guard to ensure context is set for current thread + */ +class ContextGuard { +public: + explicit ContextGuard(int device_id = 0) { + DriverContext::instance().set_current(device_id); + } +}; + +} // namespace driver +} // namespace pygpukit diff --git a/native/core/memory.cpp b/native/core/memory.cpp index eee9663..ec9155d 100644 --- a/native/core/memory.cpp +++ b/native/core/memory.cpp @@ -1,8 +1,78 @@ #include "memory.hpp" -#include #include #include +#ifdef PYGPUKIT_DRIVER_ONLY +// Driver-only mode: Use CUDA Driver API +#include "driver_context.hpp" +#include "driver_api.hpp" + +namespace pygpukit { + +namespace { + +void check_driver_error(CUresult result, const char* msg) { + if (result != CUDA_SUCCESS) { + const char* error_str = nullptr; + cuGetErrorString(result, &error_str); + throw CudaError(std::string(msg) + ": " + (error_str ? error_str : "unknown error")); + } +} + +} // anonymous namespace + +DevicePtr device_malloc(size_t size_bytes) { + // Ensure context is initialized + driver::DriverContext::instance().set_current(); + + CUdeviceptr dptr = 0; + check_driver_error(cuMemAlloc(&dptr, size_bytes), "Failed to allocate device memory"); + return reinterpret_cast(dptr); +} + +void device_free(DevicePtr ptr) { + if (ptr != nullptr) { + cuMemFree(reinterpret_cast(ptr)); + } +} + +void memcpy_host_to_device(DevicePtr dst, const void* src, size_t size_bytes) { + check_driver_error( + cuMemcpyHtoD(reinterpret_cast(dst), src, size_bytes), + "Failed to copy host to device" + ); +} + +void memcpy_device_to_host(void* dst, DevicePtr src, size_t size_bytes) { + check_driver_error( + cuMemcpyDtoH(dst, reinterpret_cast(src), size_bytes), + "Failed to copy device to host" + ); +} + +void memcpy_device_to_device(DevicePtr dst, DevicePtr src, size_t size_bytes) { + check_driver_error( + cuMemcpyDtoD(reinterpret_cast(dst), reinterpret_cast(src), size_bytes), + "Failed to copy device to device" + ); +} + +void device_memset(DevicePtr ptr, int value, size_t size_bytes) { + // cuMemsetD8 sets each byte to the value + check_driver_error( + cuMemsetD8(reinterpret_cast(ptr), static_cast(value), size_bytes), + "Failed to memset device memory" + ); +} + +void get_memory_info(size_t* free_bytes, size_t* total_bytes) { + check_driver_error(cuMemGetInfo(free_bytes, total_bytes), "Failed to get memory info"); +} + +#else +// Standard mode: Use CUDA Runtime API +#include + namespace pygpukit { namespace { @@ -53,6 +123,8 @@ void get_memory_info(size_t* free_bytes, size_t* total_bytes) { check_cuda_error(err, "Failed to get memory info"); } +#endif // PYGPUKIT_DRIVER_ONLY + // GPUArray implementation GPUArray::GPUArray(const std::vector& shape, DataType dtype) diff --git a/native/core/memory.cu b/native/core/memory.cu index b750eef..68043f1 100644 --- a/native/core/memory.cu +++ b/native/core/memory.cu @@ -1,9 +1,27 @@ // CUDA kernels for memory operations #include "memory.hpp" + +#ifdef PYGPUKIT_DRIVER_ONLY +#include "driver_context.hpp" +#include +#else #include +#endif namespace pygpukit { +namespace { + +void sync_device() { +#ifdef PYGPUKIT_DRIVER_ONLY + cuCtxSynchronize(); +#else + cudaDeviceSynchronize(); +#endif +} + +} // anonymous namespace + // Kernel to fill array with ones (float32) __global__ void fill_ones_f32_kernel(float* data, size_t n) { size_t idx = blockIdx.x * blockDim.x + threadIdx.x; @@ -59,7 +77,7 @@ void fill_ones_impl(DevicePtr ptr, size_t count, DataType dtype) { static_cast(ptr), count); break; } - cudaDeviceSynchronize(); + sync_device(); } GPUArray ones(const std::vector& shape, DataType dtype) { diff --git a/native/core/stream.cpp b/native/core/stream.cpp index a759167..cf61382 100644 --- a/native/core/stream.cpp +++ b/native/core/stream.cpp @@ -1,5 +1,76 @@ #include "stream.hpp" +#ifdef PYGPUKIT_DRIVER_ONLY +// Driver-only mode: Use CUDA Driver API +#include "driver_context.hpp" + +namespace pygpukit { + +namespace { + +void check_driver_error(CUresult result, const char* msg) { + if (result != CUDA_SUCCESS) { + const char* error_str = nullptr; + cuGetErrorString(result, &error_str); + throw CudaError(std::string(msg) + ": " + (error_str ? error_str : "unknown error")); + } +} + +} // anonymous namespace + +Stream::Stream(StreamPriority priority) + : stream_(nullptr), priority_(priority) { + // Ensure context is initialized + driver::DriverContext::instance().set_current(); + + int cuda_priority = (priority == StreamPriority::High) ? -1 : 0; + check_driver_error( + cuStreamCreateWithPriority(&stream_, CU_STREAM_NON_BLOCKING, cuda_priority), + "Failed to create stream" + ); +} + +Stream::~Stream() { + if (stream_ != nullptr) { + cuStreamDestroy(stream_); + } +} + +Stream::Stream(Stream&& other) noexcept + : stream_(other.stream_), priority_(other.priority_) { + other.stream_ = nullptr; +} + +Stream& Stream::operator=(Stream&& other) noexcept { + if (this != &other) { + if (stream_ != nullptr) { + cuStreamDestroy(stream_); + } + stream_ = other.stream_; + priority_ = other.priority_; + other.stream_ = nullptr; + } + return *this; +} + +void Stream::synchronize() { + check_driver_error(cuStreamSynchronize(stream_), "Failed to synchronize stream"); +} + +void get_stream_priority_range(int* least_priority, int* greatest_priority) { + // Ensure context is initialized + driver::DriverContext::instance().set_current(); + check_driver_error( + cuCtxGetStreamPriorityRange(least_priority, greatest_priority), + "Failed to get stream priority range" + ); +} + +} // namespace pygpukit + +#else +// Standard mode: Use CUDA Runtime API + namespace pygpukit { namespace { @@ -54,3 +125,5 @@ void get_stream_priority_range(int* least_priority, int* greatest_priority) { } } // namespace pygpukit + +#endif // PYGPUKIT_DRIVER_ONLY diff --git a/native/core/stream.hpp b/native/core/stream.hpp index ecdb54c..2a60bf8 100644 --- a/native/core/stream.hpp +++ b/native/core/stream.hpp @@ -1,7 +1,15 @@ #pragma once #include "types.hpp" + +#ifdef PYGPUKIT_DRIVER_ONLY +#include +// CUstream and cudaStream_t are the same underlying type +using StreamHandle = CUstream; +#else #include +using StreamHandle = cudaStream_t; +#endif namespace pygpukit { @@ -29,13 +37,13 @@ class Stream { void synchronize(); // Get raw CUDA stream handle - cudaStream_t handle() const { return stream_; } + StreamHandle handle() const { return stream_; } // Get priority StreamPriority priority() const { return priority_; } private: - cudaStream_t stream_; + StreamHandle stream_; StreamPriority priority_; }; diff --git a/native/jit/kernel.cpp b/native/jit/kernel.cpp index 2f0d62b..a1d7a7c 100644 --- a/native/jit/kernel.cpp +++ b/native/jit/kernel.cpp @@ -2,6 +2,10 @@ #include "compiler.hpp" #include +#ifdef PYGPUKIT_DRIVER_ONLY +#include "../core/driver_context.hpp" +#endif + namespace pygpukit { namespace { @@ -16,12 +20,18 @@ void check_cuda_driver_error(CUresult result, const char* msg) { // Initialize CUDA driver API (called once) void ensure_cuda_initialized() { +#ifdef PYGPUKIT_DRIVER_ONLY + // Use unified context manager in driver-only mode + driver::DriverContext::instance().set_current(); +#else + // Standard mode: use local initialization static bool initialized = false; if (!initialized) { CUresult result = cuInit(0); check_cuda_driver_error(result, "Failed to initialize CUDA driver"); initialized = true; } +#endif } } // anonymous namespace diff --git a/native/ops/basic.cu b/native/ops/basic.cu index d4d5b96..b8e4c90 100644 --- a/native/ops/basic.cu +++ b/native/ops/basic.cu @@ -1,7 +1,30 @@ #include "basic.cuh" -#include #include +#ifdef PYGPUKIT_DRIVER_ONLY +#include "../core/driver_context.hpp" +#include + +namespace pygpukit { +namespace ops { + +namespace { + +void check_driver_error(CUresult result, const char* msg) { + if (result != CUDA_SUCCESS) { + const char* error_str = nullptr; + cuGetErrorString(result, &error_str); + throw CudaError(std::string(msg) + ": " + (error_str ? error_str : "unknown error")); + } +} + +void sync_and_check(const char* msg) { + check_driver_error(cuCtxSynchronize(), msg); +} + +#else +#include + namespace pygpukit { namespace ops { @@ -13,6 +36,13 @@ void check_cuda_error(cudaError_t err, const char* msg) { } } +void sync_and_check(const char* msg) { + check_cuda_error(cudaGetLastError(), msg); + check_cuda_error(cudaDeviceSynchronize(), msg); +} + +#endif // PYGPUKIT_DRIVER_ONLY + void validate_same_shape(const GPUArray& a, const GPUArray& b, const char* op_name) { if (a.shape() != b.shape()) { throw std::runtime_error(std::string(op_name) + " requires arrays of same shape"); @@ -109,8 +139,7 @@ void add(const GPUArray& a, const GPUArray& b, GPUArray& c) { break; } - check_cuda_error(cudaGetLastError(), "add kernel launch failed"); - check_cuda_error(cudaDeviceSynchronize(), "add kernel sync failed"); + sync_and_check("add kernel failed"); } GPUArray add(const GPUArray& a, const GPUArray& b) { @@ -195,8 +224,7 @@ void mul(const GPUArray& a, const GPUArray& b, GPUArray& c) { break; } - check_cuda_error(cudaGetLastError(), "mul kernel launch failed"); - check_cuda_error(cudaDeviceSynchronize(), "mul kernel sync failed"); + sync_and_check("mul kernel failed"); } GPUArray mul(const GPUArray& a, const GPUArray& b) { @@ -209,25 +237,34 @@ GPUArray mul(const GPUArray& a, const GPUArray& b) { } // ============================================================================ -// Matmul kernels - Ampere Optimized (SM >= 80) +// Matmul kernels - Tiled with Shared Memory and Double Buffering // ============================================================================ // -// Optimization Strategy for Ampere (RTX 30XX, A100) and newer: -// - L2-friendly memory access patterns (large L2 cache: 6MB on 3090 Ti) -// - Use __ldg() for read-only texture cache path -// - Avoid shared memory tiling (L2 cache handles it better) -// - No __syncthreads() overhead -// - Use __restrict__ for compiler optimization +// Two implementations: +// 1. L2-optimized kernel: For small matrices (<2048), uses __ldg() cache +// 2. Tiled kernel: For large matrices (>=2048), uses shared memory tiling // -// Target Performance: -// - RTX 3090: 2.1-2.3 TFLOPS (FP32 naive) -// - For higher performance, use Tensor Cores (MMA kernels) or cuBLAS +// Tiled kernel features: +// - Configurable TILE_M, TILE_N, TILE_K +// - Double-buffered prefetch to overlap compute with memory loads +// - Bank-conflict-free shared memory access +// - Coalesced global memory access // -// Legacy tiled kernels are REMOVED per optimization directives. // ============================================================================ +// Small matrix kernel block size #define BLOCK_SIZE 16 +// Tiled matmul configuration +#define TILE_M 64 // Output tile height +#define TILE_N 64 // Output tile width +#define TILE_K 16 // Reduction tile depth +#define THREAD_M 4 // Elements per thread in M dimension +#define THREAD_N 4 // Elements per thread in N dimension + +// Threshold for switching to tiled kernel +#define TILED_MATMUL_THRESHOLD 2048 + // L2-optimized matmul kernel for FP32 (Ampere+) // Uses __ldg() for read-only cache and __restrict__ for aliasing hints __global__ void matmul_f32_l2opt_kernel( @@ -275,6 +312,320 @@ __global__ void matmul_f64_l2opt_kernel( } } +// ============================================================================ +// Tiled Matmul with Shared Memory and Double Buffering (FP32) +// ============================================================================ +// +// Each thread block computes a TILE_M x TILE_N output tile. +// Each thread computes THREAD_M x THREAD_N elements. +// Block dimensions: (TILE_N / THREAD_N, TILE_M / THREAD_M) = (16, 16) = 256 threads +// +// Double buffering: +// - While computing with data in shared memory buffer 0 +// - Prefetch next tile into shared memory buffer 1 +// - Swap buffers each iteration +// +__global__ void matmul_f32_tiled_kernel( + const float* __restrict__ A, + const float* __restrict__ B, + float* __restrict__ C, + size_t M, size_t N, size_t K +) { + // Thread and block indices + const int tx = threadIdx.x; // 0..15 (TILE_N / THREAD_N) + const int ty = threadIdx.y; // 0..15 (TILE_M / THREAD_M) + const int bx = blockIdx.x; + const int by = blockIdx.y; + + // Shared memory for double buffering + // Pad by 1 to avoid bank conflicts + __shared__ float As[2][TILE_K][TILE_M + 1]; + __shared__ float Bs[2][TILE_K][TILE_N + 1]; + + // Thread's output tile (THREAD_M x THREAD_N elements) + float accum[THREAD_M][THREAD_N] = {{0.0f}}; + + // Global row/col start for this thread block + const size_t block_row_start = by * TILE_M; + const size_t block_col_start = bx * TILE_N; + + // Linear thread ID within block + const int tid = ty * blockDim.x + tx; + const int num_threads = blockDim.x * blockDim.y; // 256 + + // Number of tiles along K dimension + const int num_k_tiles = (K + TILE_K - 1) / TILE_K; + + // Current buffer index for double buffering + int curr_buf = 0; + + // Prefetch first tile into buffer 0 + { + // Load A tile: TILE_M x TILE_K elements, 256 threads + // Each thread loads multiple elements + const int a_loads_per_thread = (TILE_M * TILE_K + num_threads - 1) / num_threads; + for (int i = 0; i < a_loads_per_thread; ++i) { + int load_idx = tid + i * num_threads; + if (load_idx < TILE_M * TILE_K) { + int a_row = load_idx / TILE_K; + int a_col = load_idx % TILE_K; + size_t global_row = block_row_start + a_row; + size_t global_col = a_col; + if (global_row < M && global_col < K) { + As[0][a_col][a_row] = A[global_row * K + global_col]; + } else { + As[0][a_col][a_row] = 0.0f; + } + } + } + + // Load B tile: TILE_K x TILE_N elements + const int b_loads_per_thread = (TILE_K * TILE_N + num_threads - 1) / num_threads; + for (int i = 0; i < b_loads_per_thread; ++i) { + int load_idx = tid + i * num_threads; + if (load_idx < TILE_K * TILE_N) { + int b_row = load_idx / TILE_N; + int b_col = load_idx % TILE_N; + size_t global_row = b_row; + size_t global_col = block_col_start + b_col; + if (global_row < K && global_col < N) { + Bs[0][b_row][b_col] = B[global_row * N + global_col]; + } else { + Bs[0][b_row][b_col] = 0.0f; + } + } + } + } + __syncthreads(); + + // Main loop over K tiles with double buffering + for (int tile_k = 0; tile_k < num_k_tiles; ++tile_k) { + int next_buf = 1 - curr_buf; + + // Prefetch next tile (if not the last tile) + if (tile_k + 1 < num_k_tiles) { + size_t k_offset = (tile_k + 1) * TILE_K; + + // Load A tile + const int a_loads_per_thread = (TILE_M * TILE_K + num_threads - 1) / num_threads; + for (int i = 0; i < a_loads_per_thread; ++i) { + int load_idx = tid + i * num_threads; + if (load_idx < TILE_M * TILE_K) { + int a_row = load_idx / TILE_K; + int a_col = load_idx % TILE_K; + size_t global_row = block_row_start + a_row; + size_t global_col = k_offset + a_col; + if (global_row < M && global_col < K) { + As[next_buf][a_col][a_row] = A[global_row * K + global_col]; + } else { + As[next_buf][a_col][a_row] = 0.0f; + } + } + } + + // Load B tile + const int b_loads_per_thread = (TILE_K * TILE_N + num_threads - 1) / num_threads; + for (int i = 0; i < b_loads_per_thread; ++i) { + int load_idx = tid + i * num_threads; + if (load_idx < TILE_K * TILE_N) { + int b_row = load_idx / TILE_N; + int b_col = load_idx % TILE_N; + size_t global_row = k_offset + b_row; + size_t global_col = block_col_start + b_col; + if (global_row < K && global_col < N) { + Bs[next_buf][b_row][b_col] = B[global_row * N + global_col]; + } else { + Bs[next_buf][b_row][b_col] = 0.0f; + } + } + } + } + + // Compute using current buffer + // Each thread computes THREAD_M x THREAD_N elements + #pragma unroll + for (int k = 0; k < TILE_K; ++k) { + // Load A fragment for this thread + float a_frag[THREAD_M]; + #pragma unroll + for (int m = 0; m < THREAD_M; ++m) { + a_frag[m] = As[curr_buf][k][ty * THREAD_M + m]; + } + + // Load B fragment and compute + #pragma unroll + for (int n = 0; n < THREAD_N; ++n) { + float b_val = Bs[curr_buf][k][tx * THREAD_N + n]; + #pragma unroll + for (int m = 0; m < THREAD_M; ++m) { + accum[m][n] += a_frag[m] * b_val; + } + } + } + + // Sync before swapping buffers + __syncthreads(); + curr_buf = next_buf; + } + + // Write output + #pragma unroll + for (int m = 0; m < THREAD_M; ++m) { + size_t out_row = block_row_start + ty * THREAD_M + m; + if (out_row < M) { + #pragma unroll + for (int n = 0; n < THREAD_N; ++n) { + size_t out_col = block_col_start + tx * THREAD_N + n; + if (out_col < N) { + C[out_row * N + out_col] = accum[m][n]; + } + } + } + } +} + +// Tiled Matmul for FP64 +__global__ void matmul_f64_tiled_kernel( + const double* __restrict__ A, + const double* __restrict__ B, + double* __restrict__ C, + size_t M, size_t N, size_t K +) { + // Thread and block indices + const int tx = threadIdx.x; + const int ty = threadIdx.y; + const int bx = blockIdx.x; + const int by = blockIdx.y; + + // Shared memory (smaller tiles for FP64 due to memory constraints) + constexpr int TILE_K_F64 = 8; + __shared__ double As[2][TILE_K_F64][TILE_M + 1]; + __shared__ double Bs[2][TILE_K_F64][TILE_N + 1]; + + double accum[THREAD_M][THREAD_N] = {{0.0}}; + + const size_t block_row_start = by * TILE_M; + const size_t block_col_start = bx * TILE_N; + + const int tid = ty * blockDim.x + tx; + const int num_threads = blockDim.x * blockDim.y; + + const int num_k_tiles = (K + TILE_K_F64 - 1) / TILE_K_F64; + + int curr_buf = 0; + + // Prefetch first tile + { + const int a_loads_per_thread = (TILE_M * TILE_K_F64 + num_threads - 1) / num_threads; + for (int i = 0; i < a_loads_per_thread; ++i) { + int load_idx = tid + i * num_threads; + if (load_idx < TILE_M * TILE_K_F64) { + int a_row = load_idx / TILE_K_F64; + int a_col = load_idx % TILE_K_F64; + size_t global_row = block_row_start + a_row; + size_t global_col = a_col; + if (global_row < M && global_col < K) { + As[0][a_col][a_row] = A[global_row * K + global_col]; + } else { + As[0][a_col][a_row] = 0.0; + } + } + } + + const int b_loads_per_thread = (TILE_K_F64 * TILE_N + num_threads - 1) / num_threads; + for (int i = 0; i < b_loads_per_thread; ++i) { + int load_idx = tid + i * num_threads; + if (load_idx < TILE_K_F64 * TILE_N) { + int b_row = load_idx / TILE_N; + int b_col = load_idx % TILE_N; + size_t global_row = b_row; + size_t global_col = block_col_start + b_col; + if (global_row < K && global_col < N) { + Bs[0][b_row][b_col] = B[global_row * N + global_col]; + } else { + Bs[0][b_row][b_col] = 0.0; + } + } + } + } + __syncthreads(); + + for (int tile_k = 0; tile_k < num_k_tiles; ++tile_k) { + int next_buf = 1 - curr_buf; + + if (tile_k + 1 < num_k_tiles) { + size_t k_offset = (tile_k + 1) * TILE_K_F64; + + const int a_loads_per_thread = (TILE_M * TILE_K_F64 + num_threads - 1) / num_threads; + for (int i = 0; i < a_loads_per_thread; ++i) { + int load_idx = tid + i * num_threads; + if (load_idx < TILE_M * TILE_K_F64) { + int a_row = load_idx / TILE_K_F64; + int a_col = load_idx % TILE_K_F64; + size_t global_row = block_row_start + a_row; + size_t global_col = k_offset + a_col; + if (global_row < M && global_col < K) { + As[next_buf][a_col][a_row] = A[global_row * K + global_col]; + } else { + As[next_buf][a_col][a_row] = 0.0; + } + } + } + + const int b_loads_per_thread = (TILE_K_F64 * TILE_N + num_threads - 1) / num_threads; + for (int i = 0; i < b_loads_per_thread; ++i) { + int load_idx = tid + i * num_threads; + if (load_idx < TILE_K_F64 * TILE_N) { + int b_row = load_idx / TILE_N; + int b_col = load_idx % TILE_N; + size_t global_row = k_offset + b_row; + size_t global_col = block_col_start + b_col; + if (global_row < K && global_col < N) { + Bs[next_buf][b_row][b_col] = B[global_row * N + global_col]; + } else { + Bs[next_buf][b_row][b_col] = 0.0; + } + } + } + } + + #pragma unroll + for (int k = 0; k < TILE_K_F64; ++k) { + double a_frag[THREAD_M]; + #pragma unroll + for (int m = 0; m < THREAD_M; ++m) { + a_frag[m] = As[curr_buf][k][ty * THREAD_M + m]; + } + + #pragma unroll + for (int n = 0; n < THREAD_N; ++n) { + double b_val = Bs[curr_buf][k][tx * THREAD_N + n]; + #pragma unroll + for (int m = 0; m < THREAD_M; ++m) { + accum[m][n] += a_frag[m] * b_val; + } + } + } + + __syncthreads(); + curr_buf = next_buf; + } + + #pragma unroll + for (int m = 0; m < THREAD_M; ++m) { + size_t out_row = block_row_start + ty * THREAD_M + m; + if (out_row < M) { + #pragma unroll + for (int n = 0; n < THREAD_N; ++n) { + size_t out_col = block_col_start + tx * THREAD_N + n; + if (out_col < N) { + C[out_row * N + out_col] = accum[m][n]; + } + } + } + } +} + void matmul(const GPUArray& a, const GPUArray& b, GPUArray& c) { validate_matmul_shapes(a, b, "matmul"); validate_same_dtype(a, b, "matmul"); @@ -287,34 +638,66 @@ void matmul(const GPUArray& a, const GPUArray& b, GPUArray& c) { throw std::runtime_error("matmul output shape mismatch"); } - // L2-optimized kernel for Ampere+ (SM >= 80) - dim3 block_size(BLOCK_SIZE, BLOCK_SIZE); - dim3 grid_size( - (N + BLOCK_SIZE - 1) / BLOCK_SIZE, - (M + BLOCK_SIZE - 1) / BLOCK_SIZE - ); - - switch (a.dtype()) { - case DataType::Float32: - matmul_f32_l2opt_kernel<<>>( - static_cast(a.data()), - static_cast(b.data()), - static_cast(c.data()), - M, N, K); - break; - case DataType::Float64: - matmul_f64_l2opt_kernel<<>>( - static_cast(a.data()), - static_cast(b.data()), - static_cast(c.data()), - M, N, K); - break; - default: - throw std::runtime_error("matmul only supports float32 and float64"); + // Select kernel based on matrix size + // Use tiled kernel for large matrices (>= TILED_MATMUL_THRESHOLD) + bool use_tiled = (M >= TILED_MATMUL_THRESHOLD || N >= TILED_MATMUL_THRESHOLD || K >= TILED_MATMUL_THRESHOLD); + + if (use_tiled) { + // Tiled kernel with shared memory and double buffering + // Block size: (TILE_N / THREAD_N, TILE_M / THREAD_M) = (16, 16) + dim3 block_size(TILE_N / THREAD_N, TILE_M / THREAD_M); + dim3 grid_size( + (N + TILE_N - 1) / TILE_N, + (M + TILE_M - 1) / TILE_M + ); + + switch (a.dtype()) { + case DataType::Float32: + matmul_f32_tiled_kernel<<>>( + static_cast(a.data()), + static_cast(b.data()), + static_cast(c.data()), + M, N, K); + break; + case DataType::Float64: + matmul_f64_tiled_kernel<<>>( + static_cast(a.data()), + static_cast(b.data()), + static_cast(c.data()), + M, N, K); + break; + default: + throw std::runtime_error("matmul only supports float32 and float64"); + } + } else { + // L2-optimized kernel for small matrices (Ampere+) + dim3 block_size(BLOCK_SIZE, BLOCK_SIZE); + dim3 grid_size( + (N + BLOCK_SIZE - 1) / BLOCK_SIZE, + (M + BLOCK_SIZE - 1) / BLOCK_SIZE + ); + + switch (a.dtype()) { + case DataType::Float32: + matmul_f32_l2opt_kernel<<>>( + static_cast(a.data()), + static_cast(b.data()), + static_cast(c.data()), + M, N, K); + break; + case DataType::Float64: + matmul_f64_l2opt_kernel<<>>( + static_cast(a.data()), + static_cast(b.data()), + static_cast(c.data()), + M, N, K); + break; + default: + throw std::runtime_error("matmul only supports float32 and float64"); + } } - check_cuda_error(cudaGetLastError(), "matmul kernel launch failed"); - check_cuda_error(cudaDeviceSynchronize(), "matmul kernel sync failed"); + sync_and_check("matmul kernel failed"); } GPUArray matmul(const GPUArray& a, const GPUArray& b) { diff --git a/rust/pygpukit-core/src/dispatch/controller.rs b/rust/pygpukit-core/src/dispatch/controller.rs new file mode 100644 index 0000000..c38a044 --- /dev/null +++ b/rust/pygpukit-core/src/dispatch/controller.rs @@ -0,0 +1,710 @@ +//! Kernel Dispatch Controller implementation +//! +//! Coordinates kernel launches with stream management and scheduler integration. + +use std::collections::{HashMap, VecDeque}; +use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; +use std::time::{SystemTime, UNIX_EPOCH}; +use parking_lot::RwLock; + +/// State of a kernel launch +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum KernelState { + /// Kernel is queued for launch + Queued, + /// Kernel has been launched (async) + Launched, + /// Kernel execution completed + Completed, + /// Kernel launch or execution failed + Failed, + /// Kernel was cancelled before launch + Cancelled, +} + +impl KernelState { + /// Check if this is a terminal state + pub fn is_terminal(&self) -> bool { + matches!(self, KernelState::Completed | KernelState::Failed | KernelState::Cancelled) + } +} + +/// Kernel launch configuration +#[derive(Debug, Clone)] +pub struct LaunchConfig { + /// Grid dimensions (x, y, z) + pub grid: (u32, u32, u32), + /// Block dimensions (x, y, z) + pub block: (u32, u32, u32), + /// Shared memory size in bytes + pub shared_mem: u32, + /// Stream ID for execution + pub stream_id: u32, +} + +impl Default for LaunchConfig { + fn default() -> Self { + Self { + grid: (1, 1, 1), + block: (256, 1, 1), + shared_mem: 0, + stream_id: 0, + } + } +} + +impl LaunchConfig { + /// Create a 1D launch config + pub fn linear(n_elements: usize, block_size: u32) -> Self { + let grid_x = ((n_elements as u32) + block_size - 1) / block_size; + Self { + grid: (grid_x, 1, 1), + block: (block_size, 1, 1), + shared_mem: 0, + stream_id: 0, + } + } + + /// Create a 2D launch config + pub fn grid_2d(grid_x: u32, grid_y: u32, block_x: u32, block_y: u32) -> Self { + Self { + grid: (grid_x, grid_y, 1), + block: (block_x, block_y, 1), + shared_mem: 0, + stream_id: 0, + } + } + + /// Set shared memory size + pub fn with_shared_mem(mut self, bytes: u32) -> Self { + self.shared_mem = bytes; + self + } + + /// Set stream ID + pub fn with_stream(mut self, stream_id: u32) -> Self { + self.stream_id = stream_id; + self + } +} + +/// Request to launch a kernel +#[derive(Debug, Clone)] +pub struct KernelLaunchRequest { + /// Unique request ID + pub id: u64, + /// Kernel function handle (CUfunction as u64) + pub kernel_handle: u64, + /// Launch configuration + pub config: LaunchConfig, + /// Kernel arguments as raw bytes + pub args: Vec, + /// Current state + pub state: KernelState, + /// Associated scheduler task ID (if any) + pub task_id: Option, + /// Priority (higher = more urgent) + pub priority: i32, + /// Timestamp when queued + pub queued_at: f64, + /// Timestamp when launched + pub launched_at: Option, + /// Timestamp when completed + pub completed_at: Option, + /// Error message if failed + pub error: Option, +} + +impl KernelLaunchRequest { + /// Create a new launch request + pub fn new(kernel_handle: u64, config: LaunchConfig) -> Self { + Self { + id: 0, // Will be assigned by dispatcher + kernel_handle, + config, + args: Vec::new(), + state: KernelState::Queued, + task_id: None, + priority: 0, + queued_at: Self::now(), + launched_at: None, + completed_at: None, + error: None, + } + } + + /// Set kernel arguments + pub fn with_args(mut self, args: Vec) -> Self { + self.args = args; + self + } + + /// Link to a scheduler task + pub fn with_task(mut self, task_id: String) -> Self { + self.task_id = Some(task_id); + self + } + + /// Set priority + pub fn with_priority(mut self, priority: i32) -> Self { + self.priority = priority; + self + } + + /// Mark as launched + fn launch(&mut self) { + if self.state == KernelState::Queued { + self.state = KernelState::Launched; + self.launched_at = Some(Self::now()); + } + } + + /// Mark as completed + fn complete(&mut self) { + if self.state == KernelState::Launched { + self.state = KernelState::Completed; + self.completed_at = Some(Self::now()); + } + } + + /// Mark as failed + fn fail(&mut self, error: String) { + self.state = KernelState::Failed; + self.completed_at = Some(Self::now()); + self.error = Some(error); + } + + /// Mark as cancelled + fn cancel(&mut self) { + if !self.state.is_terminal() { + self.state = KernelState::Cancelled; + self.completed_at = Some(Self::now()); + } + } + + /// Get execution duration (launch to complete) + pub fn duration(&self) -> Option { + match (self.launched_at, self.completed_at) { + (Some(start), Some(end)) => Some(end - start), + _ => None, + } + } + + /// Get current Unix timestamp + #[inline] + fn now() -> f64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs_f64()) + .unwrap_or(0.0) + } +} + +/// Dispatch statistics +#[derive(Debug, Clone, Default)] +pub struct DispatchStats { + /// Total kernels queued + pub total_queued: usize, + /// Kernels completed successfully + pub completed_count: usize, + /// Kernels failed + pub failed_count: usize, + /// Kernels currently pending + pub pending_count: usize, + /// Kernels currently in-flight (launched but not completed) + pub in_flight_count: usize, + /// Average kernel execution time (seconds) + pub avg_exec_time: f64, + /// Launches per stream + pub launches_per_stream: HashMap, +} + +/// Internal dispatcher state +struct DispatcherInner { + /// All launch requests by ID + requests: HashMap, + /// Pending queue (FIFO within priority) + pending_queue: VecDeque, + /// In-flight kernels by stream + in_flight: HashMap>, + /// Statistics + total_exec_time: f64, + completed_count: usize, + failed_count: usize, + launches_per_stream: HashMap, +} + +/// Kernel Dispatch Controller +/// +/// Coordinates GPU kernel launches with: +/// - Stream-based execution +/// - Priority ordering +/// - Integration with scheduler tasks +/// +/// # Example +/// +/// ```ignore +/// use pygpukit_core::dispatch::{KernelDispatcher, KernelLaunchRequest, LaunchConfig}; +/// +/// let dispatcher = KernelDispatcher::new(4); // Max 4 in-flight per stream +/// +/// // Queue a kernel launch +/// let config = LaunchConfig::linear(1024, 256); +/// let req = KernelLaunchRequest::new(kernel_handle, config); +/// let req_id = dispatcher.queue(req); +/// +/// // Get kernels ready to launch +/// let ready = dispatcher.get_ready(10); +/// for req in ready { +/// // Launch via C++ backend (cuLaunchKernel) +/// // ... +/// dispatcher.mark_launched(req.id); +/// } +/// +/// // When kernel completes (via cudaStreamSynchronize or event) +/// dispatcher.mark_completed(req_id); +/// ``` +pub struct KernelDispatcher { + /// Next request ID + next_id: AtomicU64, + /// Maximum in-flight kernels per stream + max_in_flight: usize, + /// Internal state + inner: RwLock, +} + +impl KernelDispatcher { + /// Create a new kernel dispatcher + /// + /// # Arguments + /// + /// * `max_in_flight` - Maximum concurrent kernels per stream + pub fn new(max_in_flight: usize) -> Self { + Self { + next_id: AtomicU64::new(1), + max_in_flight, + inner: RwLock::new(DispatcherInner { + requests: HashMap::new(), + pending_queue: VecDeque::new(), + in_flight: HashMap::new(), + total_exec_time: 0.0, + completed_count: 0, + failed_count: 0, + launches_per_stream: HashMap::new(), + }), + } + } + + /// Generate next request ID + fn next_req_id(&self) -> u64 { + self.next_id.fetch_add(1, AtomicOrdering::SeqCst) + } + + /// Queue a kernel launch request + /// + /// Returns the request ID + pub fn queue(&self, mut request: KernelLaunchRequest) -> u64 { + let id = self.next_req_id(); + request.id = id; + + let mut inner = self.inner.write(); + inner.pending_queue.push_back(id); + inner.requests.insert(id, request); + + id + } + + /// Queue a kernel launch with scheduler task binding + pub fn queue_for_task(&self, task_id: String, kernel_handle: u64, config: LaunchConfig, args: Vec) -> u64 { + let request = KernelLaunchRequest::new(kernel_handle, config) + .with_args(args) + .with_task(task_id); + self.queue(request) + } + + /// Get launch requests ready to execute + /// + /// Returns requests that can be launched (stream has capacity) + pub fn get_ready(&self, max_requests: usize) -> Vec { + let inner = self.inner.read(); + let mut ready = Vec::new(); + + // Track how many we're planning to add to each stream + let mut planned_per_stream: HashMap = HashMap::new(); + + for req_id in inner.pending_queue.iter() { + if ready.len() >= max_requests { + break; + } + + if let Some(req) = inner.requests.get(req_id) { + if req.state == KernelState::Queued { + // Check stream capacity (current in-flight + planned) + let stream_id = req.config.stream_id; + let current_in_flight = inner.in_flight + .get(&stream_id) + .map(|v| v.len()) + .unwrap_or(0); + let planned = planned_per_stream.get(&stream_id).copied().unwrap_or(0); + let total = current_in_flight + planned; + + if total < self.max_in_flight { + ready.push(req.clone()); + *planned_per_stream.entry(stream_id).or_insert(0) += 1; + } + } + } + } + + ready + } + + /// Mark a request as launched + /// + /// Call this after successfully calling cuLaunchKernel + pub fn mark_launched(&self, req_id: u64) -> bool { + let mut inner = self.inner.write(); + + // Get stream ID first + let stream_id = inner.requests.get(&req_id).map(|r| r.config.stream_id); + + if let Some(req) = inner.requests.get_mut(&req_id) { + if req.state == KernelState::Queued { + req.launch(); + + // Remove from pending queue + inner.pending_queue.retain(|&id| id != req_id); + + // Add to in-flight for this stream + if let Some(sid) = stream_id { + inner.in_flight + .entry(sid) + .or_insert_with(Vec::new) + .push(req_id); + + // Track launches per stream + *inner.launches_per_stream.entry(sid).or_insert(0) += 1; + } + + return true; + } + } + false + } + + /// Mark a request as completed + /// + /// Call this when kernel execution finishes + pub fn mark_completed(&self, req_id: u64) -> bool { + let mut inner = self.inner.write(); + + // Get stream ID first + let stream_id = inner.requests.get(&req_id).map(|r| r.config.stream_id); + + if let Some(req) = inner.requests.get_mut(&req_id) { + if req.state == KernelState::Launched { + req.complete(); + + // Update stats + if let Some(duration) = req.duration() { + inner.total_exec_time += duration; + } + inner.completed_count += 1; + + // Remove from in-flight + if let Some(sid) = stream_id { + if let Some(v) = inner.in_flight.get_mut(&sid) { + v.retain(|&id| id != req_id); + } + } + + return true; + } + } + false + } + + /// Mark a request as failed + pub fn mark_failed(&self, req_id: u64, error: String) -> bool { + let mut inner = self.inner.write(); + + // Get state and stream ID first + let info = inner.requests.get(&req_id).map(|r| (r.state, r.config.stream_id)); + + if let Some((state, stream_id)) = info { + if let Some(req) = inner.requests.get_mut(&req_id) { + req.fail(error); + inner.failed_count += 1; + + // Remove from appropriate queue + if state == KernelState::Queued { + inner.pending_queue.retain(|&id| id != req_id); + } else if state == KernelState::Launched { + if let Some(v) = inner.in_flight.get_mut(&stream_id) { + v.retain(|&id| id != req_id); + } + } + + return true; + } + } + false + } + + /// Cancel a pending request + pub fn cancel(&self, req_id: u64) -> bool { + let mut inner = self.inner.write(); + + if let Some(req) = inner.requests.get_mut(&req_id) { + if req.state == KernelState::Queued { + req.cancel(); + inner.pending_queue.retain(|&id| id != req_id); + return true; + } + } + false + } + + /// Get a request by ID + pub fn get_request(&self, req_id: u64) -> Option { + self.inner.read().requests.get(&req_id).cloned() + } + + /// Get all in-flight request IDs for a stream + pub fn get_in_flight(&self, stream_id: u32) -> Vec { + self.inner.read() + .in_flight + .get(&stream_id) + .cloned() + .unwrap_or_default() + } + + /// Get requests linked to a scheduler task + pub fn get_requests_for_task(&self, task_id: &str) -> Vec { + self.inner.read() + .requests + .values() + .filter(|r| r.task_id.as_deref() == Some(task_id)) + .cloned() + .collect() + } + + /// Check if there's pending work + pub fn has_pending_work(&self) -> bool { + let inner = self.inner.read(); + !inner.pending_queue.is_empty() || + inner.in_flight.values().any(|v| !v.is_empty()) + } + + /// Get dispatch statistics + pub fn stats(&self) -> DispatchStats { + let inner = self.inner.read(); + + let pending_count = inner.pending_queue.len(); + let in_flight_count: usize = inner.in_flight.values().map(|v| v.len()).sum(); + + let avg_exec = if inner.completed_count > 0 { + inner.total_exec_time / inner.completed_count as f64 + } else { + 0.0 + }; + + DispatchStats { + total_queued: inner.requests.len(), + completed_count: inner.completed_count, + failed_count: inner.failed_count, + pending_count, + in_flight_count, + avg_exec_time: avg_exec, + launches_per_stream: inner.launches_per_stream.clone(), + } + } + + /// Garbage collect completed requests + pub fn gc(&self) { + let mut inner = self.inner.write(); + inner.requests.retain(|_, req| !req.state.is_terminal()); + } + + /// Clear all state + pub fn clear(&self) { + let mut inner = self.inner.write(); + inner.requests.clear(); + inner.pending_queue.clear(); + inner.in_flight.clear(); + inner.total_exec_time = 0.0; + inner.completed_count = 0; + inner.failed_count = 0; + inner.launches_per_stream.clear(); + } +} + +// Thread-safe +unsafe impl Send for KernelDispatcher {} +unsafe impl Sync for KernelDispatcher {} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_dispatcher_creation() { + let dispatcher = KernelDispatcher::new(4); + let stats = dispatcher.stats(); + assert_eq!(stats.total_queued, 0); + assert_eq!(stats.pending_count, 0); + } + + #[test] + fn test_queue_request() { + let dispatcher = KernelDispatcher::new(4); + + let config = LaunchConfig::linear(1024, 256); + let req = KernelLaunchRequest::new(0x1000, config); + let req_id = dispatcher.queue(req); + + assert!(req_id > 0); + let stats = dispatcher.stats(); + assert_eq!(stats.pending_count, 1); + } + + #[test] + fn test_get_ready() { + let dispatcher = KernelDispatcher::new(4); + + let config = LaunchConfig::linear(1024, 256); + let req = KernelLaunchRequest::new(0x1000, config); + dispatcher.queue(req); + + let ready = dispatcher.get_ready(10); + assert_eq!(ready.len(), 1); + } + + #[test] + fn test_launch_lifecycle() { + let dispatcher = KernelDispatcher::new(4); + + let config = LaunchConfig::linear(1024, 256); + let req = KernelLaunchRequest::new(0x1000, config); + let req_id = dispatcher.queue(req); + + // Mark launched + assert!(dispatcher.mark_launched(req_id)); + let req = dispatcher.get_request(req_id).unwrap(); + assert_eq!(req.state, KernelState::Launched); + + // Mark completed + assert!(dispatcher.mark_completed(req_id)); + let req = dispatcher.get_request(req_id).unwrap(); + assert_eq!(req.state, KernelState::Completed); + + let stats = dispatcher.stats(); + assert_eq!(stats.completed_count, 1); + } + + #[test] + fn test_max_in_flight() { + let dispatcher = KernelDispatcher::new(2); + + // Queue 5 requests on same stream + for _ in 0..5 { + let config = LaunchConfig::linear(1024, 256); + let req = KernelLaunchRequest::new(0x1000, config); + dispatcher.queue(req); + } + + // get_ready returns up to 2 (since max_in_flight = 2 and 0 are in-flight) + let ready = dispatcher.get_ready(10); + assert_eq!(ready.len(), 2); + + // Launch both - this moves them from pending to in-flight + for req in &ready { + dispatcher.mark_launched(req.id); + } + + let stats = dispatcher.stats(); + assert_eq!(stats.in_flight_count, 2); + assert_eq!(stats.pending_count, 3); // 5 - 2 launched = 3 pending + + // Now only 0 should be ready (stream is full with 2 in-flight) + let ready = dispatcher.get_ready(10); + assert_eq!(ready.len(), 0); + + // Complete one - frees a slot + let first_id = stats.launches_per_stream.keys().next().copied() + .and_then(|_| dispatcher.get_in_flight(0).first().copied()) + .unwrap(); + dispatcher.mark_completed(first_id); + + // Now one should be ready (1 in-flight, can add 1 more) + let ready = dispatcher.get_ready(10); + assert_eq!(ready.len(), 1); + } + + #[test] + fn test_multiple_streams() { + let dispatcher = KernelDispatcher::new(2); + + // Queue on different streams + for stream_id in 0..3 { + let config = LaunchConfig::linear(1024, 256).with_stream(stream_id); + let req = KernelLaunchRequest::new(0x1000, config); + dispatcher.queue(req); + } + + // All 3 should be ready (different streams) + let ready = dispatcher.get_ready(10); + assert_eq!(ready.len(), 3); + } + + #[test] + fn test_task_binding() { + let dispatcher = KernelDispatcher::new(4); + + let config = LaunchConfig::linear(1024, 256); + let req_id = dispatcher.queue_for_task( + "task-1".to_string(), + 0x1000, + config, + vec![0x2000, 1024], + ); + + let req = dispatcher.get_request(req_id).unwrap(); + assert_eq!(req.task_id, Some("task-1".to_string())); + assert_eq!(req.args, vec![0x2000, 1024]); + + let task_reqs = dispatcher.get_requests_for_task("task-1"); + assert_eq!(task_reqs.len(), 1); + } + + #[test] + fn test_failure() { + let dispatcher = KernelDispatcher::new(4); + + let config = LaunchConfig::linear(1024, 256); + let req = KernelLaunchRequest::new(0x1000, config); + let req_id = dispatcher.queue(req); + + dispatcher.mark_launched(req_id); + dispatcher.mark_failed(req_id, "CUDA error".to_string()); + + let req = dispatcher.get_request(req_id).unwrap(); + assert_eq!(req.state, KernelState::Failed); + assert_eq!(req.error, Some("CUDA error".to_string())); + + let stats = dispatcher.stats(); + assert_eq!(stats.failed_count, 1); + } + + #[test] + fn test_launch_config() { + let config = LaunchConfig::grid_2d(32, 32, 16, 16) + .with_shared_mem(4096) + .with_stream(2); + + assert_eq!(config.grid, (32, 32, 1)); + assert_eq!(config.block, (16, 16, 1)); + assert_eq!(config.shared_mem, 4096); + assert_eq!(config.stream_id, 2); + } +} diff --git a/rust/pygpukit-core/src/dispatch/mod.rs b/rust/pygpukit-core/src/dispatch/mod.rs new file mode 100644 index 0000000..f89171f --- /dev/null +++ b/rust/pygpukit-core/src/dispatch/mod.rs @@ -0,0 +1,13 @@ +//! Kernel Dispatch Controller +//! +//! Provides coordination for GPU kernel launches with: +//! - Per-task stream assignment +//! - Integration with the scheduler tick loop +//! - Kernel execution tracking +//! +//! Note: Actual CUDA Driver API calls (cuLaunchKernel) are handled by C++ backend. +//! This module provides the Rust-side coordination logic. + +mod controller; + +pub use controller::{KernelDispatcher, KernelLaunchRequest, KernelState, DispatchStats, LaunchConfig}; diff --git a/rust/pygpukit-core/src/lib.rs b/rust/pygpukit-core/src/lib.rs index 623c21f..c1f8286 100644 --- a/rust/pygpukit-core/src/lib.rs +++ b/rust/pygpukit-core/src/lib.rs @@ -1,11 +1,17 @@ -//! PyGPUkit Core - Rust implementation of memory pool and scheduler +//! PyGPUkit Core - Rust implementation of memory pool, scheduler, transfer engine, and kernel dispatcher //! //! This crate provides the core data structures and algorithms for: //! - GPU memory pool with LRU eviction //! - Task scheduler with bandwidth pacing +//! - Async memory transfer engine with separate streams +//! - Kernel dispatch controller with stream management pub mod memory; pub mod scheduler; +pub mod transfer; +pub mod dispatch; pub use memory::{MemoryBlock, MemoryPool, PoolStats, MemoryError}; pub use scheduler::{TaskState, TaskPolicy, TaskMeta, Scheduler, SchedulerStats, TaskStats}; +pub use transfer::{TransferType, TransferOp, TransferState, AsyncTransferEngine, StreamType, TransferStats}; +pub use dispatch::{KernelDispatcher, KernelLaunchRequest, KernelState, DispatchStats, LaunchConfig}; diff --git a/rust/pygpukit-core/src/scheduler/core.rs b/rust/pygpukit-core/src/scheduler/core.rs index 2630e7e..c544de9 100644 --- a/rust/pygpukit-core/src/scheduler/core.rs +++ b/rust/pygpukit-core/src/scheduler/core.rs @@ -473,27 +473,34 @@ mod tests { fn test_memory_reservation() { let sched = Scheduler::new(Some(1000), 10.0, 100.0); - // Submit tasks that exceed memory - let task1 = TaskMeta::with_memory("task-1".into(), "T1".into(), 600); - let task2 = TaskMeta::with_memory("task-2".into(), "T2".into(), 600); + // Submit tasks - memory is reserved at submit time + let task1 = TaskMeta::with_memory("task-1".into(), "T1".into(), 400); + let task2 = TaskMeta::with_memory("task-2".into(), "T2".into(), 400); sched.submit(task1); sched.submit(task2); - // Only first should run (not enough memory for second) + // Both tasks should run (total 800 <= 1000) let runnable = sched.get_runnable_tasks(10); - assert_eq!(runnable.len(), 1); + assert_eq!(runnable.len(), 2); + // Memory is reserved at submit time let stats = sched.stats(); - assert_eq!(stats.reserved_memory, 600); - assert_eq!(stats.pending_count, 1); + assert_eq!(stats.reserved_memory, 800); + assert_eq!(stats.running_count, 2); - // Complete first task + // Complete first task - releases memory sched.complete_task("task-1"); - // Now second should run - let runnable = sched.get_runnable_tasks(10); - assert_eq!(runnable.len(), 1); - assert_eq!(runnable[0], "task-2"); + let stats = sched.stats(); + assert_eq!(stats.reserved_memory, 400); + assert_eq!(stats.completed_count, 1); + + // Complete second task + sched.complete_task("task-2"); + + let stats = sched.stats(); + assert_eq!(stats.reserved_memory, 0); + assert_eq!(stats.completed_count, 2); } #[test] diff --git a/rust/pygpukit-core/src/transfer/engine.rs b/rust/pygpukit-core/src/transfer/engine.rs new file mode 100644 index 0000000..b6703d8 --- /dev/null +++ b/rust/pygpukit-core/src/transfer/engine.rs @@ -0,0 +1,648 @@ +//! Async Transfer Engine implementation +//! +//! Manages asynchronous memory transfers with multiple streams. + +use std::collections::{HashMap, BinaryHeap}; +use std::cmp::Ordering; +use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; +use parking_lot::RwLock; + +use crate::transfer::operation::{TransferOp, TransferState, TransferType}; + +/// Stream type identifier +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum StreamType { + /// Default compute stream (stream 0) + Compute, + /// Dedicated memcpy stream for H2D/D2H + MemcpyH2D, + /// Dedicated memcpy stream for D2H (separate to allow overlap) + MemcpyD2H, + /// Custom stream with ID + Custom(u32), +} + +impl StreamType { + /// Convert to stream ID for the C++ backend + pub fn to_id(&self) -> u32 { + match self { + StreamType::Compute => 0, + StreamType::MemcpyH2D => 1, + StreamType::MemcpyD2H => 2, + StreamType::Custom(id) => *id, + } + } +} + +/// Callback type for executing transfers via C++ backend +pub type TransferCallback = Box Result<(), String> + Send + Sync>; + +/// Transfer statistics +#[derive(Debug, Clone, Default)] +pub struct TransferStats { + /// Total transfers queued + pub total_queued: usize, + /// Transfers completed successfully + pub completed_count: usize, + /// Transfers failed + pub failed_count: usize, + /// Total bytes transferred + pub total_bytes: usize, + /// Total H2D bytes + pub h2d_bytes: usize, + /// Total D2H bytes + pub d2h_bytes: usize, + /// Total D2D bytes + pub d2d_bytes: usize, + /// Average H2D bandwidth (GB/s) + pub avg_h2d_bandwidth: f64, + /// Average D2H bandwidth (GB/s) + pub avg_d2h_bandwidth: f64, + /// Pending transfers count + pub pending_count: usize, + /// In-progress transfers count + pub in_progress_count: usize, +} + +/// Priority wrapper for transfer operations +struct PriorityTransfer { + op_id: u64, + priority: i32, + queued_order: u64, // For FIFO within same priority +} + +impl PartialEq for PriorityTransfer { + fn eq(&self, other: &Self) -> bool { + self.priority == other.priority && self.queued_order == other.queued_order + } +} + +impl Eq for PriorityTransfer {} + +impl PartialOrd for PriorityTransfer { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for PriorityTransfer { + fn cmp(&self, other: &Self) -> Ordering { + // Higher priority first, then earlier queued order (lower = earlier) + match self.priority.cmp(&other.priority) { + Ordering::Equal => other.queued_order.cmp(&self.queued_order), // Reverse for FIFO + other => other, + } + } +} + +/// Internal engine state +struct EngineInner { + /// All transfer operations by ID + operations: HashMap, + /// Priority queue for pending H2D transfers + h2d_queue: BinaryHeap, + /// Priority queue for pending D2H transfers + d2h_queue: BinaryHeap, + /// Priority queue for pending D2D transfers + d2d_queue: BinaryHeap, + /// Currently in-progress transfers by stream + in_progress: HashMap>, + /// Statistics tracking + h2d_total_time: f64, + h2d_total_bytes: usize, + d2h_total_time: f64, + d2h_total_bytes: usize, + completed_count: usize, + failed_count: usize, +} + +/// Async Memory Transfer Engine +/// +/// Manages asynchronous memory transfers between host and device with: +/// - Separate streams for H2D and D2H to enable overlap +/// - Priority-based scheduling within each queue +/// - Integration with the scheduler tick loop +/// +/// # Example +/// +/// ```ignore +/// use pygpukit_core::transfer::{AsyncTransferEngine, TransferOp}; +/// +/// let engine = AsyncTransferEngine::new(4); // Max 4 concurrent transfers per stream +/// +/// // Queue an H2D transfer +/// let op_id = engine.enqueue_h2d(host_ptr, device_ptr, size); +/// +/// // In the scheduler tick loop +/// let ready = engine.get_ready_transfers(10); +/// for op in ready { +/// // Execute via C++ backend +/// // ... +/// engine.complete_transfer(op.id); +/// } +/// ``` +pub struct AsyncTransferEngine { + /// Next operation ID + next_id: AtomicU64, + /// Queued order counter for FIFO within priority + queued_order: AtomicU64, + /// Maximum concurrent transfers per stream + max_concurrent: usize, + /// Internal state + inner: RwLock, +} + +impl AsyncTransferEngine { + /// Create a new transfer engine + /// + /// # Arguments + /// + /// * `max_concurrent` - Maximum concurrent transfers per stream + pub fn new(max_concurrent: usize) -> Self { + Self { + next_id: AtomicU64::new(1), + queued_order: AtomicU64::new(0), + max_concurrent, + inner: RwLock::new(EngineInner { + operations: HashMap::new(), + h2d_queue: BinaryHeap::new(), + d2h_queue: BinaryHeap::new(), + d2d_queue: BinaryHeap::new(), + in_progress: HashMap::new(), + h2d_total_time: 0.0, + h2d_total_bytes: 0, + d2h_total_time: 0.0, + d2h_total_bytes: 0, + completed_count: 0, + failed_count: 0, + }), + } + } + + /// Generate next operation ID + fn next_op_id(&self) -> u64 { + self.next_id.fetch_add(1, AtomicOrdering::SeqCst) + } + + /// Generate next queued order + fn next_queued_order(&self) -> u64 { + self.queued_order.fetch_add(1, AtomicOrdering::SeqCst) + } + + /// Enqueue an H2D transfer + /// + /// Returns the operation ID + pub fn enqueue_h2d(&self, host_ptr: u64, device_ptr: u64, size: usize) -> u64 { + let id = self.next_op_id(); + let mut op = TransferOp::h2d(id, host_ptr, device_ptr, size); + op.stream_id = StreamType::MemcpyH2D.to_id(); + + self.enqueue_op(op) + } + + /// Enqueue a D2H transfer + pub fn enqueue_d2h(&self, device_ptr: u64, host_ptr: u64, size: usize) -> u64 { + let id = self.next_op_id(); + let mut op = TransferOp::d2h(id, device_ptr, host_ptr, size); + op.stream_id = StreamType::MemcpyD2H.to_id(); + + self.enqueue_op(op) + } + + /// Enqueue a D2D transfer + pub fn enqueue_d2d(&self, src_ptr: u64, dst_ptr: u64, size: usize) -> u64 { + let id = self.next_op_id(); + let mut op = TransferOp::d2d(id, src_ptr, dst_ptr, size); + op.stream_id = StreamType::Compute.to_id(); // D2D on compute stream + + self.enqueue_op(op) + } + + /// Enqueue a transfer with priority + pub fn enqueue_with_priority( + &self, + transfer_type: TransferType, + src_ptr: u64, + dst_ptr: u64, + size: usize, + priority: i32, + ) -> u64 { + let id = self.next_op_id(); + let mut op = TransferOp::new(id, transfer_type, src_ptr, dst_ptr, size) + .with_priority(priority); + + op.stream_id = match transfer_type { + TransferType::H2D => StreamType::MemcpyH2D.to_id(), + TransferType::D2H => StreamType::MemcpyD2H.to_id(), + TransferType::D2D => StreamType::Compute.to_id(), + }; + + self.enqueue_op(op) + } + + /// Internal: enqueue an operation + fn enqueue_op(&self, op: TransferOp) -> u64 { + let id = op.id; + let priority = op.priority; + let transfer_type = op.transfer_type; + let queued_order = self.next_queued_order(); + + let mut inner = self.inner.write(); + inner.operations.insert(id, op); + + let priority_entry = PriorityTransfer { + op_id: id, + priority, + queued_order, + }; + + match transfer_type { + TransferType::H2D => inner.h2d_queue.push(priority_entry), + TransferType::D2H => inner.d2h_queue.push(priority_entry), + TransferType::D2D => inner.d2d_queue.push(priority_entry), + } + + id + } + + /// Get transfers that are ready to execute + /// + /// Returns up to `max_transfers` operations that can be started + pub fn get_ready_transfers(&self, max_transfers: usize) -> Vec { + let mut inner = self.inner.write(); + let mut ready = Vec::new(); + + // Process each queue type + let queues_and_streams = [ + (TransferType::H2D, StreamType::MemcpyH2D.to_id()), + (TransferType::D2H, StreamType::MemcpyD2H.to_id()), + (TransferType::D2D, StreamType::Compute.to_id()), + ]; + + for (transfer_type, stream_id) in queues_and_streams { + if ready.len() >= max_transfers { + break; + } + + // Get current in-progress count for this stream + let current_count = inner.in_progress.get(&stream_id).map(|v| v.len()).unwrap_or(0); + let available_slots = self.max_concurrent.saturating_sub(current_count); + + if available_slots == 0 { + continue; + } + + let to_get = available_slots.min(max_transfers - ready.len()); + + // Get the queue for this transfer type + let queue = match transfer_type { + TransferType::H2D => &mut inner.h2d_queue, + TransferType::D2H => &mut inner.d2h_queue, + TransferType::D2D => &mut inner.d2d_queue, + }; + + // Collect op IDs to process + let mut ops_to_start = Vec::new(); + while ops_to_start.len() < to_get { + if let Some(entry) = queue.pop() { + ops_to_start.push(entry.op_id); + } else { + break; + } + } + + // Start each operation + for op_id in ops_to_start { + if let Some(op) = inner.operations.get_mut(&op_id) { + if op.state == TransferState::Queued { + op.start(); + ready.push(op.clone()); + } + } + inner.in_progress.entry(stream_id).or_insert_with(Vec::new).push(op_id); + } + } + + ready + } + + /// Mark a transfer as started (called when C++ backend initiates transfer) + pub fn start_transfer(&self, op_id: u64) -> bool { + let mut inner = self.inner.write(); + + // First get the stream_id if operation is in queued state + let stream_id = inner.operations.get(&op_id).and_then(|op| { + if op.state == TransferState::Queued { + Some(op.stream_id) + } else { + None + } + }); + + if let Some(sid) = stream_id { + if let Some(op) = inner.operations.get_mut(&op_id) { + op.start(); + } + inner.in_progress + .entry(sid) + .or_insert_with(Vec::new) + .push(op_id); + return true; + } + false + } + + /// Mark a transfer as completed + pub fn complete_transfer(&self, op_id: u64) -> bool { + let mut inner = self.inner.write(); + + // Get operation info first + let op_info = inner.operations.get(&op_id).and_then(|op| { + if op.state == TransferState::InProgress { + Some((op.stream_id, op.transfer_type, op.size)) + } else { + None + } + }); + + if let Some((stream_id, transfer_type, size)) = op_info { + if let Some(op) = inner.operations.get_mut(&op_id) { + op.complete(); + + // Update stats + if let Some(duration) = op.duration() { + match transfer_type { + TransferType::H2D => { + inner.h2d_total_time += duration; + inner.h2d_total_bytes += size; + } + TransferType::D2H => { + inner.d2h_total_time += duration; + inner.d2h_total_bytes += size; + } + TransferType::D2D => {} + } + } + } + + // Remove from in-progress + if let Some(in_prog) = inner.in_progress.get_mut(&stream_id) { + in_prog.retain(|&id| id != op_id); + } + + inner.completed_count += 1; + return true; + } + false + } + + /// Mark a transfer as failed + pub fn fail_transfer(&self, op_id: u64, error: String) -> bool { + let mut inner = self.inner.write(); + + let stream_id = inner.operations.get(&op_id).map(|op| op.stream_id); + + if let Some(op) = inner.operations.get_mut(&op_id) { + if op.state == TransferState::InProgress { + op.fail(error); + + // Remove from in-progress + if let Some(sid) = stream_id { + if let Some(in_prog) = inner.in_progress.get_mut(&sid) { + in_prog.retain(|&id| id != op_id); + } + } + + inner.failed_count += 1; + return true; + } + } + false + } + + /// Cancel a pending transfer + pub fn cancel_transfer(&self, op_id: u64) -> bool { + let mut inner = self.inner.write(); + + if let Some(op) = inner.operations.get_mut(&op_id) { + if op.state == TransferState::Queued { + op.cancel(); + return true; + } + } + false + } + + /// Get operation by ID + pub fn get_operation(&self, op_id: u64) -> Option { + self.inner.read().operations.get(&op_id).cloned() + } + + /// Get transfer statistics + pub fn stats(&self) -> TransferStats { + let inner = self.inner.read(); + + let pending_count = inner.h2d_queue.len() + inner.d2h_queue.len() + inner.d2d_queue.len(); + let in_progress_count: usize = inner.in_progress.values().map(|v| v.len()).sum(); + + let avg_h2d_bandwidth = if inner.h2d_total_time > 0.0 { + (inner.h2d_total_bytes as f64) / inner.h2d_total_time / 1e9 + } else { + 0.0 + }; + + let avg_d2h_bandwidth = if inner.d2h_total_time > 0.0 { + (inner.d2h_total_bytes as f64) / inner.d2h_total_time / 1e9 + } else { + 0.0 + }; + + TransferStats { + total_queued: inner.operations.len(), + completed_count: inner.completed_count, + failed_count: inner.failed_count, + total_bytes: inner.h2d_total_bytes + inner.d2h_total_bytes, + h2d_bytes: inner.h2d_total_bytes, + d2h_bytes: inner.d2h_total_bytes, + d2d_bytes: 0, // TODO: track D2D separately + avg_h2d_bandwidth, + avg_d2h_bandwidth, + pending_count, + in_progress_count, + } + } + + /// Synchronize a specific stream (wait for all transfers on that stream) + /// + /// Returns the IDs of all completed transfers + pub fn get_in_progress_for_stream(&self, stream_id: u32) -> Vec { + self.inner.read() + .in_progress + .get(&stream_id) + .cloned() + .unwrap_or_default() + } + + /// Check if any transfers are pending or in progress + pub fn has_pending_work(&self) -> bool { + let inner = self.inner.read(); + !inner.h2d_queue.is_empty() + || !inner.d2h_queue.is_empty() + || !inner.d2d_queue.is_empty() + || inner.in_progress.values().any(|v| !v.is_empty()) + } + + /// Clear all completed and cancelled operations to free memory + pub fn gc(&self) { + let mut inner = self.inner.write(); + inner.operations.retain(|_, op| !op.state.is_terminal()); + } + + /// Clear all operations (including pending) + pub fn clear(&self) { + let mut inner = self.inner.write(); + inner.operations.clear(); + inner.h2d_queue.clear(); + inner.d2h_queue.clear(); + inner.d2d_queue.clear(); + inner.in_progress.clear(); + inner.h2d_total_time = 0.0; + inner.h2d_total_bytes = 0; + inner.d2h_total_time = 0.0; + inner.d2h_total_bytes = 0; + inner.completed_count = 0; + inner.failed_count = 0; + } +} + +// Thread-safe +unsafe impl Send for AsyncTransferEngine {} +unsafe impl Sync for AsyncTransferEngine {} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_engine_creation() { + let engine = AsyncTransferEngine::new(4); + let stats = engine.stats(); + assert_eq!(stats.total_queued, 0); + assert_eq!(stats.pending_count, 0); + } + + #[test] + fn test_enqueue_h2d() { + let engine = AsyncTransferEngine::new(4); + + let op_id = engine.enqueue_h2d(0x1000, 0x2000, 1024); + assert!(op_id > 0); + + let op = engine.get_operation(op_id).unwrap(); + assert_eq!(op.transfer_type, TransferType::H2D); + assert_eq!(op.size, 1024); + assert_eq!(op.state, TransferState::Queued); + } + + #[test] + fn test_get_ready_transfers() { + let engine = AsyncTransferEngine::new(4); + + engine.enqueue_h2d(0x1000, 0x2000, 1024); + engine.enqueue_d2h(0x2000, 0x1000, 2048); + + let ready = engine.get_ready_transfers(10); + assert_eq!(ready.len(), 2); + + // Both should be in progress now + let stats = engine.stats(); + assert_eq!(stats.in_progress_count, 2); + assert_eq!(stats.pending_count, 0); + } + + #[test] + fn test_complete_transfer() { + let engine = AsyncTransferEngine::new(4); + + let op_id = engine.enqueue_h2d(0x1000, 0x2000, 1024); + let _ = engine.get_ready_transfers(1); + + assert!(engine.complete_transfer(op_id)); + + let op = engine.get_operation(op_id).unwrap(); + assert_eq!(op.state, TransferState::Completed); + + let stats = engine.stats(); + assert_eq!(stats.completed_count, 1); + assert_eq!(stats.in_progress_count, 0); + } + + #[test] + fn test_max_concurrent() { + let engine = AsyncTransferEngine::new(2); + + // Queue 5 H2D transfers + for i in 0..5 { + engine.enqueue_h2d(0x1000 * (i + 1) as u64, 0x2000 * (i + 1) as u64, 1024); + } + + // Only 2 should be ready (max_concurrent = 2) + let ready = engine.get_ready_transfers(10); + assert_eq!(ready.len(), 2); + + let stats = engine.stats(); + assert_eq!(stats.in_progress_count, 2); + assert_eq!(stats.pending_count, 3); + } + + #[test] + fn test_priority_ordering() { + let engine = AsyncTransferEngine::new(1); + + // Queue with different priorities + let low_id = engine.enqueue_with_priority(TransferType::H2D, 0x1000, 0x2000, 1024, 0); + let high_id = engine.enqueue_with_priority(TransferType::H2D, 0x3000, 0x4000, 1024, 10); + + // Higher priority should come first + let ready = engine.get_ready_transfers(1); + assert_eq!(ready.len(), 1); + assert_eq!(ready[0].id, high_id); + + // Complete it and get next + engine.complete_transfer(high_id); + let ready = engine.get_ready_transfers(1); + assert_eq!(ready[0].id, low_id); + } + + #[test] + fn test_separate_streams() { + let engine = AsyncTransferEngine::new(2); + + // Queue H2D and D2H - they use different streams + engine.enqueue_h2d(0x1000, 0x2000, 1024); + engine.enqueue_h2d(0x3000, 0x4000, 1024); + engine.enqueue_d2h(0x5000, 0x6000, 1024); + engine.enqueue_d2h(0x7000, 0x8000, 1024); + + // Should get 4 transfers (2 per stream, max_concurrent per stream) + let ready = engine.get_ready_transfers(10); + assert_eq!(ready.len(), 4); + } + + #[test] + fn test_fail_transfer() { + let engine = AsyncTransferEngine::new(4); + + let op_id = engine.enqueue_h2d(0x1000, 0x2000, 1024); + let _ = engine.get_ready_transfers(1); + + assert!(engine.fail_transfer(op_id, "CUDA error".into())); + + let op = engine.get_operation(op_id).unwrap(); + assert_eq!(op.state, TransferState::Failed); + assert_eq!(op.error, Some("CUDA error".into())); + + let stats = engine.stats(); + assert_eq!(stats.failed_count, 1); + } +} diff --git a/rust/pygpukit-core/src/transfer/mod.rs b/rust/pygpukit-core/src/transfer/mod.rs new file mode 100644 index 0000000..15f9ec1 --- /dev/null +++ b/rust/pygpukit-core/src/transfer/mod.rs @@ -0,0 +1,16 @@ +//! Async Memory Transfer Engine +//! +//! Provides asynchronous memory transfer operations with: +//! - Separate compute and memcpy streams +//! - H2D, D2H, and D2D transfer types +//! - Stream synchronization model +//! - Integration with the scheduler tick loop +//! +//! Note: This module provides the Rust-side coordination logic. +//! Actual CUDA stream operations are handled by the C++ backend via callbacks. + +mod operation; +mod engine; + +pub use operation::{TransferType, TransferOp, TransferState}; +pub use engine::{AsyncTransferEngine, StreamType, TransferStats, TransferCallback}; diff --git a/rust/pygpukit-core/src/transfer/operation.rs b/rust/pygpukit-core/src/transfer/operation.rs new file mode 100644 index 0000000..c971bed --- /dev/null +++ b/rust/pygpukit-core/src/transfer/operation.rs @@ -0,0 +1,257 @@ +//! Transfer operation definitions +//! +//! Defines the types of memory transfers and their metadata. + +use std::time::{SystemTime, UNIX_EPOCH}; + +/// Type of memory transfer operation +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum TransferType { + /// Host to Device transfer + H2D, + /// Device to Host transfer + D2H, + /// Device to Device transfer + D2D, +} + +impl TransferType { + /// Returns a human-readable name for this transfer type + pub fn name(&self) -> &'static str { + match self { + TransferType::H2D => "H2D", + TransferType::D2H => "D2H", + TransferType::D2D => "D2D", + } + } +} + +/// State of a transfer operation +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum TransferState { + /// Transfer is queued but not started + Queued, + /// Transfer is in progress + InProgress, + /// Transfer completed successfully + Completed, + /// Transfer failed + Failed, + /// Transfer was cancelled + Cancelled, +} + +impl TransferState { + /// Check if this is a terminal state + pub fn is_terminal(&self) -> bool { + matches!(self, TransferState::Completed | TransferState::Failed | TransferState::Cancelled) + } +} + +/// Represents a single memory transfer operation +#[derive(Debug, Clone)] +pub struct TransferOp { + /// Unique operation ID + pub id: u64, + /// Type of transfer + pub transfer_type: TransferType, + /// Source memory address (device pointer or host pointer as u64) + pub src_ptr: u64, + /// Destination memory address + pub dst_ptr: u64, + /// Size of transfer in bytes + pub size: usize, + /// Current state + pub state: TransferState, + /// Stream ID to use (0 = default compute, 1 = memcpy stream) + pub stream_id: u32, + /// Timestamp when operation was queued + pub queued_at: f64, + /// Timestamp when operation started + pub started_at: Option, + /// Timestamp when operation completed + pub completed_at: Option, + /// Priority (higher = more urgent) + pub priority: i32, + /// Error message if failed + pub error: Option, + /// Associated task ID (if linked to a scheduler task) + pub task_id: Option, +} + +impl TransferOp { + /// Create a new transfer operation + pub fn new( + id: u64, + transfer_type: TransferType, + src_ptr: u64, + dst_ptr: u64, + size: usize, + ) -> Self { + Self { + id, + transfer_type, + src_ptr, + dst_ptr, + size, + state: TransferState::Queued, + stream_id: 1, // Default to memcpy stream + queued_at: Self::now(), + started_at: None, + completed_at: None, + priority: 0, + error: None, + task_id: None, + } + } + + /// Create an H2D transfer + pub fn h2d(id: u64, host_ptr: u64, device_ptr: u64, size: usize) -> Self { + Self::new(id, TransferType::H2D, host_ptr, device_ptr, size) + } + + /// Create a D2H transfer + pub fn d2h(id: u64, device_ptr: u64, host_ptr: u64, size: usize) -> Self { + Self::new(id, TransferType::D2H, device_ptr, host_ptr, size) + } + + /// Create a D2D transfer + pub fn d2d(id: u64, src_device_ptr: u64, dst_device_ptr: u64, size: usize) -> Self { + Self::new(id, TransferType::D2D, src_device_ptr, dst_device_ptr, size) + } + + /// Set the priority + pub fn with_priority(mut self, priority: i32) -> Self { + self.priority = priority; + self + } + + /// Set the stream ID + pub fn with_stream(mut self, stream_id: u32) -> Self { + self.stream_id = stream_id; + self + } + + /// Link to a scheduler task + pub fn with_task(mut self, task_id: String) -> Self { + self.task_id = Some(task_id); + self + } + + /// Mark as started + pub fn start(&mut self) { + if self.state == TransferState::Queued { + self.state = TransferState::InProgress; + self.started_at = Some(Self::now()); + } + } + + /// Mark as completed + pub fn complete(&mut self) { + if self.state == TransferState::InProgress { + self.state = TransferState::Completed; + self.completed_at = Some(Self::now()); + } + } + + /// Mark as failed + pub fn fail(&mut self, error: String) { + self.state = TransferState::Failed; + self.completed_at = Some(Self::now()); + self.error = Some(error); + } + + /// Mark as cancelled + pub fn cancel(&mut self) { + if !self.state.is_terminal() { + self.state = TransferState::Cancelled; + self.completed_at = Some(Self::now()); + } + } + + /// Get wait time (time in queue before starting) + pub fn wait_time(&self) -> f64 { + self.started_at + .map(|s| s - self.queued_at) + .unwrap_or_else(|| Self::now() - self.queued_at) + } + + /// Get transfer duration (time from start to completion) + pub fn duration(&self) -> Option { + match (self.started_at, self.completed_at) { + (Some(start), Some(end)) => Some(end - start), + _ => None, + } + } + + /// Get bandwidth in GB/s (if completed) + pub fn bandwidth_gbps(&self) -> Option { + self.duration().map(|d| { + if d > 0.0 { + (self.size as f64) / d / 1e9 + } else { + 0.0 + } + }) + } + + /// Get current Unix timestamp + #[inline] + fn now() -> f64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs_f64()) + .unwrap_or(0.0) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_transfer_op_creation() { + let op = TransferOp::h2d(1, 0x1000, 0x2000, 1024); + assert_eq!(op.id, 1); + assert_eq!(op.transfer_type, TransferType::H2D); + assert_eq!(op.size, 1024); + assert_eq!(op.state, TransferState::Queued); + } + + #[test] + fn test_transfer_lifecycle() { + let mut op = TransferOp::d2h(1, 0x2000, 0x1000, 2048); + + assert_eq!(op.state, TransferState::Queued); + assert!(op.started_at.is_none()); + + op.start(); + assert_eq!(op.state, TransferState::InProgress); + assert!(op.started_at.is_some()); + + op.complete(); + assert_eq!(op.state, TransferState::Completed); + assert!(op.completed_at.is_some()); + assert!(op.duration().is_some()); + } + + #[test] + fn test_transfer_failure() { + let mut op = TransferOp::h2d(1, 0x1000, 0x2000, 1024); + op.start(); + op.fail("CUDA error: out of memory".into()); + + assert_eq!(op.state, TransferState::Failed); + assert_eq!(op.error, Some("CUDA error: out of memory".into())); + } + + #[test] + fn test_priority_and_stream() { + let op = TransferOp::h2d(1, 0x1000, 0x2000, 1024) + .with_priority(10) + .with_stream(2); + + assert_eq!(op.priority, 10); + assert_eq!(op.stream_id, 2); + } +} diff --git a/rust/pygpukit-python/src/dispatch.rs b/rust/pygpukit-python/src/dispatch.rs new file mode 100644 index 0000000..09e5c18 --- /dev/null +++ b/rust/pygpukit-python/src/dispatch.rs @@ -0,0 +1,417 @@ +//! Python bindings for the kernel dispatch controller + +use pyo3::prelude::*; +use std::collections::HashMap; +use pygpukit_core::dispatch::{ + KernelDispatcher, KernelLaunchRequest, KernelState, DispatchStats, LaunchConfig, +}; + +/// Python wrapper for KernelState enum +#[pyclass(name = "KernelState")] +#[derive(Clone)] +pub struct PyKernelState { + inner: KernelState, +} + +#[pymethods] +impl PyKernelState { + #[classattr] + fn Queued() -> Self { + Self { inner: KernelState::Queued } + } + + #[classattr] + fn Launched() -> Self { + Self { inner: KernelState::Launched } + } + + #[classattr] + fn Completed() -> Self { + Self { inner: KernelState::Completed } + } + + #[classattr] + fn Failed() -> Self { + Self { inner: KernelState::Failed } + } + + #[classattr] + fn Cancelled() -> Self { + Self { inner: KernelState::Cancelled } + } + + fn is_terminal(&self) -> bool { + self.inner.is_terminal() + } + + fn __repr__(&self) -> String { + let name = match self.inner { + KernelState::Queued => "Queued", + KernelState::Launched => "Launched", + KernelState::Completed => "Completed", + KernelState::Failed => "Failed", + KernelState::Cancelled => "Cancelled", + }; + format!("KernelState.{}", name) + } +} + +/// Python wrapper for LaunchConfig +#[pyclass(name = "LaunchConfig")] +#[derive(Clone)] +pub struct PyLaunchConfig { + inner: LaunchConfig, +} + +#[pymethods] +impl PyLaunchConfig { + #[new] + #[pyo3(signature = (grid=(1, 1, 1), block=(256, 1, 1), shared_mem=0, stream_id=0))] + fn new( + grid: (u32, u32, u32), + block: (u32, u32, u32), + shared_mem: u32, + stream_id: u32, + ) -> Self { + Self { + inner: LaunchConfig { + grid, + block, + shared_mem, + stream_id, + }, + } + } + + /// Create a 1D linear launch config + #[staticmethod] + #[pyo3(signature = (n_elements, block_size=256))] + fn linear(n_elements: usize, block_size: u32) -> Self { + Self { + inner: LaunchConfig::linear(n_elements, block_size), + } + } + + /// Create a 2D grid launch config + #[staticmethod] + fn grid_2d(grid_x: u32, grid_y: u32, block_x: u32, block_y: u32) -> Self { + Self { + inner: LaunchConfig::grid_2d(grid_x, grid_y, block_x, block_y), + } + } + + #[getter] + fn grid(&self) -> (u32, u32, u32) { + self.inner.grid + } + + #[getter] + fn block(&self) -> (u32, u32, u32) { + self.inner.block + } + + #[getter] + fn shared_mem(&self) -> u32 { + self.inner.shared_mem + } + + #[setter] + fn set_shared_mem(&mut self, bytes: u32) { + self.inner.shared_mem = bytes; + } + + #[getter] + fn stream_id(&self) -> u32 { + self.inner.stream_id + } + + #[setter] + fn set_stream_id(&mut self, stream_id: u32) { + self.inner.stream_id = stream_id; + } + + fn __repr__(&self) -> String { + format!( + "LaunchConfig(grid={:?}, block={:?}, shared_mem={}, stream_id={})", + self.inner.grid, self.inner.block, self.inner.shared_mem, self.inner.stream_id + ) + } +} + +/// Python wrapper for KernelLaunchRequest +#[pyclass(name = "KernelLaunchRequest")] +#[derive(Clone)] +pub struct PyKernelLaunchRequest { + inner: KernelLaunchRequest, +} + +#[pymethods] +impl PyKernelLaunchRequest { + #[getter] + fn id(&self) -> u64 { + self.inner.id + } + + #[getter] + fn kernel_handle(&self) -> u64 { + self.inner.kernel_handle + } + + #[getter] + fn config(&self) -> PyLaunchConfig { + PyLaunchConfig { inner: self.inner.config.clone() } + } + + #[getter] + fn args(&self) -> Vec { + self.inner.args.clone() + } + + #[getter] + fn state(&self) -> PyKernelState { + PyKernelState { inner: self.inner.state } + } + + #[getter] + fn task_id(&self) -> Option { + self.inner.task_id.clone() + } + + #[getter] + fn priority(&self) -> i32 { + self.inner.priority + } + + #[getter] + fn queued_at(&self) -> f64 { + self.inner.queued_at + } + + #[getter] + fn launched_at(&self) -> Option { + self.inner.launched_at + } + + #[getter] + fn completed_at(&self) -> Option { + self.inner.completed_at + } + + #[getter] + fn error(&self) -> Option { + self.inner.error.clone() + } + + fn duration(&self) -> Option { + self.inner.duration() + } + + fn __repr__(&self) -> String { + format!( + "KernelLaunchRequest(id={}, state={:?}, kernel=0x{:x})", + self.inner.id, self.inner.state, self.inner.kernel_handle + ) + } +} + +/// Python wrapper for DispatchStats +#[pyclass(name = "DispatchStats")] +#[derive(Clone)] +pub struct PyDispatchStats { + inner: DispatchStats, +} + +#[pymethods] +impl PyDispatchStats { + #[getter] + fn total_queued(&self) -> usize { + self.inner.total_queued + } + + #[getter] + fn completed_count(&self) -> usize { + self.inner.completed_count + } + + #[getter] + fn failed_count(&self) -> usize { + self.inner.failed_count + } + + #[getter] + fn pending_count(&self) -> usize { + self.inner.pending_count + } + + #[getter] + fn in_flight_count(&self) -> usize { + self.inner.in_flight_count + } + + #[getter] + fn avg_exec_time(&self) -> f64 { + self.inner.avg_exec_time + } + + #[getter] + fn launches_per_stream(&self) -> HashMap { + self.inner.launches_per_stream.clone() + } + + fn __repr__(&self) -> String { + format!( + "DispatchStats(completed={}, pending={}, in_flight={}, avg_exec={:.4}s)", + self.inner.completed_count, + self.inner.pending_count, + self.inner.in_flight_count, + self.inner.avg_exec_time, + ) + } +} + +/// Kernel Dispatch Controller +/// +/// Coordinates GPU kernel launches with stream management +/// and scheduler integration. +#[pyclass(name = "KernelDispatcher")] +pub struct PyKernelDispatcher { + inner: KernelDispatcher, +} + +#[pymethods] +impl PyKernelDispatcher { + /// Create a new kernel dispatcher + /// + /// Args: + /// max_in_flight: Maximum concurrent kernels per stream (default: 4) + #[new] + #[pyo3(signature = (max_in_flight=4))] + fn new(max_in_flight: usize) -> Self { + Self { + inner: KernelDispatcher::new(max_in_flight), + } + } + + /// Queue a kernel launch + /// + /// Args: + /// kernel_handle: CUfunction handle as int + /// config: LaunchConfig + /// args: Kernel arguments as list of int (pointers/values) + /// task_id: Optional scheduler task ID + /// priority: Priority (default: 0) + /// + /// Returns: + /// Request ID + #[pyo3(signature = (kernel_handle, config, args=None, task_id=None, priority=0))] + fn queue( + &self, + kernel_handle: u64, + config: PyLaunchConfig, + args: Option>, + task_id: Option, + priority: i32, + ) -> u64 { + let mut request = KernelLaunchRequest::new(kernel_handle, config.inner) + .with_priority(priority); + + if let Some(a) = args { + request = request.with_args(a); + } + + if let Some(tid) = task_id { + request = request.with_task(tid); + } + + self.inner.queue(request) + } + + /// Queue a kernel for a scheduler task + fn queue_for_task( + &self, + task_id: String, + kernel_handle: u64, + config: PyLaunchConfig, + args: Vec, + ) -> u64 { + self.inner.queue_for_task(task_id, kernel_handle, config.inner, args) + } + + /// Get launch requests ready to execute + fn get_ready(&self, max_requests: usize) -> Vec { + self.inner + .get_ready(max_requests) + .into_iter() + .map(|r| PyKernelLaunchRequest { inner: r }) + .collect() + } + + /// Mark a request as launched + fn mark_launched(&self, req_id: u64) -> bool { + self.inner.mark_launched(req_id) + } + + /// Mark a request as completed + fn mark_completed(&self, req_id: u64) -> bool { + self.inner.mark_completed(req_id) + } + + /// Mark a request as failed + fn mark_failed(&self, req_id: u64, error: String) -> bool { + self.inner.mark_failed(req_id, error) + } + + /// Cancel a pending request + fn cancel(&self, req_id: u64) -> bool { + self.inner.cancel(req_id) + } + + /// Get a request by ID + fn get_request(&self, req_id: u64) -> Option { + self.inner.get_request(req_id).map(|r| PyKernelLaunchRequest { inner: r }) + } + + /// Get in-flight request IDs for a stream + fn get_in_flight(&self, stream_id: u32) -> Vec { + self.inner.get_in_flight(stream_id) + } + + /// Get requests linked to a scheduler task + fn get_requests_for_task(&self, task_id: &str) -> Vec { + self.inner + .get_requests_for_task(task_id) + .into_iter() + .map(|r| PyKernelLaunchRequest { inner: r }) + .collect() + } + + /// Check if there's pending work + fn has_pending_work(&self) -> bool { + self.inner.has_pending_work() + } + + /// Get dispatch statistics + fn stats(&self) -> PyDispatchStats { + PyDispatchStats { inner: self.inner.stats() } + } + + /// Garbage collect completed requests + fn gc(&self) { + self.inner.gc() + } + + /// Clear all state + fn clear(&self) { + self.inner.clear() + } +} + +/// Register dispatch module +pub fn register(m: &Bound<'_, PyModule>) -> PyResult<()> { + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + Ok(()) +} diff --git a/rust/pygpukit-python/src/lib.rs b/rust/pygpukit-python/src/lib.rs index 6a123b5..58e3e2e 100644 --- a/rust/pygpukit-python/src/lib.rs +++ b/rust/pygpukit-python/src/lib.rs @@ -1,11 +1,13 @@ //! PyGPUkit Rust Python bindings //! -//! Provides PyO3 bindings for the Rust memory pool and scheduler. +//! Provides PyO3 bindings for the Rust memory pool, scheduler, transfer engine, and kernel dispatcher. use pyo3::prelude::*; mod memory; mod scheduler; +mod transfer; +mod dispatch; /// PyGPUkit Rust module #[pymodule] @@ -20,6 +22,16 @@ fn _pygpukit_rust(m: &Bound<'_, PyModule>) -> PyResult<()> { scheduler::register(&scheduler_module)?; m.add_submodule(&scheduler_module)?; + // Transfer submodule + let transfer_module = PyModule::new(m.py(), "transfer")?; + transfer::register(&transfer_module)?; + m.add_submodule(&transfer_module)?; + + // Dispatch submodule + let dispatch_module = PyModule::new(m.py(), "dispatch")?; + dispatch::register(&dispatch_module)?; + m.add_submodule(&dispatch_module)?; + // Also export at top level for convenience m.add_class::()?; m.add_class::()?; @@ -28,6 +40,12 @@ fn _pygpukit_rust(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/rust/pygpukit-python/src/transfer.rs b/rust/pygpukit-python/src/transfer.rs new file mode 100644 index 0000000..dd78521 --- /dev/null +++ b/rust/pygpukit-python/src/transfer.rs @@ -0,0 +1,460 @@ +//! Python bindings for the async transfer engine + +use pyo3::prelude::*; +use pygpukit_core::transfer::{ + AsyncTransferEngine, TransferOp, TransferState, TransferStats, TransferType, StreamType, +}; + +/// Python wrapper for TransferType enum +#[pyclass(name = "TransferType")] +#[derive(Clone)] +pub struct PyTransferType { + inner: TransferType, +} + +#[pymethods] +impl PyTransferType { + /// Host to Device transfer + #[classattr] + fn H2D() -> Self { + Self { inner: TransferType::H2D } + } + + /// Device to Host transfer + #[classattr] + fn D2H() -> Self { + Self { inner: TransferType::D2H } + } + + /// Device to Device transfer + #[classattr] + fn D2D() -> Self { + Self { inner: TransferType::D2D } + } + + fn __repr__(&self) -> String { + format!("TransferType.{}", self.inner.name()) + } + + fn __str__(&self) -> String { + self.inner.name().to_string() + } +} + +/// Python wrapper for TransferState enum +#[pyclass(name = "TransferState")] +#[derive(Clone)] +pub struct PyTransferState { + inner: TransferState, +} + +#[pymethods] +impl PyTransferState { + #[classattr] + fn Queued() -> Self { + Self { inner: TransferState::Queued } + } + + #[classattr] + fn InProgress() -> Self { + Self { inner: TransferState::InProgress } + } + + #[classattr] + fn Completed() -> Self { + Self { inner: TransferState::Completed } + } + + #[classattr] + fn Failed() -> Self { + Self { inner: TransferState::Failed } + } + + #[classattr] + fn Cancelled() -> Self { + Self { inner: TransferState::Cancelled } + } + + /// Check if this is a terminal state + fn is_terminal(&self) -> bool { + self.inner.is_terminal() + } + + fn __repr__(&self) -> String { + let name = match self.inner { + TransferState::Queued => "Queued", + TransferState::InProgress => "InProgress", + TransferState::Completed => "Completed", + TransferState::Failed => "Failed", + TransferState::Cancelled => "Cancelled", + }; + format!("TransferState.{}", name) + } +} + +/// Python wrapper for StreamType enum +#[pyclass(name = "StreamType")] +#[derive(Clone)] +pub struct PyStreamType { + inner: StreamType, +} + +#[pymethods] +impl PyStreamType { + #[classattr] + fn Compute() -> Self { + Self { inner: StreamType::Compute } + } + + #[classattr] + fn MemcpyH2D() -> Self { + Self { inner: StreamType::MemcpyH2D } + } + + #[classattr] + fn MemcpyD2H() -> Self { + Self { inner: StreamType::MemcpyD2H } + } + + #[staticmethod] + fn Custom(id: u32) -> Self { + Self { inner: StreamType::Custom(id) } + } + + /// Get stream ID + fn to_id(&self) -> u32 { + self.inner.to_id() + } +} + +/// Python wrapper for TransferOp +#[pyclass(name = "TransferOp")] +#[derive(Clone)] +pub struct PyTransferOp { + inner: TransferOp, +} + +#[pymethods] +impl PyTransferOp { + /// Operation ID + #[getter] + fn id(&self) -> u64 { + self.inner.id + } + + /// Transfer type + #[getter] + fn transfer_type(&self) -> PyTransferType { + PyTransferType { inner: self.inner.transfer_type } + } + + /// Source pointer + #[getter] + fn src_ptr(&self) -> u64 { + self.inner.src_ptr + } + + /// Destination pointer + #[getter] + fn dst_ptr(&self) -> u64 { + self.inner.dst_ptr + } + + /// Size in bytes + #[getter] + fn size(&self) -> usize { + self.inner.size + } + + /// Current state + #[getter] + fn state(&self) -> PyTransferState { + PyTransferState { inner: self.inner.state } + } + + /// Stream ID + #[getter] + fn stream_id(&self) -> u32 { + self.inner.stream_id + } + + /// Timestamp when queued + #[getter] + fn queued_at(&self) -> f64 { + self.inner.queued_at + } + + /// Timestamp when started + #[getter] + fn started_at(&self) -> Option { + self.inner.started_at + } + + /// Timestamp when completed + #[getter] + fn completed_at(&self) -> Option { + self.inner.completed_at + } + + /// Priority + #[getter] + fn priority(&self) -> i32 { + self.inner.priority + } + + /// Error message if failed + #[getter] + fn error(&self) -> Option { + self.inner.error.clone() + } + + /// Associated task ID + #[getter] + fn task_id(&self) -> Option { + self.inner.task_id.clone() + } + + /// Get wait time (time in queue) + fn wait_time(&self) -> f64 { + self.inner.wait_time() + } + + /// Get transfer duration + fn duration(&self) -> Option { + self.inner.duration() + } + + /// Get bandwidth in GB/s + fn bandwidth_gbps(&self) -> Option { + self.inner.bandwidth_gbps() + } + + fn __repr__(&self) -> String { + format!( + "TransferOp(id={}, type={}, size={}, state={:?})", + self.inner.id, self.inner.transfer_type.name(), self.inner.size, self.inner.state + ) + } +} + +/// Python wrapper for TransferStats +#[pyclass(name = "TransferStats")] +#[derive(Clone)] +pub struct PyTransferStats { + inner: TransferStats, +} + +#[pymethods] +impl PyTransferStats { + #[getter] + fn total_queued(&self) -> usize { + self.inner.total_queued + } + + #[getter] + fn completed_count(&self) -> usize { + self.inner.completed_count + } + + #[getter] + fn failed_count(&self) -> usize { + self.inner.failed_count + } + + #[getter] + fn total_bytes(&self) -> usize { + self.inner.total_bytes + } + + #[getter] + fn h2d_bytes(&self) -> usize { + self.inner.h2d_bytes + } + + #[getter] + fn d2h_bytes(&self) -> usize { + self.inner.d2h_bytes + } + + #[getter] + fn avg_h2d_bandwidth(&self) -> f64 { + self.inner.avg_h2d_bandwidth + } + + #[getter] + fn avg_d2h_bandwidth(&self) -> f64 { + self.inner.avg_d2h_bandwidth + } + + #[getter] + fn pending_count(&self) -> usize { + self.inner.pending_count + } + + #[getter] + fn in_progress_count(&self) -> usize { + self.inner.in_progress_count + } + + fn __repr__(&self) -> String { + format!( + "TransferStats(completed={}, pending={}, in_progress={}, h2d_bw={:.2} GB/s, d2h_bw={:.2} GB/s)", + self.inner.completed_count, + self.inner.pending_count, + self.inner.in_progress_count, + self.inner.avg_h2d_bandwidth, + self.inner.avg_d2h_bandwidth, + ) + } +} + +/// Async Memory Transfer Engine +/// +/// Manages asynchronous memory transfers between host and device with +/// separate streams for H2D and D2H to enable overlap. +#[pyclass(name = "AsyncTransferEngine")] +pub struct PyAsyncTransferEngine { + inner: AsyncTransferEngine, +} + +#[pymethods] +impl PyAsyncTransferEngine { + /// Create a new transfer engine + /// + /// Args: + /// max_concurrent: Maximum concurrent transfers per stream (default: 4) + #[new] + #[pyo3(signature = (max_concurrent=4))] + fn new(max_concurrent: usize) -> Self { + Self { + inner: AsyncTransferEngine::new(max_concurrent), + } + } + + /// Enqueue an H2D transfer + /// + /// Args: + /// host_ptr: Host memory address (as int) + /// device_ptr: Device memory address (as int) + /// size: Size in bytes + /// + /// Returns: + /// Operation ID + fn enqueue_h2d(&self, host_ptr: u64, device_ptr: u64, size: usize) -> u64 { + self.inner.enqueue_h2d(host_ptr, device_ptr, size) + } + + /// Enqueue a D2H transfer + fn enqueue_d2h(&self, device_ptr: u64, host_ptr: u64, size: usize) -> u64 { + self.inner.enqueue_d2h(device_ptr, host_ptr, size) + } + + /// Enqueue a D2D transfer + fn enqueue_d2d(&self, src_ptr: u64, dst_ptr: u64, size: usize) -> u64 { + self.inner.enqueue_d2d(src_ptr, dst_ptr, size) + } + + /// Enqueue with priority + /// + /// Args: + /// transfer_type: "h2d", "d2h", or "d2d" + /// src_ptr: Source address + /// dst_ptr: Destination address + /// size: Size in bytes + /// priority: Priority (higher = more urgent) + fn enqueue_with_priority( + &self, + transfer_type: &str, + src_ptr: u64, + dst_ptr: u64, + size: usize, + priority: i32, + ) -> PyResult { + let t_type = match transfer_type.to_lowercase().as_str() { + "h2d" => TransferType::H2D, + "d2h" => TransferType::D2H, + "d2d" => TransferType::D2D, + _ => return Err(pyo3::exceptions::PyValueError::new_err( + "Invalid transfer type. Use 'h2d', 'd2h', or 'd2d'" + )), + }; + + Ok(self.inner.enqueue_with_priority(t_type, src_ptr, dst_ptr, size, priority)) + } + + /// Get transfers ready to execute + /// + /// Args: + /// max_transfers: Maximum number of transfers to return + /// + /// Returns: + /// List of TransferOp objects ready to execute + fn get_ready_transfers(&self, max_transfers: usize) -> Vec { + self.inner + .get_ready_transfers(max_transfers) + .into_iter() + .map(|op| PyTransferOp { inner: op }) + .collect() + } + + /// Mark a transfer as started + fn start_transfer(&self, op_id: u64) -> bool { + self.inner.start_transfer(op_id) + } + + /// Mark a transfer as completed + fn complete_transfer(&self, op_id: u64) -> bool { + self.inner.complete_transfer(op_id) + } + + /// Mark a transfer as failed + fn fail_transfer(&self, op_id: u64, error: String) -> bool { + self.inner.fail_transfer(op_id, error) + } + + /// Cancel a pending transfer + fn cancel_transfer(&self, op_id: u64) -> bool { + self.inner.cancel_transfer(op_id) + } + + /// Get operation by ID + fn get_operation(&self, op_id: u64) -> Option { + self.inner.get_operation(op_id).map(|op| PyTransferOp { inner: op }) + } + + /// Get transfer statistics + fn stats(&self) -> PyTransferStats { + PyTransferStats { inner: self.inner.stats() } + } + + /// Get in-progress transfer IDs for a stream + fn get_in_progress_for_stream(&self, stream_id: u32) -> Vec { + self.inner.get_in_progress_for_stream(stream_id) + } + + /// Check if there's pending work + fn has_pending_work(&self) -> bool { + self.inner.has_pending_work() + } + + /// Garbage collect completed operations + fn gc(&self) { + self.inner.gc() + } + + /// Clear all operations + fn clear(&self) { + self.inner.clear() + } +} + +/// Register transfer module +pub fn register(m: &Bound<'_, PyModule>) -> PyResult<()> { + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + Ok(()) +}