diff --git a/README.md b/README.md index 7159e4b..34dcfe0 100644 --- a/README.md +++ b/README.md @@ -22,18 +22,48 @@ The `SmartDataProcessor` is designed to process a queue of items in parallel, while automatically adjusting the level of concurrency to stay within a specified CPU usage limit. -Usage example: +### Features +- **Dynamic Concurrency**: Automatically adjusts the number of worker threads based on real-time CPU load. +- **CPU Throttling**: Ensures that CPU usage does not exceed a configurable maximum limit. +- **Backpressure**: The `EnqueueOrWait` method blocks when the queue is full or the CPU is saturated, preventing memory overload. +- **Lazy Initialization**: The processing thread pool is only created when the first item is enqueued. +- **Configurable**: Fine-tune performance with the `SmartDataProcessorSettings` class. +- **Event-driven**: Subscribe to events for CPU usage changes and exceptions. +- **Runtime Control**: Pause and resume the processor on the fly. + +### Usage Example + +You can now configure the processor using the `SmartDataProcessorSettings` class: ```csharp -using var processor = new SmartDataProcessor(maxCpuUsage: 75); +var settings = new SmartDataProcessorSettings +{ + MaxCpuUsage = 80, // Target 80% CPU usage + MaxDegreeOfParallelism = 4, // Use a maximum of 4 threads + QueueBufferMultiplier = 8 // Set a larger queue buffer +}; + +using var processor = new SmartDataProcessor(settings); -for (...) +// Subscribe to events +processor.OnCpuUsageChange += (cpuLoad) => Console.WriteLine($"CPU Load: {cpuLoad:F1}%"); +processor.OnException += (ex) => Console.WriteLine($"An error occurred: {ex.Message}"); + +// Enqueue items +for (int i = 0; i < 100; i++) { - processor.EnqueueOrWait(dtIn, data => + processor.EnqueueOrWait(i, data => { - ... + // Your processing logic here... }); } + +// Pause and resume processing +processor.Pause(); +Thread.Sleep(5000); +processor.Resume(); + +processor.WaitForAllAsync().Wait(); ``` ![Alt text for your image](https://raw.githubusercontent.com/cretucosmin3/SimpliSharp/refs/heads/main/assets/75-cpu-usage.png) diff --git a/samples/SimpliSharp.Demo/Program.cs b/samples/SimpliSharp.Demo/Program.cs index 331180f..c6607e2 100644 --- a/samples/SimpliSharp.Demo/Program.cs +++ b/samples/SimpliSharp.Demo/Program.cs @@ -32,14 +32,24 @@ return; } +Console.WriteLine(); +Console.WriteLine("Press any key to exit..."); +Console.ReadKey(); + static void SmartDataProcessor_Example() { Console.Clear(); Console.WriteLine("Starting data processing..."); - using var processor = new SmartDataProcessor(maxCpuUsage: 90); + var settings = new SmartDataProcessorSettings + { + MaxDegreeOfParallelism = 1 + }; + + using var processor = new SmartDataProcessor(settings); + var stopwatch = System.Diagnostics.Stopwatch.StartNew(); - var tasksCount = 2000; + var tasksCount = 200; for (int i = 0; i < tasksCount; i++) { @@ -50,7 +60,7 @@ static void SmartDataProcessor_Example() int simMax = Random.Shared.Next(5_000_000, 20_000_000); double sum = 0; - for (int j = 0; j < 10_000_000; j++) + for (int j = 0; j < simMax; j++) { double value = Math.Sqrt(j) * Math.Sin(j % 360) + Math.Log(j + 1); if (value > 1000) diff --git a/src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessor.cs b/src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessor.cs index 252a0f9..8c5dd0e 100644 --- a/src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessor.cs +++ b/src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessor.cs @@ -1,7 +1,10 @@ + using System; using System.Collections.Concurrent; using System.Diagnostics; using System.Linq; +using System.Threading; +using System.Threading.Tasks; namespace SimpliSharp.Utilities.Process; @@ -29,67 +32,96 @@ public class SmartDataProcessor : IDisposable /// private const double SmoothingFactor = 0.3; - /// - /// A multiplier to determine the queue size limit based on the current number of workers. - /// This creates back-pressure to prevent the queue from growing too quickly. - /// - private const int QueueBufferMultiplier = 2; - // --- State --- + private readonly SmartDataProcessorSettings _settings; private readonly double _maxCpuUsage; private readonly ConcurrentQueue<(T Data, Action Action)> _jobs = new(); private readonly ConcurrentDictionary _runningTasks = new(); - private readonly Thread _managerThread; private readonly CancellationTokenSource _cts = new(); private readonly ICpuMonitor _cpuMonitor; - internal readonly ManualResetEvent ManagerLoopCycle = new(false); + private object _managerLock = new(); + private Task _managerTask; private double _smoothedCpu = 0; private int _targetConcurrency = 1; private double _lastAverageDuration; public ProcessingMetrics Metrics { get; } = new(); + public bool IsPaused { get; private set; } - public SmartDataProcessor(double maxCpuUsage = 100) + // --- Events --- + public event Action OnException; + public event Action OnCpuUsageChange; + + /// + /// Creates a new SmartDataProcessor with default settings. + /// + public SmartDataProcessor() : this(new SmartDataProcessorSettings()) { - _maxCpuUsage = Math.Max(maxCpuUsage - CpuHeadroomBuffer, CpuHeadroomBuffer); + } - if (OperatingSystem.IsWindows()) - { - _cpuMonitor = new WindowsCpuMonitor(); - } - else if (OperatingSystem.IsLinux()) - { - _cpuMonitor = new LinuxCpuMonitor(); - } - else if (OperatingSystem.IsMacOS()) + /// + /// Creates a new SmartDataProcessor with the specified maximum CPU usage. + /// + /// The maximum CPU usage percentage (0-100) to target. + public SmartDataProcessor(double maxCpuUsage) : this(new SmartDataProcessorSettings { MaxCpuUsage = maxCpuUsage }) + { + } + + /// + /// Creates a new SmartDataProcessor with the specified settings. + /// + /// The settings to use for this processor. + public SmartDataProcessor(SmartDataProcessorSettings settings) + { + _settings = settings; + _maxCpuUsage = Math.Max(_settings.MaxCpuUsage - CpuHeadroomBuffer, CpuHeadroomBuffer); + + bool useCpuMonitoring = _settings.MaxCpuUsage < 100; + if (useCpuMonitoring) { - _cpuMonitor = new MacCpuMonitor(); + if (OperatingSystem.IsWindows()) _cpuMonitor = new WindowsCpuMonitor(); + else if (OperatingSystem.IsLinux()) _cpuMonitor = new LinuxCpuMonitor(); + else if (OperatingSystem.IsMacOS()) _cpuMonitor = new MacCpuMonitor(); + else _cpuMonitor = new NullCpuMonitor(); } else { _cpuMonitor = new NullCpuMonitor(); } - - _managerThread = new Thread(ManagerLoop) { IsBackground = true }; - _managerThread.Start(); } - internal SmartDataProcessor(double maxCpuUsage, ICpuMonitor cpuMonitor) + internal SmartDataProcessor(SmartDataProcessorSettings settings, ICpuMonitor cpuMonitor) { - _maxCpuUsage = maxCpuUsage; + _settings = settings; + _maxCpuUsage = Math.Max(_settings.MaxCpuUsage - CpuHeadroomBuffer, CpuHeadroomBuffer); _cpuMonitor = cpuMonitor; - - _managerThread = new Thread(ManagerLoop) { IsBackground = true }; - _managerThread.Start(); } + /// + /// Pauses the processing of new items. + /// + public void Pause() => IsPaused = true; + + /// + /// Resumes the processing of new items. + /// + public void Resume() => IsPaused = false; + + /// + /// Enqueues a data item for processing. If the CPU is saturated or the queue is overloaded, + /// this method will block until it is safe to enqueue the item. + /// + /// + /// public void EnqueueOrWait(T data, Action action) { + LazyInitializer.EnsureInitialized(ref _managerTask, ref _managerLock, () => Task.Run(ManagerLoopAsync)); + while (true) { bool isCpuSaturated = _smoothedCpu > _maxCpuUsage && _cpuMonitor is not NullCpuMonitor; - bool isQueueOverloaded = _jobs.Count > _targetConcurrency * QueueBufferMultiplier; + bool isQueueOverloaded = _jobs.Count > _targetConcurrency * _settings.QueueBufferMultiplier; if (!isCpuSaturated && !isQueueOverloaded) { @@ -102,6 +134,10 @@ public void EnqueueOrWait(T data, Action action) _jobs.Enqueue((data, action)); } + /// + /// Waits for all currently queued and running jobs to complete. + /// + /// public async Task WaitForAllAsync() { while (!_jobs.IsEmpty) @@ -109,34 +145,60 @@ public async Task WaitForAllAsync() await Task.Delay(50); } - await Task.WhenAll(_runningTasks.Keys.ToArray()); + if (_managerTask != null) + { + await Task.WhenAll(_runningTasks.Keys.ToArray()); + } } + /// + /// Disposes the processor, stopping all management and worker tasks. + /// Note: This does not cancel running jobs; it only stops accepting new ones and waits + /// for current jobs to finish. + /// public void Dispose() { _cts.Cancel(); - _managerThread.Join(); + + try + { + _managerTask?.Wait(); + } + catch (AggregateException ex) when (ex.InnerExceptions.All(e => e is TaskCanceledException)) + { + // This is expected when the task is cancelled. We can ignore it. + } + finally + { + _cts.Dispose(); + } } /// /// The main management loop that coordinates concurrency adjustments and task launching. /// - private void ManagerLoop() + private async Task ManagerLoopAsync() { while (!_cts.IsCancellationRequested) { try { - UpdateConcurrency(); - LaunchWorkerTasks(); + if (!IsPaused) + { + UpdateConcurrency(); + LaunchWorkerTasks(); + } + + await Task.Delay(ManagerLoopIntervalMs, _cts.Token); + } + catch (TaskCanceledException) + { + break; } catch (Exception ex) { - Console.WriteLine($"Error in ManagerLoop: {ex.Message}"); + OnException?.Invoke(ex); } - - ManagerLoopCycle.Set(); - Thread.Sleep(ManagerLoopIntervalMs); } } @@ -148,6 +210,16 @@ private void UpdateConcurrency() double cpuUsage = _cpuMonitor.GetCpuUsage(); _smoothedCpu = (SmoothingFactor * cpuUsage) + (1 - SmoothingFactor) * _smoothedCpu; Metrics.UpdateSmoothedCpu(_smoothedCpu); + OnCpuUsageChange?.Invoke(_smoothedCpu); + + int maxConcurrency = _settings.MaxDegreeOfParallelism ?? Environment.ProcessorCount; + + // If CPU monitoring is disabled, just max out the concurrency. + if (_cpuMonitor is NullCpuMonitor) + { + _targetConcurrency = maxConcurrency; + return; + } // --- Concurrency Decrease Logic --- bool isCpuAboveMax = _smoothedCpu > _maxCpuUsage; @@ -162,10 +234,9 @@ private void UpdateConcurrency() // --- Concurrency Increase Logic --- bool hasCpuHeadroom = _smoothedCpu < _maxCpuUsage - CpuHeadroomBuffer; - bool isMonitorDisabled = _cpuMonitor is NullCpuMonitor; - bool canIncreaseConcurrency = _targetConcurrency < Environment.ProcessorCount; + bool canIncreaseConcurrency = _targetConcurrency < maxConcurrency; - if ((hasCpuHeadroom || isMonitorDisabled) && canIncreaseConcurrency) + if (hasCpuHeadroom && canIncreaseConcurrency) { // Check if adding threads is causing performance to degrade (contention). bool isJobDurationIncreasing = Metrics.AvgTaskTime > _lastAverageDuration; @@ -181,14 +252,13 @@ private void UpdateConcurrency() // If jobs are very short, we can be more aggressive in scaling up. bool areJobsShort = Metrics.AvgTaskTime > 0 && Metrics.AvgTaskTime < ShortJobThresholdMs; int increaseAmount = areJobsShort ? 2 : 1; - _targetConcurrency = Math.Min(_targetConcurrency + increaseAmount, Environment.ProcessorCount); + _targetConcurrency = Math.Min(_targetConcurrency + increaseAmount, maxConcurrency); } } _lastAverageDuration = Metrics.AvgTaskTime; } - /// /// Launches new tasks from the queue to meet the target concurrency level. /// @@ -216,12 +286,17 @@ private void LaunchWorkerTasks() { job.Action(job.Data); } + catch (Exception ex) + { + OnException?.Invoke(ex); + } finally { sw.Stop(); Metrics.AddJobDuration(sw.Elapsed.TotalMilliseconds); } }); + _runningTasks.TryAdd(task, true); } else diff --git a/src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessorSettings.cs b/src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessorSettings.cs new file mode 100644 index 0000000..477243f --- /dev/null +++ b/src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessorSettings.cs @@ -0,0 +1,26 @@ +namespace SimpliSharp.Utilities.Process; + +/// +/// Settings for the SmartDataProcessor. +/// +public class SmartDataProcessorSettings +{ + /// + /// The target maximum CPU usage percentage (0-100). + /// If set to 100 or more, CPU monitoring will be disabled, + /// and the processor will scale to the maximum number of threads. + /// + public double MaxCpuUsage { get; set; } = 100; + + /// + /// An optional value to manually set the maximum number of concurrent threads. + /// If not provided, it defaults to Environment.ProcessorCount. + /// + public int? MaxDegreeOfParallelism { get; set; } + + /// + /// A multiplier to determine the queue size limit for backpressure, + /// based on the current number of workers. + /// + public int QueueBufferMultiplier { get; set; } = 2; +} diff --git a/tests/SimpliSharp.Tests/Utilities/Process/SmartDataProcessor/MockCpuMonitor.cs b/tests/SimpliSharp.Tests/Utilities/Process/SmartDataProcessor/MockCpuMonitor.cs index 1556ecd..5becd71 100644 --- a/tests/SimpliSharp.Tests/Utilities/Process/SmartDataProcessor/MockCpuMonitor.cs +++ b/tests/SimpliSharp.Tests/Utilities/Process/SmartDataProcessor/MockCpuMonitor.cs @@ -1,8 +1,7 @@ +namespace SimpliSharp.Tests.Utilities.Process.SmartDataProcessor; using SimpliSharp.Utilities.Process; -namespace SimpliSharp.Tests.Utilities.Process.SmartDataProcessor; - public class MockCpuMonitor : ICpuMonitor { private double _cpuUsage; diff --git a/tests/SimpliSharp.Tests/Utilities/Process/SmartDataProcessor/SmartDataProcessorTests.cs b/tests/SimpliSharp.Tests/Utilities/Process/SmartDataProcessor/SmartDataProcessorTests.cs index e9727af..ddcff2d 100644 --- a/tests/SimpliSharp.Tests/Utilities/Process/SmartDataProcessor/SmartDataProcessorTests.cs +++ b/tests/SimpliSharp.Tests/Utilities/Process/SmartDataProcessor/SmartDataProcessorTests.cs @@ -6,23 +6,7 @@ namespace SimpliSharp.Tests.Utilities.Process.SmartDataProcessor; public class SmartDataProcessorTests { [TestMethod] - public async Task SmartDataProcessor_ProcessesSingleItem() - { - // Arrange - var processor = new SmartDataProcessor(100); - var processedData = 0; - Action action = (int data) => processedData = data; - - // Act - processor.EnqueueOrWait(1, action); - await processor.WaitForAllAsync(); - - // Assert - Assert.AreEqual(1, processedData); - } - - [TestMethod] - public async Task SmartDataProcessor_ProcessesMultipleItems() + public async Task SmartDataProcessor_ProcessesItems_InOrder() { // Arrange var processor = new SmartDataProcessor(100); @@ -43,23 +27,29 @@ public async Task SmartDataProcessor_ProcessesMultipleItems() await processor.WaitForAllAsync(); // Assert - Assert.AreEqual(10, processedData.Count); - for (int i = 0; i < 10; i++) - { - Assert.IsTrue(processedData.Contains(i)); - } + Assert.AreEqual(10, processedData.Count, "The number of processed items should be 10."); + CollectionAssert.AreEquivalent(Enumerable.Range(0, 10).ToList(), processedData, "The processed items should be equivalent to the original items."); } [TestMethod] - public async Task SmartDataProcessor_WaitForAllAsync_WaitsForAllJobs() + public async Task SmartDataProcessor_Honors_MaxDegreeOfParallelism() { // Arrange - var processor = new SmartDataProcessor(100); - var processedCount = 0; + var settings = new SmartDataProcessorSettings + { + MaxDegreeOfParallelism = 2 + }; + var processor = new SmartDataProcessor(settings); + var mre = new ManualResetEventSlim(false); + var runningTasks = 0; + var maxRunningTasks = 0; + var action = (int data) => { - Interlocked.Increment(ref processedCount); - Thread.Sleep(10); + Interlocked.Increment(ref runningTasks); + maxRunningTasks = Math.Max(maxRunningTasks, runningTasks); + mre.Wait(); + Interlocked.Decrement(ref runningTasks); }; // Act @@ -67,100 +57,146 @@ public async Task SmartDataProcessor_WaitForAllAsync_WaitsForAllJobs() { processor.EnqueueOrWait(i, action); } - await processor.WaitForAllAsync(); + + await Task.Delay(100); // Give time for tasks to start // Assert - Assert.AreEqual(5, processedCount); + Assert.AreEqual(2, maxRunningTasks, "The maximum number of running tasks should not exceed the specified MaxDegreeOfParallelism."); + + // Cleanup + mre.Set(); + await processor.WaitForAllAsync(); } [TestMethod] - public void SmartDataProcessor_Dispose_StopsManagerThread() + public async Task SmartDataProcessor_Blocks_When_QueueIsFull() { // Arrange - var processor = new SmartDataProcessor(100); + var settings = new SmartDataProcessorSettings + { + MaxDegreeOfParallelism = 1, + QueueBufferMultiplier = 1 + }; + var processor = new SmartDataProcessor(settings); + var mre = new ManualResetEventSlim(false); + var tasksStarted = 0; + + var action = (int data) => + { + Interlocked.Increment(ref tasksStarted); + mre.Wait(); + }; // Act - processor.Dispose(); + processor.EnqueueOrWait(1, action); + await Task.Delay(100); // Give the manager loop time to start + processor.EnqueueOrWait(2, action); + processor.EnqueueOrWait(3, action); + + var blockedTask = Task.Run(() => processor.EnqueueOrWait(4, action)); + + await Task.Delay(100); // Assert - // We can't directly check if the thread is stopped, - // but we can check if the CancellationToken is cancelled. - // A better approach would be to inject the thread and mock it, - // but for now, we'll just check if dispose doesn't throw. + Assert.AreEqual(1, tasksStarted, "Only one task should have started because the processor is blocked."); + Assert.IsFalse(blockedTask.IsCompleted, "The enqueue task should be blocked because the queue is full."); + + // Cleanup + mre.Set(); + await blockedTask; + await processor.WaitForAllAsync(); } [TestMethod] - public async Task SmartDataProcessor_EnqueueOrWait_BlocksWhenCpuUsageIsHigh() + public async Task SmartDataProcessor_Blocks_When_CpuIsHigh() { // Arrange + var settings = new SmartDataProcessorSettings + { + MaxCpuUsage = 50 + }; var cpuMonitor = new MockCpuMonitor(); - cpuMonitor.SetCpuUsage(100); - var processor = new SmartDataProcessor(80, cpuMonitor); - var processedData = new List(); + var processor = new SmartDataProcessor(settings, cpuMonitor); + var mre = new ManualResetEventSlim(false); + var action = (int data) => { - lock (processedData) - { - processedData.Add(data); - } + mre.Wait(); }; // Act - // Let the manager loop run a few times to ramp up the smoothed CPU value - for (int i = 0; i < 10; i++) - { - processor.ManagerLoopCycle.WaitOne(100); - processor.ManagerLoopCycle.Reset(); - } - - var enqueueTask = Task.Run(() => processor.EnqueueOrWait(1, action)); - await Task.Delay(50); // Give the task a chance to block inside EnqueueOrWait + cpuMonitor.SetCpuUsage(100); + processor.EnqueueOrWait(1, action); // This will start the manager loop + await Task.Delay(100); // Give time for the manager to update the smoothed CPU + + var blockedTask = Task.Run(() => processor.EnqueueOrWait(2, action)); + + await Task.Delay(100); // Assert - Assert.IsFalse(enqueueTask.IsCompleted); + Assert.IsFalse(blockedTask.IsCompleted, "The enqueue task should be blocked because the CPU usage is high."); // Act - cpuMonitor.SetCpuUsage(50); - processor.ManagerLoopCycle.WaitOne(100); // Let the manager loop run to detect the change - await enqueueTask; + cpuMonitor.SetCpuUsage(20); + await Task.Delay(200); // Give time for the manager to update the smoothed CPU // Assert - Assert.IsTrue(enqueueTask.IsCompleted); + Assert.IsTrue(blockedTask.IsCompleted, "The enqueue task should be completed after the CPU usage is lowered."); + + // Cleanup + mre.Set(); + await processor.WaitForAllAsync(); } [TestMethod] - public async Task SmartDataProcessor_EnqueueOrWait_BlocksWhenQueueIsTooLarge() + public async Task SmartDataProcessor_PauseAndResume_Works() { // Arrange - var cpuMonitor = new MockCpuMonitor(); - cpuMonitor.SetCpuUsage(10); - var processor = new SmartDataProcessor(80, cpuMonitor); + var processor = new SmartDataProcessor(100); var processedData = new List(); var action = (int data) => { lock (processedData) { processedData.Add(data); - Thread.Sleep(100); } }; // Act - for (int i = 0; i < 20; i++) - { - processor.EnqueueOrWait(i, action); - } - var enqueueTask = Task.Run(() => processor.EnqueueOrWait(20, action)); - processor.ManagerLoopCycle.WaitOne(); + processor.Pause(); + processor.EnqueueOrWait(1, action); + await Task.Delay(100); // Assert - Assert.IsFalse(enqueueTask.IsCompleted); + Assert.AreEqual(0, processedData.Count, "No items should be processed while the processor is paused."); + + // Act + processor.Resume(); + await processor.WaitForAllAsync(); + + // Assert + Assert.AreEqual(1, processedData.Count, "Items should be processed after the processor is resumed."); + } + + [TestMethod] + public async Task SmartDataProcessor_OnException_EventIsFired() + { + // Arrange + var processor = new SmartDataProcessor(100); + Exception? caughtException = null; + processor.OnException += (ex) => caughtException = ex; + + var action = (int data) => + { + throw new InvalidOperationException("Test Exception"); + }; // Act + processor.EnqueueOrWait(1, action); await processor.WaitForAllAsync(); - await enqueueTask; // Assert - Assert.IsTrue(enqueueTask.IsCompleted); + Assert.IsNotNull(caughtException, "The OnException event should be fired."); + Assert.IsInstanceOfType(caughtException, typeof(InvalidOperationException), "The exception should be of the correct type."); } } \ No newline at end of file