diff --git a/KustoSchemaTools.Tests/DefaultDatabaseWriterTests.cs b/KustoSchemaTools.Tests/DefaultDatabaseWriterTests.cs new file mode 100644 index 0000000..94d509c --- /dev/null +++ b/KustoSchemaTools.Tests/DefaultDatabaseWriterTests.cs @@ -0,0 +1,232 @@ +using Kusto.Data; +using KustoSchemaTools.Changes; +using KustoSchemaTools.Model; +using KustoSchemaTools.Parser; +using KustoSchemaTools.Parser.KustoWriter; +using Microsoft.Extensions.Logging; +using Moq; + +namespace KustoSchemaTools.Tests +{ + public class DefaultDatabaseWriterTests + { + [Fact] + public async Task UpdatePrimary_ShouldRetryUntilChangesSucceed() + { + // Arrange + var sourceDb = new Database { Name = "SourceDb" }; + var targetDb = new Database { Name = "TargetDb" }; + + var logger = new ConsoleLogger(); + var mockClient = new Mock(MockBehavior.Loose, "test"); + + var writer = new TestableDefaultDatabaseWriter(); + + // Create mock changes for the test + writer.Setup(true, false, false); + writer.Setup(true, true); + + // Act + await writer.WriteAsync(sourceDb, targetDb, mockClient.Object, logger); + Assert.Equal(2, writer.GenerateChangesCallCount); + Assert.Equal(2, writer.ApplyChangesToDatabaseCallCount); + Assert.Equal(0, writer.ExceptionCount); + } + + [Fact] + public async Task UpdatePrimary_NoRetryIfAllSuccess() + { + // Arrange + var sourceDb = new Database { Name = "SourceDb" }; + var targetDb = new Database { Name = "TargetDb" }; + var logger = new ConsoleLogger(); + var mockClient = new Mock(MockBehavior.Loose, "test"); + var writer = new TestableDefaultDatabaseWriter(); + + // Create mock changesets for the test + writer.Setup(true, true, true); + writer.Setup(true, true, true); + + // all success + await writer.WriteAsync(sourceDb, targetDb, mockClient.Object, logger); + Assert.Equal(1, writer.GenerateChangesCallCount); + Assert.Equal(1, writer.ApplyChangesToDatabaseCallCount); + Assert.Equal(0, writer.ExceptionCount); + } + + [Fact] + public async Task UpdatePrimary_NoRetryIfAllFail() + { + // Arrange + var sourceDb = new Database { Name = "SourceDb" }; + var targetDb = new Database { Name = "TargetDb" }; + var logger = new ConsoleLogger(); + var mockClient = new Mock(MockBehavior.Loose, "test"); + var writer = new TestableDefaultDatabaseWriter(); + + // Create mock changesets for the test + writer.Setup(false, false, false); + writer.Setup(false, false, false); + + await writer.WriteAsync(sourceDb, targetDb, mockClient.Object, logger); + Assert.Equal(1, writer.GenerateChangesCallCount); + Assert.Equal(1, writer.ApplyChangesToDatabaseCallCount); + Assert.Equal(1, writer.ExceptionCount); + } + + + [Fact] + public async Task UpdatePrimary_ShouldRetryUntilNoSuccessfulChanges() + { + // Arrange + var sourceDb = new Database { Name = "SourceDb" }; + var targetDb = new Database { Name = "TargetDb" }; + var logger = new ConsoleLogger(); + var mockClient = new Mock(MockBehavior.Loose, "test"); + var writer = new TestableDefaultDatabaseWriter(); + + // Create mock changes for the test + writer.Setup(true, false, false); + writer.Setup(true, false); + writer.Setup(false); + writer.Setup(false); // one extra + + // Act + await writer.WriteAsync(sourceDb, targetDb, mockClient.Object, logger); + Assert.Equal(3, writer.GenerateChangesCallCount); + Assert.Equal(3, writer.ApplyChangesToDatabaseCallCount); + Assert.Equal(1, writer.ExceptionCount); + } + + #region Helper Methods + + + static IChange CreateMockChange(string name) + { + var mockChange = new Mock(); + + var scripts = new List { + new DatabaseScriptContainer( + new DatabaseScript(name, 0), + "TestKind" + ) + }; + + mockChange.Setup(c => c.Scripts).Returns(scripts); + return mockChange.Object; + } + + static ScriptExecuteCommandResult CreateMockResult(IChange change, bool isSuccess) + { + return new ScriptExecuteCommandResult + { + OperationId = Guid.NewGuid(), + CommandType = "Script", + Result = isSuccess ? "Completed" : "Failed", + CommandText = change.Scripts.First().Script.Text, + Reason = isSuccess ? null : "Test failure" + }; + } + + #endregion + + #region Test Support Classes + + /// + /// A testable version of DefaultDatabaseWriter that we can use to track calls and control behavior + /// + private class TestableDefaultDatabaseWriter : DefaultDatabaseWriter + { + public Dictionary ResultsCache { get; } = new Dictionary(); + private readonly IList> _generateChangesResults = new List>(); + public int GenerateChangesCallCount { get; private set; } = 0; + public int ApplyChangesToDatabaseCallCount { get; private set; } = 0; + public int ExceptionCount { get; private set; } = 0; + + public void ResetCounts() + { + GenerateChangesCallCount = ApplyChangesToDatabaseCallCount = ExceptionCount = 0; + } + + public void Setup(params bool[] results) + { + var changeSet = new List { }; + foreach (var result in results) + { + var change = CreateMockChange($"change"); + var scriptExecuteResult = CreateMockResult(change, result); + ResultsCache.Add(change, scriptExecuteResult); + changeSet.Add(change); + } + _generateChangesResults.Add(changeSet); + } + + // Override the base method to use our predetermined results + internal override List GenerateChanges(Database targetDb, Database sourceDb, ILogger logger) + { + if (GenerateChangesCallCount >= _generateChangesResults.Count) + { + throw new InvalidOperationException("No more predefined change sets available."); + } + + var changes = _generateChangesResults.ElementAt(GenerateChangesCallCount); + GenerateChangesCallCount++; + return changes; + } + + // Override the protected method to return our predetermined results + internal override Task> ApplyChangesToDatabase(string databaseName, List changes, KustoClient client, ILogger logger) + { + ApplyChangesToDatabaseCallCount++; + var results = new List(); + foreach (var c in changes) + { + var result = ResultsCache[c]; + results.Add(result); + } + return Task.FromResult(results); + } + + // Expose the internal UpdatePrimary method for testing + public Task> TestUpdatePrimary(Database sourceDb, Database targetDb, KustoClient client, ILogger logger) + { + return UpdatePrimary(sourceDb, targetDb, client, logger); + } + + // Override WriteAsync to control exception throwing + public override async Task WriteAsync(Database sourceDb, Database targetDb, KustoClient client, ILogger logger) + { + try + { + await base.WriteAsync(sourceDb, targetDb, client, logger); + } + catch (Exception) + { + ExceptionCount++; + } + } + } + + // Custom console logger implementation + private class ConsoleLogger : ILogger + { + public IDisposable? BeginScope(TState state) where TState : notnull => new DummyDisposable(); + public bool IsEnabled(LogLevel logLevel) => true; + + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func formatter) + { + Console.WriteLine($"[{logLevel}] {formatter(state, exception)}"); + if (exception != null) + { + Console.WriteLine($"Exception: {exception.Message}"); + } + } + + private class DummyDisposable : IDisposable + { + public void Dispose() { } + } + } + #endregion + } +} diff --git a/KustoSchemaTools/Parser/KustoWriter/DefaultDatabaseWriter.cs b/KustoSchemaTools/Parser/KustoWriter/DefaultDatabaseWriter.cs index 52b8db6..125d1ef 100644 --- a/KustoSchemaTools/Parser/KustoWriter/DefaultDatabaseWriter.cs +++ b/KustoSchemaTools/Parser/KustoWriter/DefaultDatabaseWriter.cs @@ -1,4 +1,4 @@ -using Kusto.Data; +using Kusto.Data; using KustoSchemaTools.Changes; using KustoSchemaTools.Model; using KustoSchemaTools.Parser.KustoLoader; @@ -10,16 +10,84 @@ namespace KustoSchemaTools.Parser.KustoWriter { public class DefaultDatabaseWriter : IDBEntityWriter { - public async Task WriteAsync(Database sourceDb, Database targetDb, KustoClient client, ILogger logger) + public virtual async Task WriteAsync(Database sourceDb, Database targetDb, KustoClient client, ILogger logger) { - var changes = DatabaseChanges.GenerateChanges(targetDb, sourceDb, targetDb.Name, logger); - var results = await ApplyChangesToDatabase(targetDb.Name, changes, client, logger); + var allResults = await UpdatePrimary(sourceDb, targetDb, client, logger); + allResults.AddRange(await UpdateFollowers(sourceDb, logger)); - foreach (var result in results) + // Throw exception if there were any problems. + var exceptions = allResults + .Where(itm => itm.Result == "Failed") + .Select(itm => new Exception($"Execution failed for command:{itm.OperationId} with reason: {itm.Reason}")) + .ToList(); + + if (exceptions.Count == 1) + { + throw exceptions[0]; + } + if (exceptions.Count > 1) { - Console.WriteLine($"{result.CommandType} ({result.OperationId}): {result.Result} => {result.Reason} ({result.CommandText})"); - Console.WriteLine("---------------------------------------------------------------------------"); + throw new AggregateException(exceptions); } + } + + /// + /// Iteratively generates and applies changes to the primary database until it stops making forward progress. + /// + /// + /// + /// + /// + /// + internal virtual async Task> UpdatePrimary(Database sourceDb, Database targetDb, KustoClient client, ILogger logger) + { + // Some changes will be dependent upon each other, causing a race condition when attempting to apply them. + // As long as the write made some forward progress, keep looping until there are no more successes. + logger.LogInformation($"Updating primary database {targetDb.Name}"); + var keepGoing = false; + var iterationCount = 0; + var allResults = new List(); + + // Iteratively generate changes and apply them until we stop making forward progress. + do + { + iterationCount++; + var changes = GenerateChanges(targetDb, sourceDb, logger); + var results = await ApplyChangesToDatabase(targetDb.Name, changes, client, logger); + + // Save the successes + var successes = results.Where(r => r.Result != "Failed").ToList(); + allResults.AddRange(successes); + logger.LogInformation($"Iteration {iterationCount}: Successfully applied {successes.Count} out of {results.Count} changes."); + + // Decide whether to loop + keepGoing = successes.Count < results.Count && successes.Count > 0; + if (!keepGoing) + { + // if we're stopping, add remaining (failure) results to the list. + allResults.AddRange(results); + } + } while (keepGoing); + + // Final status + logger.LogInformation("---------------------------------------------------------------------------"); + logger.LogInformation($"Database update complete: Successfully applied {allResults.Count(r => r.Result != "Failed")} out of {allResults.Count} changes to {targetDb.Name} after {iterationCount} iterations."); + foreach (var result in allResults) + { + logger.LogInformation($"Successfully applied {allResults.Count(r => r.Result != "Failed")} out of {allResults.Count} changes to {targetDb.Name}"); + foreach (var result in allResults) + { + logger.LogInformation($"{result.CommandType} ({result.OperationId}): {result.Result} => {result.Reason} ({result.CommandText})"); + } + logger.LogInformation("---------------------------------------------------------------------------"); + + return allResults; + } + + internal virtual async Task> UpdateFollowers(Database sourceDb, ILogger logger) + { + // Update followers with the latest changes + var results = new List(); foreach (var follower in sourceDb.Followers) { @@ -31,66 +99,77 @@ public async Task WriteAsync(Database sourceDb, Database targetDb, KustoClient c var followerResults = await ApplyChangesToDatabase(follower.Value.DatabaseName, followerChanges, followerClient, logger); results.AddRange(followerResults); - Console.WriteLine(); - Console.WriteLine($"Follower: {follower.Key}"); - Console.WriteLine("---------------------------------------------------------------------------"); - Console.WriteLine(); + logger.LogInformation($"Follower: {follower.Key}"); + logger.LogInformation("---------------------------------------------------------------------------"); foreach (var result in followerResults) { - Console.WriteLine($"{result.CommandType} ({result.OperationId}): {result.Result} => {result.Reason} ({result.CommandText})"); - Console.WriteLine("---------------------------------------------------------------------------"); + logger.LogInformation($"{result.CommandType} ({result.OperationId}): {result.Result} => {result.Reason} ({result.CommandText})"); + logger.LogInformation("---------------------------------------------------------------------------"); } - - Console.WriteLine(); - Console.WriteLine(); - - } - var exs = results.Where(itm => itm.Result == "Failed").Select(itm => new Exception($"Execution failed for command \n{itm.CommandText} \n with reason\n{itm.Reason}")).ToList(); - if (exs.Count == 1) - { - throw exs[0]; - } - if (exs.Count > 1) - { - throw new AggregateException(exs); } + + return results; } + internal virtual List GenerateChanges(Database targetDb, Database sourceDb, ILogger logger) + { + return DatabaseChanges.GenerateChanges(targetDb, sourceDb, targetDb.Name, logger); + } - private async Task> ApplyChangesToDatabase(string databaseName, List changes, KustoClient client, ILogger logger) + internal virtual async Task> ApplyChangesToDatabase( + string databaseName, List changes, KustoClient client, ILogger logger) { + // Filter and sort scripts var scripts = changes .SelectMany(itm => itm.Scripts) - .Where(itm => itm.Order >= 0) .Where(itm => itm.IsValid == true) + .Where(itm => itm.Order >= 0) .OrderBy(itm => itm.Order) .ToList(); + logger.LogInformation($"Applying {scripts.Count} scripts to database '{databaseName}'"); + var results = new List(); - var batch = new List(); - foreach (var sc in scripts) + + // Process scripts in batches, separating synchronous and asynchronous scripts + var pendingBatch = new List(); + + foreach (var script in scripts) { - if (sc.IsAsync == false) + if (script.IsAsync) { - batch.Add(sc); - continue; + // If we encounter an async script, execute any pending sync scripts first + if (pendingBatch.Count != 0) + { + var batchResults = await ExecutePendingSync(databaseName, client, logger, pendingBatch); + results.AddRange(batchResults); + pendingBatch.Clear(); + } + + // Then execute and record the async script + logger.LogInformation($"Executing async script with order {script.Order}"); + var asyncResult = await ExecuteAsyncCommand(databaseName, client, logger, script); + results.Add(asyncResult); } else { - var batchResults = await ExecutePendingSync(databaseName, client, logger, batch); - results.AddRange(batchResults); - var asyncResult = await ExecuteAsyncCommand(databaseName, client, logger, sc); - results.Add(asyncResult); + // Collect synchronous scripts into a batch + pendingBatch.Add(script); } } - var finalBatchResults = await ExecutePendingSync(databaseName, client, logger, batch); - results.AddRange(finalBatchResults); - return results; + // Execute any remaining synchronous scripts + if (pendingBatch.Any()) + { + var finalBatchResults = await ExecutePendingSync(databaseName, client, logger, pendingBatch); + results.AddRange(finalBatchResults); + } + + return results; } - private async Task ExecuteAsyncCommand(string databaseName, KustoClient client, ILogger logger, DatabaseScriptContainer sc) + private static async Task ExecuteAsyncCommand(string databaseName, KustoClient client, ILogger logger, DatabaseScriptContainer sc) { var interval = TimeSpan.FromSeconds(5); var iterations = (int)(TimeSpan.FromHours(1) / interval); @@ -111,7 +190,7 @@ private async Task ExecuteAsyncCommand(string databa logger.LogInformation($"Waiting for operation {operationId} to complete... current iteration: {cnt}/{iterations}"); var monitoringResult = client.Client.ExecuteQuery(databaseName, monitoringCommand, new Kusto.Data.Common.ClientRequestProperties()); var operationState = monitoringResult.As().FirstOrDefault(); - + if (operationState != null && operationState?.IsFinal() == true) { operationState.CommandText = sc.Text; @@ -122,12 +201,18 @@ private async Task ExecuteAsyncCommand(string databa throw new Exception("Operation did not complete in a reasonable time"); } - private static async Task> ExecutePendingSync(string databaseName, KustoClient client, ILogger logger, List scripts) + /// + /// This function will build a single .execute script with(ContinueOnErrors = true) using all the scripts provided. + /// Execute script is defined here: https://learn.microsoft.com/en-us/kusto/management/execute-database-script?view=azure-data-explorer + /// + private static async Task> ExecutePendingSync( + string databaseName, KustoClient client, ILogger logger, List scripts) { - if(scripts.Any() == false) + if (scripts.Count == 0) { - return new List(); - } + return []; + } + var sb = new StringBuilder(); sb.AppendLine(".execute script with(ContinueOnErrors = true) <|"); foreach (var sc in scripts) @@ -136,7 +221,9 @@ private static async Task> ExecutePendingSync(s } var script = sb.ToString(); - logger.LogInformation($"Applying sript:\n{script}"); + logger.LogInformation($"Applying batch of {scripts.Count} scripts to database {databaseName}"); + logger.LogDebug($"Script content:\n{script}"); + var result = await client.AdminClient.ExecuteControlCommandAsync(databaseName, script); return result.As(); } diff --git a/KustoSchemaTools/Properties/AssemblyInfo.cs b/KustoSchemaTools/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..d649300 --- /dev/null +++ b/KustoSchemaTools/Properties/AssemblyInfo.cs @@ -0,0 +1,4 @@ +using System.Runtime.CompilerServices; + +// Make internals visible to the test project +[assembly: InternalsVisibleTo("KustoSchemaTools.Tests")]