Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 74 additions & 8 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1096,11 +1096,40 @@ func isConflict(err error) bool {
if err == nil {
return false
}
return err == errConflict ||

// Check direct error matches
if err == errConflict ||
Copy link

Copilot AI Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using errors.Is(err, errConflict) instead of direct equality to correctly handle wrapped instances of the sentinel errConflict.

Suggested change
if err == errConflict ||
if errors.Is(err, errConflict) ||

Copilot uses AI. Check for mistakes.
isSampleConflictErr(err) ||
isExemplarConflictErr(err) ||
isLabelsConflictErr(err) ||
status.Code(err) == codes.AlreadyExists
isLabelsConflictErr(err) {
return true
}

// Check gRPC status code - this handles the primary case from the issue
if status.Code(err) == codes.AlreadyExists {
return true
}

// For wrapped errors, we need to check if the underlying cause has AlreadyExists status
// This handles the case where AlreadyExists errors are wrapped multiple times
cause := errors.Cause(err)
if status.Code(cause) == codes.AlreadyExists {
return true
}

// Additional check for MultiError - if any error in the chain is a conflict, consider it a conflict
// This handles the nested MultiError case mentioned in the issue
if merr, ok := err.(interface{ Unwrap() []error }); ok {
if errs := merr.Unwrap(); errs != nil {
for _, e := range errs {
if isConflict(e) {
return true
}
}
}
}

return false
}

// isSampleConflictErr returns whether or not the given error represents
Expand Down Expand Up @@ -1130,15 +1159,52 @@ func isLabelsConflictErr(err error) bool {

// isNotReady returns whether or not the given error represents a not ready error.
func isNotReady(err error) bool {
return err == errNotReady ||
err == tsdb.ErrNotReady ||
status.Code(err) == codes.Unavailable
if err == nil {
return false
}

// Check direct error matches and use errors.Is for wrapped errors
if errors.Is(err, errNotReady) || errors.Is(err, tsdb.ErrNotReady) {
return true
}

// Check gRPC status code
if status.Code(err) == codes.Unavailable {
return true
}

// For wrapped errors, check if the underlying cause has the correct status
cause := errors.Cause(err)
if status.Code(cause) == codes.Unavailable {
return true
}

return false
}

// isUnavailable returns whether or not the given error represents an unavailable error.
func isUnavailable(err error) bool {
return err == errUnavailable ||
status.Code(err) == codes.Unavailable
if err == nil {
return false
}

// Check direct error matches and use errors.Is for wrapped errors
if errors.Is(err, errUnavailable) {
return true
}

// Check gRPC status code
if status.Code(err) == codes.Unavailable {
return true
}

// For wrapped errors, check if the underlying cause has the correct status
cause := errors.Cause(err)
if status.Code(cause) == codes.Unavailable {
return true
}

return false
}

// retryState encapsulates the number of request attempt made against a peer and,
Expand Down
84 changes: 84 additions & 0 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import (
"github.com/prometheus/prometheus/tsdb"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/grpc/test/bufconn"

"github.com/thanos-io/thanos/pkg/block/metadata"
Expand Down Expand Up @@ -1955,3 +1957,85 @@ func TestHandlerFlippingHashrings(t *testing.T) {
cancel()
wg.Wait()
}

func TestIsConflictWrappedErrors(t *testing.T) {
tests := []struct {
name string
err error
expectMatch bool
}{
{
name: "direct AlreadyExists gRPC error",
err: status.Error(codes.AlreadyExists, "already exists"),
expectMatch: true,
},
{
name: "wrapped AlreadyExists gRPC error",
err: errors.Wrap(status.Error(codes.AlreadyExists, "already exists"), "wrapped error"),
expectMatch: true,
},
{
name: "double wrapped AlreadyExists gRPC error",
err: errors.Wrap(errors.Wrap(status.Error(codes.AlreadyExists, "already exists"), "first wrap"), "second wrap"),
expectMatch: true,
},
{
name: "triple wrapped AlreadyExists gRPC error (as seen in issue #5407)",
err: errors.Wrapf(
errors.Wrapf(
status.Error(codes.AlreadyExists, "store locally for endpoint conflict"),
"forwarding request to endpoint %v", "test-endpoint"),
"replicate write request for endpoint %v", "test-endpoint"),
expectMatch: true,
},
{
name: "non-conflict error",
err: status.Error(codes.Internal, "internal error"),
expectMatch: false,
},
{
name: "wrapped non-conflict error",
err: errors.Wrap(status.Error(codes.Internal, "internal error"), "wrapped"),
expectMatch: false,
},
{
name: "nil error",
err: nil,
expectMatch: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := isConflict(tt.err)
if result != tt.expectMatch {
t.Errorf("isConflict(%v) = %v, want %v", tt.err, result, tt.expectMatch)
}
})
}
}

// TestReplicationErrorsWithWrappedConflicts tests that replicationErrors
// properly identifies wrapped conflict errors and returns the correct cause
func TestReplicationErrorsWithWrappedConflicts(t *testing.T) {
// Create a scenario similar to the one described in issue #5407
// where we have replication factor 2, and one request fails with wrapped AlreadyExists
re := &replicationErrors{
threshold: 1, // With RF=2, quorum is 1, so threshold should be 1
}

// Add a wrapped AlreadyExists error (as would happen in the fanout scenario)
wrappedConflictErr := errors.Wrapf(
errors.Wrapf(
status.Error(codes.AlreadyExists, "store locally for endpoint conflict"),
"forwarding request to endpoint %v", "test-endpoint"),
"replicate write request for endpoint %v", "test-endpoint")

re.Add(wrappedConflictErr)

// The Cause() method should return errConflict, not errInternal
cause := re.Cause()
if cause != errConflict {
t.Errorf("replicationErrors.Cause() = %v, want %v", cause, errConflict)
}
}