diff --git a/src/lean_api.zig b/src/lean_api.zig index 1bd521f..322b13a 100644 --- a/src/lean_api.zig +++ b/src/lean_api.zig @@ -13,6 +13,7 @@ pub fn fetchSlots( client: *std.http.Client, base_url: []const u8, _: []const u8, // path parameter not used anymore + out_state_ssz: *?[]u8, ) !Slots { // Fetch finalized slot from SSZ-encoded endpoint const finalized_slot = try fetchSlotFromSSZEndpoint( @@ -20,6 +21,7 @@ pub fn fetchSlots( client, base_url, "/lean/v0/states/finalized", + out_state_ssz, ); // For now, use finalized slot as justified slot since /lean/v0/states/justified returns 404 @@ -52,6 +54,7 @@ fn fetchSlotFromSSZEndpoint( client: *std.http.Client, base_url: []const u8, path: []const u8, + out_state_ssz: *?[]u8, ) !u64 { // Build full URL var url_buf: [512]u8 = undefined; @@ -84,7 +87,7 @@ fn fetchSlotFromSSZEndpoint( // Read response body var body_buf = std.ArrayList(u8).init(allocator); - defer body_buf.deinit(); + errdefer body_buf.deinit(); // Optimize buffer size based on endpoint // Finalized/justified states are typically 1-2MB in SSZ format @@ -155,6 +158,9 @@ fn fetchSlotFromSSZEndpoint( log.debug("Successfully fetched slot {d} from {s}", .{ slot, url }); + // Transfer ownership of the full SSZ payload to the caller. + out_state_ssz.* = try body_buf.toOwnedSlice(); + return slot; } diff --git a/src/poller.zig b/src/poller.zig index 15e3c8a..b35c9eb 100644 --- a/src/poller.zig +++ b/src/poller.zig @@ -72,11 +72,13 @@ pub const Poller = struct { /// Poll single upstream (legacy mode) fn pollSingle(self: *Poller, now_ms: i64) !void { + var state_ssz: ?[]u8 = null; const slots = lean_api.fetchSlots( self.allocator, &self.client, self.config.lean_api_base_url, self.config.lean_api_path, + &state_ssz, ) catch |err| { var msg_buf = std.ArrayList(u8).init(self.allocator); defer msg_buf.deinit(); @@ -92,13 +94,19 @@ pub const Poller = struct { slots.finalized_slot, latency_ms, now_ms, + state_ssz, ); + + if (state_ssz) |blob| { + self.allocator.free(blob); + } } /// Poll multiple upstreams with consensus fn pollMulti(self: *Poller, manager: *upstreams_mod.UpstreamManager, now_ms: i64) !void { // Poll all upstreams and get consensus - const consensus_slots = manager.pollUpstreams(&self.client, now_ms); + var state_ssz: ?[]u8 = null; + const consensus_slots = manager.pollUpstreams(&self.client, now_ms, &state_ssz); if (consensus_slots) |slots| { const latency_ms: u64 = 0; // Latency not tracked in multi-upstream mode @@ -108,7 +116,11 @@ pub const Poller = struct { slots.finalized_slot, latency_ms, now_ms, + state_ssz, ); + if (state_ssz) |blob| { + self.allocator.free(blob); + } } else { const error_msg = manager.getErrorSummary(self.allocator) catch "failed to get error summary"; defer self.allocator.free(error_msg); diff --git a/src/server.zig b/src/server.zig index 859d34b..58aa634 100644 --- a/src/server.zig +++ b/src/server.zig @@ -68,6 +68,10 @@ fn handleRequest( try handleApiUpstreams(allocator, config, state, req); return; } + if (std.mem.eql(u8, path, "/lean/v0/states/finalized")) { + try handleFinalizedState(allocator, config, state, req); + return; + } if (config.static_dir) |static_dir| { if (try handleStatic(allocator, static_dir, path, req)) return; @@ -114,6 +118,51 @@ fn handleStatus( try respondText(req, .ok, buffer.items, "application/json"); } +fn handleFinalizedState( + allocator: std.mem.Allocator, + config: *const config_mod.Config, + state: *state_mod.AppState, + req: *std.http.Server.Request, +) !void { + var snapshot = try state.snapshot(allocator); + defer snapshot.deinit(allocator); + + const now_ms = std.time.milliTimestamp(); + const stale = snapshot.last_success_ms == 0 or + (now_ms - snapshot.last_success_ms) > @as(i64, @intCast(config.stale_after_ms)); + + if (stale) { + log.warn("Finalized state request failed: stale data (last success: {d}ms ago)", .{ + if (snapshot.last_success_ms == 0) now_ms else now_ms - snapshot.last_success_ms, + }); + try respondText(req, .service_unavailable, "stale\n", "text/plain"); + return; + } + + // Get a copy of the last finalized state SSZ blob. + const state_ssz_opt = try state.copyFinalizedStateSSZ(allocator); + if (state_ssz_opt == null) { + log.warn("Finalized state request failed: no finalized state SSZ cached", .{}); + try respondText(req, .service_unavailable, "no_finalized_state\n", "text/plain"); + return; + } + + const state_ssz = state_ssz_opt.?; + defer allocator.free(state_ssz); + + var content_length_buf: [32]u8 = undefined; + const content_length_str = try std.fmt.bufPrint(&content_length_buf, "{d}", .{state_ssz.len}); + + const headers = [_]std.http.Header{ + .{ .name = "content-type", .value = "application/octet-stream" }, + .{ .name = "content-length", .value = content_length_str }, + }; + try req.respond(state_ssz, .{ + .status = .ok, + .extra_headers = &headers, + }); +} + fn handleMetrics( allocator: std.mem.Allocator, state: *state_mod.AppState, diff --git a/src/state.zig b/src/state.zig index 35d8856..b9cdf24 100644 --- a/src/state.zig +++ b/src/state.zig @@ -61,6 +61,7 @@ pub const AppState = struct { last_latency_ms: u64 = 0, error_count: u64 = 0, last_error: ?[]u8 = null, + last_finalized_state_ssz: ?[]u8 = null, upstream_manager: ?*upstreams_mod.UpstreamManager = null, pub fn init(upstream_manager: ?*upstreams_mod.UpstreamManager) AppState { @@ -72,6 +73,8 @@ pub const AppState = struct { pub fn deinit(self: *AppState, allocator: std.mem.Allocator) void { if (self.last_error) |msg| allocator.free(msg); self.last_error = null; + if (self.last_finalized_state_ssz) |state_ssz| allocator.free(state_ssz); + self.last_finalized_state_ssz = null; } pub fn updateSuccess( @@ -81,6 +84,7 @@ pub const AppState = struct { finalized_slot: u64, latency_ms: u64, now_ms: i64, + state_ssz: ?[]const u8, ) void { self.mutex.lock(); defer self.mutex.unlock(); @@ -91,6 +95,13 @@ pub const AppState = struct { self.last_latency_ms = latency_ms; if (self.last_error) |msg| allocator.free(msg); self.last_error = null; + + if (state_ssz) |bytes| { + if (self.last_finalized_state_ssz) |old_state| { + allocator.free(old_state); + } + self.last_finalized_state_ssz = allocator.dupe(u8, bytes) catch self.last_finalized_state_ssz; + } } pub fn updateError( @@ -125,6 +136,19 @@ pub const AppState = struct { }; } + /// Returns a freshly allocated copy of the last finalized state SSZ, if any. + /// Caller is responsible for freeing the returned slice. + pub fn copyFinalizedStateSSZ(self: *AppState, allocator: std.mem.Allocator) !?[]u8 { + self.mutex.lock(); + defer self.mutex.unlock(); + + if (self.last_finalized_state_ssz) |state_ssz| { + const copy = try allocator.dupe(u8, state_ssz); + return copy; + } + return null; + } + pub fn getUpstreamsData(self: *AppState, allocator: std.mem.Allocator) !UpstreamsData { if (self.upstream_manager) |manager| { manager.mutex.lock(); @@ -185,7 +209,7 @@ test "AppState updateSuccess" { var state = AppState{}; defer state.deinit(std.testing.allocator); - state.updateSuccess(std.testing.allocator, 100, 99, 50, 1000); + state.updateSuccess(std.testing.allocator, 100, 99, 50, 1000, null); try std.testing.expectEqual(@as(u64, 100), state.justified_slot); try std.testing.expectEqual(@as(u64, 99), state.finalized_slot); @@ -214,7 +238,7 @@ test "AppState snapshot" { var state = AppState{}; defer state.deinit(std.testing.allocator); - state.updateSuccess(std.testing.allocator, 200, 199, 75, 3000); + state.updateSuccess(std.testing.allocator, 200, 199, 75, 3000, null); var snapshot = try state.snapshot(std.testing.allocator); defer snapshot.deinit(std.testing.allocator); @@ -246,7 +270,7 @@ test "AppState updateSuccess clears error" { state.updateError(std.testing.allocator, "error message", 1000); try std.testing.expectEqual(@as(u64, 1), state.error_count); - state.updateSuccess(std.testing.allocator, 100, 99, 50, 2000); + state.updateSuccess(std.testing.allocator, 100, 99, 50, 2000, null); try std.testing.expect(state.last_error == null); try std.testing.expectEqual(@as(u64, 1), state.error_count); // error_count persists } diff --git a/src/upstreams.zig b/src/upstreams.zig index 6dd9094..667201b 100644 --- a/src/upstreams.zig +++ b/src/upstreams.zig @@ -103,6 +103,7 @@ pub const UpstreamManager = struct { index: usize, slots: ?lean_api.Slots, error_msg: ?[]const u8, + state_ssz: ?[]u8, }; /// Poll all upstreams and return consensus slots if 50%+ agree @@ -111,6 +112,7 @@ pub const UpstreamManager = struct { self: *UpstreamManager, client: *std.http.Client, now_ms: i64, + out_state_ssz: *?[]u8, ) ?lean_api.Slots { // Step 1: Create snapshot of upstreams to poll (without holding lock) var targets = std.ArrayList(PollTarget).init(self.allocator); @@ -137,20 +139,24 @@ pub const UpstreamManager = struct { // Step 2: Poll all upstreams WITHOUT holding the lock (I/O can be slow!) var results = std.ArrayList(PollResult).init(self.allocator); defer { - // Clean up any error messages that weren't transferred to upstreams + // Clean up any error messages or SSZ blobs that weren't transferred for (results.items) |result| { if (result.error_msg) |msg| self.allocator.free(msg); + if (result.state_ssz) |blob| self.allocator.free(blob); } results.deinit(); } for (targets.items) |target| { + var state_ssz: ?[]u8 = null; const slots = lean_api.fetchSlots( self.allocator, client, target.base_url, target.path, + &state_ssz, ) catch |err| { + if (state_ssz) |blob| self.allocator.free(blob); const error_msg = std.fmt.allocPrint( self.allocator, "{s}", @@ -163,6 +169,7 @@ pub const UpstreamManager = struct { .index = target.index, .slots = null, .error_msg = error_msg, + .state_ssz = null, }) catch continue; continue; }; @@ -173,6 +180,7 @@ pub const UpstreamManager = struct { .index = target.index, .slots = slots, .error_msg = null, + .state_ssz = state_ssz, }) catch continue; } @@ -232,9 +240,26 @@ pub const UpstreamManager = struct { while (iter.next()) |entry| { if (entry.value_ptr.* >= required_votes) { const slot_key = entry.key_ptr.*; + const justified_slot: u64 = @truncate(slot_key >> 64); + const finalized_slot: u64 = @truncate(slot_key & 0xFFFFFFFFFFFFFFFF); + + // Find a matching upstream result and transfer SSZ ownership to caller. + for (results.items, 0..) |*res, i| { + if (res.slots) |s| { + if (s.justified_slot == justified_slot and s.finalized_slot == finalized_slot) { + if (res.state_ssz) |blob| { + out_state_ssz.* = blob; + // Prevent defer block from freeing it. + results.items[i].state_ssz = null; + break; + } + } + } + } + const result = lean_api.Slots{ - .justified_slot = @truncate(slot_key >> 64), - .finalized_slot = @truncate(slot_key & 0xFFFFFFFFFFFFFFFF), + .justified_slot = justified_slot, + .finalized_slot = finalized_slot, }; log.info("Consensus reached: justified={d}, finalized={d} ({d}/{d} upstreams)", .{ result.justified_slot,