Skip to content

Commit 0dc12e5

Browse files
committed
Pull updating commits hashes out of SnapshotWorker
1 parent 5986c56 commit 0dc12e5

File tree

4 files changed

+75
-50
lines changed

4 files changed

+75
-50
lines changed

src/SIL.Harmony.Core/QueryHelpers.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,22 @@ public static async IAsyncEnumerable<TCommit> GetMissingCommits<TCommit, TChange
6060
}
6161
}
6262

63+
private static readonly IComparer<CommitBase> CommitComparer =
64+
Comparer<CommitBase>.Create((a, b) => a.CompareKey.CompareTo(b.CompareKey));
65+
66+
public static SortedSet<T> ToSortedSet<T>(this IEnumerable<T> queryable) where T : CommitBase
67+
{
68+
return new SortedSet<T>(queryable, CommitComparer);
69+
}
70+
71+
public static async Task<SortedSet<T>> ToSortedSetAsync<T>(this IQueryable<T> queryable) where T : CommitBase
72+
{
73+
var set = new SortedSet<T>(CommitComparer);
74+
await foreach (var item in queryable.AsAsyncEnumerable())
75+
set.Add(item);
76+
return set;
77+
}
78+
6379
public static IQueryable<T> DefaultOrder<T>(this IQueryable<T> queryable) where T: CommitBase
6480
{
6581
return queryable

src/SIL.Harmony/DataModel.cs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ public async Task AddManyChanges(Guid clientId,
8181
repo.ClearChangeTracker();
8282

8383
await using var transaction = await repo.BeginTransactionAsync();
84-
await repo.AddCommits(commits);
85-
await UpdateSnapshots(repo, commits.First(), commits);
84+
var updatedCommits = await repo.AddCommits(commits);
85+
await UpdateSnapshots(repo, updatedCommits);
8686
await ValidateCommits(repo);
8787
await transaction.CommitAsync();
8888
}
@@ -119,8 +119,8 @@ private async Task Add(Commit commit)
119119
repo.ClearChangeTracker();
120120

121121
await using var transaction = repo.IsInTransaction ? null : await repo.BeginTransactionAsync();
122-
await repo.AddCommit(commit);
123-
await UpdateSnapshots(repo, commit, [commit]);
122+
var updatedCommits = await repo.AddCommit(commit);
123+
await UpdateSnapshots(repo, updatedCommits);
124124

125125
if (AlwaysValidate) await ValidateCommits(repo);
126126

@@ -156,8 +156,8 @@ async Task ISyncable.AddRangeFromSync(IEnumerable<Commit> commits)
156156

157157
await using var transaction = await repo.BeginTransactionAsync();
158158
//don't save since UpdateSnapshots will also modify newCommits with hashes, so changes will be saved once that's done
159-
await repo.AddCommits(newCommits, false);
160-
await UpdateSnapshots(repo, oldestChange, newCommits);
159+
var updatedCommits = await repo.AddCommits(newCommits, false);
160+
await UpdateSnapshots(repo, updatedCommits);
161161
await ValidateCommits(repo);
162162
await transaction.CommitAsync();
163163
}
@@ -194,13 +194,15 @@ ValueTask<bool> ISyncable.ShouldSync()
194194
return ValueTask.FromResult(true);
195195
}
196196

197-
private async Task UpdateSnapshots(CrdtRepository repo, Commit oldestAddedCommit, Commit[] newCommits)
197+
private async Task UpdateSnapshots(CrdtRepository repo, SortedSet<Commit> commitsToApply)
198198
{
199+
if (commitsToApply.Count == 0) return;
200+
var oldestAddedCommit = commitsToApply.First();
199201
await repo.DeleteStaleSnapshots(oldestAddedCommit);
200202
Dictionary<Guid, Guid?> snapshotLookup;
201-
if (newCommits.Length > 10)
203+
if (commitsToApply.Count > 10)
202204
{
203-
var entityIds = newCommits.SelectMany(c => c.ChangeEntities.Select(ce => ce.EntityId));
205+
var entityIds = commitsToApply.SelectMany(c => c.ChangeEntities.Select(ce => ce.EntityId));
204206
snapshotLookup = await repo.CurrentSnapshots()
205207
.Where(s => entityIds.Contains(s.EntityId))
206208
.Select(s => new KeyValuePair<Guid, Guid?>(s.EntityId, s.Id))
@@ -212,7 +214,7 @@ private async Task UpdateSnapshots(CrdtRepository repo, Commit oldestAddedCommit
212214
}
213215

214216
var snapshotWorker = new SnapshotWorker(snapshotLookup, repo, _crdtConfig.Value);
215-
await snapshotWorker.UpdateSnapshots(oldestAddedCommit, newCommits);
217+
await snapshotWorker.UpdateSnapshots(commitsToApply);
216218
}
217219

218220
private async Task ValidateCommits(CrdtRepository repo)
@@ -240,8 +242,8 @@ public async Task RegenerateSnapshots()
240242
await using var repo = await _crdtRepositoryFactory.CreateRepository();
241243
await repo.DeleteSnapshotsAndProjectedTables();
242244
repo.ClearChangeTracker();
243-
var allCommits = await repo.CurrentCommits().AsNoTracking().ToArrayAsync();
244-
await UpdateSnapshots(repo, allCommits.First(), allCommits);
245+
var allCommits = await repo.CurrentCommits().AsNoTracking().ToSortedSetAsync();
246+
await UpdateSnapshots(repo, allCommits);
245247
}
246248

