Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 105 additions & 42 deletions src/upstreams.zig
Original file line number Diff line number Diff line change
Expand Up @@ -90,79 +90,142 @@ pub const UpstreamManager = struct {
}
}

/// Information needed to poll an upstream (snapshot without holding lock)
const PollTarget = struct {
index: usize,
name: []const u8,
base_url: []const u8,
path: []const u8,
};

/// Result of polling a single upstream
const PollResult = struct {
index: usize,
slots: ?lean_api.Slots,
error_msg: ?[]const u8,
};

/// Poll all upstreams and return consensus slots if 50%+ agree
/// Thread-safe: acquires mutex during upstream state updates
/// Thread-safe: minimizes critical section by only locking during state updates
pub fn pollUpstreams(
self: *UpstreamManager,
client: *std.http.Client,
now_ms: i64,
) ?lean_api.Slots {
// Lock for the entire polling operation to ensure consistency
self.mutex.lock();
defer self.mutex.unlock();
// Step 1: Create snapshot of upstreams to poll (without holding lock)
var targets = std.ArrayList(PollTarget).init(self.allocator);
defer targets.deinit();

if (self.upstreams.items.len == 0) return null;
{
self.mutex.lock();
defer self.mutex.unlock();

var slot_counts = std.AutoHashMap(u128, u32).init(self.allocator);
defer slot_counts.deinit();
if (self.upstreams.items.len == 0) return null;

var successful_polls: u32 = 0;
for (self.upstreams.items, 0..) |upstream, i| {
targets.append(PollTarget{
.index = i,
.name = upstream.name,
.base_url = upstream.base_url,
.path = upstream.path,
}) catch continue;
}
}

// Poll each upstream
for (self.upstreams.items) |*upstream| {
if (targets.items.len == 0) return null;

// Step 2: Poll all upstreams WITHOUT holding the lock (I/O can be slow!)
var results = std.ArrayList(PollResult).init(self.allocator);
defer {
// Clean up any error messages that weren't transferred to upstreams
for (results.items) |result| {
if (result.error_msg) |msg| self.allocator.free(msg);
}
results.deinit();
}

for (targets.items) |target| {
const slots = lean_api.fetchSlots(
self.allocator,
client,
upstream.base_url,
upstream.path,
target.base_url,
target.path,
) catch |err| {
upstream.error_count += 1;

// Free old error first to prevent leak
if (upstream.last_error) |old_err| {
self.allocator.free(old_err);
}

// Allocate new error message with fallback
upstream.last_error = std.fmt.allocPrint(
const error_msg = std.fmt.allocPrint(
self.allocator,
"{s}",
.{@errorName(err)},
) catch blk: {
// Fallback: try to duplicate a static string
break :blk self.allocator.dupe(u8, "allocation_failed") catch null;
};
) catch self.allocator.dupe(u8, "allocation_failed") catch null;

log.warn("Upstream {s} ({s}) failed: {s}", .{ target.name, target.base_url, @errorName(err) });

log.warn("Upstream {s} ({s}) failed: {s}", .{ upstream.name, upstream.base_url, @errorName(err) });
results.append(PollResult{
.index = target.index,
.slots = null,
.error_msg = error_msg,
}) catch continue;
continue;
};

// Clear error on success
if (upstream.last_error) |old_err| {
self.allocator.free(old_err);
upstream.last_error = null;
}
log.debug("Upstream {s}: justified={d}, finalized={d}", .{ target.name, slots.justified_slot, slots.finalized_slot });

results.append(PollResult{
.index = target.index,
.slots = slots,
.error_msg = null,
}) catch continue;
}

// Update upstream state
upstream.last_slots = slots;
upstream.last_success_ms = now_ms;
// Step 3: Update upstream states with results (brief lock)
var slot_counts = std.AutoHashMap(u128, u32).init(self.allocator);
defer slot_counts.deinit();

log.debug("Upstream {s}: justified={d}, finalized={d}", .{ upstream.name, slots.justified_slot, slots.finalized_slot });
var successful_polls: u32 = 0;

// Create a unique key for this slot combination
const slot_key: u128 = (@as(u128, slots.justified_slot) << 64) | @as(u128, slots.finalized_slot);
const count = slot_counts.get(slot_key) orelse 0;
slot_counts.put(slot_key, count + 1) catch continue;
{
self.mutex.lock();
defer self.mutex.unlock();

successful_polls += 1;
for (results.items, 0..) |*result, i| {
if (result.index >= self.upstreams.items.len) continue;
var upstream = &self.upstreams.items[result.index];

if (result.slots) |slots| {
// Success: clear error and update state
if (upstream.last_error) |old_err| {
self.allocator.free(old_err);
upstream.last_error = null;
}
upstream.last_slots = slots;
upstream.last_success_ms = now_ms;

// Track for consensus
const slot_key: u128 = (@as(u128, slots.justified_slot) << 64) | @as(u128, slots.finalized_slot);
const count = slot_counts.get(slot_key) orelse 0;
slot_counts.put(slot_key, count + 1) catch continue;

successful_polls += 1;
} else {
// Error: update error state
upstream.error_count += 1;

if (upstream.last_error) |old_err| {
self.allocator.free(old_err);
}
upstream.last_error = result.error_msg;

// Mark as transferred to prevent double-free in defer
results.items[i].error_msg = null;
}
}
}

// Step 4: Calculate consensus (no lock needed)
if (successful_polls == 0) {
log.warn("No upstreams responded successfully", .{});
return null;
}

// Find consensus (50%+ agreement)
const required_votes = (successful_polls + 1) / 2; // Ceiling division

var iter = slot_counts.iterator();
Expand All @@ -184,7 +247,7 @@ pub const UpstreamManager = struct {
}

log.warn("No consensus reached among {d} responding upstreams", .{successful_polls});
return null; // No consensus reached
return null;
}
};

Expand Down