diff --git a/src/SIL.Harmony.Core/CommitBase.cs b/src/SIL.Harmony.Core/CommitBase.cs index e69812e..f60930b 100644 --- a/src/SIL.Harmony.Core/CommitBase.cs +++ b/src/SIL.Harmony.Core/CommitBase.cs @@ -7,7 +7,7 @@ namespace SIL.Harmony.Core; /// most basic commit, does not contain any change data, that's stored in /// this class is not meant to be inherited from directly, use or instead /// -public abstract class CommitBase +public abstract class CommitBase : IComparable { public const string NullParentHash = "0000"; [JsonConstructor] @@ -45,6 +45,12 @@ public override string ToString() { return $"{Id} [{DateTime}]"; } + + public int CompareTo(CommitBase? other) + { + if (other is null) return 1; + return CompareKey.CompareTo(other.CompareKey); + } } /// diff --git a/src/SIL.Harmony.Core/QueryHelpers.cs b/src/SIL.Harmony.Core/QueryHelpers.cs index db71c6a..bfb56a0 100644 --- a/src/SIL.Harmony.Core/QueryHelpers.cs +++ b/src/SIL.Harmony.Core/QueryHelpers.cs @@ -60,6 +60,21 @@ public static async IAsyncEnumerable GetMissingCommits ToSortedSet(this IEnumerable queryable) where T : CommitBase + { + return [.. queryable]; + } + + public static async Task> ToSortedSetAsync(this IQueryable queryable) where T : CommitBase + { + var set = new SortedSet(); + await foreach (var item in queryable.AsAsyncEnumerable()) + { + set.Add(item); + } + return set; + } + public static IQueryable DefaultOrder(this IQueryable queryable) where T: CommitBase { return queryable diff --git a/src/SIL.Harmony/DataModel.cs b/src/SIL.Harmony/DataModel.cs index 9235389..2a446ad 100644 --- a/src/SIL.Harmony/DataModel.cs +++ b/src/SIL.Harmony/DataModel.cs @@ -81,8 +81,8 @@ public async Task AddManyChanges(Guid clientId, repo.ClearChangeTracker(); await using var transaction = await repo.BeginTransactionAsync(); - await repo.AddCommits(commits); - await UpdateSnapshots(repo, commits.First(), commits); + var updatedCommits = await repo.AddCommits(commits); + await UpdateSnapshots(repo, updatedCommits); await ValidateCommits(repo); await transaction.CommitAsync(); } @@ -119,8 +119,8 @@ private async Task Add(Commit commit) repo.ClearChangeTracker(); await using var transaction = repo.IsInTransaction ? null : await repo.BeginTransactionAsync(); - await repo.AddCommit(commit); - await UpdateSnapshots(repo, commit, [commit]); + var updatedCommits = await repo.AddCommit(commit); + await UpdateSnapshots(repo, updatedCommits); if (AlwaysValidate) await ValidateCommits(repo); @@ -155,9 +155,8 @@ async Task ISyncable.AddRangeFromSync(IEnumerable commits) if (oldestChange is null || newCommits is []) return; await using var transaction = await repo.BeginTransactionAsync(); - //don't save since UpdateSnapshots will also modify newCommits with hashes, so changes will be saved once that's done - await repo.AddCommits(newCommits, false); - await UpdateSnapshots(repo, oldestChange, newCommits); + var updatedCommits = await repo.AddCommits(newCommits); + await UpdateSnapshots(repo, updatedCommits); await ValidateCommits(repo); await transaction.CommitAsync(); } @@ -194,13 +193,15 @@ ValueTask ISyncable.ShouldSync() return ValueTask.FromResult(true); } - private async Task UpdateSnapshots(CrdtRepository repo, Commit oldestAddedCommit, Commit[] newCommits) + private async Task UpdateSnapshots(CrdtRepository repo, SortedSet commitsToApply) { + if (commitsToApply.Count == 0) return; + var oldestAddedCommit = commitsToApply.First(); await repo.DeleteStaleSnapshots(oldestAddedCommit); Dictionary snapshotLookup; - if (newCommits.Length > 10) + if (commitsToApply.Count > 10) { - var entityIds = newCommits.SelectMany(c => c.ChangeEntities.Select(ce => ce.EntityId)); + var entityIds = commitsToApply.SelectMany(c => c.ChangeEntities.Select(ce => ce.EntityId)); snapshotLookup = await repo.CurrentSnapshots() .Where(s => entityIds.Contains(s.EntityId)) .Select(s => new KeyValuePair(s.EntityId, s.Id)) @@ -212,7 +213,7 @@ private async Task UpdateSnapshots(CrdtRepository repo, Commit oldestAddedCommit } var snapshotWorker = new SnapshotWorker(snapshotLookup, repo, _crdtConfig.Value); - await snapshotWorker.UpdateSnapshots(oldestAddedCommit, newCommits); + await snapshotWorker.UpdateSnapshots(commitsToApply); } private async Task ValidateCommits(CrdtRepository repo) @@ -240,8 +241,10 @@ public async Task RegenerateSnapshots() await using var repo = await _crdtRepositoryFactory.CreateRepository(); await repo.DeleteSnapshotsAndProjectedTables(); repo.ClearChangeTracker(); - var allCommits = await repo.CurrentCommits().AsNoTracking().ToArrayAsync(); - await UpdateSnapshots(repo, allCommits.First(), allCommits); + var allCommits = await repo.CurrentCommits() + .Include(c => c.ChangeEntities) + .ToSortedSetAsync(); + await UpdateSnapshots(repo, allCommits); } public async Task GetLatestSnapshotByObjectId(Guid entityId) @@ -296,7 +299,7 @@ public async Task> GetSnapshotsAtCommit(Commit var repository = repo.GetScopedRepository(commit); var (snapshots, pendingCommits) = await repository.GetCurrentSnapshotsAndPendingCommits(); - if (pendingCommits.Length != 0) + if (pendingCommits.Count != 0) { snapshots = await SnapshotWorker.ApplyCommitsToSnapshots(snapshots, repository, @@ -331,8 +334,8 @@ public async Task GetAtCommit(Commit commit, Guid entityId) var newCommits = await repository.CurrentCommits() .Include(c => c.ChangeEntities) .WhereAfter(snapshot.Commit) - .ToArrayAsync(); - if (newCommits.Length > 0) + .ToSortedSetAsync(); + if (newCommits.Count > 0) { var snapshots = await SnapshotWorker.ApplyCommitsToSnapshots( new Dictionary([ diff --git a/src/SIL.Harmony/Db/CrdtRepository.cs b/src/SIL.Harmony/Db/CrdtRepository.cs index 98d1210..5b10dd8 100644 --- a/src/SIL.Harmony/Db/CrdtRepository.cs +++ b/src/SIL.Harmony/Db/CrdtRepository.cs @@ -207,7 +207,7 @@ public IAsyncEnumerable CurrenSimpleSnapshots(bool includeDelete return snapshots; } - public async Task<(Dictionary currentSnapshots, Commit[] pendingCommits)> GetCurrentSnapshotsAndPendingCommits() + public async Task<(Dictionary currentSnapshots, SortedSet pendingCommits)> GetCurrentSnapshotsAndPendingCommits() { var snapshots = await CurrentSnapshots().Include(s => s.Commit).ToDictionaryAsync(s => s.EntityId); @@ -217,7 +217,7 @@ public IAsyncEnumerable CurrenSimpleSnapshots(bool includeDelete var newCommits = await CurrentCommits() .Include(c => c.ChangeEntities) .WhereAfter(lastCommit) - .ToArrayAsync(); + .ToSortedSetAsync(); return (snapshots, newCommits); } @@ -326,25 +326,11 @@ public async Task AddSnapshots(IEnumerable snapshots) catch (DbUpdateException e) { var entries = string.Join(Environment.NewLine, e.Entries.Select(entry => entry.ToString())); - var message = $"Error saving snapshots: {e.Message}{Environment.NewLine}{entries}"; + var message = $"Error saving snapshots (deleted: {grouping.Key}): {e.Message}{Environment.NewLine}{entries}"; _logger.LogError(e, message); throw new DbUpdateException(message, e); } } - - // this extra try/catch was added as a quick way to get the NewEntityOnExistingEntityIsNoOp test to pass - // it will be removed again in a larger refactor in https://github.com/sillsdev/harmony/pull/56 - try - { - await _dbContext.SaveChangesAsync(); - } - catch (DbUpdateException e) - { - var entries = string.Join(Environment.NewLine, e.Entries.Select(entry => entry.ToString())); - var message = $"Error saving snapshots: {e.Message}{Environment.NewLine}{entries}"; - _logger.LogError(e, message); - throw new DbUpdateException(message, e); - } } private async ValueTask ProjectSnapshot(ObjectSnapshot objectSnapshot) @@ -389,16 +375,44 @@ public CrdtRepository GetScopedRepository(Commit excludeChangesAfterCommit) return new CrdtRepository(_dbContext, _crdtConfig, _logger, excludeChangesAfterCommit); } - public async Task AddCommit(Commit commit) + public async Task> AddCommit(Commit commit) { - _dbContext.Add(commit); + var updatedCommits = await AddNewCommits([commit]); await _dbContext.SaveChangesAsync(); + return updatedCommits; } - public async Task AddCommits(IEnumerable commits, bool save = true) + public async Task> AddCommits(IEnumerable commits) { - _dbContext.AddRange(commits); - if (save) await _dbContext.SaveChangesAsync(); + var updatedCommits = await AddNewCommits(commits); + await _dbContext.SaveChangesAsync(); + return updatedCommits; + } + + private async Task> AddNewCommits(IEnumerable newCommits) + { + if (newCommits is null || !newCommits.Any()) return []; + var oldestAddedCommit = newCommits.MinBy(c => c.CompareKey) + ?? throw new ArgumentException("Couldn't find oldest commit", nameof(newCommits)); + var parentCommit = await FindPreviousCommit(oldestAddedCommit); + var existingCommitsToUpdate = await GetCommitsAfter(parentCommit); + var commitsToApply = existingCommitsToUpdate + .UnionBy(newCommits, c => c.Id) + .ToSortedSet(); + //we're inserting commits in the past/rewriting history, so we need to update the previous commit hashes + UpdateCommitHashes(commitsToApply, parentCommit); + _dbContext.AddRange(newCommits); + return commitsToApply; + } + + private void UpdateCommitHashes(SortedSet commits, Commit? parentCommit = null) + { + var previousCommitHash = parentCommit?.Hash ?? CommitBase.NullParentHash; + foreach (var commit in commits) + { + commit.SetParentHash(previousCommitHash); + previousCommitHash = commit.Hash; + } } public HybridDateTime? GetLatestDateTime() diff --git a/src/SIL.Harmony/SnapshotWorker.cs b/src/SIL.Harmony/SnapshotWorker.cs index 0cd822a..c4fe70f 100644 --- a/src/SIL.Harmony/SnapshotWorker.cs +++ b/src/SIL.Harmony/SnapshotWorker.cs @@ -31,12 +31,12 @@ private SnapshotWorker(Dictionary snapshots, internal static async Task> ApplyCommitsToSnapshots( Dictionary snapshots, CrdtRepository crdtRepository, - ICollection commits, + SortedSet commits, CrdtConfig crdtConfig) { //we need to pass in the snapshots because we expect it to be modified, this is intended. //if the constructor makes a copy in the future this will need to be updated - await new SnapshotWorker(snapshots, [], crdtRepository, crdtConfig).ApplyCommitChanges(commits, false, null); + await new SnapshotWorker(snapshots, [], crdtRepository, crdtConfig).ApplyCommitChanges(commits); return snapshots; } @@ -49,12 +49,9 @@ internal SnapshotWorker(Dictionary snapshotLookup, { } - public async Task UpdateSnapshots(Commit oldestAddedCommit, Commit[] newCommits) + public async Task UpdateSnapshots(SortedSet commits) { - var previousCommit = await _crdtRepository.FindPreviousCommit(oldestAddedCommit); - var commits = await _crdtRepository.GetCommitsAfter(previousCommit); - await ApplyCommitChanges(commits.UnionBy(newCommits, c => c.Id), true, previousCommit?.Hash ?? CommitBase.NullParentHash); - + await ApplyCommitChanges(commits); await _crdtRepository.AddSnapshots([ .._rootSnapshots.Values, .._newIntermediateSnapshots, @@ -62,19 +59,12 @@ await _crdtRepository.AddSnapshots([ ]); } - private async ValueTask ApplyCommitChanges(IEnumerable commits, bool updateCommitHash, string? previousCommitHash) + private async ValueTask ApplyCommitChanges(SortedSet commits) { var intermediateSnapshots = new Dictionary(); var commitIndex = 0; - foreach (var commit in commits.DefaultOrder()) + foreach (var commit in commits) { - if (updateCommitHash && previousCommitHash is not null) - { - //we're rewriting history, so we need to update the previous commit hash - commit.SetParentHash(previousCommitHash); - } - - previousCommitHash = commit.Hash; commitIndex++; foreach (var commitChange in commit.ChangeEntities.OrderBy(c => c.Index)) {