From d0767e0b6221369189e890a90c6ea04a17d1d466 Mon Sep 17 00:00:00 2001 From: Peter Date: Wed, 3 Apr 2019 21:35:53 +0200 Subject: [PATCH 1/4] Grouped store operations by partition key --- .../StorageContext.cs | 67 +++++++++++-------- 1 file changed, 38 insertions(+), 29 deletions(-) diff --git a/CoreHelpers.WindowsAzure.Storage.Table/StorageContext.cs b/CoreHelpers.WindowsAzure.Storage.Table/StorageContext.cs index bd6fdac..98d9d61 100644 --- a/CoreHelpers.WindowsAzure.Storage.Table/StorageContext.cs +++ b/CoreHelpers.WindowsAzure.Storage.Table/StorageContext.cs @@ -323,39 +323,48 @@ private string GetTableName(Type entityType) // define the modelcounter int modelCounter = 0; + // batch operations must be in the same partition + var partitions = models.Select(m => new DynamicTableEntity(m, entityMapper)).GroupBy(m => m.PartitionKey); + // Add all items - foreach (var model in models) - { - switch (storaeOperationType) - { - case nStoreOperation.insertOperation: - currentBatch.Insert(new DynamicTableEntity(model, entityMapper)); - break; - case nStoreOperation.insertOrReplaceOperation: - currentBatch.InsertOrReplace(new DynamicTableEntity(model, entityMapper)); - break; - case nStoreOperation.mergeOperation: - currentBatch.Merge(new DynamicTableEntity(model, entityMapper)); - break; - case nStoreOperation.mergeOrInserOperation: - currentBatch.InsertOrMerge(new DynamicTableEntity(model, entityMapper)); - break; - case nStoreOperation.delete: - currentBatch.Delete(new DynamicTableEntity(model, entityMapper)); - break; - } + foreach (var partition in partitions) + { + currentBatch = new TableBatchOperation(); + batchOperations.Add(currentBatch); - modelCounter++; + foreach (var dynamicEntity in partition) + { + switch (storaeOperationType) + { + case nStoreOperation.insertOperation: + currentBatch.Insert(dynamicEntity); + break; + case nStoreOperation.insertOrReplaceOperation: + currentBatch.InsertOrReplace(dynamicEntity); + break; + case nStoreOperation.mergeOperation: + currentBatch.Merge(dynamicEntity); + break; + case nStoreOperation.mergeOrInserOperation: + currentBatch.InsertOrMerge(dynamicEntity); + break; + case nStoreOperation.delete: + currentBatch.Delete(dynamicEntity); + break; + } - if (modelCounter % 100 == 0) - { - currentBatch = new TableBatchOperation(); - batchOperations.Add(currentBatch); - } - } + modelCounter++; + + if (modelCounter % 100 == 0) + { + currentBatch = new TableBatchOperation(); + batchOperations.Add(currentBatch); + } + } + } - // execute - foreach (var createdBatch in batchOperations) + // execute + foreach (var createdBatch in batchOperations) { if (createdBatch.Count() > 0) { From a4af89ae9ed2c79f69989373ddb1fd55132570c6 Mon Sep 17 00:00:00 2001 From: Peter Date: Sat, 15 Jun 2019 11:01:06 +0200 Subject: [PATCH 2/4] Added parallel connections in StoreAsync --- .../UC10CreateHugeAmountOfDemoEntries.cs | 2 +- .../Program.cs | 15 +-- ...rs.WindowsAzure.Storage.Table.Net45.csproj | 3 + .../Models/ParallelConnectionsOptions.cs | 21 ++++ .../StorageContext.cs | 98 ++++++++++++++----- 5 files changed, 107 insertions(+), 32 deletions(-) create mode 100644 CoreHelpers.WindowsAzure.Storage.Table/Models/ParallelConnectionsOptions.cs diff --git a/CoreHelpers.WindowsAzure.Storage.Table.Demo/DemoCases/UC10CreateHugeAmountOfDemoEntries.cs b/CoreHelpers.WindowsAzure.Storage.Table.Demo/DemoCases/UC10CreateHugeAmountOfDemoEntries.cs index 8cbd322..4380df2 100644 --- a/CoreHelpers.WindowsAzure.Storage.Table.Demo/DemoCases/UC10CreateHugeAmountOfDemoEntries.cs +++ b/CoreHelpers.WindowsAzure.Storage.Table.Demo/DemoCases/UC10CreateHugeAmountOfDemoEntries.cs @@ -25,7 +25,7 @@ public async Task Execute(string storageKey, string storageSecret, string endpoi // create 2000 items var data = new List(); - for (int i = 0; i < 2000; i++) + for (int i = 0; i < 20000; i++) data.Add(new HugeDemoEntry()); await storageContext.EnableAutoCreateTable().MergeOrInsertAsync(data); diff --git a/CoreHelpers.WindowsAzure.Storage.Table.Demo/Program.cs b/CoreHelpers.WindowsAzure.Storage.Table.Demo/Program.cs index 0fc11ac..2b41fc8 100644 --- a/CoreHelpers.WindowsAzure.Storage.Table.Demo/Program.cs +++ b/CoreHelpers.WindowsAzure.Storage.Table.Demo/Program.cs @@ -24,7 +24,7 @@ static async Task Main(string[] args) // register all demo cases var cases = new List { - /new UC01StoreWithStaticEntityMapper(), + //new UC01StoreWithStaticEntityMapper(), // new UC02StoreWithAttributeMapper(), // new UC03StoreWithAttributeMapperManualRegistration(), // new UC04GetVirtualArray(), @@ -33,7 +33,7 @@ static async Task Main(string[] args) // new UC07CreateModelsPaged(), // new UC08CheckMaxItems(), // new UC09ReadInterfaceValues(), - // new UC10CreateHugeAmountOfDemoEntries(), + new UC10CreateHugeAmountOfDemoEntries(), // new UC11ReadPageByPage(), // new UC12PartialUpdateMergeModel(), // new UC13DynamicallyCreateList(), @@ -42,7 +42,7 @@ static async Task Main(string[] args) // new UC16Backup() // new UC17Restore() // new UC18DateTime() - new UC19QueryFilter() + // new UC19QueryFilter() }; // register demo cases for Ger Cloud @@ -54,10 +54,11 @@ static async Task Main(string[] args) // execute in WW cloud Console.WriteLine("Executing Demo Cases (WW Cloud)"); foreach (var useCase in cases) - await useCase.Execute(config.GetValue("key").ToString(), config.GetValue("secret").ToString()); - - // execute in GER cloud - /*Console.WriteLine("Executing Demo Cases (GER Cloud)"); + await useCase.Execute(config.GetValue("key").ToString(), config.GetValue("secret").ToString()); + + Console.ReadKey(); + // execute in GER cloud + /*Console.WriteLine("Executing Demo Cases (GER Cloud)"); foreach (var useCase in casesGer) await useCase.Execute(config.GetValue("keyde").ToString(), config.GetValue("secretde").ToString(), "core.cloudapi.de"); */ } diff --git a/CoreHelpers.WindowsAzure.Storage.Table.Net45/CoreHelpers.WindowsAzure.Storage.Table.Net45.csproj b/CoreHelpers.WindowsAzure.Storage.Table.Net45/CoreHelpers.WindowsAzure.Storage.Table.Net45.csproj index 4201084..3b0e95e 100644 --- a/CoreHelpers.WindowsAzure.Storage.Table.Net45/CoreHelpers.WindowsAzure.Storage.Table.Net45.csproj +++ b/CoreHelpers.WindowsAzure.Storage.Table.Net45/CoreHelpers.WindowsAzure.Storage.Table.Net45.csproj @@ -116,6 +116,9 @@ Extensions\PropertyInfoSetValueFromEntityProperty.cs + + Models\ParallelConnectionsOptions.cs + PagedTableEntityWriter.cs diff --git a/CoreHelpers.WindowsAzure.Storage.Table/Models/ParallelConnectionsOptions.cs b/CoreHelpers.WindowsAzure.Storage.Table/Models/ParallelConnectionsOptions.cs new file mode 100644 index 0000000..ced5c17 --- /dev/null +++ b/CoreHelpers.WindowsAzure.Storage.Table/Models/ParallelConnectionsOptions.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace CoreHelpers.WindowsAzure.Storage.Table.Models +{ + public class ParallelConnectionsOptions + { + + public static ParallelConnectionsOptions Default => new ParallelConnectionsOptions() + { + RunInParallel = true, + MaxDegreeOfParallelism = 20 + }; + + public bool RunInParallel { get; set; } + + public int MaxDegreeOfParallelism { get; set; } + + } +} diff --git a/CoreHelpers.WindowsAzure.Storage.Table/StorageContext.cs b/CoreHelpers.WindowsAzure.Storage.Table/StorageContext.cs index 98d9d61..756f391 100644 --- a/CoreHelpers.WindowsAzure.Storage.Table/StorageContext.cs +++ b/CoreHelpers.WindowsAzure.Storage.Table/StorageContext.cs @@ -299,7 +299,7 @@ private string GetTableName(Type entityType) return entityMapper.TableName; } - public async Task StoreAsync(nStoreOperation storaeOperationType, IEnumerable models) where T : new() + public async Task StoreAsync(nStoreOperation storaeOperationType, IEnumerable models, ParallelConnectionsOptions parallelOptions = null) where T : new() { try { @@ -319,18 +319,34 @@ private string GetTableName(Type entityType) // lookup the entitymapper var entityMapper = _entityMapperRegistry[typeof(T)]; - - // define the modelcounter - int modelCounter = 0; - + // batch operations must be in the same partition var partitions = models.Select(m => new DynamicTableEntity(m, entityMapper)).GroupBy(m => m.PartitionKey); - // Add all items + var batchTasks = new List>>(); +#if DEBUG + var stopWatch = new System.Diagnostics.Stopwatch(); + stopWatch.Start(); +#endif + if (parallelOptions == null) + parallelOptions = ParallelConnectionsOptions.Default; + + if (parallelOptions.RunInParallel && _autoCreateTable) + { + // try to create the table if we are parallel processing the catch/retry mechanism fails + await CreateTableAsync(true); + } + + + // Add all items foreach (var partition in partitions) { + if (parallelOptions.RunInParallel && currentBatch != null) + batchTasks.Add(table.ExecuteBatchAsync(currentBatch)); + currentBatch = new TableBatchOperation(); - batchOperations.Add(currentBatch); + if (!parallelOptions.RunInParallel) + batchOperations.Add(currentBatch); foreach (var dynamicEntity in partition) { @@ -353,30 +369,64 @@ private string GetTableName(Type entityType) break; } - modelCounter++; - if (modelCounter % 100 == 0) + if (currentBatch.Count == 100) { + if (parallelOptions.RunInParallel && currentBatch != null) + batchTasks.Add(table.ExecuteBatchAsync(currentBatch)); + currentBatch = new TableBatchOperation(); - batchOperations.Add(currentBatch); + if (!parallelOptions.RunInParallel) + batchOperations.Add(currentBatch); + } + + if (parallelOptions.RunInParallel && batchTasks.Count >= parallelOptions.MaxDegreeOfParallelism) + { + var taskResults = await Task.WhenAll(batchTasks); + if (_delegate != null) + foreach (var taskResult in taskResults) + _delegate.OnStored(typeof(T), storaeOperationType, taskResult.Count, null); + + batchTasks.Clear(); } } } + if (parallelOptions.RunInParallel) + { + var taskResults = await Task.WhenAll(batchTasks); + if (_delegate != null) + foreach (var taskResult in taskResults) + _delegate.OnStored(typeof(T), storaeOperationType, taskResult.Count, null); + } + else + { + // execute + foreach (var createdBatch in batchOperations) + { + if (createdBatch.Count() > 0) + { + await table.ExecuteBatchAsync(createdBatch); - // execute - foreach (var createdBatch in batchOperations) - { - if (createdBatch.Count() > 0) - { - await table.ExecuteBatchAsync(createdBatch); - - // notify delegate - if (_delegate != null) - _delegate.OnStored(typeof(T), storaeOperationType, createdBatch.Count(), null); - } - } - } - catch (StorageException ex) + // notify delegate + if (_delegate != null) + _delegate.OnStored(typeof(T), storaeOperationType, createdBatch.Count, null); + } + } + } + +#if DEBUG + stopWatch.Stop(); + // Get the elapsed time as a TimeSpan value. + TimeSpan ts = stopWatch.Elapsed; + + // Format and display the TimeSpan value. + string elapsedTime = String.Format("{0:00}:{1:00}:{2:00}.{3:00}", + ts.Hours, ts.Minutes, ts.Seconds, + ts.Milliseconds / 10); + Console.WriteLine("RunTime " + elapsedTime); +#endif + } + catch (StorageException ex) { // check the exception if (!_autoCreateTable || !ex.Message.StartsWith("0:The table specified does not exist", StringComparison.CurrentCulture)) From b8a4f1de09f5c1f791a3f60bf847a8cfacf6f344 Mon Sep 17 00:00:00 2001 From: Peter Date: Sat, 15 Jun 2019 11:52:40 +0200 Subject: [PATCH 3/4] missing space for auto merge --- CoreHelpers.WindowsAzure.Storage.Table.Demo/Program.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CoreHelpers.WindowsAzure.Storage.Table.Demo/Program.cs b/CoreHelpers.WindowsAzure.Storage.Table.Demo/Program.cs index 2b41fc8..4b9fdac 100644 --- a/CoreHelpers.WindowsAzure.Storage.Table.Demo/Program.cs +++ b/CoreHelpers.WindowsAzure.Storage.Table.Demo/Program.cs @@ -24,7 +24,7 @@ static async Task Main(string[] args) // register all demo cases var cases = new List { - //new UC01StoreWithStaticEntityMapper(), + // new UC01StoreWithStaticEntityMapper(), // new UC02StoreWithAttributeMapper(), // new UC03StoreWithAttributeMapperManualRegistration(), // new UC04GetVirtualArray(), From c5c950889e6c6d60c414f0a8e6fb520eea1a7b55 Mon Sep 17 00:00:00 2001 From: Peter Date: Mon, 24 Jun 2019 14:16:42 +0200 Subject: [PATCH 4/4] Small bug fixes and tiny performance and logical fixes (when to add to lists) --- .../StorageContext.cs | 58 ++++++++----------- 1 file changed, 24 insertions(+), 34 deletions(-) diff --git a/CoreHelpers.WindowsAzure.Storage.Table/StorageContext.cs b/CoreHelpers.WindowsAzure.Storage.Table/StorageContext.cs index 756f391..52e15c3 100644 --- a/CoreHelpers.WindowsAzure.Storage.Table/StorageContext.cs +++ b/CoreHelpers.WindowsAzure.Storage.Table/StorageContext.cs @@ -308,14 +308,13 @@ private string GetTableName(Type entityType) _delegate.OnStoring(typeof(T), storaeOperationType); // Retrieve a reference to the table. - CloudTable table = GetTableReference(GetTableName()); + var table = GetTableReference(GetTableName()); // Create the batch operation. - List batchOperations = new List(); - - // Create the first batch - var currentBatch = new TableBatchOperation(); - batchOperations.Add(currentBatch); + var batchOperations = new List(); + + // Allocate batch variable + var currentBatch = default(TableBatchOperation); // lookup the entitymapper var entityMapper = _entityMapperRegistry[typeof(T)]; @@ -324,10 +323,7 @@ private string GetTableName(Type entityType) var partitions = models.Select(m => new DynamicTableEntity(m, entityMapper)).GroupBy(m => m.PartitionKey); var batchTasks = new List>>(); -#if DEBUG - var stopWatch = new System.Diagnostics.Stopwatch(); - stopWatch.Start(); -#endif + if (parallelOptions == null) parallelOptions = ParallelConnectionsOptions.Default; @@ -341,8 +337,6 @@ private string GetTableName(Type entityType) // Add all items foreach (var partition in partitions) { - if (parallelOptions.RunInParallel && currentBatch != null) - batchTasks.Add(table.ExecuteBatchAsync(currentBatch)); currentBatch = new TableBatchOperation(); if (!parallelOptions.RunInParallel) @@ -350,6 +344,17 @@ private string GetTableName(Type entityType) foreach (var dynamicEntity in partition) { + if (currentBatch.Count == 100) + { + if (parallelOptions.RunInParallel) + batchTasks.Add(table.ExecuteBatchAsync(currentBatch)); + + currentBatch = new TableBatchOperation(); + + if (!parallelOptions.RunInParallel) + batchOperations.Add(currentBatch); + } + switch (storaeOperationType) { case nStoreOperation.insertOperation: @@ -370,16 +375,6 @@ private string GetTableName(Type entityType) } - if (currentBatch.Count == 100) - { - if (parallelOptions.RunInParallel && currentBatch != null) - batchTasks.Add(table.ExecuteBatchAsync(currentBatch)); - - currentBatch = new TableBatchOperation(); - if (!parallelOptions.RunInParallel) - batchOperations.Add(currentBatch); - } - if (parallelOptions.RunInParallel && batchTasks.Count >= parallelOptions.MaxDegreeOfParallelism) { var taskResults = await Task.WhenAll(batchTasks); @@ -390,9 +385,15 @@ private string GetTableName(Type entityType) batchTasks.Clear(); } } + + if (parallelOptions.RunInParallel && currentBatch != null && currentBatch.Any()) + batchTasks.Add(table.ExecuteBatchAsync(currentBatch)); + } + if (parallelOptions.RunInParallel) { + var taskResults = await Task.WhenAll(batchTasks); if (_delegate != null) foreach (var taskResult in taskResults) @@ -413,18 +414,7 @@ private string GetTableName(Type entityType) } } } - -#if DEBUG - stopWatch.Stop(); - // Get the elapsed time as a TimeSpan value. - TimeSpan ts = stopWatch.Elapsed; - - // Format and display the TimeSpan value. - string elapsedTime = String.Format("{0:00}:{1:00}:{2:00}.{3:00}", - ts.Hours, ts.Minutes, ts.Seconds, - ts.Milliseconds / 10); - Console.WriteLine("RunTime " + elapsedTime); -#endif + } catch (StorageException ex) {