Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/SIL.Harmony.Core/CommitBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace SIL.Harmony.Core;
/// most basic commit, does not contain any change data, that's stored in <see cref="CommitBase{TChange}"/>
/// this class is not meant to be inherited from directly, use <see cref="ServerCommit"/> or <see cref="SIL.Harmony.Commit"/> instead
/// </summary>
public abstract class CommitBase
public abstract class CommitBase : IComparable<CommitBase>
{
public const string NullParentHash = "0000";
[JsonConstructor]
Expand Down Expand Up @@ -45,6 +45,12 @@ public override string ToString()
{
return $"{Id} [{DateTime}]";
}

public int CompareTo(CommitBase? other)
{
if (other is null) return 1;
return CompareKey.CompareTo(other.CompareKey);
}
}

/// <inheritdoc cref="CommitBase"/>
Expand Down
15 changes: 15 additions & 0 deletions src/SIL.Harmony.Core/QueryHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,21 @@ public static async IAsyncEnumerable<TCommit> GetMissingCommits<TCommit, TChange
}
}

public static SortedSet<T> ToSortedSet<T>(this IEnumerable<T> queryable) where T : CommitBase
{
return [.. queryable];
}

public static async Task<SortedSet<T>> ToSortedSetAsync<T>(this IQueryable<T> queryable) where T : CommitBase
{
var set = new SortedSet<T>();
await foreach (var item in queryable.AsAsyncEnumerable())
{
set.Add(item);
}
return set;
}

public static IQueryable<T> DefaultOrder<T>(this IQueryable<T> queryable) where T: CommitBase
{
return queryable
Expand Down
35 changes: 19 additions & 16 deletions src/SIL.Harmony/DataModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ public async Task AddManyChanges(Guid clientId,
repo.ClearChangeTracker();

await using var transaction = await repo.BeginTransactionAsync();
await repo.AddCommits(commits);
await UpdateSnapshots(repo, commits.First(), commits);
var updatedCommits = await repo.AddCommits(commits);
await UpdateSnapshots(repo, updatedCommits);
await ValidateCommits(repo);
await transaction.CommitAsync();
}
Expand Down Expand Up @@ -119,8 +119,8 @@ private async Task Add(Commit commit)
repo.ClearChangeTracker();

await using var transaction = repo.IsInTransaction ? null : await repo.BeginTransactionAsync();
await repo.AddCommit(commit);
await UpdateSnapshots(repo, commit, [commit]);
var updatedCommits = await repo.AddCommit(commit);
await UpdateSnapshots(repo, updatedCommits);

if (AlwaysValidate) await ValidateCommits(repo);

Expand Down Expand Up @@ -155,9 +155,8 @@ async Task ISyncable.AddRangeFromSync(IEnumerable<Commit> commits)
if (oldestChange is null || newCommits is []) return;

await using var transaction = await repo.BeginTransactionAsync();
//don't save since UpdateSnapshots will also modify newCommits with hashes, so changes will be saved once that's done
await repo.AddCommits(newCommits, false);
await UpdateSnapshots(repo, oldestChange, newCommits);
var updatedCommits = await repo.AddCommits(newCommits);
await UpdateSnapshots(repo, updatedCommits);
await ValidateCommits(repo);
await transaction.CommitAsync();
}
Expand Down Expand Up @@ -194,13 +193,15 @@ ValueTask<bool> ISyncable.ShouldSync()
return ValueTask.FromResult(true);
}

private async Task UpdateSnapshots(CrdtRepository repo, Commit oldestAddedCommit, Commit[] newCommits)
private async Task UpdateSnapshots(CrdtRepository repo, SortedSet<Commit> commitsToApply)
{
if (commitsToApply.Count == 0) return;
var oldestAddedCommit = commitsToApply.First();
await repo.DeleteStaleSnapshots(oldestAddedCommit);
Dictionary<Guid, Guid?> snapshotLookup;
if (newCommits.Length > 10)
if (commitsToApply.Count > 10)
{
var entityIds = newCommits.SelectMany(c => c.ChangeEntities.Select(ce => ce.EntityId));
var entityIds = commitsToApply.SelectMany(c => c.ChangeEntities.Select(ce => ce.EntityId));
snapshotLookup = await repo.CurrentSnapshots()
.Where(s => entityIds.Contains(s.EntityId))
.Select(s => new KeyValuePair<Guid, Guid?>(s.EntityId, s.Id))
Expand All @@ -212,7 +213,7 @@ private async Task UpdateSnapshots(CrdtRepository repo, Commit oldestAddedCommit
}

var snapshotWorker = new SnapshotWorker(snapshotLookup, repo, _crdtConfig.Value);
await snapshotWorker.UpdateSnapshots(oldestAddedCommit, newCommits);
await snapshotWorker.UpdateSnapshots(commitsToApply);
}

private async Task ValidateCommits(CrdtRepository repo)
Expand Down Expand Up @@ -240,8 +241,10 @@ public async Task RegenerateSnapshots()
await using var repo = await _crdtRepositoryFactory.CreateRepository();
await repo.DeleteSnapshotsAndProjectedTables();
repo.ClearChangeTracker();
var allCommits = await repo.CurrentCommits().AsNoTracking().ToArrayAsync();
await UpdateSnapshots(repo, allCommits.First(), allCommits);
var allCommits = await repo.CurrentCommits()
.Include(c => c.ChangeEntities)
.ToSortedSetAsync();
await UpdateSnapshots(repo, allCommits);
}

