From fbdff796b0629247ad76484c02e85ba0d92231bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cosmin=20Cre=C8=9Bu?= Date: Sun, 7 Sep 2025 11:23:12 +0100 Subject: [PATCH 1/2] main --- NUGET.md | 7 +- README.md | 2 +- samples/SimpliSharp.Demo/Program.cs | 80 +++++++++++++++---- .../SmartDataProcessor/SmartDataProcessor.cs | 14 ++-- 4 files changed, 74 insertions(+), 29 deletions(-) diff --git a/NUGET.md b/NUGET.md index 51cd8b3..d1c0e32 100644 --- a/NUGET.md +++ b/NUGET.md @@ -1,6 +1,5 @@ -![SimpliSharp](https://github.com/cretucosmin3/SimpliSharp/blob/main/assets/simpli-sharp-dark.png?raw=true) - -**SimpliSharp is a C# utility library designed to streamline development with useful extensions, helpers, data processing tools, and logging helpers.** +## SimpliSharp +**A utility library designed to streamline development with useful extensions, helpers, data processing tools, and logging helpers.** [![.NET](https://github.com/cretucosmin3/SimpliSharp/actions/workflows/dotnet.yml/badge.svg)](https://github.com/cretucosmin3/SimpliSharp/actions/workflows/dotnet.yml)[![GitHub last commit](https://img.shields.io/github/last-commit/cretucosmin3/SimpliSharp.svg)](https://github.com/cretucosmin3/SimpliSharp/commits/main) [![GitHub stars](https://img.shields.io/github/stars/cretucosmin3/SimpliSharp.svg)](https://github.com/cretucosmin3/SimpliSharp/stargazers) @@ -40,7 +39,7 @@ processor.OnException += (ex) => Console.WriteLine($"An error occurred: {ex.Mess // Enqueue items for (int i = 0; i < 100; i++) { - processor.EnqueueOrWait(i, data => + processor.EnqueueOrWaitAsync(i, data => { // Your processing logic here... }); diff --git a/README.md b/README.md index 34dcfe0..14a2a5c 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,7 @@ processor.OnException += (ex) => Console.WriteLine($"An error occurred: {ex.Mess // Enqueue items for (int i = 0; i < 100; i++) { - processor.EnqueueOrWait(i, data => + processor.EnqueueOrWaitAsync(i, data => { // Your processing logic here... }); diff --git a/samples/SimpliSharp.Demo/Program.cs b/samples/SimpliSharp.Demo/Program.cs index c6607e2..ae6c666 100644 --- a/samples/SimpliSharp.Demo/Program.cs +++ b/samples/SimpliSharp.Demo/Program.cs @@ -1,4 +1,5 @@ -using SimpliSharp.Extensions.Batch; +using System.Threading.Tasks.Dataflow; +using SimpliSharp.Extensions.Batch; using SimpliSharp.Utilities.Process; Console.WriteLine("SimpliSharp Demo Application"); @@ -6,8 +7,9 @@ Console.WriteLine("Available Demos:"); Console.WriteLine("1. SmartDataProcessor Example"); -Console.WriteLine("2. Enumerable.Batch"); -Console.WriteLine("3. Enumerable.BatchSliding"); +Console.WriteLine("2. ActionBlock Example (from TPL Dataflow)"); +Console.WriteLine("3. Enumerable.Batch"); +Console.WriteLine("4. Enumerable.BatchSliding"); Console.WriteLine("[Enter] to exit"); @@ -17,14 +19,18 @@ { case ConsoleKey.D1: case ConsoleKey.NumPad1: - SmartDataProcessor_Example(); + SmartDataProcessor_Example().Wait(); break; case ConsoleKey.D2: case ConsoleKey.NumPad2: - EnumerableBatch_Example(); + ActionBlock_Example().Wait(); break; case ConsoleKey.D3: case ConsoleKey.NumPad3: + EnumerableBatch_Example(); + break; + case ConsoleKey.D4: + case ConsoleKey.NumPad4: EnumerableBatchSliding_Example(); break; default: @@ -36,31 +42,29 @@ Console.WriteLine("Press any key to exit..."); Console.ReadKey(); -static void SmartDataProcessor_Example() +static async Task SmartDataProcessor_Example() { - Console.Clear(); Console.WriteLine("Starting data processing..."); var settings = new SmartDataProcessorSettings { - MaxDegreeOfParallelism = 1 + MaxCpuUsage = 95 }; using var processor = new SmartDataProcessor(settings); var stopwatch = System.Diagnostics.Stopwatch.StartNew(); - var tasksCount = 200; + var tasksCount = 1000; for (int i = 0; i < tasksCount; i++) { int line = i; - processor.EnqueueOrWait(line, data => + await processor.EnqueueOrWaitAsync(line, data => { - int simMax = Random.Shared.Next(5_000_000, 20_000_000); double sum = 0; - for (int j = 0; j < simMax; j++) + for (int j = 0; j < 10_000_000; j++) { double value = Math.Sqrt(j) * Math.Sin(j % 360) + Math.Log(j + 1); if (value > 1000) @@ -91,13 +95,56 @@ static void SmartDataProcessor_Example() Console.WriteLine("All processing done"); } -static void EnumerableBatch_Example() +static async Task ActionBlock_Example() { - Console.Clear(); - Console.WriteLine("Starting Enumerable.Batch demo..."); + Console.WriteLine("Starting data processing with ActionBlock..."); + + var tasksCount = 1000; + var stopwatch = System.Diagnostics.Stopwatch.StartNew(); + + Action processAction = data => + { + double sum = 0; + for (int j = 0; j < 10_000_000; j++) + { + double value = Math.Sqrt(j) * Math.Sin(j % 360) + Math.Log(j + 1); + if (value > 1000) + sum -= value / 3.0; + else + sum += value * 2.5; + } + }; + + var executionOptions = new ExecutionDataflowBlockOptions + { + MaxDegreeOfParallelism = Environment.ProcessorCount + }; + + var actionBlock = new ActionBlock(processAction, executionOptions); - // yields: [ ["Red", "Blue"], ["Purple", "Black"], ["Yellow", "Pink"] ] + for (int i = 0; i < tasksCount; i++) + { + await actionBlock.SendAsync(i); + + Console.SetCursorPosition(0, Console.CursorTop); + Console.Write($"Posting item {i + 1} of {tasksCount}"); + } + + Console.WriteLine("\nAll items have been posted. Waiting for processing to complete..."); + + actionBlock.Complete(); + + await actionBlock.Completion; + stopwatch.Stop(); + + Console.WriteLine($"\nProcessing completed in {stopwatch.Elapsed.TotalSeconds:F2} seconds"); + Console.WriteLine("All processing done."); +} + +static void EnumerableBatch_Example() +{ + Console.WriteLine("Starting Enumerable.Batch demo..."); string[] sample = ["Red", "Blue", "Purple", "Black", "Yellow", "Pink"]; int batchSize = 3; @@ -117,7 +164,6 @@ static void EnumerableBatch_Example() static void EnumerableBatchSliding_Example() { - Console.Clear(); Console.WriteLine("Starting Enumerable.BatchSliding demo..."); var numbers = Enumerable.Range(1, 3); diff --git a/src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessor.cs b/src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessor.cs index 8c5dd0e..25a4704 100644 --- a/src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessor.cs +++ b/src/SimpliSharp/Utilities/Process/SmartDataProcessor/SmartDataProcessor.cs @@ -20,7 +20,7 @@ public class SmartDataProcessor : IDisposable /// /// A buffer to keep CPU usage below the absolute maximum, allowing for scaling. /// - private const double CpuHeadroomBuffer = 5; + private const double CpuHeadroomBuffer = 2; /// /// Threshold in milliseconds to consider a job "short" for faster concurrency scaling. @@ -41,7 +41,7 @@ public class SmartDataProcessor : IDisposable private readonly ICpuMonitor _cpuMonitor; private object _managerLock = new(); - private Task _managerTask; + private Task? _managerTask; private double _smoothedCpu = 0; private int _targetConcurrency = 1; private double _lastAverageDuration; @@ -110,11 +110,11 @@ internal SmartDataProcessor(SmartDataProcessorSettings settings, ICpuMonitor cpu /// /// Enqueues a data item for processing. If the CPU is saturated or the queue is overloaded, - /// this method will block until it is safe to enqueue the item. + /// this method will wait until it is safe to enqueue the item. /// - /// - /// - public void EnqueueOrWait(T data, Action action) + /// Data to be processed + /// Action to process the data with + public async Task EnqueueOrWaitAsync(T data, Action action) { LazyInitializer.EnsureInitialized(ref _managerTask, ref _managerLock, () => Task.Run(ManagerLoopAsync)); @@ -128,7 +128,7 @@ public void EnqueueOrWait(T data, Action action) break; } - Thread.Sleep(10); + await Task.Delay(5); } _jobs.Enqueue((data, action)); From 6b12622f3e28811db344dd04f0409737f3bcbd3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cosmin=20Cre=C8=9Bu?= Date: Sun, 7 Sep 2025 11:28:07 +0100 Subject: [PATCH 2/2] Updating unit tests --- .../SmartDataProcessorTests.cs | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/SimpliSharp.Tests/Utilities/Process/SmartDataProcessor/SmartDataProcessorTests.cs b/tests/SimpliSharp.Tests/Utilities/Process/SmartDataProcessor/SmartDataProcessorTests.cs index ddcff2d..11dec00 100644 --- a/tests/SimpliSharp.Tests/Utilities/Process/SmartDataProcessor/SmartDataProcessorTests.cs +++ b/tests/SimpliSharp.Tests/Utilities/Process/SmartDataProcessor/SmartDataProcessorTests.cs @@ -22,7 +22,7 @@ public async Task SmartDataProcessor_ProcessesItems_InOrder() // Act for (int i = 0; i < 10; i++) { - processor.EnqueueOrWait(i, action); + processor.EnqueueOrWaitAsync(i, action).Wait(); } await processor.WaitForAllAsync(); @@ -55,7 +55,7 @@ public async Task SmartDataProcessor_Honors_MaxDegreeOfParallelism() // Act for (int i = 0; i < 5; i++) { - processor.EnqueueOrWait(i, action); + processor.EnqueueOrWaitAsync(i, action).Wait(); } await Task.Delay(100); // Give time for tasks to start @@ -88,12 +88,12 @@ public async Task SmartDataProcessor_Blocks_When_QueueIsFull() }; // Act - processor.EnqueueOrWait(1, action); + processor.EnqueueOrWaitAsync(1, action).Wait(); await Task.Delay(100); // Give the manager loop time to start - processor.EnqueueOrWait(2, action); - processor.EnqueueOrWait(3, action); + processor.EnqueueOrWaitAsync(2, action).Wait(); + processor.EnqueueOrWaitAsync(3, action).Wait(); - var blockedTask = Task.Run(() => processor.EnqueueOrWait(4, action)); + var blockedTask = Task.Run(async () => await processor.EnqueueOrWaitAsync(4, action)); await Task.Delay(100); @@ -126,10 +126,10 @@ public async Task SmartDataProcessor_Blocks_When_CpuIsHigh() // Act cpuMonitor.SetCpuUsage(100); - processor.EnqueueOrWait(1, action); // This will start the manager loop + processor.EnqueueOrWaitAsync(1, action).Wait(); // This will start the manager loop await Task.Delay(100); // Give time for the manager to update the smoothed CPU - var blockedTask = Task.Run(() => processor.EnqueueOrWait(2, action)); + var blockedTask = Task.Run(async () => await processor.EnqueueOrWaitAsync(2, action)); await Task.Delay(100); @@ -164,7 +164,7 @@ public async Task SmartDataProcessor_PauseAndResume_Works() // Act processor.Pause(); - processor.EnqueueOrWait(1, action); + await processor.EnqueueOrWaitAsync(1, action); await Task.Delay(100); // Assert @@ -192,7 +192,7 @@ public async Task SmartDataProcessor_OnException_EventIsFired() }; // Act - processor.EnqueueOrWait(1, action); + processor.EnqueueOrWaitAsync(1, action).Wait(); await processor.WaitForAllAsync(); // Assert