Skip to content

Commit 2e47e39

Browse files
committed
address pr comments
1 parent adee0a8 commit 2e47e39

File tree

3 files changed

+58
-116
lines changed

3 files changed

+58
-116
lines changed

hitless/errors.go

Lines changed: 2 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package hitless
22

33
import (
44
"errors"
5-
"fmt"
65
)
76

87
// Configuration errors
@@ -19,12 +18,7 @@ var (
1918

2019
// Configuration validation errors
2120
ErrInvalidHandoffRetries = errors.New("hitless: MaxHandoffRetries must be between 1 and 10")
22-
ErrInvalidConnectionValidationTimeout = errors.New("hitless: ConnectionValidationTimeout must be greater than 0 and less than 30 seconds")
23-
ErrInvalidConnectionHealthCheckInterval = errors.New("hitless: ConnectionHealthCheckInterval must be between 0 and 1 hour")
24-
ErrInvalidOperationCleanupInterval = errors.New("hitless: OperationCleanupInterval must be greater than 0 and less than 1 hour")
25-
ErrInvalidMaxActiveOperations = errors.New("hitless: MaxActiveOperations must be between 100 and 100000")
26-
ErrInvalidNotificationBufferSize = errors.New("hitless: NotificationBufferSize must be between 10 and 10000")
27-
ErrInvalidNotificationTimeout = errors.New("hitless: NotificationTimeout must be greater than 0 and less than 30 seconds")
21+
ErrInvalidHandoffState = errors.New("hitless: Conn is in invalid state for handoff")
2822
)
2923

