Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/Razor.App/Program.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
using System.Net;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Razor.Core.Storage;
using Razor.Core.Sync;
using Razor.Storage;
using Razor.Sync;
using Razor.U5C.Sync;

var builder = WebApplication.CreateBuilder(args);
Expand Down Expand Up @@ -46,6 +50,13 @@ void ConfigureListen(Action<ListenOptions> configure)
builder.Services.AddGrpc();
builder.Services.AddGrpcReflection();

var dataPath = builder.Configuration["Storage:Path"] ?? "./data";
builder.Services.AddSingleton<IBlockStore>(_ => new ZoneTreeBlockStore(dataPath));
builder.Services.AddSingleton<ChainEventHub>();
builder.Services.AddSingleton<IChainEventSource>(sp => sp.GetRequiredService<ChainEventHub>());
builder.Services.Configure<ChainSyncOptions>(builder.Configuration.GetSection("Sync"));
builder.Services.AddHostedService<ChainSyncIndexer>();

var app = builder.Build();

app.MapGrpcService<SyncServiceImpl>();
Expand Down
6 changes: 6 additions & 0 deletions src/Razor.Core/Sync/IChainEventSource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Razor.Core.Sync;

public interface IChainEventSource
{
IAsyncEnumerable<ChainEvent> Subscribe(CancellationToken cancellationToken);
}
123 changes: 90 additions & 33 deletions src/Razor.Storage/ZoneTreeBlockStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public sealed class ZoneTreeBlockStore : IBlockStore
private static readonly Memory<byte> TipKey = new byte[] { 0x01 };

private readonly IZoneTree<Memory<byte>, Memory<byte>> _blocksByHash;
private readonly IZoneTree<Memory<byte>, Memory<byte>> _refByHash;
private readonly IZoneTree<Memory<byte>, Memory<byte>> _hashBySlot;
private readonly IZoneTree<Memory<byte>, Memory<byte>> _hashByHeight;
private readonly IZoneTree<Memory<byte>, Memory<byte>> _tip;
Expand All @@ -27,6 +28,13 @@ public ZoneTreeBlockStore(string rootPath)
.SetValueSerializer(new ByteArraySerializer())
.OpenOrCreate();

_refByHash = new ZoneTreeFactory<Memory<byte>, Memory<byte>>()
.SetDataDirectory(Path.Combine(rootPath, "ref_by_hash"))
.SetComparer(new ByteArrayComparerAscending())
.SetKeySerializer(new ByteArraySerializer())
.SetValueSerializer(new ByteArraySerializer())
.OpenOrCreate();

_hashBySlot = new ZoneTreeFactory<Memory<byte>, Memory<byte>>()
.SetDataDirectory(Path.Combine(rootPath, "hash_by_slot"))
.SetComparer(new ByteArrayComparerAscending())
Expand All @@ -52,7 +60,7 @@ public ZoneTreeBlockStore(string rootPath)
public BlockRef? GetTip()
{
var key = TipKey;
if (!_tip.TryGet(ref key, out var payload))
if (!_tip.TryGet(in key, out var payload))
{
return null;
}
Expand All @@ -69,37 +77,46 @@ public void Apply(BlockRecord record)

Memory<byte> hashKey = record.Ref.Hash;
Memory<byte> blockBytes = record.Bytes;
_blocksByHash.Upsert(ref hashKey, ref blockBytes);
_blocksByHash.Upsert(in hashKey, in blockBytes);

Memory<byte> refValue = SerializeBlockRef(record.Ref);
_refByHash.Upsert(in hashKey, in refValue);

Memory<byte> slotKey = StorageKeys.SlotKey(record.Ref.Slot);
Memory<byte> slotHash = record.Ref.Hash;
_hashBySlot.Upsert(ref slotKey, ref slotHash);
_hashBySlot.Upsert(in slotKey, in slotHash);

if (record.Ref.Height != 0)
{
Memory<byte> heightKey = StorageKeys.HeightKey(record.Ref.Height);
Memory<byte> heightHash = record.Ref.Hash;
_hashByHeight.Upsert(ref heightKey, ref heightHash);
_hashByHeight.Upsert(in heightKey, in heightHash);
}

Memory<byte> tipKey = TipKey;
Memory<byte> tipValue = SerializeBlockRef(record.Ref);
_tip.Upsert(ref tipKey, ref tipValue);
_tip.Upsert(in tipKey, in tipValue);
}

