From ed8b11b39f4d58d408389c4d85c0aa8c7ebb88c7 Mon Sep 17 00:00:00 2001 From: Tim Haasdyk Date: Fri, 24 Oct 2025 14:59:15 +0200 Subject: [PATCH 1/7] Pull updating commits hashes out of SnapshotWorker --- src/SIL.Harmony.Core/QueryHelpers.cs | 16 ++++++++ src/SIL.Harmony/DataModel.cs | 32 ++++++++-------- src/SIL.Harmony/Db/CrdtRepository.cs | 57 ++++++++++++++++++---------- src/SIL.Harmony/SnapshotWorker.cs | 22 +++-------- 4 files changed, 75 insertions(+), 52 deletions(-) diff --git a/src/SIL.Harmony.Core/QueryHelpers.cs b/src/SIL.Harmony.Core/QueryHelpers.cs index db71c6a..7b49b65 100644 --- a/src/SIL.Harmony.Core/QueryHelpers.cs +++ b/src/SIL.Harmony.Core/QueryHelpers.cs @@ -60,6 +60,22 @@ public static async IAsyncEnumerable GetMissingCommits CommitComparer = + Comparer.Create((a, b) => a.CompareKey.CompareTo(b.CompareKey)); + + public static SortedSet ToSortedSet(this IEnumerable queryable) where T : CommitBase + { + return new SortedSet(queryable, CommitComparer); + } + + public static async Task> ToSortedSetAsync(this IQueryable queryable) where T : CommitBase + { + var set = new SortedSet(CommitComparer); + await foreach (var item in queryable.AsAsyncEnumerable()) + set.Add(item); + return set; + } + public static IQueryable DefaultOrder(this IQueryable queryable) where T: CommitBase { return queryable diff --git a/src/SIL.Harmony/DataModel.cs b/src/SIL.Harmony/DataModel.cs index 9235389..a8ae69c 100644 --- a/src/SIL.Harmony/DataModel.cs +++ b/src/SIL.Harmony/DataModel.cs @@ -81,8 +81,8 @@ public async Task AddManyChanges(Guid clientId, repo.ClearChangeTracker(); await using var transaction = await repo.BeginTransactionAsync(); - await repo.AddCommits(commits); - await UpdateSnapshots(repo, commits.First(), commits); + var updatedCommits = await repo.AddCommits(commits); + await UpdateSnapshots(repo, updatedCommits); await ValidateCommits(repo); await transaction.CommitAsync(); } @@ -119,8 +119,8 @@ private async Task Add(Commit commit) repo.ClearChangeTracker(); await using var transaction = repo.IsInTransaction ? null : await repo.BeginTransactionAsync(); - await repo.AddCommit(commit); - await UpdateSnapshots(repo, commit, [commit]); + var updatedCommits = await repo.AddCommit(commit); + await UpdateSnapshots(repo, updatedCommits); if (AlwaysValidate) await ValidateCommits(repo); @@ -156,8 +156,8 @@ async Task ISyncable.AddRangeFromSync(IEnumerable commits) await using var transaction = await repo.BeginTransactionAsync(); //don't save since UpdateSnapshots will also modify newCommits with hashes, so changes will be saved once that's done - await repo.AddCommits(newCommits, false); - await UpdateSnapshots(repo, oldestChange, newCommits); + var updatedCommits = await repo.AddCommits(newCommits, false); + await UpdateSnapshots(repo, updatedCommits); await ValidateCommits(repo); await transaction.CommitAsync(); } @@ -194,13 +194,15 @@ ValueTask ISyncable.ShouldSync() return ValueTask.FromResult(true); } - private async Task UpdateSnapshots(CrdtRepository repo, Commit oldestAddedCommit, Commit[] newCommits) + private async Task UpdateSnapshots(CrdtRepository repo, SortedSet commitsToApply) { + if (commitsToApply.Count == 0) return; + var oldestAddedCommit = commitsToApply.First(); await repo.DeleteStaleSnapshots(oldestAddedCommit); Dictionary snapshotLookup; - if (newCommits.Length > 10) + if (commitsToApply.Count > 10) { - var entityIds = newCommits.SelectMany(c => c.ChangeEntities.Select(ce => ce.EntityId)); + var entityIds = commitsToApply.SelectMany(c => c.ChangeEntities.Select(ce => ce.EntityId)); snapshotLookup = await repo.CurrentSnapshots() .Where(s => entityIds.Contains(s.EntityId)) .Select(s => new KeyValuePair(s.EntityId, s.Id)) @@ -212,7 +214,7 @@ private async Task UpdateSnapshots(CrdtRepository repo, Commit oldestAddedCommit } var snapshotWorker = new SnapshotWorker(snapshotLookup, repo, _crdtConfig.Value); - await snapshotWorker.UpdateSnapshots(oldestAddedCommit, newCommits); + await snapshotWorker.UpdateSnapshots(commitsToApply); } private async Task ValidateCommits(CrdtRepository repo) @@ -240,8 +242,8 @@ public async Task RegenerateSnapshots() await using var repo = await _crdtRepositoryFactory.CreateRepository(); await repo.DeleteSnapshotsAndProjectedTables(); repo.ClearChangeTracker(); - var allCommits = await repo.CurrentCommits().AsNoTracking().ToArrayAsync(); - await UpdateSnapshots(repo, allCommits.First(), allCommits); + var allCommits = await repo.CurrentCommits().AsNoTracking().ToSortedSetAsync(); + await UpdateSnapshots(repo, allCommits); } public async Task GetLatestSnapshotByObjectId(Guid entityId) @@ -296,7 +298,7 @@ public async Task> GetSnapshotsAtCommit(Commit var repository = repo.GetScopedRepository(commit); var (snapshots, pendingCommits) = await repository.GetCurrentSnapshotsAndPendingCommits(); - if (pendingCommits.Length != 0) + if (pendingCommits.Count != 0) { snapshots = await SnapshotWorker.ApplyCommitsToSnapshots(snapshots, repository, @@ -331,8 +333,8 @@ public async Task GetAtCommit(Commit commit, Guid entityId) var newCommits = await repository.CurrentCommits() .Include(c => c.ChangeEntities) .WhereAfter(snapshot.Commit) - .ToArrayAsync(); - if (newCommits.Length > 0) + .ToSortedSetAsync(); + if (newCommits.Count > 0) { var snapshots = await SnapshotWorker.ApplyCommitsToSnapshots( new Dictionary([ diff --git a/src/SIL.Harmony/Db/CrdtRepository.cs b/src/SIL.Harmony/Db/CrdtRepository.cs index 98d1210..8056173 100644 --- a/src/SIL.Harmony/Db/CrdtRepository.cs +++ b/src/SIL.Harmony/Db/CrdtRepository.cs @@ -207,7 +207,7 @@ public IAsyncEnumerable CurrenSimpleSnapshots(bool includeDelete return snapshots; } - public async Task<(Dictionary currentSnapshots, Commit[] pendingCommits)> GetCurrentSnapshotsAndPendingCommits() + public async Task<(Dictionary currentSnapshots, SortedSet pendingCommits)> GetCurrentSnapshotsAndPendingCommits() { var snapshots = await CurrentSnapshots().Include(s => s.Commit).ToDictionaryAsync(s => s.EntityId); @@ -217,7 +217,7 @@ public IAsyncEnumerable CurrenSimpleSnapshots(bool includeDelete var newCommits = await CurrentCommits() .Include(c => c.ChangeEntities) .WhereAfter(lastCommit) - .ToArrayAsync(); + .ToSortedSetAsync(); return (snapshots, newCommits); } @@ -326,25 +326,11 @@ public async Task AddSnapshots(IEnumerable snapshots) catch (DbUpdateException e) { var entries = string.Join(Environment.NewLine, e.Entries.Select(entry => entry.ToString())); - var message = $"Error saving snapshots: {e.Message}{Environment.NewLine}{entries}"; + var message = $"Error saving snapshots (deleted: {grouping.Key}): {e.Message}{Environment.NewLine}{entries}"; _logger.LogError(e, message); throw new DbUpdateException(message, e); } } - - // this extra try/catch was added as a quick way to get the NewEntityOnExistingEntityIsNoOp test to pass - // it will be removed again in a larger refactor in https://github.com/sillsdev/harmony/pull/56 - try - { - await _dbContext.SaveChangesAsync(); - } - catch (DbUpdateException e) - { - var entries = string.Join(Environment.NewLine, e.Entries.Select(entry => entry.ToString())); - var message = $"Error saving snapshots: {e.Message}{Environment.NewLine}{entries}"; - _logger.LogError(e, message); - throw new DbUpdateException(message, e); - } } private async ValueTask ProjectSnapshot(ObjectSnapshot objectSnapshot) @@ -389,16 +375,45 @@ public CrdtRepository GetScopedRepository(Commit excludeChangesAfterCommit) return new CrdtRepository(_dbContext, _crdtConfig, _logger, excludeChangesAfterCommit); } - public async Task AddCommit(Commit commit) + public async Task> AddCommit(Commit commit) { - _dbContext.Add(commit); + var updatedCommits = await AddNewCommits([commit]); await _dbContext.SaveChangesAsync(); + return updatedCommits; } - public async Task AddCommits(IEnumerable commits, bool save = true) + public async Task> AddCommits(IEnumerable commits, bool save = true) { - _dbContext.AddRange(commits); + var updatedCommits = await AddNewCommits(commits); if (save) await _dbContext.SaveChangesAsync(); + return updatedCommits; + } + + private async Task> AddNewCommits(IEnumerable newCommits) + { + if (newCommits is null || !newCommits.Any()) return []; + var oldestAddedCommit = newCommits.MinBy(c => c.CompareKey) + ?? throw new ArgumentException("Couldn't find oldest commit", nameof(newCommits)); + var parentCommit = await FindPreviousCommit(oldestAddedCommit); + var existingCommitsToUpdate = await GetCommitsAfter(parentCommit); + var commitsToApply = existingCommitsToUpdate + .UnionBy(newCommits, c => c.Id) + .ToSortedSet(); + //we're inserting commits in the past/rewriting history, so we need to update the previous commit hashes + await UpdateCommitHashes(commitsToApply, parentCommit); + _dbContext.AddRange(newCommits); + return commitsToApply; + } + + private async Task UpdateCommitHashes(SortedSet commits, Commit? parentCommit = null) + { + var previousCommitHash = parentCommit?.Hash ?? CommitBase.NullParentHash; + foreach (var commit in commits) + { + commit.SetParentHash(previousCommitHash); + previousCommitHash = commit.Hash; + } + await _dbContext.SaveChangesAsync(); } public HybridDateTime? GetLatestDateTime() diff --git a/src/SIL.Harmony/SnapshotWorker.cs b/src/SIL.Harmony/SnapshotWorker.cs index 0cd822a..c4fe70f 100644 --- a/src/SIL.Harmony/SnapshotWorker.cs +++ b/src/SIL.Harmony/SnapshotWorker.cs @@ -31,12 +31,12 @@ private SnapshotWorker(Dictionary snapshots, internal static async Task> ApplyCommitsToSnapshots( Dictionary snapshots, CrdtRepository crdtRepository, - ICollection commits, + SortedSet commits, CrdtConfig crdtConfig) { //we need to pass in the snapshots because we expect it to be modified, this is intended. //if the constructor makes a copy in the future this will need to be updated - await new SnapshotWorker(snapshots, [], crdtRepository, crdtConfig).ApplyCommitChanges(commits, false, null); + await new SnapshotWorker(snapshots, [], crdtRepository, crdtConfig).ApplyCommitChanges(commits); return snapshots; } @@ -49,12 +49,9 @@ internal SnapshotWorker(Dictionary snapshotLookup, { } - public async Task UpdateSnapshots(Commit oldestAddedCommit, Commit[] newCommits) + public async Task UpdateSnapshots(SortedSet commits) { - var previousCommit = await _crdtRepository.FindPreviousCommit(oldestAddedCommit); - var commits = await _crdtRepository.GetCommitsAfter(previousCommit); - await ApplyCommitChanges(commits.UnionBy(newCommits, c => c.Id), true, previousCommit?.Hash ?? CommitBase.NullParentHash); - + await ApplyCommitChanges(commits); await _crdtRepository.AddSnapshots([ .._rootSnapshots.Values, .._newIntermediateSnapshots, @@ -62,19 +59,12 @@ await _crdtRepository.AddSnapshots([ ]); } - private async ValueTask ApplyCommitChanges(IEnumerable commits, bool updateCommitHash, string? previousCommitHash) + private async ValueTask ApplyCommitChanges(SortedSet commits) { var intermediateSnapshots = new Dictionary(); var commitIndex = 0; - foreach (var commit in commits.DefaultOrder()) + foreach (var commit in commits) { - if (updateCommitHash && previousCommitHash is not null) - { - //we're rewriting history, so we need to update the previous commit hash - commit.SetParentHash(previousCommitHash); - } - - previousCommitHash = commit.Hash; commitIndex++; foreach (var commitChange in commit.ChangeEntities.OrderBy(c => c.Index)) { From a4cc58dcd3a3572fc958bfc537c52908548c7eed Mon Sep 17 00:00:00 2001 From: Tim Haasdyk Date: Tue, 28 Oct 2025 15:48:13 +0100 Subject: [PATCH 2/7] Implement IComparable in CommitBase --- src/SIL.Harmony.Core/CommitBase.cs | 8 +++++++- src/SIL.Harmony.Core/QueryHelpers.cs | 9 ++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/SIL.Harmony.Core/CommitBase.cs b/src/SIL.Harmony.Core/CommitBase.cs index e69812e..f60930b 100644 --- a/src/SIL.Harmony.Core/CommitBase.cs +++ b/src/SIL.Harmony.Core/CommitBase.cs @@ -7,7 +7,7 @@ namespace SIL.Harmony.Core; /// most basic commit, does not contain any change data, that's stored in /// this class is not meant to be inherited from directly, use or instead /// -public abstract class CommitBase +public abstract class CommitBase : IComparable { public const string NullParentHash = "0000"; [JsonConstructor] @@ -45,6 +45,12 @@ public override string ToString() { return $"{Id} [{DateTime}]"; } + + public int CompareTo(CommitBase? other) + { + if (other is null) return 1; + return CompareKey.CompareTo(other.CompareKey); + } } /// diff --git a/src/SIL.Harmony.Core/QueryHelpers.cs b/src/SIL.Harmony.Core/QueryHelpers.cs index 7b49b65..bfb56a0 100644 --- a/src/SIL.Harmony.Core/QueryHelpers.cs +++ b/src/SIL.Harmony.Core/QueryHelpers.cs @@ -60,19 +60,18 @@ public static async IAsyncEnumerable GetMissingCommits CommitComparer = - Comparer.Create((a, b) => a.CompareKey.CompareTo(b.CompareKey)); - public static SortedSet ToSortedSet(this IEnumerable queryable) where T : CommitBase { - return new SortedSet(queryable, CommitComparer); + return [.. queryable]; } public static async Task> ToSortedSetAsync(this IQueryable queryable) where T : CommitBase { - var set = new SortedSet(CommitComparer); + var set = new SortedSet(); await foreach (var item in queryable.AsAsyncEnumerable()) + { set.Add(item); + } return set; } From 26d53d898b17c7f5eef50ad2fa8bce68d8be09e8 Mon Sep 17 00:00:00 2001 From: Tim Haasdyk Date: Tue, 28 Oct 2025 16:22:26 +0100 Subject: [PATCH 3/7] Fix test --- src/SIL.Harmony/DataModel.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/SIL.Harmony/DataModel.cs b/src/SIL.Harmony/DataModel.cs index a8ae69c..79892b8 100644 --- a/src/SIL.Harmony/DataModel.cs +++ b/src/SIL.Harmony/DataModel.cs @@ -242,7 +242,9 @@ public async Task RegenerateSnapshots() await using var repo = await _crdtRepositoryFactory.CreateRepository(); await repo.DeleteSnapshotsAndProjectedTables(); repo.ClearChangeTracker(); - var allCommits = await repo.CurrentCommits().AsNoTracking().ToSortedSetAsync(); + var allCommits = await repo.CurrentCommits() + .Include(c => c.ChangeEntities) + .ToSortedSetAsync(); await UpdateSnapshots(repo, allCommits); } From 19b4255f6d064b4677542ad27367842f2f28bb13 Mon Sep 17 00:00:00 2001 From: Tim Haasdyk Date: Tue, 28 Oct 2025 16:50:10 +0100 Subject: [PATCH 4/7] Call SaveChanges only after new commits have been added --- src/SIL.Harmony/Db/CrdtRepository.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/SIL.Harmony/Db/CrdtRepository.cs b/src/SIL.Harmony/Db/CrdtRepository.cs index 8056173..61282e1 100644 --- a/src/SIL.Harmony/Db/CrdtRepository.cs +++ b/src/SIL.Harmony/Db/CrdtRepository.cs @@ -402,6 +402,7 @@ private async Task> AddNewCommits(IEnumerable newCommi //we're inserting commits in the past/rewriting history, so we need to update the previous commit hashes await UpdateCommitHashes(commitsToApply, parentCommit); _dbContext.AddRange(newCommits); + await _dbContext.SaveChangesAsync(); return commitsToApply; } @@ -413,7 +414,6 @@ private async Task UpdateCommitHashes(SortedSet commits, Commit? parentC commit.SetParentHash(previousCommitHash); previousCommitHash = commit.Hash; } - await _dbContext.SaveChangesAsync(); } public HybridDateTime? GetLatestDateTime() From 0320e6bbad3dfa6eaa08aebd940752fbfc49bdf8 Mon Sep 17 00:00:00 2001 From: Tim Haasdyk Date: Tue, 28 Oct 2025 17:09:29 +0100 Subject: [PATCH 5/7] Remove redundant SaveChanges which bypasses save parameter --- src/SIL.Harmony/Db/CrdtRepository.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/SIL.Harmony/Db/CrdtRepository.cs b/src/SIL.Harmony/Db/CrdtRepository.cs index 61282e1..ed43732 100644 --- a/src/SIL.Harmony/Db/CrdtRepository.cs +++ b/src/SIL.Harmony/Db/CrdtRepository.cs @@ -402,7 +402,6 @@ private async Task> AddNewCommits(IEnumerable newCommi //we're inserting commits in the past/rewriting history, so we need to update the previous commit hashes await UpdateCommitHashes(commitsToApply, parentCommit); _dbContext.AddRange(newCommits); - await _dbContext.SaveChangesAsync(); return commitsToApply; } From 5ce1f2c899a14ea212b46759d6861abd01f2e68b Mon Sep 17 00:00:00 2001 From: Tim Haasdyk Date: Tue, 28 Oct 2025 17:30:51 +0100 Subject: [PATCH 6/7] Remove unnecessary async/await syntax --- src/SIL.Harmony/Db/CrdtRepository.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/SIL.Harmony/Db/CrdtRepository.cs b/src/SIL.Harmony/Db/CrdtRepository.cs index ed43732..9d6353f 100644 --- a/src/SIL.Harmony/Db/CrdtRepository.cs +++ b/src/SIL.Harmony/Db/CrdtRepository.cs @@ -400,12 +400,12 @@ private async Task> AddNewCommits(IEnumerable newCommi .UnionBy(newCommits, c => c.Id) .ToSortedSet(); //we're inserting commits in the past/rewriting history, so we need to update the previous commit hashes - await UpdateCommitHashes(commitsToApply, parentCommit); + UpdateCommitHashes(commitsToApply, parentCommit); _dbContext.AddRange(newCommits); return commitsToApply; } - private async Task UpdateCommitHashes(SortedSet commits, Commit? parentCommit = null) + private void UpdateCommitHashes(SortedSet commits, Commit? parentCommit = null) { var previousCommitHash = parentCommit?.Hash ?? CommitBase.NullParentHash; foreach (var commit in commits) From fd370752b908bd8fbb1779bd510abd8b87c5a45a Mon Sep 17 00:00:00 2001 From: Tim Haasdyk Date: Tue, 28 Oct 2025 17:53:16 +0100 Subject: [PATCH 7/7] Remove outdated deferred SaveChanges --- src/SIL.Harmony/DataModel.cs | 3 +-- src/SIL.Harmony/Db/CrdtRepository.cs | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/SIL.Harmony/DataModel.cs b/src/SIL.Harmony/DataModel.cs index 79892b8..2a446ad 100644 --- a/src/SIL.Harmony/DataModel.cs +++ b/src/SIL.Harmony/DataModel.cs @@ -155,8 +155,7 @@ async Task ISyncable.AddRangeFromSync(IEnumerable commits) if (oldestChange is null || newCommits is []) return; await using var transaction = await repo.BeginTransactionAsync(); - //don't save since UpdateSnapshots will also modify newCommits with hashes, so changes will be saved once that's done - var updatedCommits = await repo.AddCommits(newCommits, false); + var updatedCommits = await repo.AddCommits(newCommits); await UpdateSnapshots(repo, updatedCommits); await ValidateCommits(repo); await transaction.CommitAsync(); diff --git a/src/SIL.Harmony/Db/CrdtRepository.cs b/src/SIL.Harmony/Db/CrdtRepository.cs index 9d6353f..5b10dd8 100644 --- a/src/SIL.Harmony/Db/CrdtRepository.cs +++ b/src/SIL.Harmony/Db/CrdtRepository.cs @@ -382,10 +382,10 @@ public async Task> AddCommit(Commit commit) return updatedCommits; } - public async Task> AddCommits(IEnumerable commits, bool save = true) + public async Task> AddCommits(IEnumerable commits) { var updatedCommits = await AddNewCommits(commits); - if (save) await _dbContext.SaveChangesAsync(); + await _dbContext.SaveChangesAsync(); return updatedCommits; }