3024
// Integration errors
@@ -34,44 +28,10 @@ var (
3428

3529
// Handoff errors
3630
var (
37-
ErrHandoffInProgress = errors.New("hitless: handoff already in progress")
38-
ErrNoHandoffInProgress = errors.New("hitless: no handoff in progress")
39-
ErrConnectionFailed = errors.New("hitless: failed to establish new connection")
40-
ErrHandoffQueueFull = errors.New("hitless: handoff queue is full, cannot queue new handoff requests - consider increasing HandoffQueueSize or MaxWorkers in configuration")
31+
ErrHandoffQueueFull = errors.New("hitless: handoff queue is full, cannot queue new handoff requests - consider increasing HandoffQueueSize or MaxWorkers in configuration")
4132
)
4233

43-
// Dead error variables removed - unused in simplified architecture
44-
4534
// Notification errors
4635
var (
4736
ErrInvalidNotification = errors.New("hitless: invalid notification format")
4837
)
49-
50-
// Dead error variables removed - unused in simplified architecture
51-
52-
// HandoffError represents an error that occurred during connection handoff.
53-
type HandoffError struct {
54-
Operation string
55-
Endpoint string
56-
Cause error
57-
}
58-
59-
func (e *HandoffError) Error() string {
60-
if e.Cause != nil {
61-
return fmt.Sprintf("hitless: handoff %s failed for endpoint %s: %v", e.Operation, e.Endpoint, e.Cause)
62-
}
63-
return fmt.Sprintf("hitless: handoff %s failed for endpoint %s", e.Operation, e.Endpoint)
64-
}
65-
66-
func (e *HandoffError) Unwrap() error {
67-
return e.Cause
68-
}
69-
70-
// NewHandoffError creates a new HandoffError.
71-
func NewHandoffError(operation, endpoint string, cause error) *HandoffError {
72-
return &HandoffError{
73-
Operation: operation,
74-
Endpoint: endpoint,
75-
Cause: cause,
76-
}
77-
}

hitless/notification_handler.go

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package hitless
33
import (
44
"context"
55
"fmt"
6-
"strconv"
76
"time"
87

98
"github.com/redis/go-redis/v9/internal"
@@ -63,27 +62,17 @@ func (snh *NotificationHandler) handleMoving(ctx context.Context, handlerCtx pus
6362
if len(notification) < 3 {
6463
return ErrInvalidNotification
6564
}
66-
seqIDStr, ok := notification[1].(string)
65+
seqID, ok := notification[1].(int64)
6766
if !ok {
6867
return ErrInvalidNotification
6968
}
7069

71-
seqID, err := strconv.ParseInt(seqIDStr, 10, 64)
72-
if err != nil {
73-
return ErrInvalidNotification
74-
}
75-
7670
// Extract timeS
77-
timeSStr, ok := notification[2].(string)
71+
timeS, ok := notification[2].(int64)
7872
if !ok {
7973
return ErrInvalidNotification
8074
}
8175

82-
timeS, err := strconv.ParseInt(timeSStr, 10, 64)
83-
if err != nil {
84-
return ErrInvalidNotification
85-
}
86-
8776
newEndpoint := ""
8877
if len(notification) > 3 {
8978
// Extract new endpoint
@@ -116,16 +105,17 @@ func (snh *NotificationHandler) handleMoving(ctx context.Context, handlerCtx pus
116105
newEndpoint = snh.manager.options.GetAddr()
117106
// delay the handoff for timeS/2 seconds to the same endpoint
118107
// do this in a goroutine to avoid blocking the notification handler
119-
go func() {
120-
time.Sleep(time.Duration(timeS/2) * time.Second)
108+
// NOTE: This timer is started while parsing the notification, so the connection is not marked for handoff
109+
// and there should be no possibility of a race condition or double handoff.
110+
time.AfterFunc(time.Duration(timeS/2)*time.Second, func() {
121111
if poolConn == nil || poolConn.IsClosed() {
122112
return
123113
}
124114
if err := snh.markConnForHandoff(poolConn, newEndpoint, seqID, deadline); err != nil {
125115
// Log error but don't fail the goroutine
126-
internal.Logger.Printf(context.Background(), "hitless: failed to mark connection for handoff: %v", err)
116+
internal.Logger.Printf(ctx, "hitless: failed to mark connection for handoff: %v", err)
127117
}
128-
}()
118+
})
129119
return nil
130120
}
131121

hitless/pool_hook.go

Lines changed: 49 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -257,16 +257,45 @@ func (ph *PoolHook) processHandoffRequest(request HandoffRequest) {
257257
}()
258258

259259
// Perform the handoff with cancellable context
260-
err := ph.performConnectionHandoffWithPool(shutdownCtx, request.Conn, request.Pool)
261-
262-
// If handoff failed, restore the handoff state for potential retry
260+
shouldRetry, err := ph.performConnectionHandoffWithPool(shutdownCtx, request.Conn, request.Pool)
263261
if err != nil {
264-
request.Conn.RestoreHandoffState()
265-
internal.Logger.Printf(context.Background(), "Handoff failed for connection WILL RETRY: %v", err)
266-
}
262+
if shouldRetry {
263+
now := time.Now()
264+
deadline, ok := shutdownCtx.Deadline()
265+
if !ok || deadline.Before(now) {
266+
// wait half the timeout before retrying if no deadline or deadline has passed
267+
deadline = now.Add(handoffTimeout / 2)
268+
}
267269

268-
// No need for scale down scheduling with on-demand workers
269-
// Workers automatically exit when idle
270+
afterTime := deadline.Sub(now)
271+
if afterTime < handoffTimeout/2 {
272+
afterTime = handoffTimeout / 2
273+
}
274+
275+
internal.Logger.Printf(context.Background(), "Handoff failed for connection WILL RETRY After %v: %v", afterTime, err)
276+
time.AfterFunc(afterTime, func() {
277+
ph.queueHandoff(request.Conn)
278+
})
279+
} else {
280+
pooler := request.Pool
281+
conn := request.Conn
282+
if pooler != nil {
283+
go pooler.Remove(ctx, conn, err)
284+
if ph.config != nil && ph.config.LogLevel >= 1 { // Warning level
285+
internal.Logger.Printf(ctx,
286+
"hitless: removed connection %d from pool due to max handoff retries reached",
287+
conn.GetID())
288+
}
289+
} else {
290+
go conn.Close()
291+
if ph.config != nil && ph.config.LogLevel >= 1 { // Warning level
292+
internal.Logger.Printf(ctx,
293+
"hitless: no pool provided for connection %d, cannot remove due to handoff initialization failure: %v",
294+
conn.GetID(), err)
295+
}
296+
}
297+
}
298+
}
270299
}
271300

272301
// queueHandoff queues a handoff request for processing
@@ -313,8 +342,8 @@ func (ph *PoolHook) queueHandoff(conn *pool.Conn) error {
313342
}
314343

315344
// performConnectionHandoffWithPool performs the actual connection handoff with pool for connection removal on failure
316-
// if err is returned, connection will be removed from pool
317-
func (ph *PoolHook) performConnectionHandoffWithPool(ctx context.Context, conn *pool.Conn, pooler pool.Pooler) error {
345+
// When error is returned, the connection handoff should be retried if err is not ErrMaxHandoffRetriesReached
346+
func (ph *PoolHook) performConnectionHandoffWithPool(ctx context.Context, conn *pool.Conn, pooler pool.Pooler) (shouldRetry bool, err error) {
318347
// Clear handoff state after successful handoff
319348
seqID := conn.GetMovingSeqID()
320349
connID := conn.GetID()
@@ -326,11 +355,7 @@ func (ph *PoolHook) performConnectionHandoffWithPool(ctx context.Context, conn *
326355

327356
newEndpoint := conn.GetHandoffEndpoint()
328357
if newEndpoint == "" {
329-
// TODO(hitless): Handle by performing the handoff to the current endpoint in N seconds,
330-
// Where N is the time in the moving notification...
331-
// For now, clear the handoff state and return
332-
conn.ClearHandoffState()
333-
return nil
358+
return false, ErrInvalidHandoffState
334359
}
335360

336361
retries := conn.IncrementAndGetHandoffRetries(1)
@@ -345,23 +370,8 @@ func (ph *PoolHook) performConnectionHandoffWithPool(ctx context.Context, conn *
345370
"hitless: reached max retries (%d) for handoff of connection %d to %s",
346371
maxRetries, conn.GetID(), conn.GetHandoffEndpoint())
347372
}
348-
err := ErrMaxHandoffRetriesReached
349-
if pooler != nil {
350-
go pooler.Remove(ctx, conn, err)
351-
if ph.config != nil && ph.config.LogLevel >= 1 { // Warning level
352-
internal.Logger.Printf(ctx,
353-
"hitless: removed connection %d from pool due to max handoff retries reached",
354-
conn.GetID())
355-
}
356-
} else {
357-
go conn.Close()
358-
if ph.config != nil && ph.config.LogLevel >= 1 { // Warning level
359-
internal.Logger.Printf(ctx,
360-
"hitless: no pool provided for connection %d, cannot remove due to handoff initialization failure: %v",
361-
conn.GetID(), err)
362-
}
363-
}
364-
return err
373+
// won't retry on ErrMaxHandoffRetriesReached
374+
return false, ErrMaxHandoffRetriesReached
365375
}
366376

367377
// Create endpoint-specific dialer
@@ -370,10 +380,9 @@ func (ph *PoolHook) performConnectionHandoffWithPool(ctx context.Context, conn *
370380
// Create new connection to the new endpoint
371381
newNetConn, err := endpointDialer(ctx)
372382
if err != nil {
373-
// TODO(hitless): retry
374-
// This is the only case where we should retry the handoff request
375-
// Should we do anything else other than return the error?
376-
return err
383+
// hitless: will retry
384+
// Maybe a network error - retry after a delay
385+
return true, err
377386
}
378387

379388
// Get the old connection
@@ -382,26 +391,9 @@ func (ph *PoolHook) performConnectionHandoffWithPool(ctx context.Context, conn *
382391
// Replace the connection and execute initialization
383392
err = conn.SetNetConnAndInitConn(ctx, newNetConn)
384393
if err != nil {
385-
// Remove the connection from the pool since it's in a bad state
386-
if pooler != nil {
387-
// Use pool.Pooler interface directly - no adapter needed
388-
go pooler.Remove(ctx, conn, err)
389-
if ph.config != nil && ph.config.LogLevel >= 1 { // Warning level
390-
internal.Logger.Printf(ctx,
391-
"hitless: removed connection %d from pool due to handoff initialization failure: %v",
392-
conn.GetID(), err)
393-
}
394-
} else {
395-
go conn.Close()
396-
if ph.config != nil && ph.config.LogLevel >= 1 { // Warning level
397-
internal.Logger.Printf(ctx,
398-
"hitless: no pool provided for connection %d, cannot remove due to handoff initialization failure: %v",
399-
conn.GetID(), err)
400-
}
401-
}
402-
403-
// Keep the handoff state for retry
404-
return err
394+
// hitless: won't retry
395+
// Initialization failed - remove the connection
396+
return false, err
405397
}
406398
defer func() {
407399
if oldConn != nil {
@@ -428,7 +420,7 @@ func (ph *PoolHook) performConnectionHandoffWithPool(ctx context.Context, conn *
428420
}
429421
}
430422

431-
return nil
423+
return false, nil
432424
}
433425

434426
// createEndpointDialer creates a dialer function that connects to a specific endpoint

0 commit comments

Comments
 (0)