diff --git a/src/upstreams.zig b/src/upstreams.zig index 133f201..6dd9094 100644 --- a/src/upstreams.zig +++ b/src/upstreams.zig @@ -90,79 +90,142 @@ pub const UpstreamManager = struct { } } + /// Information needed to poll an upstream (snapshot without holding lock) + const PollTarget = struct { + index: usize, + name: []const u8, + base_url: []const u8, + path: []const u8, + }; + + /// Result of polling a single upstream + const PollResult = struct { + index: usize, + slots: ?lean_api.Slots, + error_msg: ?[]const u8, + }; + /// Poll all upstreams and return consensus slots if 50%+ agree - /// Thread-safe: acquires mutex during upstream state updates + /// Thread-safe: minimizes critical section by only locking during state updates pub fn pollUpstreams( self: *UpstreamManager, client: *std.http.Client, now_ms: i64, ) ?lean_api.Slots { - // Lock for the entire polling operation to ensure consistency - self.mutex.lock(); - defer self.mutex.unlock(); + // Step 1: Create snapshot of upstreams to poll (without holding lock) + var targets = std.ArrayList(PollTarget).init(self.allocator); + defer targets.deinit(); - if (self.upstreams.items.len == 0) return null; + { + self.mutex.lock(); + defer self.mutex.unlock(); - var slot_counts = std.AutoHashMap(u128, u32).init(self.allocator); - defer slot_counts.deinit(); + if (self.upstreams.items.len == 0) return null; - var successful_polls: u32 = 0; + for (self.upstreams.items, 0..) |upstream, i| { + targets.append(PollTarget{ + .index = i, + .name = upstream.name, + .base_url = upstream.base_url, + .path = upstream.path, + }) catch continue; + } + } - // Poll each upstream - for (self.upstreams.items) |*upstream| { + if (targets.items.len == 0) return null; + + // Step 2: Poll all upstreams WITHOUT holding the lock (I/O can be slow!) + var results = std.ArrayList(PollResult).init(self.allocator); + defer { + // Clean up any error messages that weren't transferred to upstreams + for (results.items) |result| { + if (result.error_msg) |msg| self.allocator.free(msg); + } + results.deinit(); + } + + for (targets.items) |target| { const slots = lean_api.fetchSlots( self.allocator, client, - upstream.base_url, - upstream.path, + target.base_url, + target.path, ) catch |err| { - upstream.error_count += 1; - - // Free old error first to prevent leak - if (upstream.last_error) |old_err| { - self.allocator.free(old_err); - } - - // Allocate new error message with fallback - upstream.last_error = std.fmt.allocPrint( + const error_msg = std.fmt.allocPrint( self.allocator, "{s}", .{@errorName(err)}, - ) catch blk: { - // Fallback: try to duplicate a static string - break :blk self.allocator.dupe(u8, "allocation_failed") catch null; - }; + ) catch self.allocator.dupe(u8, "allocation_failed") catch null; + + log.warn("Upstream {s} ({s}) failed: {s}", .{ target.name, target.base_url, @errorName(err) }); - log.warn("Upstream {s} ({s}) failed: {s}", .{ upstream.name, upstream.base_url, @errorName(err) }); + results.append(PollResult{ + .index = target.index, + .slots = null, + .error_msg = error_msg, + }) catch continue; continue; }; - // Clear error on success - if (upstream.last_error) |old_err| { - self.allocator.free(old_err); - upstream.last_error = null; - } + log.debug("Upstream {s}: justified={d}, finalized={d}", .{ target.name, slots.justified_slot, slots.finalized_slot }); + + results.append(PollResult{ + .index = target.index, + .slots = slots, + .error_msg = null, + }) catch continue; + } - // Update upstream state - upstream.last_slots = slots; - upstream.last_success_ms = now_ms; + // Step 3: Update upstream states with results (brief lock) + var slot_counts = std.AutoHashMap(u128, u32).init(self.allocator); + defer slot_counts.deinit(); - log.debug("Upstream {s}: justified={d}, finalized={d}", .{ upstream.name, slots.justified_slot, slots.finalized_slot }); + var successful_polls: u32 = 0; - // Create a unique key for this slot combination - const slot_key: u128 = (@as(u128, slots.justified_slot) << 64) | @as(u128, slots.finalized_slot); - const count = slot_counts.get(slot_key) orelse 0; - slot_counts.put(slot_key, count + 1) catch continue; + { + self.mutex.lock(); + defer self.mutex.unlock(); - successful_polls += 1; + for (results.items, 0..) |*result, i| { + if (result.index >= self.upstreams.items.len) continue; + var upstream = &self.upstreams.items[result.index]; + + if (result.slots) |slots| { + // Success: clear error and update state + if (upstream.last_error) |old_err| { + self.allocator.free(old_err); + upstream.last_error = null; + } + upstream.last_slots = slots; + upstream.last_success_ms = now_ms; + + // Track for consensus + const slot_key: u128 = (@as(u128, slots.justified_slot) << 64) | @as(u128, slots.finalized_slot); + const count = slot_counts.get(slot_key) orelse 0; + slot_counts.put(slot_key, count + 1) catch continue; + + successful_polls += 1; + } else { + // Error: update error state + upstream.error_count += 1; + + if (upstream.last_error) |old_err| { + self.allocator.free(old_err); + } + upstream.last_error = result.error_msg; + + // Mark as transferred to prevent double-free in defer + results.items[i].error_msg = null; + } + } } + // Step 4: Calculate consensus (no lock needed) if (successful_polls == 0) { log.warn("No upstreams responded successfully", .{}); return null; } - // Find consensus (50%+ agreement) const required_votes = (successful_polls + 1) / 2; // Ceiling division var iter = slot_counts.iterator(); @@ -184,7 +247,7 @@ pub const UpstreamManager = struct { } log.warn("No consensus reached among {d} responding upstreams", .{successful_polls}); - return null; // No consensus reached + return null; } };