diff --git a/src/Razor.App/Program.cs b/src/Razor.App/Program.cs index 95a8721..eabe7b0 100644 --- a/src/Razor.App/Program.cs +++ b/src/Razor.App/Program.cs @@ -1,5 +1,9 @@ using System.Net; using Microsoft.AspNetCore.Server.Kestrel.Core; +using Razor.Core.Storage; +using Razor.Core.Sync; +using Razor.Storage; +using Razor.Sync; using Razor.U5C.Sync; var builder = WebApplication.CreateBuilder(args); @@ -46,6 +50,13 @@ void ConfigureListen(Action configure) builder.Services.AddGrpc(); builder.Services.AddGrpcReflection(); +var dataPath = builder.Configuration["Storage:Path"] ?? "./data"; +builder.Services.AddSingleton(_ => new ZoneTreeBlockStore(dataPath)); +builder.Services.AddSingleton(); +builder.Services.AddSingleton(sp => sp.GetRequiredService()); +builder.Services.Configure(builder.Configuration.GetSection("Sync")); +builder.Services.AddHostedService(); + var app = builder.Build(); app.MapGrpcService(); diff --git a/src/Razor.Core/Sync/IChainEventSource.cs b/src/Razor.Core/Sync/IChainEventSource.cs new file mode 100644 index 0000000..6cda0c6 --- /dev/null +++ b/src/Razor.Core/Sync/IChainEventSource.cs @@ -0,0 +1,6 @@ +namespace Razor.Core.Sync; + +public interface IChainEventSource +{ + IAsyncEnumerable Subscribe(CancellationToken cancellationToken); +} diff --git a/src/Razor.Storage/ZoneTreeBlockStore.cs b/src/Razor.Storage/ZoneTreeBlockStore.cs index 41c680b..e996b97 100644 --- a/src/Razor.Storage/ZoneTreeBlockStore.cs +++ b/src/Razor.Storage/ZoneTreeBlockStore.cs @@ -12,6 +12,7 @@ public sealed class ZoneTreeBlockStore : IBlockStore private static readonly Memory TipKey = new byte[] { 0x01 }; private readonly IZoneTree, Memory> _blocksByHash; + private readonly IZoneTree, Memory> _refByHash; private readonly IZoneTree, Memory> _hashBySlot; private readonly IZoneTree, Memory> _hashByHeight; private readonly IZoneTree, Memory> _tip; @@ -27,6 +28,13 @@ public ZoneTreeBlockStore(string rootPath) .SetValueSerializer(new ByteArraySerializer()) .OpenOrCreate(); + _refByHash = new ZoneTreeFactory, Memory>() + .SetDataDirectory(Path.Combine(rootPath, "ref_by_hash")) + .SetComparer(new ByteArrayComparerAscending()) + .SetKeySerializer(new ByteArraySerializer()) + .SetValueSerializer(new ByteArraySerializer()) + .OpenOrCreate(); + _hashBySlot = new ZoneTreeFactory, Memory>() .SetDataDirectory(Path.Combine(rootPath, "hash_by_slot")) .SetComparer(new ByteArrayComparerAscending()) @@ -52,7 +60,7 @@ public ZoneTreeBlockStore(string rootPath) public BlockRef? GetTip() { var key = TipKey; - if (!_tip.TryGet(ref key, out var payload)) + if (!_tip.TryGet(in key, out var payload)) { return null; } @@ -69,37 +77,46 @@ public void Apply(BlockRecord record) Memory hashKey = record.Ref.Hash; Memory blockBytes = record.Bytes; - _blocksByHash.Upsert(ref hashKey, ref blockBytes); + _blocksByHash.Upsert(in hashKey, in blockBytes); + + Memory refValue = SerializeBlockRef(record.Ref); + _refByHash.Upsert(in hashKey, in refValue); Memory slotKey = StorageKeys.SlotKey(record.Ref.Slot); Memory slotHash = record.Ref.Hash; - _hashBySlot.Upsert(ref slotKey, ref slotHash); + _hashBySlot.Upsert(in slotKey, in slotHash); if (record.Ref.Height != 0) { Memory heightKey = StorageKeys.HeightKey(record.Ref.Height); Memory heightHash = record.Ref.Hash; - _hashByHeight.Upsert(ref heightKey, ref heightHash); + _hashByHeight.Upsert(in heightKey, in heightHash); } Memory tipKey = TipKey; Memory tipValue = SerializeBlockRef(record.Ref); - _tip.Upsert(ref tipKey, ref tipValue); + _tip.Upsert(in tipKey, in tipValue); } public void RollbackTo(BlockRef point) { Memory tipKey = TipKey; Memory tipValue = SerializeBlockRef(point); - _tip.Upsert(ref tipKey, ref tipValue); + _tip.Upsert(in tipKey, in tipValue); } public bool TryGetByHash(byte[] hash, out BlockRecord record) { Memory hashKey = hash; Memory bytes = default; - if (_blocksByHash.TryGet(ref hashKey, out bytes)) + if (_blocksByHash.TryGet(in hashKey, out bytes)) { + if (_refByHash.TryGet(in hashKey, out var refPayload)) + { + record = new BlockRecord(DeserializeBlockRef(refPayload.ToArray()), bytes.ToArray()); + return true; + } + record = new BlockRecord(new BlockRef(0, hash, 0, 0), bytes.ToArray()); return true; } @@ -112,11 +129,17 @@ public bool TryGetBySlot(ulong slot, out BlockRecord record) { Memory slotKey = StorageKeys.SlotKey(slot); Memory hash = default; - if (_hashBySlot.TryGet(ref slotKey, out hash)) + if (_hashBySlot.TryGet(in slotKey, out hash)) { Memory bytes = default; - if (_blocksByHash.TryGet(ref hash, out bytes)) + if (_blocksByHash.TryGet(in hash, out bytes)) { + if (_refByHash.TryGet(in hash, out var refPayload)) + { + record = new BlockRecord(DeserializeBlockRef(refPayload.ToArray()), bytes.ToArray()); + return true; + } + record = new BlockRecord(new BlockRef(slot, hash.ToArray(), 0, 0), bytes.ToArray()); return true; } @@ -130,11 +153,17 @@ public bool TryGetByHeight(ulong height, out BlockRecord record) { Memory heightKey = StorageKeys.HeightKey(height); Memory hash = default; - if (_hashByHeight.TryGet(ref heightKey, out hash)) + if (_hashByHeight.TryGet(in heightKey, out hash)) { Memory bytes = default; - if (_blocksByHash.TryGet(ref hash, out bytes)) + if (_blocksByHash.TryGet(in hash, out bytes)) { + if (_refByHash.TryGet(in hash, out var refPayload)) + { + record = new BlockRecord(DeserializeBlockRef(refPayload.ToArray()), bytes.ToArray()); + return true; + } + record = new BlockRecord(new BlockRef(0, hash.ToArray(), height, 0), bytes.ToArray()); return true; } @@ -146,54 +175,71 @@ record = default; public IReadOnlyList GetHistory(BlockRef? startToken, int maxItems, out BlockRef? nextToken) { - if (maxItems <= 0) - { - nextToken = null; - return Array.Empty(); - } - var iterator = _hashBySlot.CreateIterator( IteratorType.AutoRefresh, includeDeletedRecords: false, contributeToTheBlockCache: false); + if (startToken is not null) { Memory startKey = StorageKeys.SlotKey(startToken.Value.Slot); - iterator.Seek(ref startKey); + iterator.Seek(in startKey); } else { iterator.SeekFirst(); } + if (!iterator.Next()) + { + nextToken = null; + return Array.Empty(); + } + + if (maxItems <= 0) + { + nextToken = BuildRef(iterator.CurrentKey, iterator.CurrentValue); + return Array.Empty(); + } + List results = new(maxItems); - while (iterator.HasCurrent && results.Count < maxItems) + int count = 0; + + while (true) { - var slot = BinaryPrimitives.ReadUInt64BigEndian(iterator.CurrentKey.Span); var hash = iterator.CurrentValue; - if (!_blocksByHash.TryGet(ref hash, out var bytes)) + if (_blocksByHash.TryGet(in hash, out var bytes)) { - iterator.Next(); - continue; + if (_refByHash.TryGet(in hash, out var refPayload)) + { + results.Add(new BlockRecord(DeserializeBlockRef(refPayload.ToArray()), bytes.ToArray())); + } + else + { + var slot = BinaryPrimitives.ReadUInt64BigEndian(iterator.CurrentKey.Span); + results.Add(new BlockRecord(new BlockRef(slot, hash.ToArray(), 0, 0), bytes.ToArray())); + } } - results.Add(new BlockRecord(new BlockRef(slot, hash.ToArray(), 0, 0), bytes.ToArray())); - iterator.Next(); - } + count++; + if (count >= maxItems) + { + nextToken = iterator.Next() ? BuildRef(iterator.CurrentKey, iterator.CurrentValue) : null; + return results; + } - nextToken = null; - if (iterator.HasCurrent) - { - var slot = BinaryPrimitives.ReadUInt64BigEndian(iterator.CurrentKey.Span); - nextToken = new BlockRef(slot, iterator.CurrentValue.ToArray(), 0, 0); + if (!iterator.Next()) + { + nextToken = null; + return results; + } } - - return results; } public void Dispose() { _blocksByHash.Dispose(); + _refByHash.Dispose(); _hashBySlot.Dispose(); _hashByHeight.Dispose(); _tip.Dispose(); @@ -220,4 +266,15 @@ private static BlockRef DeserializeBlockRef(byte[] buffer) Array.Copy(buffer, 28, hash, 0, hashLen); return new BlockRef(slot, hash, height, timestamp); } + + private BlockRef BuildRef(Memory slotKey, Memory hash) + { + if (_refByHash.TryGet(in hash, out var refPayload)) + { + return DeserializeBlockRef(refPayload.ToArray()); + } + + var slot = BinaryPrimitives.ReadUInt64BigEndian(slotKey.Span); + return new BlockRef(slot, hash.ToArray(), 0, 0); + } } diff --git a/src/Razor.Sync/ChainEventHub.cs b/src/Razor.Sync/ChainEventHub.cs new file mode 100644 index 0000000..017f30b --- /dev/null +++ b/src/Razor.Sync/ChainEventHub.cs @@ -0,0 +1,63 @@ +using System.Collections.Concurrent; +using System.Runtime.CompilerServices; +using System.Threading.Channels; +using Razor.Core.Sync; + +namespace Razor.Sync; + +public sealed class ChainEventHub : IChainEventSource, IDisposable +{ + private readonly ConcurrentDictionary> _channels = new(); + private int _nextId; + + public IAsyncEnumerable Subscribe(CancellationToken cancellationToken) + { + var channel = Channel.CreateUnbounded(new UnboundedChannelOptions + { + SingleReader = true, + SingleWriter = false + }); + + int id = Interlocked.Increment(ref _nextId); + _channels[id] = channel; + + return ReadChannel(channel, id, cancellationToken); + } + + public void Publish(ChainEvent chainEvent) + { + foreach (var channel in _channels.Values) + { + channel.Writer.TryWrite(chainEvent); + } + } + + public void Dispose() + { + foreach (var channel in _channels.Values) + { + channel.Writer.TryComplete(); + } + + _channels.Clear(); + } + + private async IAsyncEnumerable ReadChannel( + Channel channel, + int id, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + try + { + await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken)) + { + yield return item; + } + } + finally + { + _channels.TryRemove(id, out _); + channel.Writer.TryComplete(); + } + } +} diff --git a/src/Razor.Sync/ChainSyncIndexer.cs b/src/Razor.Sync/ChainSyncIndexer.cs new file mode 100644 index 0000000..9bb444c --- /dev/null +++ b/src/Razor.Sync/ChainSyncIndexer.cs @@ -0,0 +1,324 @@ +using System.Formats.Cbor; +using Chrysalis.Cbor.Extensions.Cardano.Core; +using Chrysalis.Cbor.Extensions.Cardano.Core.Header; +using Chrysalis.Cbor.Serialization; +using Chrysalis.Cbor.Types.Cardano.Core.Header; +using Chrysalis.Network.Cbor.ChainSync; +using Chrysalis.Network.Cbor.Common; +using Chrysalis.Network.Multiplexer; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Razor.Core.Storage; +using Razor.Core.Sync; + +namespace Razor.Sync; + +public sealed class ChainSyncIndexer : BackgroundService +{ + private static readonly TimeSpan ReconnectDelay = TimeSpan.FromSeconds(5); + + private readonly IBlockStore _blockStore; + private readonly ChainEventHub _events; + private readonly ChainSyncOptions _options; + private readonly ILogger _logger; + private bool _atTip; + + public ChainSyncIndexer( + IBlockStore blockStore, + ChainEventHub events, + IOptions options, + ILogger logger) + { + _blockStore = blockStore; + _events = events; + _options = options.Value; + _logger = logger; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + if (!TryGetStartPoint(out Point startPoint, out string? error)) + { + _logger.LogError("ChainSync disabled: {Error}", error); + return; + } + + while (!stoppingToken.IsCancellationRequested) + { + try + { + await RunOnceAsync(startPoint, stoppingToken); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + break; + } + catch (Exception ex) + { + _logger.LogError(ex, "ChainSync failed. Reconnecting in {DelaySeconds}s.", ReconnectDelay.TotalSeconds); + await Task.Delay(ReconnectDelay, stoppingToken); + } + } + } + + private async Task RunOnceAsync(Point startPoint, CancellationToken cancellationToken) + { + using PeerClient peer = await ConnectAsync(cancellationToken); + await peer.StartAsync(_options.NetworkMagic, TimeSpan.FromSeconds(_options.KeepAliveSeconds)); + + _logger.LogInformation("ChainSync connected. Finding intersection..."); + ChainSyncMessage intersect = await peer.ChainSync.FindIntersectionAsync([startPoint], cancellationToken); + + switch (intersect) + { + case MessageIntersectFound found: + _logger.LogInformation( + "Intersection found at slot {Slot} hash {Hash}", + found.Point.Slot, + ToHex(found.Point.Hash)); + break; + + case MessageIntersectNotFound notFound: + _logger.LogError( + "Intersection not found. Tip slot {Slot} hash {Hash}", + notFound.Tip.Slot.Slot, + ToHex(notFound.Tip.Slot.Hash)); + return; + + default: + _logger.LogError("Unexpected intersection response."); + return; + } + + while (!cancellationToken.IsCancellationRequested) + { + MessageNextResponse? response = await peer.ChainSync.NextRequestAsync(cancellationToken); + + switch (response) + { + case MessageRollForward rollForward: + _atTip = false; + HandleRollForward(rollForward); + break; + + case MessageRollBackward rollBackward: + _atTip = false; + HandleRollBackward(rollBackward); + break; + + case MessageAwaitReply: + if (!_atTip) + { + _logger.LogInformation("ChainSync: Awaiting next block."); + _atTip = true; + } + break; + } + } + } + + private async Task ConnectAsync(CancellationToken cancellationToken) + { + if (!string.IsNullOrWhiteSpace(_options.SocketPath)) + { + _logger.LogInformation("Connecting to node via Unix socket {SocketPath}", _options.SocketPath); + return await PeerClient.ConnectAsync(_options.SocketPath, cancellationToken); + } + + _logger.LogInformation("Connecting to node via TCP {Host}:{Port}", _options.TcpHost, _options.TcpPort); + return await PeerClient.ConnectAsync(_options.TcpHost, _options.TcpPort, cancellationToken); + } + + private void HandleRollForward(MessageRollForward rollForward) + { + if (!TryDecodeHeader(rollForward.Payload.Value, out HeaderInfo header)) + { + _logger.LogWarning("Failed to decode block header. Skipping rollforward."); + return; + } + + var record = new BlockRecord( + new BlockRef(header.Slot, header.Hash, header.BlockNumber, 0), + rollForward.Payload.Value); + + _blockStore.Apply(record); + + BlockRef? tip = ToBlockRef(rollForward.Tip) ?? record.Ref; + _logger.LogInformation( + "ChainSync: RollForward Slot={Slot} Hash={Hash} Height={Height}", + record.Ref.Slot, + ToHex(record.Ref.Hash), + record.Ref.Height); + _events.Publish(new ChainEvent(ChainEventKind.Apply, record, null, tip)); + } + + private void HandleRollBackward(MessageRollBackward rollBackward) + { + var point = new BlockRef(rollBackward.Point.Slot, rollBackward.Point.Hash, 0, 0); + _blockStore.RollbackTo(point); + + BlockRef? tip = ToBlockRef(rollBackward.Tip); + _logger.LogInformation( + "ChainSync: RollBackward Slot={Slot} Hash={Hash}", + point.Slot, + ToHex(point.Hash)); + _events.Publish(new ChainEvent(ChainEventKind.Reset, null, point, tip)); + } + + private static BlockRef? ToBlockRef(Tip tip) + { + if (tip is null) + { + return null; + } + + ulong height = tip.BlockNumber is >= 0 ? (ulong)tip.BlockNumber.Value : 0; + return new BlockRef(tip.Slot.Slot, tip.Slot.Hash, height, 0); + } + + private bool TryGetStartPoint(out Point startPoint, out string? error) + { + startPoint = default!; + error = null; + + if (string.IsNullOrWhiteSpace(_options.StartHash)) + { + var tip = _blockStore.GetTip(); + if (tip is null) + { + error = "Sync:StartHash is required (hex string) unless storage already has a tip."; + return false; + } + + startPoint = new Point(tip.Value.Slot, tip.Value.Hash); + return true; + } + + if (!TryParseHash(_options.StartHash, out byte[] hash, out error)) + { + return false; + } + + startPoint = new Point(_options.StartSlot, hash); + return true; + } + + private static bool TryParseHash(string value, out byte[] hash, out string? error) + { + hash = Array.Empty(); + error = null; + + string normalized = value.Trim(); + if (normalized.StartsWith("0x", StringComparison.OrdinalIgnoreCase)) + { + normalized = normalized[2..]; + } + + if (normalized.Length == 0 || normalized.Length % 2 != 0) + { + error = "Hash must be a non-empty hex string with an even length."; + return false; + } + + try + { + hash = Convert.FromHexString(normalized); + return true; + } + catch (FormatException) + { + error = "Hash must be a valid hex string."; + return false; + } + } + + private static bool TryDecodeHeader(byte[] payload, out HeaderInfo headerInfo) + { + headerInfo = default; + + if (payload.Length == 0) + { + return false; + } + + if (!TryExtractHeaderBytes(payload, out byte variant, out byte[] headerBytes)) + { + return false; + } + + if (variant == 0) + { + return false; + } + + try + { + BlockHeader header = CborSerializer.Deserialize(headerBytes); + ulong slot = header.HeaderBody.Slot(); + ulong blockNumber = header.HeaderBody.BlockNumber(); + byte[] hash = Convert.FromHexString(header.Hash()); + headerInfo = new HeaderInfo(slot, blockNumber, hash); + return true; + } + catch + { + return false; + } + } + + private static bool TryExtractHeaderBytes(byte[] payload, out byte variant, out byte[] headerBytes) + { + variant = 0; + headerBytes = Array.Empty(); + + try + { + CborReader reader = new(payload, CborConformanceMode.Lax); + int? outerLength = reader.ReadStartArray(); + + variant = checked((byte)reader.ReadUInt64()); + + if (variant == 0) + { + int? innerLength = reader.ReadStartArray(); + int? prefixLength = reader.ReadStartArray(); + _ = reader.ReadUInt64(); + _ = reader.ReadUInt64(); + + if (prefixLength is null) + { + reader.ReadEndArray(); + } + + _ = reader.ReadTag(); + headerBytes = reader.ReadByteString(); + + if (innerLength is null) + { + reader.ReadEndArray(); + } + } + else + { + _ = reader.ReadTag(); + headerBytes = reader.ReadByteString(); + } + + if (outerLength is null) + { + reader.ReadEndArray(); + } + + return headerBytes.Length > 0; + } + catch + { + return false; + } + } + + private static string ToHex(byte[] bytes) => Convert.ToHexString(bytes).ToLowerInvariant(); + + private readonly record struct HeaderInfo(ulong Slot, ulong BlockNumber, byte[] Hash); +} diff --git a/src/Razor.Sync/ChainSyncOptions.cs b/src/Razor.Sync/ChainSyncOptions.cs new file mode 100644 index 0000000..aff8454 --- /dev/null +++ b/src/Razor.Sync/ChainSyncOptions.cs @@ -0,0 +1,12 @@ +namespace Razor.Sync; + +public sealed class ChainSyncOptions +{ + public string? SocketPath { get; set; } + public string TcpHost { get; set; } = "127.0.0.1"; + public int TcpPort { get; set; } = 3001; + public ulong NetworkMagic { get; set; } = 2; + public int KeepAliveSeconds { get; set; } = 20; + public ulong StartSlot { get; set; } + public string? StartHash { get; set; } +} diff --git a/src/Razor.Sync/Razor.Sync.csproj b/src/Razor.Sync/Razor.Sync.csproj index d00dc53..24498be 100644 --- a/src/Razor.Sync/Razor.Sync.csproj +++ b/src/Razor.Sync/Razor.Sync.csproj @@ -10,4 +10,11 @@ enable + + + + + + + diff --git a/src/Razor.U5C/Sync/SyncServiceImpl.cs b/src/Razor.U5C/Sync/SyncServiceImpl.cs index 0d2ccef..0a251ad 100644 --- a/src/Razor.U5C/Sync/SyncServiceImpl.cs +++ b/src/Razor.U5C/Sync/SyncServiceImpl.cs @@ -1,47 +1,43 @@ -using System.Text; using Google.Protobuf; using Grpc.Core; +using Microsoft.Extensions.Logging; +using Razor.Core.Storage; +using Razor.Core.Sync; using Utxorpc.V1alpha.Sync; +using Cardano = Utxorpc.V1alpha.Cardano; +using CoreBlockRef = Razor.Core.Storage.BlockRef; +using ProtoBlockRef = Utxorpc.V1alpha.Sync.BlockRef; namespace Razor.U5C.Sync; public sealed class SyncServiceImpl : SyncService.SyncServiceBase { - private static readonly byte[] DummyHash = - [ - 0x10, 0x23, 0x45, 0x67, 0x89, 0xAB, 0xCD, 0xEF, - 0x01, 0x12, 0x23, 0x34, 0x45, 0x56, 0x67, 0x78, - 0x89, 0x9A, 0xAB, 0xBC, 0xCD, 0xDE, 0xEF, 0xF0, - 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88 - ]; + private const int MaxDumpHistoryItems = 100; + private const int FollowTipHistoryPageSize = 100; - private static readonly ulong DummyTimestamp = (ulong)DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + private readonly IBlockStore _blockStore; + private readonly IChainEventSource _events; + private readonly ILogger _logger; - private static readonly BlockRef DummyTip = new() + public SyncServiceImpl(IBlockStore blockStore, IChainEventSource events, ILogger logger) { - Hash = ByteString.CopyFrom(DummyHash), - Height = 123_456, - Slot = 3_456_789, - Timestamp = DummyTimestamp - }; - - private static readonly AnyChainBlock DummyBlock = new() - { - NativeBytes = ByteString.CopyFrom(Encoding.UTF8.GetBytes("razor:u5c:dummy:block")) - }; + _blockStore = blockStore; + _events = events; + _logger = logger; + } public override Task FetchBlock(FetchBlockRequest request, ServerCallContext context) { var response = new FetchBlockResponse(); - if (request.Ref.Count == 0) - { - response.Block.Add(DummyBlock); - return Task.FromResult(response); - } - foreach (var _ in request.Ref) + foreach (var blockRef in request.Ref) { - response.Block.Add(DummyBlock); + if (!TryResolveBlock(blockRef, out var record)) + { + throw new RpcException(new Status(StatusCode.NotFound, $"Failed to find block: {blockRef}")); + } + + response.Block.Add(ToAnyBlock(record)); } return Task.FromResult(response); @@ -49,40 +45,295 @@ public override Task FetchBlock(FetchBlockRequest request, S public override Task DumpHistory(DumpHistoryRequest request, ServerCallContext context) { + if (request.MaxItems > MaxDumpHistoryItems) + { + throw new RpcException(new Status( + StatusCode.InvalidArgument, + $"max_items must be less than or equal to {MaxDumpHistoryItems}")); + } + var response = new DumpHistoryResponse(); - var maxItems = request.MaxItems == 0 ? 5 : request.MaxItems; - var count = (int)Math.Min(maxItems, 5); + int maxItems = (int)Math.Min(request.MaxItems, int.MaxValue); + var startToken = TryMapBlockRef(request.StartToken); - for (var i = 0; i < count; i++) + var history = _blockStore.GetHistory(startToken, maxItems, out var nextToken); + foreach (var record in history) { - response.Block.Add(DummyBlock); + response.Block.Add(ToAnyBlock(record)); } - var nextSlot = DummyTip.Slot + (ulong)count; - response.NextToken = new BlockRef + if (nextToken is not null) { - Hash = ByteString.CopyFrom(DummyHash), - Height = DummyTip.Height + (ulong)count, - Slot = nextSlot, - Timestamp = DummyTimestamp + (ulong)(count * 1000) - }; + response.NextToken = ToProto(nextToken.Value); + } return Task.FromResult(response); } public override async Task FollowTip(FollowTipRequest request, IServerStreamWriter responseStream, ServerCallContext context) { - var response = new FollowTipResponse + var tip = _blockStore.GetTip(); + var intersection = TryResolveIntersection(request.Intersect) ?? tip; + CoreBlockRef? suppressApply = null; + + if (intersection is not null) { - Apply = DummyBlock, - Tip = DummyTip - }; + await responseStream.WriteAsync( + new FollowTipResponse + { + Reset = ToProto(intersection.Value), + Tip = tip is not null ? ToProto(tip.Value) : ToProto(intersection.Value) + }, + context.CancellationToken); + + suppressApply = intersection.Value; + } + + if (intersection is not null && tip is not null) + { + await StreamHistory(intersection.Value, tip.Value, responseStream, context.CancellationToken); + } - await responseStream.WriteAsync(response, context.CancellationToken); + await foreach (var chainEvent in _events.Subscribe(context.CancellationToken)) + { + var response = new FollowTipResponse(); + switch (chainEvent.Kind) + { + case ChainEventKind.Apply: + if (chainEvent.Block is not null) + { + if (suppressApply is not null && IsSameRef(chainEvent.Block.Value.Ref, suppressApply.Value)) + { + suppressApply = null; + continue; + } + + suppressApply = null; + response.Apply = ToAnyBlock(chainEvent.Block.Value); + response.Tip = ToProto(chainEvent.Block.Value.Ref); + } + break; + case ChainEventKind.Undo: + if (chainEvent.Block is not null) + { + response.Undo = ToAnyBlock(chainEvent.Block.Value); + } + break; + case ChainEventKind.Reset: + if (chainEvent.Point is not null) + { + response.Reset = ToProto(chainEvent.Point.Value); + response.Tip = ToProto(chainEvent.Point.Value); + } + break; + } + + if (response.Tip is null && chainEvent.Tip is not null) + { + response.Tip = ToProto(chainEvent.Tip.Value); + } + else if (response.Tip is null && tip is not null) + { + response.Tip = ToProto(tip.Value); + } + + if (response.ActionCase != FollowTipResponse.ActionOneofCase.None) + { + await responseStream.WriteAsync(response, context.CancellationToken); + } + } } public override Task ReadTip(ReadTipRequest request, ServerCallContext context) { - return Task.FromResult(new ReadTipResponse { Tip = DummyTip }); + var response = new ReadTipResponse(); + var tip = _blockStore.GetTip(); + if (tip is null) + { + throw new RpcException(new Status(StatusCode.Internal, "chain has no data.")); + } + + response.Tip = ToProto(tip.Value); + return Task.FromResult(response); + } + + private static AnyChainBlock ToAnyBlock(BlockRecord record) + { + return new AnyChainBlock + { + NativeBytes = ByteString.CopyFrom(record.Bytes), + Cardano = ToCardanoBlock(record) + }; + } + + private static Cardano.Block ToCardanoBlock(BlockRecord record) + { + return new Cardano.Block + { + Header = new Cardano.BlockHeader + { + Slot = record.Ref.Slot, + Hash = ByteString.CopyFrom(record.Ref.Hash), + Height = record.Ref.Height + }, + Body = new Cardano.BlockBody(), + Timestamp = record.Ref.Timestamp + }; + } + + private static ProtoBlockRef ToProto(CoreBlockRef blockRef) + { + return new ProtoBlockRef + { + Slot = blockRef.Slot, + Hash = ByteString.CopyFrom(blockRef.Hash), + Height = blockRef.Height, + Timestamp = blockRef.Timestamp + }; + } + + private bool TryResolveBlock(ProtoBlockRef blockRef, out BlockRecord record) + { + if (blockRef.Hash is { Length: > 0 }) + { + return _blockStore.TryGetByHash(blockRef.Hash.ToByteArray(), out record); + } + + if (blockRef.Height != 0) + { + return _blockStore.TryGetByHeight(blockRef.Height, out record); + } + + if (blockRef.Slot != 0) + { + return _blockStore.TryGetBySlot(blockRef.Slot, out record); + } + + record = default; + return false; + } + + private CoreBlockRef? TryResolveIntersection(IEnumerable intersects) + { + foreach (var intersect in intersects) + { + if (intersect.Hash is { Length: > 0 }) + { + if (_blockStore.TryGetByHash(intersect.Hash.ToByteArray(), out var record)) + { + return MergeRef(intersect, record.Ref); + } + } + + if (intersect.Height != 0) + { + if (_blockStore.TryGetByHeight(intersect.Height, out var record)) + { + return record.Ref; + } + } + + if (intersect.Slot != 0) + { + if (_blockStore.TryGetBySlot(intersect.Slot, out var record)) + { + return record.Ref; + } + } + } + + return null; + } + + private static CoreBlockRef MergeRef(ProtoBlockRef proto, CoreBlockRef storeRef) + { + return new CoreBlockRef( + proto.Slot != 0 ? proto.Slot : storeRef.Slot, + proto.Hash.Length > 0 ? proto.Hash.ToByteArray() : storeRef.Hash, + proto.Height != 0 ? proto.Height : storeRef.Height, + proto.Timestamp != 0 ? proto.Timestamp : storeRef.Timestamp); + } + + private async Task StreamHistory( + CoreBlockRef intersection, + CoreBlockRef tip, + IServerStreamWriter responseStream, + CancellationToken cancellationToken) + { + if (IsSameRef(intersection, tip)) + { + return; + } + + CoreBlockRef? nextToken = intersection; + bool skipFirst = true; + + while (nextToken is not null && !cancellationToken.IsCancellationRequested) + { + var history = _blockStore.GetHistory(nextToken, FollowTipHistoryPageSize, out var newToken); + foreach (var record in history) + { + if (skipFirst && IsSameRef(record.Ref, intersection)) + { + skipFirst = false; + continue; + } + + skipFirst = false; + + if (record.Ref.Slot > tip.Slot) + { + return; + } + + await responseStream.WriteAsync( + new FollowTipResponse + { + Apply = ToAnyBlock(record), + Tip = ToProto(record.Ref) + }, + cancellationToken); + + if (IsSameRef(record.Ref, tip)) + { + return; + } + } + + nextToken = newToken; + if (newToken is null || history.Count == 0) + { + return; + } + } + } + + private static bool IsSameRef(CoreBlockRef left, CoreBlockRef right) + { + if (left.Slot == right.Slot && left.Hash.AsSpan().SequenceEqual(right.Hash)) + { + return true; + } + + return left.Height != 0 && left.Height == right.Height && left.Hash.AsSpan().SequenceEqual(right.Hash); + } + + private static CoreBlockRef? TryMapBlockRef(ProtoBlockRef? protoRef) + { + if (protoRef is null) + { + return null; + } + + if (protoRef.Slot == 0 && protoRef.Height == 0 && protoRef.Timestamp == 0 && protoRef.Hash.Length == 0) + { + return null; + } + + return new CoreBlockRef( + protoRef.Slot, + protoRef.Hash.ToByteArray(), + protoRef.Height, + protoRef.Timestamp); } }