Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,48 @@

The `SmartDataProcessor<T>` is designed to process a queue of items in parallel, while automatically adjusting the level of concurrency to stay within a specified CPU usage limit.

Usage example:
### Features
- **Dynamic Concurrency**: Automatically adjusts the number of worker threads based on real-time CPU load.
- **CPU Throttling**: Ensures that CPU usage does not exceed a configurable maximum limit.
- **Backpressure**: The `EnqueueOrWait` method blocks when the queue is full or the CPU is saturated, preventing memory overload.
- **Lazy Initialization**: The processing thread pool is only created when the first item is enqueued.
- **Configurable**: Fine-tune performance with the `SmartDataProcessorSettings` class.
- **Event-driven**: Subscribe to events for CPU usage changes and exceptions.
- **Runtime Control**: Pause and resume the processor on the fly.

### Usage Example

You can now configure the processor using the `SmartDataProcessorSettings` class:

```csharp
using var processor = new SmartDataProcessor<int>(maxCpuUsage: 75);
var settings = new SmartDataProcessorSettings
{
MaxCpuUsage = 80, // Target 80% CPU usage
MaxDegreeOfParallelism = 4, // Use a maximum of 4 threads
QueueBufferMultiplier = 8 // Set a larger queue buffer
};

using var processor = new SmartDataProcessor<int>(settings);

for (...)
// Subscribe to events
processor.OnCpuUsageChange += (cpuLoad) => Console.WriteLine($"CPU Load: {cpuLoad:F1}%");
processor.OnException += (ex) => Console.WriteLine($"An error occurred: {ex.Message}");

// Enqueue items
for (int i = 0; i < 100; i++)
{
processor.EnqueueOrWait(dtIn, data =>
processor.EnqueueOrWait(i, data =>
{
...
// Your processing logic here...
});
}

// Pause and resume processing
processor.Pause();
Thread.Sleep(5000);
processor.Resume();

processor.WaitForAllAsync().Wait();
```

![Alt text for your image](https://raw.githubusercontent.com/cretucosmin3/SimpliSharp/refs/heads/main/assets/75-cpu-usage.png)
Expand Down
16 changes: 13 additions & 3 deletions samples/SimpliSharp.Demo/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,24 @@
return;
}

Console.WriteLine();
Console.WriteLine("Press any key to exit...");
Console.ReadKey();

