From ea6c1e6b4bd20757270da18f0e815bcc9ef23485 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sun, 1 Mar 2026 13:48:28 -0800 Subject: [PATCH 1/2] fix(config): add upstream retry time budget and safer retry default --- src/config/types.zig | 3 ++- src/datadog_main.zig | 1 + src/lambda_main.zig | 2 ++ src/main.zig | 1 + src/otlp_main.zig | 1 + src/prometheus_main.zig | 1 + src/proxy/server.zig | 14 ++++++++++++++ 7 files changed, 22 insertions(+), 1 deletion(-) diff --git a/src/config/types.zig b/src/config/types.zig index 64a4ba0..f311194 100644 --- a/src/config/types.zig +++ b/src/config/types.zig @@ -44,7 +44,8 @@ pub const ProxyConfig = struct { max_body_size: u32 = 1024 * 1024, // 1MB // Retry config - max_upstream_retries: u8 = 10, + max_upstream_retries: u8 = 3, + upstream_retry_time_budget_ms: u32 = 2000, // Policy providers - array of provider configurations policy_providers: []ProviderConfig = &.{}, diff --git a/src/datadog_main.zig b/src/datadog_main.zig index 5945d7e..f1fde08 100644 --- a/src/datadog_main.zig +++ b/src/datadog_main.zig @@ -331,6 +331,7 @@ pub fn main() !void { config.listen_address, config.listen_port, config.max_upstream_retries, + config.upstream_retry_time_budget_ms, config.max_body_size, &module_registrations, ); diff --git a/src/lambda_main.zig b/src/lambda_main.zig index ed3a628..70935fe 100644 --- a/src/lambda_main.zig +++ b/src/lambda_main.zig @@ -59,6 +59,7 @@ pub const LambdaConfig = struct { // Limits max_body_size: u32 = 5 * 1024 * 1024, // 5MB max_upstream_retries: u8 = 3, + upstream_retry_time_budget_ms: u32 = 2000, // Service metadata service: struct { @@ -315,6 +316,7 @@ pub fn main() !void { config.listen_address, config.listen_port, config.max_upstream_retries, + config.upstream_retry_time_budget_ms, config.max_body_size, &module_registrations, ); diff --git a/src/main.zig b/src/main.zig index 4cae8b2..d3dad9e 100644 --- a/src/main.zig +++ b/src/main.zig @@ -365,6 +365,7 @@ pub fn main() !void { config.listen_address, config.listen_port, config.max_upstream_retries, + config.upstream_retry_time_budget_ms, config.max_body_size, &module_registrations, ); diff --git a/src/otlp_main.zig b/src/otlp_main.zig index 9b04b07..89d1466 100644 --- a/src/otlp_main.zig +++ b/src/otlp_main.zig @@ -296,6 +296,7 @@ pub fn main() !void { config.listen_address, config.listen_port, config.max_upstream_retries, + config.upstream_retry_time_budget_ms, config.max_body_size, &module_registrations, ); diff --git a/src/prometheus_main.zig b/src/prometheus_main.zig index 6899808..f90a475 100644 --- a/src/prometheus_main.zig +++ b/src/prometheus_main.zig @@ -310,6 +310,7 @@ pub fn main() !void { config.listen_address, config.listen_port, config.max_upstream_retries, + config.upstream_retry_time_budget_ms, config.max_body_size, &module_registrations, ); diff --git a/src/proxy/server.zig b/src/proxy/server.zig index ec4fcdc..ccd46a8 100644 --- a/src/proxy/server.zig +++ b/src/proxy/server.zig @@ -169,6 +169,9 @@ const ServerContext = struct { /// Maximum retries for failed upstream requests max_upstream_retries: u8, + /// Upper bound on total retry time budget per request (ms) + upstream_retry_time_budget_ms: u32, + /// Event bus for observability bus: *EventBus, @@ -244,6 +247,7 @@ pub const ProxyServer = struct { listen_address: [4]u8, listen_port: u16, max_upstream_retries: u8, + upstream_retry_time_budget_ms: u32, max_body_size: u32, module_registrations: []const ModuleRegistration, ) !ProxyServer { @@ -253,6 +257,7 @@ pub const ProxyServer = struct { ctx.allocator = allocator; ctx.bus = bus; ctx.max_upstream_retries = max_upstream_retries; + ctx.upstream_retry_time_budget_ms = upstream_retry_time_budget_ms; ctx.upstreams = UpstreamClientManager.init(allocator); errdefer ctx.upstreams.deinit(); ctx.modules = .{ .modules = .{} }; @@ -559,6 +564,8 @@ fn proxyToUpstream( body_to_send: []const u8, ) !usize { const max_retries = ctx.max_upstream_retries; + const retry_budget_ms = ctx.upstream_retry_time_budget_ms; + const start_ms = std.time.milliTimestamp(); var attempt: u8 = 0; // https://codeberg.org/ziglang/zig/issues/30165 @@ -581,6 +588,13 @@ fn proxyToUpstream( return err; } + if (retry_budget_ms > 0) { + const elapsed_ms = std.time.milliTimestamp() - start_ms; + if (elapsed_ms >= @as(i64, @intCast(retry_budget_ms))) { + return err; + } + } + ctx.bus.warn(UpstreamRetry{ .attempt = attempt + 1, .max_retries = max_retries, From 32d3c4b3c18fa319faf5f1e3f257ac5481dd3012 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sun, 1 Mar 2026 14:15:06 -0800 Subject: [PATCH 2/2] test(proxy): cover retry budget decision logic --- src/proxy/server.zig | 105 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 89 insertions(+), 16 deletions(-) diff --git a/src/proxy/server.zig b/src/proxy/server.zig index ccd46a8..918805e 100644 --- a/src/proxy/server.zig +++ b/src/proxy/server.zig @@ -553,6 +553,34 @@ fn getUnderlyingWriteError(upstream_req: *std.http.Client.Request) ?[]const u8 { return @errorName(write_err); } +fn isRetryableUpstreamErrorName(err_name: []const u8) bool { + return std.mem.eql(u8, err_name, "ConnectionResetByPeer") or + std.mem.eql(u8, err_name, "BrokenPipe") or + std.mem.eql(u8, err_name, "ConnectionTimedOut") or + std.mem.eql(u8, err_name, "UnexpectedReadFailure") or + std.mem.eql(u8, err_name, "HttpConnectionClosing") or + std.mem.eql(u8, err_name, "UnexpectedWriteFailure"); +} + +fn shouldRetryUpstreamRequest( + err_name: []const u8, + attempt: u8, + max_retries: u8, + start_ms: i64, + now_ms: i64, + retry_budget_ms: u32, +) bool { + if (!isRetryableUpstreamErrorName(err_name)) return false; + if (attempt + 1 >= max_retries) return false; + + if (retry_budget_ms > 0) { + const elapsed_ms = now_ms - start_ms; + if (elapsed_ms >= @as(i64, @intCast(retry_budget_ms))) return false; + } + + return true; +} + /// Forward request to upstream and stream response back /// Returns the number of bytes in the response body fn proxyToUpstream( @@ -575,26 +603,18 @@ fn proxyToUpstream( if (result) |bytes| { return bytes; } else |err| { - // Only retry on connection-related errors (stale connections from pool) const err_name = @errorName(err); - const is_retryable = std.mem.eql(u8, err_name, "ConnectionResetByPeer") or - std.mem.eql(u8, err_name, "BrokenPipe") or - std.mem.eql(u8, err_name, "ConnectionTimedOut") or - std.mem.eql(u8, err_name, "UnexpectedReadFailure") or - std.mem.eql(u8, err_name, "HttpConnectionClosing") or - std.mem.eql(u8, err_name, "UnexpectedWriteFailure"); - - if (!is_retryable or attempt + 1 >= max_retries) { + if (!shouldRetryUpstreamRequest( + err_name, + attempt, + max_retries, + start_ms, + std.time.milliTimestamp(), + retry_budget_ms, + )) { return err; } - if (retry_budget_ms > 0) { - const elapsed_ms = std.time.milliTimestamp() - start_ms; - if (elapsed_ms >= @as(i64, @intCast(retry_budget_ms))) { - return err; - } - } - ctx.bus.warn(UpstreamRetry{ .attempt = attempt + 1, .max_retries = max_retries, @@ -814,3 +834,56 @@ test "shouldSkipResponseHeader" { try std.testing.expect(!shouldSkipResponseHeader("content-type")); try std.testing.expect(!shouldSkipResponseHeader("x-custom-header")); } + +test "shouldRetryUpstreamRequest respects retryable errors and attempt budget" { + const start_ms: i64 = 1000; + + try std.testing.expect(shouldRetryUpstreamRequest( + "ConnectionResetByPeer", + 0, + 3, + start_ms, + start_ms + 10, + 2000, + )); + + try std.testing.expect(!shouldRetryUpstreamRequest( + "InvalidArgument", + 0, + 3, + start_ms, + start_ms + 10, + 2000, + )); + + try std.testing.expect(!shouldRetryUpstreamRequest( + "ConnectionResetByPeer", + 2, + 3, + start_ms, + start_ms + 10, + 2000, + )); +} + +test "shouldRetryUpstreamRequest respects retry time budget" { + const start_ms: i64 = 1000; + + try std.testing.expect(shouldRetryUpstreamRequest( + "BrokenPipe", + 0, + 3, + start_ms, + start_ms + 1999, + 2000, + )); + + try std.testing.expect(!shouldRetryUpstreamRequest( + "BrokenPipe", + 0, + 3, + start_ms, + start_ms + 2000, + 2000, + )); +}