Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions src/lean_api.zig
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const std = @import("std");
const log = @import("log.zig");

pub const Slots = struct {
justified_slot: u64,
Expand Down Expand Up @@ -75,22 +76,29 @@ fn fetchSlotFromSSZEndpoint(

// Check status
if (req.response.status != .ok) {
std.debug.print("Bad status from {s}: {any}\n", .{ url, req.response.status });
log.warn("Bad status from {s}: {any}", .{ url, req.response.status });
return error.BadStatus;
}

// Read response body
var body_buf = std.ArrayList(u8).init(allocator);
defer body_buf.deinit();

const max_bytes = 10 * 1024 * 1024; // 10 MB limit for state data
// Optimize buffer size based on endpoint
// Finalized/justified states are typically 1-2MB in SSZ format
// Other endpoints (health, metrics) are much smaller
const max_bytes: usize = if (std.mem.indexOf(u8, path, "states") != null)
2 * 1024 * 1024 // 2MB for state endpoints
else
64 * 1024; // 64KB for other endpoints

try req.reader().readAllArrayList(&body_buf, max_bytes);

const body = body_buf.items;

// Validate we have enough bytes to read the slot
if (body.len < 16) {
std.debug.print("ERROR: Response too short for SSZ state (need 16 bytes, got {d}) from {s}\n", .{ body.len, url });
log.err("Response too short for SSZ state (need 16 bytes, got {d}) from {s}", .{ body.len, url });
return error.InvalidSSZData;
}

Expand All @@ -108,8 +116,7 @@ fn fetchSlotFromSSZEndpoint(
// If more than 90% of bytes are printable text, it's probably not SSZ
if (text_byte_count * 100 / check_len > 90) {
const preview = body[0..@min(body.len, 100)];
std.debug.print("ERROR: Response from {s} appears to be text, not SSZ binary:\n", .{url});
std.debug.print(" First 100 bytes: {s}\n", .{preview});
log.err("Response from {s} appears to be text, not SSZ binary. First 100 bytes: {s}", .{ url, preview });
return error.UnexpectedTextResponse;
}

Expand All @@ -132,9 +139,7 @@ fn fetchSlotFromSSZEndpoint(
}
}
if (is_ascii) {
std.debug.print("ERROR: Invalid slot value {d} from {s}\n", .{ slot, url });
std.debug.print(" Bytes 8-15 as ASCII: '{s}'\n", .{bytes_as_text});
std.debug.print(" This suggests the response is text/metrics, not SSZ binary\n", .{});
log.err("Invalid slot value {d} from {s}. Bytes 8-15 as ASCII: '{s}'. This suggests text/metrics response instead of SSZ", .{ slot, url, bytes_as_text });
return error.InvalidSlotValue;
}
}
Expand All @@ -143,9 +148,11 @@ fn fetchSlotFromSSZEndpoint(
const min_genesis: u64 = 1577836800; // 2020-01-01
const max_genesis: u64 = 2524608000; // 2050-01-01
if (genesis_time < min_genesis or genesis_time > max_genesis) {
std.debug.print("WARNING: Unusual genesis_time {d} from {s} (expected Unix timestamp)\n", .{ genesis_time, url });
log.warn("Unusual genesis_time {d} from {s} (expected Unix timestamp between 2020-2050)", .{ genesis_time, url });
}

log.debug("Successfully fetched slot {d} from {s}", .{ slot, url });

return slot;
}

Expand Down
84 changes: 84 additions & 0 deletions src/log.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
const std = @import("std");

pub const Level = enum {
debug,
info,
warn,
err,

pub fn fromInt(val: u8) Level {
return switch (val) {
0 => .debug,
1 => .info,
2 => .warn,
3 => .err,
else => .info,
};
}
};

var log_level: Level = .info;
var log_mutex: std.Thread.Mutex = .{};

pub fn init(level: Level) void {
log_level = level;
}

pub fn setLevel(level: Level) void {
log_mutex.lock();
defer log_mutex.unlock();
log_level = level;
}

pub fn debug(comptime fmt: []const u8, args: anytype) void {
log(.debug, fmt, args);
}

pub fn info(comptime fmt: []const u8, args: anytype) void {
log(.info, fmt, args);
}

pub fn warn(comptime fmt: []const u8, args: anytype) void {
log(.warn, fmt, args);
}

pub fn err(comptime fmt: []const u8, args: anytype) void {
log(.err, fmt, args);
}

fn log(level: Level, comptime fmt: []const u8, args: anytype) void {
log_mutex.lock();
defer log_mutex.unlock();

if (@intFromEnum(level) < @intFromEnum(log_level)) return;

const timestamp = std.time.milliTimestamp();
const level_str = switch (level) {
.debug => "DEBUG",
.info => "INFO ",
.warn => "WARN ",
.err => "ERROR",
};

// Format: [timestamp] LEVEL | message
std.debug.print("[{d}] {s} | ", .{ timestamp, level_str });
std.debug.print(fmt, args);
std.debug.print("\n", .{});
}

test "log level filtering" {
setLevel(.warn);

// These shouldn't panic, just won't print
debug("debug message", .{});
info("info message", .{});

// Reset for other tests
setLevel(.info);
}

test "log formatting" {
info("Test message with arg: {s}", .{"value"});
warn("Warning with number: {d}", .{42});
err("Error with multiple: {s} {d}", .{ "error", 123 });
}
114 changes: 28 additions & 86 deletions src/main.zig
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const std = @import("std");
const config_mod = @import("config.zig");
const lean_api = @import("lean_api.zig");
const log = @import("log.zig");
const poller_mod = @import("poller.zig");
const server = @import("server.zig");
const state_mod = @import("state.zig");
const upstreams_mod = @import("upstreams.zig");
Expand All @@ -11,6 +12,9 @@ pub fn main() !void {
defer _ = gpa.deinit();
const allocator = gpa.allocator();

// Initialize logging (default level: info)
log.init(.info);

var config = try config_mod.load(allocator);
defer config.deinit(allocator);

Expand All @@ -19,109 +23,47 @@ pub fn main() !void {

// Check if multi-upstream mode is enabled
if (config.upstreams_config) |upstreams_path| {
std.debug.print("Loading upstreams from: {s}\n", .{upstreams_path});
log.info("Loading upstreams from: {s}", .{upstreams_path});
var upstreams = upstreams_config_mod.loadFromJsonFile(allocator, upstreams_path) catch |err| {
std.debug.print("Failed to load upstreams config: {s}\n", .{@errorName(err)});
log.err("Failed to load upstreams config: {s}", .{@errorName(err)});
return err;
};
defer upstreams.deinit();

state = state_mod.AppState.init(&upstreams);

std.debug.print("Loaded {d} upstreams\n", .{upstreams.upstreams.items.len});
const poller_thread = try std.Thread.spawn(.{}, pollLoopMulti, .{ allocator, &config, &state, &upstreams });
log.info("Loaded {d} upstreams", .{upstreams.upstreams.items.len});

// Initialize poller
var poller = poller_mod.Poller.init(allocator, &config, &state, &upstreams);
defer poller.deinit();

// Spawn poller thread
const poller_thread = try std.Thread.spawn(.{}, pollerThreadFn, .{&poller});
defer poller_thread.detach();

try server.serve(allocator, &config, &state);
} else {
// Legacy single upstream mode
log.info("Starting in single-upstream mode (legacy)", .{});
state = state_mod.AppState.init(null);

const poller_thread = try std.Thread.spawn(.{}, pollLoop, .{ allocator, &config, &state });
// Initialize poller
var poller = poller_mod.Poller.init(allocator, &config, &state, null);
defer poller.deinit();

// Spawn poller thread
const poller_thread = try std.Thread.spawn(.{}, pollerThreadFn, .{&poller});
defer poller_thread.detach();

try server.serve(allocator, &config, &state);
}
}

fn pollLoop(
allocator: std.mem.Allocator,
config: *const config_mod.Config,
state: *state_mod.AppState,
) !void {
var client = std.http.Client{ .allocator = allocator };
defer client.deinit();

if (@hasField(std.http.Client, "connect_timeout")) {
client.connect_timeout = config.request_timeout_ms * std.time.ns_per_ms;
}
if (@hasField(std.http.Client, "read_timeout")) {
client.read_timeout = config.request_timeout_ms * std.time.ns_per_ms;
}

while (true) {
const start_ns = std.time.nanoTimestamp();
const now_ms = std.time.milliTimestamp();

const slots = lean_api.fetchSlots(
allocator,
&client,
config.lean_api_base_url,
config.lean_api_path,
) catch |err| {
var msg_buf = std.ArrayList(u8).init(allocator);
defer msg_buf.deinit();
try msg_buf.writer().print("poll error: {s}", .{@errorName(err)});
state.updateError(allocator, msg_buf.items, now_ms);
std.time.sleep(config.poll_interval_ms * std.time.ns_per_ms);
continue;
};

const end_ns = std.time.nanoTimestamp();
const delta_ns = end_ns - start_ns;
const delta_ms = @divTrunc(delta_ns, @as(i128, std.time.ns_per_ms));
const latency_ms = @as(u64, @intCast(delta_ms));
state.updateSuccess(allocator, slots.justified_slot, slots.finalized_slot, latency_ms, now_ms);

std.time.sleep(config.poll_interval_ms * std.time.ns_per_ms);
}
}

fn pollLoopMulti(
allocator: std.mem.Allocator,
config: *const config_mod.Config,
state: *state_mod.AppState,
upstreams: *upstreams_mod.UpstreamManager,
) !void {
var client = std.http.Client{ .allocator = allocator };
defer client.deinit();

if (@hasField(std.http.Client, "connect_timeout")) {
client.connect_timeout = config.request_timeout_ms * std.time.ns_per_ms;
}
if (@hasField(std.http.Client, "read_timeout")) {
client.read_timeout = config.request_timeout_ms * std.time.ns_per_ms;
}

while (true) {
const start_ns = std.time.nanoTimestamp();
const now_ms = std.time.milliTimestamp();

// Poll all upstreams and get consensus
const consensus_slots = upstreams.pollUpstreams(&client, now_ms);

if (consensus_slots) |slots| {
const end_ns = std.time.nanoTimestamp();
const delta_ns = end_ns - start_ns;
const delta_ms = @divTrunc(delta_ns, @as(i128, std.time.ns_per_ms));
const latency_ms = @as(u64, @intCast(delta_ms));
state.updateSuccess(allocator, slots.justified_slot, slots.finalized_slot, latency_ms, now_ms);
} else {
const error_msg = upstreams.getErrorSummary(allocator) catch "failed to get error summary";
defer allocator.free(error_msg);
state.updateError(allocator, error_msg, now_ms);
}

std.time.sleep(config.poll_interval_ms * std.time.ns_per_ms);
}
/// Thread entry point for poller
fn pollerThreadFn(poller: *poller_mod.Poller) !void {
poller.run() catch |err| {
log.err("Poller crashed: {s}", .{@errorName(err)});
return err;
};
}
Loading