public async Task<ObjectSnapshot> GetLatestSnapshotByObjectId(Guid entityId)
Expand Down Expand Up @@ -296,7 +299,7 @@ public async Task<Dictionary<Guid, ObjectSnapshot>> GetSnapshotsAtCommit(Commit
var repository = repo.GetScopedRepository(commit);
var (snapshots, pendingCommits) = await repository.GetCurrentSnapshotsAndPendingCommits();

if (pendingCommits.Length != 0)
if (pendingCommits.Count != 0)
{
snapshots = await SnapshotWorker.ApplyCommitsToSnapshots(snapshots,
repository,
Expand Down Expand Up @@ -331,8 +334,8 @@ public async Task<T> GetAtCommit<T>(Commit commit, Guid entityId)
var newCommits = await repository.CurrentCommits()
.Include(c => c.ChangeEntities)
.WhereAfter(snapshot.Commit)
.ToArrayAsync();
if (newCommits.Length > 0)
.ToSortedSetAsync();
if (newCommits.Count > 0)
{
var snapshots = await SnapshotWorker.ApplyCommitsToSnapshots(
new Dictionary<Guid, ObjectSnapshot>([
Expand Down
58 changes: 36 additions & 22 deletions src/SIL.Harmony/Db/CrdtRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public IAsyncEnumerable<SimpleSnapshot> CurrenSimpleSnapshots(bool includeDelete
return snapshots;
}

public async Task<(Dictionary<Guid, ObjectSnapshot> currentSnapshots, Commit[] pendingCommits)> GetCurrentSnapshotsAndPendingCommits()
public async Task<(Dictionary<Guid, ObjectSnapshot> currentSnapshots, SortedSet<Commit> pendingCommits)> GetCurrentSnapshotsAndPendingCommits()
{
var snapshots = await CurrentSnapshots().Include(s => s.Commit).ToDictionaryAsync(s => s.EntityId);

Expand All @@ -217,7 +217,7 @@ public IAsyncEnumerable<SimpleSnapshot> CurrenSimpleSnapshots(bool includeDelete
var newCommits = await CurrentCommits()
.Include(c => c.ChangeEntities)
.WhereAfter(lastCommit)
.ToArrayAsync();
.ToSortedSetAsync();
return (snapshots, newCommits);
}

Expand Down Expand Up @@ -326,25 +326,11 @@ public async Task AddSnapshots(IEnumerable<ObjectSnapshot> snapshots)
catch (DbUpdateException e)
{
var entries = string.Join(Environment.NewLine, e.Entries.Select(entry => entry.ToString()));
var message = $"Error saving snapshots: {e.Message}{Environment.NewLine}{entries}";
var message = $"Error saving snapshots (deleted: {grouping.Key}): {e.Message}{Environment.NewLine}{entries}";
_logger.LogError(e, message);
throw new DbUpdateException(message, e);
}
}

// this extra try/catch was added as a quick way to get the NewEntityOnExistingEntityIsNoOp test to pass
// it will be removed again in a larger refactor in https://github.com/sillsdev/harmony/pull/56
try
{
await _dbContext.SaveChangesAsync();
}
catch (DbUpdateException e)
{
var entries = string.Join(Environment.NewLine, e.Entries.Select(entry => entry.ToString()));
var message = $"Error saving snapshots: {e.Message}{Environment.NewLine}{entries}";
_logger.LogError(e, message);
throw new DbUpdateException(message, e);
}
}

private async ValueTask ProjectSnapshot(ObjectSnapshot objectSnapshot)
Expand Down Expand Up @@ -389,16 +375,44 @@ public CrdtRepository GetScopedRepository(Commit excludeChangesAfterCommit)
return new CrdtRepository(_dbContext, _crdtConfig, _logger, excludeChangesAfterCommit);
}

public async Task AddCommit(Commit commit)
public async Task<SortedSet<Commit>> AddCommit(Commit commit)
{
_dbContext.Add(commit);
var updatedCommits = await AddNewCommits([commit]);
await _dbContext.SaveChangesAsync();
return updatedCommits;
}

public async Task AddCommits(IEnumerable<Commit> commits, bool save = true)
public async Task<SortedSet<Commit>> AddCommits(IEnumerable<Commit> commits)
{
_dbContext.AddRange(commits);
if (save) await _dbContext.SaveChangesAsync();
var updatedCommits = await AddNewCommits(commits);
await _dbContext.SaveChangesAsync();
return updatedCommits;
}

private async Task<SortedSet<Commit>> AddNewCommits(IEnumerable<Commit> newCommits)
{
if (newCommits is null || !newCommits.Any()) return [];
var oldestAddedCommit = newCommits.MinBy(c => c.CompareKey)
?? throw new ArgumentException("Couldn't find oldest commit", nameof(newCommits));
var parentCommit = await FindPreviousCommit(oldestAddedCommit);
var existingCommitsToUpdate = await GetCommitsAfter(parentCommit);
var commitsToApply = existingCommitsToUpdate
.UnionBy(newCommits, c => c.Id)
.ToSortedSet();
//we're inserting commits in the past/rewriting history, so we need to update the previous commit hashes
UpdateCommitHashes(commitsToApply, parentCommit);
_dbContext.AddRange(newCommits);
return commitsToApply;
}

private void UpdateCommitHashes(SortedSet<Commit> commits, Commit? parentCommit = null)
{
var previousCommitHash = parentCommit?.Hash ?? CommitBase.NullParentHash;
foreach (var commit in commits)
{
commit.SetParentHash(previousCommitHash);
previousCommitHash = commit.Hash;
}
}

public HybridDateTime? GetLatestDateTime()
Expand Down
22 changes: 6 additions & 16 deletions src/SIL.Harmony/SnapshotWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ private SnapshotWorker(Dictionary<Guid, ObjectSnapshot> snapshots,
internal static async Task<Dictionary<Guid, ObjectSnapshot>> ApplyCommitsToSnapshots(
Dictionary<Guid, ObjectSnapshot> snapshots,
CrdtRepository crdtRepository,
ICollection<Commit> commits,
SortedSet<Commit> commits,
CrdtConfig crdtConfig)
{
//we need to pass in the snapshots because we expect it to be modified, this is intended.
//if the constructor makes a copy in the future this will need to be updated
await new SnapshotWorker(snapshots, [], crdtRepository, crdtConfig).ApplyCommitChanges(commits, false, null);
await new SnapshotWorker(snapshots, [], crdtRepository, crdtConfig).ApplyCommitChanges(commits);
return snapshots;
}

Expand All @@ -49,32 +49,22 @@ internal SnapshotWorker(Dictionary<Guid, Guid?> snapshotLookup,
{
}

public async Task UpdateSnapshots(Commit oldestAddedCommit, Commit[] newCommits)
public async Task UpdateSnapshots(SortedSet<Commit> commits)
{
var previousCommit = await _crdtRepository.FindPreviousCommit(oldestAddedCommit);
var commits = await _crdtRepository.GetCommitsAfter(previousCommit);
await ApplyCommitChanges(commits.UnionBy(newCommits, c => c.Id), true, previousCommit?.Hash ?? CommitBase.NullParentHash);

await ApplyCommitChanges(commits);
await _crdtRepository.AddSnapshots([
.._rootSnapshots.Values,
.._newIntermediateSnapshots,
.._pendingSnapshots.Values
]);
}

private async ValueTask ApplyCommitChanges(IEnumerable<Commit> commits, bool updateCommitHash, string? previousCommitHash)
private async ValueTask ApplyCommitChanges(SortedSet<Commit> commits)
{
var intermediateSnapshots = new Dictionary<Guid, ObjectSnapshot>();
var commitIndex = 0;
foreach (var commit in commits.DefaultOrder())
foreach (var commit in commits)
{
if (updateCommitHash && previousCommitHash is not null)
{
//we're rewriting history, so we need to update the previous commit hash
commit.SetParentHash(previousCommitHash);
}

previousCommitHash = commit.Hash;
commitIndex++;
foreach (var commitChange in commit.ChangeEntities.OrderBy(c => c.Index))
{
Expand Down