247249
public async Task<ObjectSnapshot> GetLatestSnapshotByObjectId(Guid entityId)
@@ -296,7 +298,7 @@ public async Task<Dictionary<Guid, ObjectSnapshot>> GetSnapshotsAtCommit(Commit
296298
var repository = repo.GetScopedRepository(commit);
297299
var (snapshots, pendingCommits) = await repository.GetCurrentSnapshotsAndPendingCommits();
298300

299-
if (pendingCommits.Length != 0)
301+
if (pendingCommits.Count != 0)
300302
{
301303
snapshots = await SnapshotWorker.ApplyCommitsToSnapshots(snapshots,
302304
repository,
@@ -331,8 +333,8 @@ public async Task<T> GetAtCommit<T>(Commit commit, Guid entityId)
331333
var newCommits = await repository.CurrentCommits()
332334
.Include(c => c.ChangeEntities)
333335
.WhereAfter(snapshot.Commit)
334-
.ToArrayAsync();
335-
if (newCommits.Length > 0)
336+
.ToSortedSetAsync();
337+
if (newCommits.Count > 0)
336338
{
337339
var snapshots = await SnapshotWorker.ApplyCommitsToSnapshots(
338340
new Dictionary<Guid, ObjectSnapshot>([

src/SIL.Harmony/Db/CrdtRepository.cs

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ public IAsyncEnumerable<SimpleSnapshot> CurrenSimpleSnapshots(bool includeDelete
207207
return snapshots;
208208
}
209209

210-
public async Task<(Dictionary<Guid, ObjectSnapshot> currentSnapshots, Commit[] pendingCommits)> GetCurrentSnapshotsAndPendingCommits()
210+
public async Task<(Dictionary<Guid, ObjectSnapshot> currentSnapshots, SortedSet<Commit> pendingCommits)> GetCurrentSnapshotsAndPendingCommits()
211211
{
212212
var snapshots = await CurrentSnapshots().Include(s => s.Commit).ToDictionaryAsync(s => s.EntityId);
213213

@@ -217,7 +217,7 @@ public IAsyncEnumerable<SimpleSnapshot> CurrenSimpleSnapshots(bool includeDelete
217217
var newCommits = await CurrentCommits()
218218
.Include(c => c.ChangeEntities)
219219
.WhereAfter(lastCommit)
220-
.ToArrayAsync();
220+
.ToSortedSetAsync();
221221
return (snapshots, newCommits);
222222
}
223223

@@ -326,23 +326,11 @@ public async Task AddSnapshots(IEnumerable<ObjectSnapshot> snapshots)
326326
catch (DbUpdateException e)
327327
{
328328
var entries = string.Join(Environment.NewLine, e.Entries.Select(entry => entry.ToString()));
329-
var message = $"Error saving snapshots: {e.Message}{Environment.NewLine}{entries}";
329+
var message = $"Error saving snapshots (deleted: {grouping.Key}): {e.Message}{Environment.NewLine}{entries}";
330330
_logger.LogError(e, message);
331331
throw new DbUpdateException(message, e);
332332
}
333333
}
334-
335-
try
336-
{
337-
await _dbContext.SaveChangesAsync();
338-
}
339-
catch (DbUpdateException e)
340-
{
341-
var entries = string.Join(Environment.NewLine, e.Entries.Select(entry => entry.ToString()));
342-
var message = $"Error saving snapshots: {e.Message}{Environment.NewLine}{entries}";
343-
_logger.LogError(e, message);
344-
throw new DbUpdateException(message, e);
345-
}
346334
}
347335

348336
private async ValueTask ProjectSnapshot(ObjectSnapshot objectSnapshot)
@@ -387,16 +375,45 @@ public CrdtRepository GetScopedRepository(Commit excludeChangesAfterCommit)
387375
return new CrdtRepository(_dbContext, _crdtConfig, _logger, excludeChangesAfterCommit);
388376
}
389377

390-
public async Task AddCommit(Commit commit)
378+
public async Task<SortedSet<Commit>> AddCommit(Commit commit)
391379
{
392-
_dbContext.Add(commit);
380+
var updatedCommits = await AddNewCommits([commit]);
393381
await _dbContext.SaveChangesAsync();
382+
return updatedCommits;
394383
}
395384

396-
public async Task AddCommits(IEnumerable<Commit> commits, bool save = true)
385+
public async Task<SortedSet<Commit>> AddCommits(IEnumerable<Commit> commits, bool save = true)
397386
{
398-
_dbContext.AddRange(commits);
387+
var updatedCommits = await AddNewCommits(commits);
399388
if (save) await _dbContext.SaveChangesAsync();
389+
return updatedCommits;
390+
}
391+
392+
private async Task<SortedSet<Commit>> AddNewCommits(IEnumerable<Commit> newCommits)
393+
{
394+
if (newCommits is null || !newCommits.Any()) return [];
395+
var oldestAddedCommit = newCommits.MinBy(c => c.CompareKey)
396+
?? throw new ArgumentException("Couldn't find oldest commit", nameof(newCommits));
397+
var parentCommit = await FindPreviousCommit(oldestAddedCommit);
398+
var existingCommitsToUpdate = await GetCommitsAfter(parentCommit);
399+
var commitsToApply = existingCommitsToUpdate
400+
.UnionBy(newCommits, c => c.Id)
401+
.ToSortedSet();
402+
//we're inserting commits in the past/rewriting history, so we need to update the previous commit hashes
403+
await UpdateCommitHashes(commitsToApply, parentCommit);
404+
_dbContext.AddRange(newCommits);
405+
return commitsToApply;
406+
}
407+
408+
private async Task UpdateCommitHashes(SortedSet<Commit> commits, Commit? parentCommit = null)
409+
{
410+
var previousCommitHash = parentCommit?.Hash ?? CommitBase.NullParentHash;
411+
foreach (var commit in commits)
412+
{
413+
commit.SetParentHash(previousCommitHash);
414+
previousCommitHash = commit.Hash;
415+
}
416+
await _dbContext.SaveChangesAsync();
400417
}
401418

402419
public HybridDateTime? GetLatestDateTime()

src/SIL.Harmony/SnapshotWorker.cs

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@ private SnapshotWorker(Dictionary<Guid, ObjectSnapshot> snapshots,
3131
internal static async Task<Dictionary<Guid, ObjectSnapshot>> ApplyCommitsToSnapshots(
3232
Dictionary<Guid, ObjectSnapshot> snapshots,
3333
CrdtRepository crdtRepository,
34-
ICollection<Commit> commits,
34+
SortedSet<Commit> commits,
3535
CrdtConfig crdtConfig)
3636
{
3737
//we need to pass in the snapshots because we expect it to be modified, this is intended.
3838
//if the constructor makes a copy in the future this will need to be updated
39-
await new SnapshotWorker(snapshots, [], crdtRepository, crdtConfig).ApplyCommitChanges(commits, false, null);
39+
await new SnapshotWorker(snapshots, [], crdtRepository, crdtConfig).ApplyCommitChanges(commits);
4040
return snapshots;
4141
}
4242

@@ -49,32 +49,22 @@ internal SnapshotWorker(Dictionary<Guid, Guid?> snapshotLookup,
4949
{
5050
}
5151

52-
public async Task UpdateSnapshots(Commit oldestAddedCommit, Commit[] newCommits)
52+
public async Task UpdateSnapshots(SortedSet<Commit> commits)
5353
{
54-
var previousCommit = await _crdtRepository.FindPreviousCommit(oldestAddedCommit);
55-
var commits = await _crdtRepository.GetCommitsAfter(previousCommit);
56-
await ApplyCommitChanges(commits.UnionBy(newCommits, c => c.Id), true, previousCommit?.Hash ?? CommitBase.NullParentHash);
57-
54+
await ApplyCommitChanges(commits);
5855
await _crdtRepository.AddSnapshots([
5956
.._rootSnapshots.Values,
6057
.._newIntermediateSnapshots,
6158
.._pendingSnapshots.Values
6259
]);
6360
}
6461

65-
private async ValueTask ApplyCommitChanges(IEnumerable<Commit> commits, bool updateCommitHash, string? previousCommitHash)
62+
private async ValueTask ApplyCommitChanges(SortedSet<Commit> commits)
6663
{
6764
var intermediateSnapshots = new Dictionary<Guid, ObjectSnapshot>();
6865
var commitIndex = 0;
69-
foreach (var commit in commits.DefaultOrder())
66+
foreach (var commit in commits)
7067
{
71-
if (updateCommitHash && previousCommitHash is not null)
72-
{
73-
//we're rewriting history, so we need to update the previous commit hash
74-
commit.SetParentHash(previousCommitHash);
75-
}
76-
77-
previousCommitHash = commit.Hash;
7868
commitIndex++;
7969
foreach (var commitChange in commit.ChangeEntities.OrderBy(c => c.Index))
8070
{

0 commit comments

Comments
 (0)