Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/lean_api.zig
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ pub fn fetchSlots(
client: *std.http.Client,
base_url: []const u8,
_: []const u8, // path parameter not used anymore
out_state_ssz: *?[]u8,
) !Slots {
// Fetch finalized slot from SSZ-encoded endpoint
const finalized_slot = try fetchSlotFromSSZEndpoint(
allocator,
client,
base_url,
"/lean/v0/states/finalized",
out_state_ssz,
);

// For now, use finalized slot as justified slot since /lean/v0/states/justified returns 404
Expand Down Expand Up @@ -52,6 +54,7 @@ fn fetchSlotFromSSZEndpoint(
client: *std.http.Client,
base_url: []const u8,
path: []const u8,
out_state_ssz: *?[]u8,
) !u64 {
// Build full URL
var url_buf: [512]u8 = undefined;
Expand Down Expand Up @@ -84,7 +87,7 @@ fn fetchSlotFromSSZEndpoint(

// Read response body
var body_buf = std.ArrayList(u8).init(allocator);
defer body_buf.deinit();
errdefer body_buf.deinit();

// Optimize buffer size based on endpoint
// Finalized/justified states are typically 1-2MB in SSZ format
Expand Down Expand Up @@ -155,6 +158,9 @@ fn fetchSlotFromSSZEndpoint(

log.debug("Successfully fetched slot {d} from {s}", .{ slot, url });

// Transfer ownership of the full SSZ payload to the caller.
out_state_ssz.* = try body_buf.toOwnedSlice();

return slot;
}

Expand Down
14 changes: 13 additions & 1 deletion src/poller.zig
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,13 @@ pub const Poller = struct {

/// Poll single upstream (legacy mode)
fn pollSingle(self: *Poller, now_ms: i64) !void {
var state_ssz: ?[]u8 = null;
const slots = lean_api.fetchSlots(
self.allocator,
&self.client,
self.config.lean_api_base_url,
self.config.lean_api_path,
&state_ssz,
) catch |err| {
var msg_buf = std.ArrayList(u8).init(self.allocator);
defer msg_buf.deinit();
Expand All @@ -92,13 +94,19 @@ pub const Poller = struct {
slots.finalized_slot,
latency_ms,
now_ms,
state_ssz,
);

if (state_ssz) |blob| {
self.allocator.free(blob);
}
}

/// Poll multiple upstreams with consensus
fn pollMulti(self: *Poller, manager: *upstreams_mod.UpstreamManager, now_ms: i64) !void {
// Poll all upstreams and get consensus
const consensus_slots = manager.pollUpstreams(&self.client, now_ms);
var state_ssz: ?[]u8 = null;
const consensus_slots = manager.pollUpstreams(&self.client, now_ms, &state_ssz);

if (consensus_slots) |slots| {
const latency_ms: u64 = 0; // Latency not tracked in multi-upstream mode
Expand All @@ -108,7 +116,11 @@ pub const Poller = struct {
slots.finalized_slot,
latency_ms,
now_ms,
state_ssz,
);
if (state_ssz) |blob| {
self.allocator.free(blob);
}
} else {
const error_msg = manager.getErrorSummary(self.allocator) catch "failed to get error summary";
defer self.allocator.free(error_msg);
Expand Down
49 changes: 49 additions & 0 deletions src/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ fn handleRequest(
try handleApiUpstreams(allocator, config, state, req);
return;
}
if (std.mem.eql(u8, path, "/lean/v0/states/finalized")) {
try handleFinalizedState(allocator, config, state, req);
return;
}

if (config.static_dir) |static_dir| {
if (try handleStatic(allocator, static_dir, path, req)) return;
Expand Down Expand Up @@ -114,6 +118,51 @@ fn handleStatus(
try respondText(req, .ok, buffer.items, "application/json");
}

fn handleFinalizedState(
allocator: std.mem.Allocator,
config: *const config_mod.Config,
state: *state_mod.AppState,
req: *std.http.Server.Request,
) !void {
var snapshot = try state.snapshot(allocator);
defer snapshot.deinit(allocator);

const now_ms = std.time.milliTimestamp();
const stale = snapshot.last_success_ms == 0 or
(now_ms - snapshot.last_success_ms) > @as(i64, @intCast(config.stale_after_ms));

if (stale) {
log.warn("Finalized state request failed: stale data (last success: {d}ms ago)", .{
if (snapshot.last_success_ms == 0) now_ms else now_ms - snapshot.last_success_ms,
});
try respondText(req, .service_unavailable, "stale\n", "text/plain");
return;
}

// Get a copy of the last finalized state SSZ blob.
const state_ssz_opt = try state.copyFinalizedStateSSZ(allocator);
if (state_ssz_opt == null) {
log.warn("Finalized state request failed: no finalized state SSZ cached", .{});
try respondText(req, .service_unavailable, "no_finalized_state\n", "text/plain");
return;
}

const state_ssz = state_ssz_opt.?;
defer allocator.free(state_ssz);

var content_length_buf: [32]u8 = undefined;
const content_length_str = try std.fmt.bufPrint(&content_length_buf, "{d}", .{state_ssz.len});

const headers = [_]std.http.Header{
.{ .name = "content-type", .value = "application/octet-stream" },
.{ .name = "content-length", .value = content_length_str },
};
try req.respond(state_ssz, .{
.status = .ok,
.extra_headers = &headers,
});
}

fn handleMetrics(
allocator: std.mem.Allocator,
state: *state_mod.AppState,
Expand Down
30 changes: 27 additions & 3 deletions src/state.zig
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub const AppState = struct {
last_latency_ms: u64 = 0,
error_count: u64 = 0,
last_error: ?[]u8 = null,
last_finalized_state_ssz: ?[]u8 = null,
upstream_manager: ?*upstreams_mod.UpstreamManager = null,

pub fn init(upstream_manager: ?*upstreams_mod.UpstreamManager) AppState {
Expand All @@ -72,6 +73,8 @@ pub const AppState = struct {
pub fn deinit(self: *AppState, allocator: std.mem.Allocator) void {
if (self.last_error) |msg| allocator.free(msg);
self.last_error = null;
if (self.last_finalized_state_ssz) |state_ssz| allocator.free(state_ssz);
self.last_finalized_state_ssz = null;
}

pub fn updateSuccess(
Expand All @@ -81,6 +84,7 @@ pub const AppState = struct {
finalized_slot: u64,
latency_ms: u64,
now_ms: i64,
state_ssz: ?[]const u8,
) void {
self.mutex.lock();
defer self.mutex.unlock();
Expand All @@ -91,6 +95,13 @@ pub const AppState = struct {
self.last_latency_ms = latency_ms;
if (self.last_error) |msg| allocator.free(msg);
self.last_error = null;

if (state_ssz) |bytes| {
if (self.last_finalized_state_ssz) |old_state| {
allocator.free(old_state);
}
self.last_finalized_state_ssz = allocator.dupe(u8, bytes) catch self.last_finalized_state_ssz;
}
}

pub fn updateError(
Expand Down Expand Up @@ -125,6 +136,19 @@ pub const AppState = struct {
};
}

/// Returns a freshly allocated copy of the last finalized state SSZ, if any.
/// Caller is responsible for freeing the returned slice.
pub fn copyFinalizedStateSSZ(self: *AppState, allocator: std.mem.Allocator) !?[]u8 {
self.mutex.lock();
defer self.mutex.unlock();

if (self.last_finalized_state_ssz) |state_ssz| {
const copy = try allocator.dupe(u8, state_ssz);
return copy;
}
return null;
}

pub fn getUpstreamsData(self: *AppState, allocator: std.mem.Allocator) !UpstreamsData {
if (self.upstream_manager) |manager| {
manager.mutex.lock();
Expand Down Expand Up @@ -185,7 +209,7 @@ test "AppState updateSuccess" {
var state = AppState{};
defer state.deinit(std.testing.allocator);

state.updateSuccess(std.testing.allocator, 100, 99, 50, 1000);
state.updateSuccess(std.testing.allocator, 100, 99, 50, 1000, null);

try std.testing.expectEqual(@as(u64, 100), state.justified_slot);
try std.testing.expectEqual(@as(u64, 99), state.finalized_slot);
Expand Down Expand Up @@ -214,7 +238,7 @@ test "AppState snapshot" {
var state = AppState{};
defer state.deinit(std.testing.allocator);

state.updateSuccess(std.testing.allocator, 200, 199, 75, 3000);
state.updateSuccess(std.testing.allocator, 200, 199, 75, 3000, null);

var snapshot = try state.snapshot(std.testing.allocator);
defer snapshot.deinit(std.testing.allocator);
Expand Down Expand Up @@ -246,7 +270,7 @@ test "AppState updateSuccess clears error" {
state.updateError(std.testing.allocator, "error message", 1000);
try std.testing.expectEqual(@as(u64, 1), state.error_count);

state.updateSuccess(std.testing.allocator, 100, 99, 50, 2000);
state.updateSuccess(std.testing.allocator, 100, 99, 50, 2000, null);
try std.testing.expect(state.last_error == null);
try std.testing.expectEqual(@as(u64, 1), state.error_count); // error_count persists
}
31 changes: 28 additions & 3 deletions src/upstreams.zig
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ pub const UpstreamManager = struct {
index: usize,
slots: ?lean_api.Slots,
error_msg: ?[]const u8,
state_ssz: ?[]u8,
};

/// Poll all upstreams and return consensus slots if 50%+ agree
Expand All @@ -111,6 +112,7 @@ pub const UpstreamManager = struct {
self: *UpstreamManager,
client: *std.http.Client,
now_ms: i64,
out_state_ssz: *?[]u8,
) ?lean_api.Slots {
// Step 1: Create snapshot of upstreams to poll (without holding lock)
var targets = std.ArrayList(PollTarget).init(self.allocator);
Expand All @@ -137,20 +139,24 @@ pub const UpstreamManager = struct {
// Step 2: Poll all upstreams WITHOUT holding the lock (I/O can be slow!)
var results = std.ArrayList(PollResult).init(self.allocator);
defer {
// Clean up any error messages that weren't transferred to upstreams
// Clean up any error messages or SSZ blobs that weren't transferred
for (results.items) |result| {
if (result.error_msg) |msg| self.allocator.free(msg);
if (result.state_ssz) |blob| self.allocator.free(blob);
}
results.deinit();
}

for (targets.items) |target| {
var state_ssz: ?[]u8 = null;
const slots = lean_api.fetchSlots(
self.allocator,
client,
target.base_url,
target.path,
&state_ssz,
) catch |err| {
if (state_ssz) |blob| self.allocator.free(blob);
const error_msg = std.fmt.allocPrint(
self.allocator,
"{s}",
Expand All @@ -163,6 +169,7 @@ pub const UpstreamManager = struct {
.index = target.index,
.slots = null,
.error_msg = error_msg,
.state_ssz = null,
}) catch continue;
continue;
};
Expand All @@ -173,6 +180,7 @@ pub const UpstreamManager = struct {
.index = target.index,
.slots = slots,
.error_msg = null,
.state_ssz = state_ssz,
}) catch continue;
}

Expand Down Expand Up @@ -232,9 +240,26 @@ pub const UpstreamManager = struct {
while (iter.next()) |entry| {
if (entry.value_ptr.* >= required_votes) {
const slot_key = entry.key_ptr.*;
const justified_slot: u64 = @truncate(slot_key >> 64);
const finalized_slot: u64 = @truncate(slot_key & 0xFFFFFFFFFFFFFFFF);

// Find a matching upstream result and transfer SSZ ownership to caller.
for (results.items, 0..) |*res, i| {
if (res.slots) |s| {
if (s.justified_slot == justified_slot and s.finalized_slot == finalized_slot) {
if (res.state_ssz) |blob| {
out_state_ssz.* = blob;
// Prevent defer block from freeing it.
results.items[i].state_ssz = null;
break;
}
}
}
}

const result = lean_api.Slots{
.justified_slot = @truncate(slot_key >> 64),
.finalized_slot = @truncate(slot_key & 0xFFFFFFFFFFFFFFFF),
.justified_slot = justified_slot,
.finalized_slot = finalized_slot,
};
log.info("Consensus reached: justified={d}, finalized={d} ({d}/{d} upstreams)", .{
result.justified_slot,
Expand Down