From 259722d77f13f20a9af11dd246ff0abf3d83b74b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cosmin=20Cre=C8=9Bu?= Date: Thu, 4 Sep 2025 12:41:09 +0100 Subject: [PATCH 1/3] Cleanup --- .../SmartDataProcessor/SmartDataProcessor.cs | 78 ++++++++++++------- 1 file changed, 49 insertions(+), 29 deletions(-) diff --git a/src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessor.cs b/src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessor.cs index 252a0f9..75c9ba8 100644 --- a/src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessor.cs +++ b/src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessor.cs @@ -39,10 +39,9 @@ public class SmartDataProcessor : IDisposable private readonly double _maxCpuUsage; private readonly ConcurrentQueue<(T Data, Action Action)> _jobs = new(); private readonly ConcurrentDictionary _runningTasks = new(); - private readonly Thread _managerThread; private readonly CancellationTokenSource _cts = new(); private readonly ICpuMonitor _cpuMonitor; - internal readonly ManualResetEvent ManagerLoopCycle = new(false); + private readonly Task _managerTask; private double _smoothedCpu = 0; private int _targetConcurrency = 1; @@ -50,29 +49,21 @@ public class SmartDataProcessor : IDisposable public ProcessingMetrics Metrics { get; } = new(); + /// + /// Creates a new SmartDataProcessor with the specified maximum CPU usage. + /// + /// The maximum CPU usage percentage (0-100) to target. public SmartDataProcessor(double maxCpuUsage = 100) { _maxCpuUsage = Math.Max(maxCpuUsage - CpuHeadroomBuffer, CpuHeadroomBuffer); - if (OperatingSystem.IsWindows()) - { - _cpuMonitor = new WindowsCpuMonitor(); - } - else if (OperatingSystem.IsLinux()) - { - _cpuMonitor = new LinuxCpuMonitor(); - } - else if (OperatingSystem.IsMacOS()) - { - _cpuMonitor = new MacCpuMonitor(); - } - else - { - _cpuMonitor = new NullCpuMonitor(); - } + // OS-specific monitor logic is unchanged... + if (OperatingSystem.IsWindows()) _cpuMonitor = new WindowsCpuMonitor(); + else if (OperatingSystem.IsLinux()) _cpuMonitor = new LinuxCpuMonitor(); + else if (OperatingSystem.IsMacOS()) _cpuMonitor = new MacCpuMonitor(); + else _cpuMonitor = new NullCpuMonitor(); - _managerThread = new Thread(ManagerLoop) { IsBackground = true }; - _managerThread.Start(); + _managerTask = Task.Run(ManagerLoopAsync); } internal SmartDataProcessor(double maxCpuUsage, ICpuMonitor cpuMonitor) @@ -80,10 +71,15 @@ internal SmartDataProcessor(double maxCpuUsage, ICpuMonitor cpuMonitor) _maxCpuUsage = maxCpuUsage; _cpuMonitor = cpuMonitor; - _managerThread = new Thread(ManagerLoop) { IsBackground = true }; - _managerThread.Start(); + _managerTask = Task.Run(ManagerLoopAsync); } + /// + /// Enqueues a data item for processing. If the CPU is saturated or the queue is overloaded, + /// this method will block until it is safe to enqueue the item. + /// + /// + /// public void EnqueueOrWait(T data, Action action) { while (true) @@ -102,6 +98,10 @@ public void EnqueueOrWait(T data, Action action) _jobs.Enqueue((data, action)); } + /// + /// Waits for all currently queued and running jobs to complete. + /// + /// public async Task WaitForAllAsync() { while (!_jobs.IsEmpty) @@ -112,16 +112,33 @@ public async Task WaitForAllAsync() await Task.WhenAll(_runningTasks.Keys.ToArray()); } + /// + /// Disposes the processor, stopping all management and worker tasks. + /// Note: This does not cancel running jobs; it only stops accepting new ones and waits + /// for current jobs to finish. + /// public void Dispose() { _cts.Cancel(); - _managerThread.Join(); + + try + { + _managerTask.Wait(); + } + catch (AggregateException ex) when (ex.InnerExceptions.All(e => e is TaskCanceledException)) + { + // This is expected when the task is cancelled. We can ignore it. + } + finally + { + _cts.Dispose(); + } } /// /// The main management loop that coordinates concurrency adjustments and task launching. /// - private void ManagerLoop() + private async Task ManagerLoopAsync() { while (!_cts.IsCancellationRequested) { @@ -129,14 +146,17 @@ private void ManagerLoop() { UpdateConcurrency(); LaunchWorkerTasks(); + + await Task.Delay(ManagerLoopIntervalMs, _cts.Token); + } + catch (TaskCanceledException) + { + break; } catch (Exception ex) { - Console.WriteLine($"Error in ManagerLoop: {ex.Message}"); + Console.WriteLine($"Error in ManagerLoopAsync: {ex.Message}"); } - - ManagerLoopCycle.Set(); - Thread.Sleep(ManagerLoopIntervalMs); } } @@ -188,7 +208,6 @@ private void UpdateConcurrency() _lastAverageDuration = Metrics.AvgTaskTime; } - /// /// Launches new tasks from the queue to meet the target concurrency level. /// @@ -222,6 +241,7 @@ private void LaunchWorkerTasks() Metrics.AddJobDuration(sw.Elapsed.TotalMilliseconds); } }); + _runningTasks.TryAdd(task, true); } else From cc6ad45d263165156ba2f0eea746aff975f01709 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cosmin=20Cre=C8=9Bu?= Date: Thu, 4 Sep 2025 19:05:08 +0100 Subject: [PATCH 2/3] main --- README.md | 40 +++++- samples/SimpliSharp.Demo/Program.cs | 16 ++- .../SmartDataProcessor/SmartDataProcessor.cs | 115 +++++++++++++----- .../SmartDataProcessorSettings.cs | 26 ++++ 4 files changed, 159 insertions(+), 38 deletions(-) create mode 100644 src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessorSettings.cs diff --git a/README.md b/README.md index 7159e4b..34dcfe0 100644 --- a/README.md +++ b/README.md @@ -22,18 +22,48 @@ The `SmartDataProcessor` is designed to process a queue of items in parallel, while automatically adjusting the level of concurrency to stay within a specified CPU usage limit. -Usage example: +### Features +- **Dynamic Concurrency**: Automatically adjusts the number of worker threads based on real-time CPU load. +- **CPU Throttling**: Ensures that CPU usage does not exceed a configurable maximum limit. +- **Backpressure**: The `EnqueueOrWait` method blocks when the queue is full or the CPU is saturated, preventing memory overload. +- **Lazy Initialization**: The processing thread pool is only created when the first item is enqueued. +- **Configurable**: Fine-tune performance with the `SmartDataProcessorSettings` class. +- **Event-driven**: Subscribe to events for CPU usage changes and exceptions. +- **Runtime Control**: Pause and resume the processor on the fly. + +### Usage Example + +You can now configure the processor using the `SmartDataProcessorSettings` class: ```csharp -using var processor = new SmartDataProcessor(maxCpuUsage: 75); +var settings = new SmartDataProcessorSettings +{ + MaxCpuUsage = 80, // Target 80% CPU usage + MaxDegreeOfParallelism = 4, // Use a maximum of 4 threads + QueueBufferMultiplier = 8 // Set a larger queue buffer +}; + +using var processor = new SmartDataProcessor(settings); -for (...) +// Subscribe to events +processor.OnCpuUsageChange += (cpuLoad) => Console.WriteLine($"CPU Load: {cpuLoad:F1}%"); +processor.OnException += (ex) => Console.WriteLine($"An error occurred: {ex.Message}"); + +// Enqueue items +for (int i = 0; i < 100; i++) { - processor.EnqueueOrWait(dtIn, data => + processor.EnqueueOrWait(i, data => { - ... + // Your processing logic here... }); } + +// Pause and resume processing +processor.Pause(); +Thread.Sleep(5000); +processor.Resume(); + +processor.WaitForAllAsync().Wait(); ``` ![Alt text for your image](https://raw.githubusercontent.com/cretucosmin3/SimpliSharp/refs/heads/main/assets/75-cpu-usage.png) diff --git a/samples/SimpliSharp.Demo/Program.cs b/samples/SimpliSharp.Demo/Program.cs index 331180f..c6607e2 100644 --- a/samples/SimpliSharp.Demo/Program.cs +++ b/samples/SimpliSharp.Demo/Program.cs @@ -32,14 +32,24 @@ return; } +Console.WriteLine(); +Console.WriteLine("Press any key to exit..."); +Console.ReadKey(); + static void SmartDataProcessor_Example() { Console.Clear(); Console.WriteLine("Starting data processing..."); - using var processor = new SmartDataProcessor(maxCpuUsage: 90); + var settings = new SmartDataProcessorSettings + { + MaxDegreeOfParallelism = 1 + }; + + using var processor = new SmartDataProcessor(settings); + var stopwatch = System.Diagnostics.Stopwatch.StartNew(); - var tasksCount = 2000; + var tasksCount = 200; for (int i = 0; i < tasksCount; i++) { @@ -50,7 +60,7 @@ static void SmartDataProcessor_Example() int simMax = Random.Shared.Next(5_000_000, 20_000_000); double sum = 0; - for (int j = 0; j < 10_000_000; j++) + for (int j = 0; j < simMax; j++) { double value = Math.Sqrt(j) * Math.Sin(j % 360) + Math.Log(j + 1); if (value > 1000) diff --git a/src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessor.cs b/src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessor.cs index 75c9ba8..8c5dd0e 100644 --- a/src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessor.cs +++ b/src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessor.cs @@ -1,7 +1,10 @@ + using System; using System.Collections.Concurrent; using System.Diagnostics; using System.Linq; +using System.Threading; +using System.Threading.Tasks; namespace SimpliSharp.Utilities.Process; @@ -29,51 +32,82 @@ public class SmartDataProcessor : IDisposable /// private const double SmoothingFactor = 0.3; - /// - /// A multiplier to determine the queue size limit based on the current number of workers. - /// This creates back-pressure to prevent the queue from growing too quickly. - /// - private const int QueueBufferMultiplier = 2; - // --- State --- + private readonly SmartDataProcessorSettings _settings; private readonly double _maxCpuUsage; private readonly ConcurrentQueue<(T Data, Action Action)> _jobs = new(); private readonly ConcurrentDictionary _runningTasks = new(); private readonly CancellationTokenSource _cts = new(); private readonly ICpuMonitor _cpuMonitor; - private readonly Task _managerTask; + private object _managerLock = new(); + private Task _managerTask; private double _smoothedCpu = 0; private int _targetConcurrency = 1; private double _lastAverageDuration; public ProcessingMetrics Metrics { get; } = new(); + public bool IsPaused { get; private set; } + + // --- Events --- + public event Action OnException; + public event Action OnCpuUsageChange; + + /// + /// Creates a new SmartDataProcessor with default settings. + /// + public SmartDataProcessor() : this(new SmartDataProcessorSettings()) + { + } /// /// Creates a new SmartDataProcessor with the specified maximum CPU usage. /// /// The maximum CPU usage percentage (0-100) to target. - public SmartDataProcessor(double maxCpuUsage = 100) + public SmartDataProcessor(double maxCpuUsage) : this(new SmartDataProcessorSettings { MaxCpuUsage = maxCpuUsage }) { - _maxCpuUsage = Math.Max(maxCpuUsage - CpuHeadroomBuffer, CpuHeadroomBuffer); - - // OS-specific monitor logic is unchanged... - if (OperatingSystem.IsWindows()) _cpuMonitor = new WindowsCpuMonitor(); - else if (OperatingSystem.IsLinux()) _cpuMonitor = new LinuxCpuMonitor(); - else if (OperatingSystem.IsMacOS()) _cpuMonitor = new MacCpuMonitor(); - else _cpuMonitor = new NullCpuMonitor(); + } + + /// + /// Creates a new SmartDataProcessor with the specified settings. + /// + /// The settings to use for this processor. + public SmartDataProcessor(SmartDataProcessorSettings settings) + { + _settings = settings; + _maxCpuUsage = Math.Max(_settings.MaxCpuUsage - CpuHeadroomBuffer, CpuHeadroomBuffer); - _managerTask = Task.Run(ManagerLoopAsync); + bool useCpuMonitoring = _settings.MaxCpuUsage < 100; + if (useCpuMonitoring) + { + if (OperatingSystem.IsWindows()) _cpuMonitor = new WindowsCpuMonitor(); + else if (OperatingSystem.IsLinux()) _cpuMonitor = new LinuxCpuMonitor(); + else if (OperatingSystem.IsMacOS()) _cpuMonitor = new MacCpuMonitor(); + else _cpuMonitor = new NullCpuMonitor(); + } + else + { + _cpuMonitor = new NullCpuMonitor(); + } } - internal SmartDataProcessor(double maxCpuUsage, ICpuMonitor cpuMonitor) + internal SmartDataProcessor(SmartDataProcessorSettings settings, ICpuMonitor cpuMonitor) { - _maxCpuUsage = maxCpuUsage; + _settings = settings; + _maxCpuUsage = Math.Max(_settings.MaxCpuUsage - CpuHeadroomBuffer, CpuHeadroomBuffer); _cpuMonitor = cpuMonitor; - - _managerTask = Task.Run(ManagerLoopAsync); } + /// + /// Pauses the processing of new items. + /// + public void Pause() => IsPaused = true; + + /// + /// Resumes the processing of new items. + /// + public void Resume() => IsPaused = false; + /// /// Enqueues a data item for processing. If the CPU is saturated or the queue is overloaded, /// this method will block until it is safe to enqueue the item. @@ -82,10 +116,12 @@ internal SmartDataProcessor(double maxCpuUsage, ICpuMonitor cpuMonitor) /// public void EnqueueOrWait(T data, Action action) { + LazyInitializer.EnsureInitialized(ref _managerTask, ref _managerLock, () => Task.Run(ManagerLoopAsync)); + while (true) { bool isCpuSaturated = _smoothedCpu > _maxCpuUsage && _cpuMonitor is not NullCpuMonitor; - bool isQueueOverloaded = _jobs.Count > _targetConcurrency * QueueBufferMultiplier; + bool isQueueOverloaded = _jobs.Count > _targetConcurrency * _settings.QueueBufferMultiplier; if (!isCpuSaturated && !isQueueOverloaded) { @@ -109,7 +145,10 @@ public async Task WaitForAllAsync() await Task.Delay(50); } - await Task.WhenAll(_runningTasks.Keys.ToArray()); + if (_managerTask != null) + { + await Task.WhenAll(_runningTasks.Keys.ToArray()); + } } /// @@ -123,7 +162,7 @@ public void Dispose() try { - _managerTask.Wait(); + _managerTask?.Wait(); } catch (AggregateException ex) when (ex.InnerExceptions.All(e => e is TaskCanceledException)) { @@ -144,8 +183,11 @@ private async Task ManagerLoopAsync() { try { - UpdateConcurrency(); - LaunchWorkerTasks(); + if (!IsPaused) + { + UpdateConcurrency(); + LaunchWorkerTasks(); + } await Task.Delay(ManagerLoopIntervalMs, _cts.Token); } @@ -155,7 +197,7 @@ private async Task ManagerLoopAsync() } catch (Exception ex) { - Console.WriteLine($"Error in ManagerLoopAsync: {ex.Message}"); + OnException?.Invoke(ex); } } } @@ -168,6 +210,16 @@ private void UpdateConcurrency() double cpuUsage = _cpuMonitor.GetCpuUsage(); _smoothedCpu = (SmoothingFactor * cpuUsage) + (1 - SmoothingFactor) * _smoothedCpu; Metrics.UpdateSmoothedCpu(_smoothedCpu); + OnCpuUsageChange?.Invoke(_smoothedCpu); + + int maxConcurrency = _settings.MaxDegreeOfParallelism ?? Environment.ProcessorCount; + + // If CPU monitoring is disabled, just max out the concurrency. + if (_cpuMonitor is NullCpuMonitor) + { + _targetConcurrency = maxConcurrency; + return; + } // --- Concurrency Decrease Logic --- bool isCpuAboveMax = _smoothedCpu > _maxCpuUsage; @@ -182,10 +234,9 @@ private void UpdateConcurrency() // --- Concurrency Increase Logic --- bool hasCpuHeadroom = _smoothedCpu < _maxCpuUsage - CpuHeadroomBuffer; - bool isMonitorDisabled = _cpuMonitor is NullCpuMonitor; - bool canIncreaseConcurrency = _targetConcurrency < Environment.ProcessorCount; + bool canIncreaseConcurrency = _targetConcurrency < maxConcurrency; - if ((hasCpuHeadroom || isMonitorDisabled) && canIncreaseConcurrency) + if (hasCpuHeadroom && canIncreaseConcurrency) { // Check if adding threads is causing performance to degrade (contention). bool isJobDurationIncreasing = Metrics.AvgTaskTime > _lastAverageDuration; @@ -201,7 +252,7 @@ private void UpdateConcurrency() // If jobs are very short, we can be more aggressive in scaling up. bool areJobsShort = Metrics.AvgTaskTime > 0 && Metrics.AvgTaskTime < ShortJobThresholdMs; int increaseAmount = areJobsShort ? 2 : 1; - _targetConcurrency = Math.Min(_targetConcurrency + increaseAmount, Environment.ProcessorCount); + _targetConcurrency = Math.Min(_targetConcurrency + increaseAmount, maxConcurrency); } } @@ -235,6 +286,10 @@ private void LaunchWorkerTasks() { job.Action(job.Data); } + catch (Exception ex) + { + OnException?.Invoke(ex); + } finally { sw.Stop(); diff --git a/src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessorSettings.cs b/src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessorSettings.cs new file mode 100644 index 0000000..477243f --- /dev/null +++ b/src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessorSettings.cs @@ -0,0 +1,26 @@ +namespace SimpliSharp.Utilities.Process; + +/// +/// Settings for the SmartDataProcessor. +/// +public class SmartDataProcessorSettings +{ + /// + /// The target maximum CPU usage percentage (0-100). + /// If set to 100 or more, CPU monitoring will be disabled, + /// and the processor will scale to the maximum number of threads. + /// + public double MaxCpuUsage { get; set; } = 100; + + /// + /// An optional value to manually set the maximum number of concurrent threads. + /// If not provided, it defaults to Environment.ProcessorCount. + /// + public int? MaxDegreeOfParallelism { get; set; } + + /// + /// A multiplier to determine the queue size limit for backpressure, + /// based on the current number of workers. + /// + public int QueueBufferMultiplier { get; set; } = 2; +} From 12a844f398d667d7fc99687c5187cf8eda8d06e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cosmin=20Cre=C8=9Bu?= Date: Thu, 4 Sep 2025 19:43:06 +0100 Subject: [PATCH 3/3] Updating unit tests --- .../SmartDataProcessor/MockCpuMonitor.cs | 3 +- .../SmartDataProcessorTests.cs | 180 +++++++++++------- 2 files changed, 109 insertions(+), 74 deletions(-) diff --git a/tests/SimpliSharp.Tests/Utilities/Process/SmartDataProcessor/MockCpuMonitor.cs b/tests/SimpliSharp.Tests/Utilities/Process/SmartDataProcessor/MockCpuMonitor.cs index 1556ecd..5becd71 100644 --- a/tests/SimpliSharp.Tests/Utilities/Process/SmartDataProcessor/MockCpuMonitor.cs +++ b/tests/SimpliSharp.Tests/Utilities/Process/SmartDataProcessor/MockCpuMonitor.cs @@ -1,8 +1,7 @@ +namespace SimpliSharp.Tests.Utilities.Process.SmartDataProcessor; using SimpliSharp.Utilities.Process; -namespace SimpliSharp.Tests.Utilities.Process.SmartDataProcessor; - public class MockCpuMonitor : ICpuMonitor { private double _cpuUsage; diff --git a/tests/SimpliSharp.Tests/Utilities/Process/SmartDataProcessor/SmartDataProcessorTests.cs b/tests/SimpliSharp.Tests/Utilities/Process/SmartDataProcessor/SmartDataProcessorTests.cs index e9727af..ddcff2d 100644 --- a/tests/SimpliSharp.Tests/Utilities/Process/SmartDataProcessor/SmartDataProcessorTests.cs +++ b/tests/SimpliSharp.Tests/Utilities/Process/SmartDataProcessor/SmartDataProcessorTests.cs @@ -6,23 +6,7 @@ namespace SimpliSharp.Tests.Utilities.Process.SmartDataProcessor; public class SmartDataProcessorTests { [TestMethod] - public async Task SmartDataProcessor_ProcessesSingleItem() - { - // Arrange - var processor = new SmartDataProcessor(100); - var processedData = 0; - Action action = (int data) => processedData = data; - - // Act - processor.EnqueueOrWait(1, action); - await processor.WaitForAllAsync(); - - // Assert - Assert.AreEqual(1, processedData); - } - - [TestMethod] - public async Task SmartDataProcessor_ProcessesMultipleItems() + public async Task SmartDataProcessor_ProcessesItems_InOrder() { // Arrange var processor = new SmartDataProcessor(100); @@ -43,23 +27,29 @@ public async Task SmartDataProcessor_ProcessesMultipleItems() await processor.WaitForAllAsync(); // Assert - Assert.AreEqual(10, processedData.Count); - for (int i = 0; i < 10; i++) - { - Assert.IsTrue(processedData.Contains(i)); - } + Assert.AreEqual(10, processedData.Count, "The number of processed items should be 10."); + CollectionAssert.AreEquivalent(Enumerable.Range(0, 10).ToList(), processedData, "The processed items should be equivalent to the original items."); } [TestMethod] - public async Task SmartDataProcessor_WaitForAllAsync_WaitsForAllJobs() + public async Task SmartDataProcessor_Honors_MaxDegreeOfParallelism() { // Arrange - var processor = new SmartDataProcessor(100); - var processedCount = 0; + var settings = new SmartDataProcessorSettings + { + MaxDegreeOfParallelism = 2 + }; + var processor = new SmartDataProcessor(settings); + var mre = new ManualResetEventSlim(false); + var runningTasks = 0; + var maxRunningTasks = 0; + var action = (int data) => { - Interlocked.Increment(ref processedCount); - Thread.Sleep(10); + Interlocked.Increment(ref runningTasks); + maxRunningTasks = Math.Max(maxRunningTasks, runningTasks); + mre.Wait(); + Interlocked.Decrement(ref runningTasks); }; // Act @@ -67,100 +57,146 @@ public async Task SmartDataProcessor_WaitForAllAsync_WaitsForAllJobs() { processor.EnqueueOrWait(i, action); } - await processor.WaitForAllAsync(); + + await Task.Delay(100); // Give time for tasks to start // Assert - Assert.AreEqual(5, processedCount); + Assert.AreEqual(2, maxRunningTasks, "The maximum number of running tasks should not exceed the specified MaxDegreeOfParallelism."); + + // Cleanup + mre.Set(); + await processor.WaitForAllAsync(); } [TestMethod] - public void SmartDataProcessor_Dispose_StopsManagerThread() + public async Task SmartDataProcessor_Blocks_When_QueueIsFull() { // Arrange - var processor = new SmartDataProcessor(100); + var settings = new SmartDataProcessorSettings + { + MaxDegreeOfParallelism = 1, + QueueBufferMultiplier = 1 + }; + var processor = new SmartDataProcessor(settings); + var mre = new ManualResetEventSlim(false); + var tasksStarted = 0; + + var action = (int data) => + { + Interlocked.Increment(ref tasksStarted); + mre.Wait(); + }; // Act - processor.Dispose(); + processor.EnqueueOrWait(1, action); + await Task.Delay(100); // Give the manager loop time to start + processor.EnqueueOrWait(2, action); + processor.EnqueueOrWait(3, action); + + var blockedTask = Task.Run(() => processor.EnqueueOrWait(4, action)); + + await Task.Delay(100); // Assert - // We can't directly check if the thread is stopped, - // but we can check if the CancellationToken is cancelled. - // A better approach would be to inject the thread and mock it, - // but for now, we'll just check if dispose doesn't throw. + Assert.AreEqual(1, tasksStarted, "Only one task should have started because the processor is blocked."); + Assert.IsFalse(blockedTask.IsCompleted, "The enqueue task should be blocked because the queue is full."); + + // Cleanup + mre.Set(); + await blockedTask; + await processor.WaitForAllAsync(); } [TestMethod] - public async Task SmartDataProcessor_EnqueueOrWait_BlocksWhenCpuUsageIsHigh() + public async Task SmartDataProcessor_Blocks_When_CpuIsHigh() { // Arrange + var settings = new SmartDataProcessorSettings + { + MaxCpuUsage = 50 + }; var cpuMonitor = new MockCpuMonitor(); - cpuMonitor.SetCpuUsage(100); - var processor = new SmartDataProcessor(80, cpuMonitor); - var processedData = new List(); + var processor = new SmartDataProcessor(settings, cpuMonitor); + var mre = new ManualResetEventSlim(false); + var action = (int data) => { - lock (processedData) - { - processedData.Add(data); - } + mre.Wait(); }; // Act - // Let the manager loop run a few times to ramp up the smoothed CPU value - for (int i = 0; i < 10; i++) - { - processor.ManagerLoopCycle.WaitOne(100); - processor.ManagerLoopCycle.Reset(); - } - - var enqueueTask = Task.Run(() => processor.EnqueueOrWait(1, action)); - await Task.Delay(50); // Give the task a chance to block inside EnqueueOrWait + cpuMonitor.SetCpuUsage(100); + processor.EnqueueOrWait(1, action); // This will start the manager loop + await Task.Delay(100); // Give time for the manager to update the smoothed CPU + + var blockedTask = Task.Run(() => processor.EnqueueOrWait(2, action)); + + await Task.Delay(100); // Assert - Assert.IsFalse(enqueueTask.IsCompleted); + Assert.IsFalse(blockedTask.IsCompleted, "The enqueue task should be blocked because the CPU usage is high."); // Act - cpuMonitor.SetCpuUsage(50); - processor.ManagerLoopCycle.WaitOne(100); // Let the manager loop run to detect the change - await enqueueTask; + cpuMonitor.SetCpuUsage(20); + await Task.Delay(200); // Give time for the manager to update the smoothed CPU // Assert - Assert.IsTrue(enqueueTask.IsCompleted); + Assert.IsTrue(blockedTask.IsCompleted, "The enqueue task should be completed after the CPU usage is lowered."); + + // Cleanup + mre.Set(); + await processor.WaitForAllAsync(); } [TestMethod] - public async Task SmartDataProcessor_EnqueueOrWait_BlocksWhenQueueIsTooLarge() + public async Task SmartDataProcessor_PauseAndResume_Works() { // Arrange - var cpuMonitor = new MockCpuMonitor(); - cpuMonitor.SetCpuUsage(10); - var processor = new SmartDataProcessor(80, cpuMonitor); + var processor = new SmartDataProcessor(100); var processedData = new List(); var action = (int data) => { lock (processedData) { processedData.Add(data); - Thread.Sleep(100); } }; // Act - for (int i = 0; i < 20; i++) - { - processor.EnqueueOrWait(i, action); - } - var enqueueTask = Task.Run(() => processor.EnqueueOrWait(20, action)); - processor.ManagerLoopCycle.WaitOne(); + processor.Pause(); + processor.EnqueueOrWait(1, action); + await Task.Delay(100); // Assert - Assert.IsFalse(enqueueTask.IsCompleted); + Assert.AreEqual(0, processedData.Count, "No items should be processed while the processor is paused."); + + // Act + processor.Resume(); + await processor.WaitForAllAsync(); + + // Assert + Assert.AreEqual(1, processedData.Count, "Items should be processed after the processor is resumed."); + } + + [TestMethod] + public async Task SmartDataProcessor_OnException_EventIsFired() + { + // Arrange + var processor = new SmartDataProcessor(100); + Exception? caughtException = null; + processor.OnException += (ex) => caughtException = ex; + + var action = (int data) => + { + throw new InvalidOperationException("Test Exception"); + }; // Act + processor.EnqueueOrWait(1, action); await processor.WaitForAllAsync(); - await enqueueTask; // Assert - Assert.IsTrue(enqueueTask.IsCompleted); + Assert.IsNotNull(caughtException, "The OnException event should be fired."); + Assert.IsInstanceOfType(caughtException, typeof(InvalidOperationException), "The exception should be of the correct type."); } } \ No newline at end of file