diff --git a/integration_test.go b/integration_test.go index 293d8f6..e61f206 100644 --- a/integration_test.go +++ b/integration_test.go @@ -1123,6 +1123,72 @@ func TestClient_502CommandRemovesProvider(t *testing.T) { } } +func TestClient_502ReconnectDelay(t *testing.T) { + var attempt atomic.Int32 + factory := func(ctx context.Context) (net.Conn, error) { + client, server := net.Pipe() + go func() { + _, _ = server.Write([]byte("200 server ready\r\n")) + buf := make([]byte, 4096) + for { + _, err := server.Read(buf) + if err != nil { + return + } + if attempt.Add(1) == 1 { + // First command: return 502 to trigger removal. + _, _ = server.Write([]byte("502 service unavailable\r\n")) + } else { + // Subsequent commands: succeed. + _, _ = server.Write([]byte("223 article exists\r\n")) + } + } + }() + return client, nil + } + + c, err := NewClient(context.Background(), []Provider{ + {Factory: factory, Connections: 1, SkipPing: true, ReconnectDelay: 50 * time.Millisecond}, + }) + if err != nil { + t.Fatalf("NewClient() error = %v", err) + } + defer func() { _ = c.Close() }() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // First request: hits 502, provider is removed. + resp := <-c.Send(ctx, []byte("STAT \r\n"), nil) + if !errors.Is(resp.Err, ErrServiceUnavailable) { + t.Fatalf("first Send() error = %v, want ErrServiceUnavailable", resp.Err) + } + if c.NumProviders() != 0 { + t.Errorf("NumProviders() = %d, want 0 after 502", c.NumProviders()) + } + + // Wait for reconnect goroutine to re-add the provider. + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if c.NumProviders() == 1 { + break + } + time.Sleep(5 * time.Millisecond) + } + if c.NumProviders() != 1 { + t.Fatalf("NumProviders() = %d, want 1 after reconnect", c.NumProviders()) + } + + // Second request: should succeed via the re-added provider. + resp = <-c.Send(ctx, []byte("STAT \r\n"), nil) + if resp.Err != nil { + t.Fatalf("second Send() error = %v", resp.Err) + } + if resp.StatusCode != 223 { + t.Errorf("StatusCode = %d, want 223", resp.StatusCode) + } +} + func TestClient_502CommandFallsBackToBackup(t *testing.T) { makeFactory := func(statusCode int) ConnFactory { return func(ctx context.Context) (net.Conn, error) { diff --git a/nntp.go b/nntp.go index dc7dfcd..ffd5ec9 100644 --- a/nntp.go +++ b/nntp.go @@ -1082,6 +1082,7 @@ type Provider struct { IdleTimeout time.Duration // 0 means no idle disconnect ThrottleRestore time.Duration // 0 defaults to 30s KeepAlive time.Duration // TCP keep-alive interval; 0 defaults to 30s; negative disables + ReconnectDelay time.Duration // 0 disables auto-reconnect after 502; when set, re-adds provider after this delay } type providerGroup struct { @@ -1096,6 +1097,7 @@ type providerGroup struct { gate *connGate stats providerStats cancel context.CancelFunc // cancels this group's slot goroutines + p Provider // original config; used for auto-reconnect } type Client struct { @@ -1242,6 +1244,7 @@ func (c *Client) startProviderGroup(p Provider, index int) *providerGroup { hotPrioCh: make(chan *Request), gate: gate, cancel: gcancel, + p: p, } // Ping with a short timeout so we don't block forever. @@ -1514,6 +1517,9 @@ func (c *Client) doSendWithRetry(ctx context.Context, payload []byte, bodyWriter // Provider returned "service unavailable" — remove it from the // pool immediately so no further requests are routed to it. _ = c.RemoveProvider(g.name) + if g.p.ReconnectDelay > 0 { + c.scheduleReconnect(g) + } lastErr = fmt.Errorf("%s: %w", g.name, ErrServiceUnavailable) continue } @@ -1557,6 +1563,9 @@ func (c *Client) doSendWithRetry(ctx context.Context, payload []byte, bodyWriter } if resp.StatusCode == 502 { _ = c.RemoveProvider(g.name) + if g.p.ReconnectDelay > 0 { + c.scheduleReconnect(g) + } lastErr = fmt.Errorf("%s: %w", g.name, ErrServiceUnavailable) continue } @@ -1657,6 +1666,17 @@ func (c *Client) AddProvider(p Provider) error { // RemoveProvider stops and removes a provider by name. // Goroutines wind down asynchronously; Client.Close still waits for all via c.wg. +func (c *Client) scheduleReconnect(g *providerGroup) { + go func() { + select { + case <-c.ctx.Done(): + return + case <-time.After(g.p.ReconnectDelay): + } + _ = c.AddProvider(g.p) // no-op if client closed or duplicate + }() +} + func (c *Client) RemoveProvider(name string) error { for _, pair := range [...]struct { ptr *atomic.Pointer[[]*providerGroup]