static void SmartDataProcessor_Example()
{
Console.Clear();
Console.WriteLine("Starting data processing...");

using var processor = new SmartDataProcessor<int>(maxCpuUsage: 90);
var settings = new SmartDataProcessorSettings
{
MaxDegreeOfParallelism = 1
};

using var processor = new SmartDataProcessor<int>(settings);

var stopwatch = System.Diagnostics.Stopwatch.StartNew();
var tasksCount = 2000;
var tasksCount = 200;

for (int i = 0; i < tasksCount; i++)
{
Expand All @@ -50,7 +60,7 @@ static void SmartDataProcessor_Example()
int simMax = Random.Shared.Next(5_000_000, 20_000_000);
double sum = 0;

for (int j = 0; j < 10_000_000; j++)
for (int j = 0; j < simMax; j++)
{
double value = Math.Sqrt(j) * Math.Sin(j % 360) + Math.Log(j + 1);
if (value > 1000)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace SimpliSharp.Utilities.Process;

Expand Down Expand Up @@ -29,67 +32,96 @@
/// </summary>
private const double SmoothingFactor = 0.3;

/// <summary>
/// A multiplier to determine the queue size limit based on the current number of workers.
/// This creates back-pressure to prevent the queue from growing too quickly.
/// </summary>
private const int QueueBufferMultiplier = 2;

// --- State ---
private readonly SmartDataProcessorSettings _settings;
private readonly double _maxCpuUsage;
private readonly ConcurrentQueue<(T Data, Action<T> Action)> _jobs = new();
private readonly ConcurrentDictionary<Task, bool> _runningTasks = new();
private readonly Thread _managerThread;
private readonly CancellationTokenSource _cts = new();
private readonly ICpuMonitor _cpuMonitor;
internal readonly ManualResetEvent ManagerLoopCycle = new(false);
private object _managerLock = new();

private Task _managerTask;
private double _smoothedCpu = 0;
private int _targetConcurrency = 1;
private double _lastAverageDuration;

public ProcessingMetrics Metrics { get; } = new();
public bool IsPaused { get; private set; }

public SmartDataProcessor(double maxCpuUsage = 100)
// --- Events ---
public event Action<Exception> OnException;
public event Action<double> OnCpuUsageChange;

/// <summary>
/// Creates a new SmartDataProcessor with default settings.
/// </summary>
public SmartDataProcessor() : this(new SmartDataProcessorSettings())
{
_maxCpuUsage = Math.Max(maxCpuUsage - CpuHeadroomBuffer, CpuHeadroomBuffer);
}

if (OperatingSystem.IsWindows())
{
_cpuMonitor = new WindowsCpuMonitor();
}
else if (OperatingSystem.IsLinux())
{
_cpuMonitor = new LinuxCpuMonitor();
}
else if (OperatingSystem.IsMacOS())
/// <summary>
/// Creates a new SmartDataProcessor with the specified maximum CPU usage.
/// </summary>
/// <param name="maxCpuUsage"> The maximum CPU usage percentage (0-100) to target.</param>
public SmartDataProcessor(double maxCpuUsage) : this(new SmartDataProcessorSettings { MaxCpuUsage = maxCpuUsage })
{
}

/// <summary>
/// Creates a new SmartDataProcessor with the specified settings.
/// </summary>
/// <param name="settings">The settings to use for this processor.</param>
public SmartDataProcessor(SmartDataProcessorSettings settings)

Check warning on line 75 in src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessor.cs

View workflow job for this annotation

GitHub Actions / build

Non-nullable event 'OnCpuUsageChange' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the event as nullable.

Check warning on line 75 in src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessor.cs

View workflow job for this annotation

GitHub Actions / build

Non-nullable event 'OnException' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the event as nullable.

Check warning on line 75 in src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessor.cs

View workflow job for this annotation

GitHub Actions / build

Non-nullable field '_managerTask' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the field as nullable.

Check warning on line 75 in src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessor.cs

View workflow job for this annotation

GitHub Actions / build

Non-nullable event 'OnCpuUsageChange' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the event as nullable.

Check warning on line 75 in src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessor.cs

View workflow job for this annotation

GitHub Actions / build

Non-nullable event 'OnException' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the event as nullable.

Check warning on line 75 in src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessor.cs

View workflow job for this annotation

GitHub Actions / build

Non-nullable field '_managerTask' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the field as nullable.
{
_settings = settings;
_maxCpuUsage = Math.Max(_settings.MaxCpuUsage - CpuHeadroomBuffer, CpuHeadroomBuffer);

bool useCpuMonitoring = _settings.MaxCpuUsage < 100;
if (useCpuMonitoring)
{
_cpuMonitor = new MacCpuMonitor();
if (OperatingSystem.IsWindows()) _cpuMonitor = new WindowsCpuMonitor();
else if (OperatingSystem.IsLinux()) _cpuMonitor = new LinuxCpuMonitor();
else if (OperatingSystem.IsMacOS()) _cpuMonitor = new MacCpuMonitor();
else _cpuMonitor = new NullCpuMonitor();
}
else
{
_cpuMonitor = new NullCpuMonitor();
}

_managerThread = new Thread(ManagerLoop) { IsBackground = true };
_managerThread.Start();
}

internal SmartDataProcessor(double maxCpuUsage, ICpuMonitor cpuMonitor)
internal SmartDataProcessor(SmartDataProcessorSettings settings, ICpuMonitor cpuMonitor)

Check warning on line 94 in src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessor.cs

View workflow job for this annotation

GitHub Actions / build

Non-nullable field '_managerTask' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the field as nullable.

Check warning on line 94 in src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessor.cs

View workflow job for this annotation

GitHub Actions / build

Non-nullable event 'OnCpuUsageChange' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the event as nullable.

Check warning on line 94 in src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessor.cs

View workflow job for this annotation

GitHub Actions / build

Non-nullable event 'OnException' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the event as nullable.

Check warning on line 94 in src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessor.cs

View workflow job for this annotation

GitHub Actions / build

Non-nullable field '_managerTask' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the field as nullable.
{
_maxCpuUsage = maxCpuUsage;
_settings = settings;
_maxCpuUsage = Math.Max(_settings.MaxCpuUsage - CpuHeadroomBuffer, CpuHeadroomBuffer);
_cpuMonitor = cpuMonitor;

_managerThread = new Thread(ManagerLoop) { IsBackground = true };
_managerThread.Start();
}

/// <summary>
/// Pauses the processing of new items.
/// </summary>
public void Pause() => IsPaused = true;

/// <summary>
/// Resumes the processing of new items.
/// </summary>
public void Resume() => IsPaused = false;

/// <summary>
/// Enqueues a data item for processing. If the CPU is saturated or the queue is overloaded,
/// this method will block until it is safe to enqueue the item.
/// </summary>
/// <param name="data"></param>
/// <param name="action"></param>
public void EnqueueOrWait(T data, Action<T> action)
{
LazyInitializer.EnsureInitialized(ref _managerTask, ref _managerLock, () => Task.Run(ManagerLoopAsync));

while (true)
{
bool isCpuSaturated = _smoothedCpu > _maxCpuUsage && _cpuMonitor is not NullCpuMonitor;
bool isQueueOverloaded = _jobs.Count > _targetConcurrency * QueueBufferMultiplier;
bool isQueueOverloaded = _jobs.Count > _targetConcurrency * _settings.QueueBufferMultiplier;

if (!isCpuSaturated && !isQueueOverloaded)
{
Expand All @@ -102,41 +134,71 @@
_jobs.Enqueue((data, action));
}

/// <summary>
/// Waits for all currently queued and running jobs to complete.
/// </summary>
/// <returns></returns>
public async Task WaitForAllAsync()
{
while (!_jobs.IsEmpty)
{
await Task.Delay(50);
}

await Task.WhenAll(_runningTasks.Keys.ToArray());
if (_managerTask != null)
{
await Task.WhenAll(_runningTasks.Keys.ToArray());
}
}

/// <summary>
/// Disposes the processor, stopping all management and worker tasks.
/// Note: This does not cancel running jobs; it only stops accepting new ones and waits
/// for current jobs to finish.
/// </summary>
public void Dispose()
{
_cts.Cancel();
_managerThread.Join();

try
{
_managerTask?.Wait();
}
catch (AggregateException ex) when (ex.InnerExceptions.All(e => e is TaskCanceledException))
{
// This is expected when the task is cancelled. We can ignore it.
}
finally
{
_cts.Dispose();
}
}

/// <summary>
/// The main management loop that coordinates concurrency adjustments and task launching.
/// </summary>
private void ManagerLoop()
private async Task ManagerLoopAsync()
{
while (!_cts.IsCancellationRequested)
{
try
{
UpdateConcurrency();
LaunchWorkerTasks();
if (!IsPaused)
{
UpdateConcurrency();
LaunchWorkerTasks();
}

await Task.Delay(ManagerLoopIntervalMs, _cts.Token);
}
catch (TaskCanceledException)
{
break;
}
catch (Exception ex)
{
Console.WriteLine($"Error in ManagerLoop: {ex.Message}");
OnException?.Invoke(ex);
}

ManagerLoopCycle.Set();
Thread.Sleep(ManagerLoopIntervalMs);
}
}

Expand All @@ -148,6 +210,16 @@
double cpuUsage = _cpuMonitor.GetCpuUsage();
_smoothedCpu = (SmoothingFactor * cpuUsage) + (1 - SmoothingFactor) * _smoothedCpu;
Metrics.UpdateSmoothedCpu(_smoothedCpu);
OnCpuUsageChange?.Invoke(_smoothedCpu);

int maxConcurrency = _settings.MaxDegreeOfParallelism ?? Environment.ProcessorCount;

// If CPU monitoring is disabled, just max out the concurrency.
if (_cpuMonitor is NullCpuMonitor)
{
_targetConcurrency = maxConcurrency;
return;
}

// --- Concurrency Decrease Logic ---
bool isCpuAboveMax = _smoothedCpu > _maxCpuUsage;
Expand All @@ -162,10 +234,9 @@

// --- Concurrency Increase Logic ---
bool hasCpuHeadroom = _smoothedCpu < _maxCpuUsage - CpuHeadroomBuffer;
bool isMonitorDisabled = _cpuMonitor is NullCpuMonitor;
bool canIncreaseConcurrency = _targetConcurrency < Environment.ProcessorCount;
bool canIncreaseConcurrency = _targetConcurrency < maxConcurrency;

if ((hasCpuHeadroom || isMonitorDisabled) && canIncreaseConcurrency)
if (hasCpuHeadroom && canIncreaseConcurrency)
{
// Check if adding threads is causing performance to degrade (contention).
bool isJobDurationIncreasing = Metrics.AvgTaskTime > _lastAverageDuration;
Expand All @@ -181,14 +252,13 @@
// If jobs are very short, we can be more aggressive in scaling up.
bool areJobsShort = Metrics.AvgTaskTime > 0 && Metrics.AvgTaskTime < ShortJobThresholdMs;
int increaseAmount = areJobsShort ? 2 : 1;
_targetConcurrency = Math.Min(_targetConcurrency + increaseAmount, Environment.ProcessorCount);
_targetConcurrency = Math.Min(_targetConcurrency + increaseAmount, maxConcurrency);
}
}

_lastAverageDuration = Metrics.AvgTaskTime;
}


/// <summary>
/// Launches new tasks from the queue to meet the target concurrency level.
/// </summary>
Expand Down Expand Up @@ -216,12 +286,17 @@
{
job.Action(job.Data);
}
catch (Exception ex)
{
OnException?.Invoke(ex);
}
finally
{
sw.Stop();
Metrics.AddJobDuration(sw.Elapsed.TotalMilliseconds);
}
});

_runningTasks.TryAdd(task, true);
}
else
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
namespace SimpliSharp.Utilities.Process;

/// <summary>
/// Settings for the SmartDataProcessor.
/// </summary>
public class SmartDataProcessorSettings
{
/// <summary>
/// The target maximum CPU usage percentage (0-100).
/// If set to 100 or more, CPU monitoring will be disabled,
/// and the processor will scale to the maximum number of threads.
/// </summary>
public double MaxCpuUsage { get; set; } = 100;

/// <summary>
/// An optional value to manually set the maximum number of concurrent threads.
/// If not provided, it defaults to Environment.ProcessorCount.
/// </summary>
public int? MaxDegreeOfParallelism { get; set; }

/// <summary>
/// A multiplier to determine the queue size limit for backpressure,
/// based on the current number of workers.
/// </summary>
public int QueueBufferMultiplier { get; set; } = 2;
}
Loading