public void RollbackTo(BlockRef point)
{
Memory<byte> tipKey = TipKey;
Memory<byte> tipValue = SerializeBlockRef(point);
_tip.Upsert(ref tipKey, ref tipValue);
_tip.Upsert(in tipKey, in tipValue);
}

public bool TryGetByHash(byte[] hash, out BlockRecord record)
{
Memory<byte> hashKey = hash;
Memory<byte> bytes = default;
if (_blocksByHash.TryGet(ref hashKey, out bytes))
if (_blocksByHash.TryGet(in hashKey, out bytes))
{
if (_refByHash.TryGet(in hashKey, out var refPayload))
{
record = new BlockRecord(DeserializeBlockRef(refPayload.ToArray()), bytes.ToArray());
return true;
}

record = new BlockRecord(new BlockRef(0, hash, 0, 0), bytes.ToArray());
return true;
}
Expand All @@ -112,11 +129,17 @@ public bool TryGetBySlot(ulong slot, out BlockRecord record)
{
Memory<byte> slotKey = StorageKeys.SlotKey(slot);
Memory<byte> hash = default;
if (_hashBySlot.TryGet(ref slotKey, out hash))
if (_hashBySlot.TryGet(in slotKey, out hash))
{
Memory<byte> bytes = default;
if (_blocksByHash.TryGet(ref hash, out bytes))
if (_blocksByHash.TryGet(in hash, out bytes))
{
if (_refByHash.TryGet(in hash, out var refPayload))
{
record = new BlockRecord(DeserializeBlockRef(refPayload.ToArray()), bytes.ToArray());
return true;
}

record = new BlockRecord(new BlockRef(slot, hash.ToArray(), 0, 0), bytes.ToArray());
return true;
}
Expand All @@ -130,11 +153,17 @@ public bool TryGetByHeight(ulong height, out BlockRecord record)
{
Memory<byte> heightKey = StorageKeys.HeightKey(height);
Memory<byte> hash = default;
if (_hashByHeight.TryGet(ref heightKey, out hash))
if (_hashByHeight.TryGet(in heightKey, out hash))
{
Memory<byte> bytes = default;
if (_blocksByHash.TryGet(ref hash, out bytes))
if (_blocksByHash.TryGet(in hash, out bytes))
{
if (_refByHash.TryGet(in hash, out var refPayload))
{
record = new BlockRecord(DeserializeBlockRef(refPayload.ToArray()), bytes.ToArray());
return true;
}

record = new BlockRecord(new BlockRef(0, hash.ToArray(), height, 0), bytes.ToArray());
return true;
}
Expand All @@ -146,54 +175,71 @@ record = default;

public IReadOnlyList<BlockRecord> GetHistory(BlockRef? startToken, int maxItems, out BlockRef? nextToken)
{
if (maxItems <= 0)
{
nextToken = null;
return Array.Empty<BlockRecord>();
}

var iterator = _hashBySlot.CreateIterator(
IteratorType.AutoRefresh,
includeDeletedRecords: false,
contributeToTheBlockCache: false);

if (startToken is not null)
{
Memory<byte> startKey = StorageKeys.SlotKey(startToken.Value.Slot);
iterator.Seek(ref startKey);
iterator.Seek(in startKey);
}
else
{
iterator.SeekFirst();
}

if (!iterator.Next())
{
nextToken = null;
return Array.Empty<BlockRecord>();
}

if (maxItems <= 0)
{
nextToken = BuildRef(iterator.CurrentKey, iterator.CurrentValue);
return Array.Empty<BlockRecord>();
}

List<BlockRecord> results = new(maxItems);
while (iterator.HasCurrent && results.Count < maxItems)
int count = 0;

while (true)
{
var slot = BinaryPrimitives.ReadUInt64BigEndian(iterator.CurrentKey.Span);
var hash = iterator.CurrentValue;
if (!_blocksByHash.TryGet(ref hash, out var bytes))
if (_blocksByHash.TryGet(in hash, out var bytes))
{
iterator.Next();
continue;
if (_refByHash.TryGet(in hash, out var refPayload))
{
results.Add(new BlockRecord(DeserializeBlockRef(refPayload.ToArray()), bytes.ToArray()));
}
else
{
var slot = BinaryPrimitives.ReadUInt64BigEndian(iterator.CurrentKey.Span);
results.Add(new BlockRecord(new BlockRef(slot, hash.ToArray(), 0, 0), bytes.ToArray()));
}
}

results.Add(new BlockRecord(new BlockRef(slot, hash.ToArray(), 0, 0), bytes.ToArray()));
iterator.Next();
}
count++;
if (count >= maxItems)
{
nextToken = iterator.Next() ? BuildRef(iterator.CurrentKey, iterator.CurrentValue) : null;
return results;
}

nextToken = null;
if (iterator.HasCurrent)
{
var slot = BinaryPrimitives.ReadUInt64BigEndian(iterator.CurrentKey.Span);
nextToken = new BlockRef(slot, iterator.CurrentValue.ToArray(), 0, 0);
if (!iterator.Next())
{
nextToken = null;
return results;
}
}

return results;
}

public void Dispose()
{
_blocksByHash.Dispose();
_refByHash.Dispose();
_hashBySlot.Dispose();
_hashByHeight.Dispose();
_tip.Dispose();
Expand All @@ -220,4 +266,15 @@ private static BlockRef DeserializeBlockRef(byte[] buffer)
Array.Copy(buffer, 28, hash, 0, hashLen);
return new BlockRef(slot, hash, height, timestamp);
}

private BlockRef BuildRef(Memory<byte> slotKey, Memory<byte> hash)
{
if (_refByHash.TryGet(in hash, out var refPayload))
{
return DeserializeBlockRef(refPayload.ToArray());
}

var slot = BinaryPrimitives.ReadUInt64BigEndian(slotKey.Span);
return new BlockRef(slot, hash.ToArray(), 0, 0);
}
}
63 changes: 63 additions & 0 deletions src/Razor.Sync/ChainEventHub.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Razor.Core.Sync;

namespace Razor.Sync;

public sealed class ChainEventHub : IChainEventSource, IDisposable
{
private readonly ConcurrentDictionary<int, Channel<ChainEvent>> _channels = new();
private int _nextId;

public IAsyncEnumerable<ChainEvent> Subscribe(CancellationToken cancellationToken)
{
var channel = Channel.CreateUnbounded<ChainEvent>(new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = false
});

int id = Interlocked.Increment(ref _nextId);
_channels[id] = channel;

return ReadChannel(channel, id, cancellationToken);
}

public void Publish(ChainEvent chainEvent)
{
foreach (var channel in _channels.Values)
{
channel.Writer.TryWrite(chainEvent);
}
}

public void Dispose()
{
foreach (var channel in _channels.Values)
{
channel.Writer.TryComplete();
}

_channels.Clear();
}

private async IAsyncEnumerable<ChainEvent> ReadChannel(
Channel<ChainEvent> channel,
int id,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
try
{
await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken))
{
yield return item;
}
}
finally
{
_channels.TryRemove(id, out _);
channel.Writer.TryComplete();
}
}
}
Loading