Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,72 @@ func TestClient_502CommandRemovesProvider(t *testing.T) {
}
}

func TestClient_502ReconnectDelay(t *testing.T) {
var attempt atomic.Int32
factory := func(ctx context.Context) (net.Conn, error) {
client, server := net.Pipe()
go func() {
_, _ = server.Write([]byte("200 server ready\r\n"))
buf := make([]byte, 4096)
for {
_, err := server.Read(buf)
if err != nil {
return
}
if attempt.Add(1) == 1 {
// First command: return 502 to trigger removal.
_, _ = server.Write([]byte("502 service unavailable\r\n"))
} else {
// Subsequent commands: succeed.
_, _ = server.Write([]byte("223 article exists\r\n"))
}
}
}()
return client, nil
}

c, err := NewClient(context.Background(), []Provider{
{Factory: factory, Connections: 1, SkipPing: true, ReconnectDelay: 50 * time.Millisecond},
})
if err != nil {
t.Fatalf("NewClient() error = %v", err)
}
defer func() { _ = c.Close() }()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// First request: hits 502, provider is removed.
resp := <-c.Send(ctx, []byte("STAT <id@test>\r\n"), nil)
if !errors.Is(resp.Err, ErrServiceUnavailable) {
t.Fatalf("first Send() error = %v, want ErrServiceUnavailable", resp.Err)
}
if c.NumProviders() != 0 {
t.Errorf("NumProviders() = %d, want 0 after 502", c.NumProviders())
}

// Wait for reconnect goroutine to re-add the provider.
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
if c.NumProviders() == 1 {
break
}
time.Sleep(5 * time.Millisecond)
}
if c.NumProviders() != 1 {
t.Fatalf("NumProviders() = %d, want 1 after reconnect", c.NumProviders())
}

// Second request: should succeed via the re-added provider.
resp = <-c.Send(ctx, []byte("STAT <id@test>\r\n"), nil)
if resp.Err != nil {
t.Fatalf("second Send() error = %v", resp.Err)
}
if resp.StatusCode != 223 {
t.Errorf("StatusCode = %d, want 223", resp.StatusCode)
}
}

func TestClient_502CommandFallsBackToBackup(t *testing.T) {
makeFactory := func(statusCode int) ConnFactory {
return func(ctx context.Context) (net.Conn, error) {
Expand Down
20 changes: 20 additions & 0 deletions nntp.go
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,7 @@ type Provider struct {
IdleTimeout time.Duration // 0 means no idle disconnect
ThrottleRestore time.Duration // 0 defaults to 30s
KeepAlive time.Duration // TCP keep-alive interval; 0 defaults to 30s; negative disables
ReconnectDelay time.Duration // 0 disables auto-reconnect after 502; when set, re-adds provider after this delay
}

type providerGroup struct {
Expand All @@ -1096,6 +1097,7 @@ type providerGroup struct {
gate *connGate
stats providerStats
cancel context.CancelFunc // cancels this group's slot goroutines
p Provider // original config; used for auto-reconnect
}

type Client struct {
Expand Down Expand Up @@ -1242,6 +1244,7 @@ func (c *Client) startProviderGroup(p Provider, index int) *providerGroup {
hotPrioCh: make(chan *Request),
gate: gate,
cancel: gcancel,
p: p,
}

// Ping with a short timeout so we don't block forever.
Expand Down Expand Up @@ -1514,6 +1517,9 @@ func (c *Client) doSendWithRetry(ctx context.Context, payload []byte, bodyWriter
// Provider returned "service unavailable" — remove it from the
// pool immediately so no further requests are routed to it.
_ = c.RemoveProvider(g.name)
if g.p.ReconnectDelay > 0 {
c.scheduleReconnect(g)
}
lastErr = fmt.Errorf("%s: %w", g.name, ErrServiceUnavailable)
continue
}
Expand Down Expand Up @@ -1557,6 +1563,9 @@ func (c *Client) doSendWithRetry(ctx context.Context, payload []byte, bodyWriter
}
if resp.StatusCode == 502 {
_ = c.RemoveProvider(g.name)
if g.p.ReconnectDelay > 0 {
c.scheduleReconnect(g)
}
lastErr = fmt.Errorf("%s: %w", g.name, ErrServiceUnavailable)
continue
}
Expand Down Expand Up @@ -1657,6 +1666,17 @@ func (c *Client) AddProvider(p Provider) error {

// RemoveProvider stops and removes a provider by name.
// Goroutines wind down asynchronously; Client.Close still waits for all via c.wg.
func (c *Client) scheduleReconnect(g *providerGroup) {
go func() {
select {
case <-c.ctx.Done():
return
case <-time.After(g.p.ReconnectDelay):
}
_ = c.AddProvider(g.p) // no-op if client closed or duplicate
}()
}

func (c *Client) RemoveProvider(name string) error {
for _, pair := range [...]struct {
ptr *atomic.Pointer[[]*providerGroup]
Expand Down
Loading