Skip to content
Merged

Tmp #17

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions src/DotJEM.Json.Index2.Management.Test/JsonIndexManagerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace DotJEM.Json.Index2.Management.Test;
[TestFixture]
public class JsonIndexManagerTest
{
[Test, Explicit]
[Test, Explicit, MaxTime(1000*60*12)]
public async Task IndexWriterShouldNotBeDisposed()
{
using TestDirectory dir = new();
Expand All @@ -36,6 +36,7 @@ public async Task IndexWriterShouldNotBeDisposed()
ISnapshotStrategy strategy = new ZipSnapshotStrategy(dir.Info.CreateSubdirectory("snapshot").FullName);
IJsonIndexSnapshotManager snapshots = new JsonIndexSnapshotManager(index, strategy, scheduler, "60h");
IJsonIndexManager manager = new JsonIndexManager(source, snapshots, index);
index.Commit();

InfoStreamExceptionEvent? disposedEvent = null;
InfoStreamExceptionEvent? exceptionEvent = null;
Expand All @@ -44,13 +45,15 @@ public async Task IndexWriterShouldNotBeDisposed()
.Where(@event => @event.Exception is ObjectDisposedException)
.Subscribe(@event =>
{
Debug.WriteLine($"Event {@event.Message};");
disposedEvent = @event;
});
manager.InfoStream
.OfType<InfoStreamExceptionEvent>()
.Where(@event => @event.Exception.Message != "Can't write to an existing snapshot.")
.Subscribe(@event =>
{
Debug.WriteLine($"Event {@event.Message};");
exceptionEvent = @event;
});

Expand All @@ -59,7 +62,7 @@ public async Task IndexWriterShouldNotBeDisposed()
await manager.RunAsync();
Debug.WriteLine("TEST STARTED");
Stopwatch sw = Stopwatch.StartNew();
while (sw.Elapsed < 10.Minutes() && disposedEvent == null && exceptionEvent == null)
while (sw.Elapsed < 1.Minutes() && disposedEvent == null && exceptionEvent == null)
{
Task result = Random.Shared.Next(100) switch
{
Expand All @@ -73,7 +76,16 @@ public async Task IndexWriterShouldNotBeDisposed()
async Task DoAfterDelay(Func<Task> action, TimeSpan? delay = null)
{
await Task.Delay(delay ?? Random.Shared.Next(1, 5).Seconds());
await action();
Debug.WriteLine($"Calling {action.Method.Name};");
try
{
await action();
}
catch (Exception e)
{
Console.WriteLine(e.Message);
throw;
}
}

await manager.StopAsync();
Expand All @@ -100,7 +112,7 @@ public class TestDirectory : IDisposable

public TestDirectory()
{
Info = Directory.CreateDirectory(Path.Combine(Path.GetTempPath(), $"TEST-{Guid.NewGuid():N}"));
Info = Directory.CreateDirectory(Path.Combine(Path.GetTempPath(), "DOTNET_TEST", $"TEST-{Guid.NewGuid():N}"));
Debug.WriteLine("TEST DIR: " + Info.FullName);
}

Expand Down
1 change: 1 addition & 0 deletions src/DotJEM.Json.Index2.Test/JsonIndexTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public async Task Search_Booleans()
int count = searcher.Search(new TermQuery(new Term("inStock", "true"))).Count();
Assert.AreEqual(3, count);
}

[Test]
public async Task FindBeforeCommit_AddsDocument()
{
Expand Down
30 changes: 27 additions & 3 deletions src/DotJEM.Json.Index2/IO/IndexWriterSafeProxy.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using Lucene.Net.Analysis;
using Lucene.Net.Index;
using Lucene.Net.Search;
Expand All @@ -10,7 +12,9 @@
public class IndexWriterSafeProxy : IIndexWriter
{
private readonly IndexWriter inner;

private readonly Guid id = Guid.NewGuid();
private StackTrace whoDisposed;

public IndexWriterSafeProxy(IndexWriter writer)
{
inner = writer;
Expand All @@ -30,11 +34,15 @@

public void Dispose()
{
whoDisposed = new StackTrace();
Debug.WriteLine($"Dispose IndexWriter[{id}]");
inner.Dispose();
}

public void Dispose(bool waitForMerges)
{
whoDisposed = new StackTrace();
Debug.WriteLine($"Dispose IndexWriter[{id}]");
inner.Dispose(waitForMerges);
}

Expand Down Expand Up @@ -190,7 +198,23 @@

public void Commit()
{
inner.Commit();
Console.WriteLine($"Commit IndexWriter[{id}]");
try
{
inner.Commit();
}
catch (ObjectDisposedException e)

Check warning on line 206 in src/DotJEM.Json.Index2/IO/IndexWriterSafeProxy.cs

View workflow job for this annotation

GitHub Actions / build

The variable 'e' is declared but never used
{
Console.WriteLine("Who disposed me:");
Console.WriteLine(whoDisposed);
Console.WriteLine();

Console.WriteLine("Who then called me:");
Console.WriteLine(new StackTrace());
Console.WriteLine();

throw;
}
}

public bool HasUncommittedChanges()
Expand Down
43 changes: 38 additions & 5 deletions src/DotJEM.Json.Index2/IO/JsonIndexWriterManager.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using DotJEM.Json.Index2.Leases;
using DotJEM.Json.Index2.Util;
using Lucene.Net.Index;
Expand All @@ -10,9 +13,11 @@ namespace DotJEM.Json.Index2.IO;
public interface IIndexWriterManager : IDisposable
{
event EventHandler<EventArgs> OnClose;

ILease<IIndexWriter> Lease();
void Close();

void Lock();
void Unlock();
}


Expand All @@ -28,9 +33,21 @@ public class IndexWriterManager : Disposable, IIndexWriterManager
private readonly object writerPadLock = new();
private readonly LeaseManager<IIndexWriter> leaseManager = new();

private readonly ManualResetEventSlim reset = new(true);
//TODO: With leases, this should not be needed.
Comment thread
jeme marked this conversation as resolved.
public event EventHandler<EventArgs> OnClose;

private static List<WeakReference<IIndexWriter>> writers = new();

public void Lock()
{
reset.Reset();
}
public void Unlock()
{
reset.Set();
}

Comment thread
jeme marked this conversation as resolved.
private IIndexWriter Writer
{
get
Expand All @@ -43,7 +60,11 @@ private IIndexWriter Writer
if (writer != null)
return writer;

return writer = Open(index);
reset.Wait();

IIndexWriter newWriter = Open(index);
writers.Add(new(newWriter));
return writer = newWriter;
Comment thread
jeme marked this conversation as resolved.
}
}
}
Expand All @@ -52,7 +73,13 @@ public IndexWriterManager(IJsonIndex index)
{
this.index = index;
}
public ILease<IIndexWriter> Lease() => leaseManager.Create(Writer, TimeSpan.FromSeconds(10));
public ILease<IIndexWriter> Lease()
{
lock (writerPadLock)
{
return leaseManager.Create(Writer, TimeSpan.FromSeconds(10));
}
}

private static IIndexWriter Open(IJsonIndex index)
{
Expand All @@ -71,19 +98,25 @@ public void Close()

lock (writerPadLock)
{
leaseManager.RecallAll();
if (writer == null)
return;

writer.Dispose();
IIndexWriter copy = writer;
writer = null;
leaseManager.RecallAll();
copy.Dispose();
RaiseOnClose();
}

int writersOpenend = writers.Count;
int writersAlive = writers.Count(w => w.TryGetTarget(out _));
Debug.WriteLine($"Number of opened writers: {writersOpenend} where {writersAlive} are still alive");
}

protected override void Dispose(bool disposing)
{
Debug.WriteLine($"DISPOSE WRITER: {disposing}");
reset.Dispose();
if (disposing)
Close();
base.Dispose(disposing);
Expand Down
59 changes: 42 additions & 17 deletions src/DotJEM.Json.Index2/Leases/LeaseManager.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using DotJEM.Json.Index2.Util;

Expand All @@ -11,7 +12,7 @@
int Count { get; }
ILease<T> Create(T value);
ILease<T> Create(T value, TimeSpan limit);
void RecallAll();
IEnumerable<T> RecallAll();
}

public class LeaseManager<T> : ILeaseManager<T>
Expand All @@ -32,18 +33,26 @@
return Add(new TimeLimitedLease(value, OnReturned, limit));
}

public void RecallAll()
public IEnumerable<T> RecallAll()
{
lock (leasesPadLock)
{
IRecallableLease<T>[] copy = leases.ToArray();
leases.Clear();
IRecallableLease<T>[] copy = CopyLeases();
T[] values = Array.ConvertAll(copy, x => x.Value);
foreach (IRecallableLease<T> lease in copy)
lease.Terminate();
return values;

foreach (IRecallableLease<T> lease in copy)
lease.Terminate();
IRecallableLease<T>[] CopyLeases()
{
lock (leasesPadLock)
{
IRecallableLease<T>[] copy = leases.ToArray();
leases.Clear();
return copy;
}
}
}


private ILease<T> Add(IRecallableLease<T> lease)
{
lock (leasesPadLock)
Expand All @@ -61,7 +70,7 @@
}
}

private interface IRecallableLease<T> : ILease<T>

Check warning on line 73 in src/DotJEM.Json.Index2/Leases/LeaseManager.cs

View workflow job for this annotation

GitHub Actions / build

Type parameter 'T' has the same name as the type parameter from outer type 'LeaseManager<T>'
{
void Terminate();
}
Expand All @@ -72,6 +81,7 @@

private readonly T value;
private readonly Action<Lease> onReturned;
private readonly ManualResetEventSlim returned = new ManualResetEventSlim();

public bool IsExpired => IsDisposed;
public bool IsTerminated { get; private set; }
Expand Down Expand Up @@ -109,14 +119,21 @@

public void Terminate()
{
returned.Wait(500);
IsTerminated = true;
Terminated?.Invoke(this, EventArgs.Empty);
Dispose();
Dispose(false);
onReturned(this);
}

protected override void Dispose(bool disposing)
{
onReturned(this);
if (disposing)
{
returned.Set();
onReturned(this);
}
returned.Dispose();
base.Dispose(disposing);
}
}
Expand All @@ -129,7 +146,7 @@
private readonly Action<TimeLimitedLease> onReturned;
private readonly long timeLimitMilliseconds;
private readonly long leaseTime = Stopwatch.GetTimestamp();
private readonly AutoResetEvent handle = new(false);
private readonly AutoResetEvent returned = new(false);

public bool IsExpired => (ElapsedMs > timeLimitMilliseconds) || IsDisposed;
public bool IsTerminated { get; private set; }
Expand Down Expand Up @@ -163,31 +180,39 @@
this.onReturned = onReturned;
this.timeLimitMilliseconds = (long)timeLimit.TotalMilliseconds;
}

public bool TryRenew()
{
return false;
}

public void Terminate()
{
Wait();
IsTerminated = true;
Terminated?.Invoke(this, EventArgs.Empty);
Wait();
Dispose();
Dispose(false);
onReturned(this);
}

public void Wait()
private void Wait()
{
if (IsExpired)
return;

handle.WaitOne(TimeSpan.FromSeconds(6) - TimeSpan.FromMilliseconds(ElapsedMs));
TimeSpan remaining = TimeSpan.FromSeconds(6) - TimeSpan.FromMilliseconds(ElapsedMs);
if(remaining > TimeSpan.Zero)
returned.WaitOne(TimeSpan.FromSeconds(6) - TimeSpan.FromMilliseconds(ElapsedMs));
}

protected override void Dispose(bool disposing)
{
onReturned(this);
if (disposing)
{
returned.Set();
onReturned(this);
}
returned.Dispose();
base.Dispose(disposing);
}
}
Expand Down
Loading
Loading