Skip to content

Commit 8ffe20b

Browse files
committed
fix panic on health check failure when using stream push
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent 93fd1c5 commit 8ffe20b

File tree

3 files changed

+49
-1
lines changed

3 files changed

+49
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
* [ENHANCEMENT] Distributor: Add a label references validation for remote write v2 request. #7074
99
* [ENHANCEMENT] Distributor: Add count, spans, and buckets validations for native histogram. #7072
1010
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088
11+
* [BUGFIX] Distributor: Fix panic on health check failure when using stream push. #7115
1112

1213
## 1.20.0 2025-11-10
1314

pkg/ingester/client/client.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,10 @@ func (c *closableHealthAndIngesterClient) worker(ctx context.Context) error {
231231
select {
232232
case <-ctx.Done():
233233
return
234-
case job := <-c.streamPushChan:
234+
case job, ok := <-c.streamPushChan:
235+
if !ok {
236+
return
237+
}
235238
err = stream.Send(job.req)
236239
if err == io.EOF {
237240
job.resp = &cortexpb.WriteResponse{}

pkg/ingester/client/client_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/prometheus/client_golang/prometheus"
1111
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/mock"
1213
"github.com/stretchr/testify/require"
1314
"google.golang.org/grpc"
1415

@@ -115,12 +116,18 @@ func createTestIngesterClient(maxInflightPushRequests int64, currentInflightRequ
115116

116117
type mockIngester struct {
117118
IngesterClient
119+
mock.Mock
118120
}
119121

120122
func (m *mockIngester) Push(_ context.Context, _ *cortexpb.WriteRequest, _ ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
121123
return &cortexpb.WriteResponse{}, nil
122124
}
123125

126+
func (m *mockIngester) PushStream(ctx context.Context, opts ...grpc.CallOption) (Ingester_PushStreamClient, error) {
127+
args := m.Called(ctx, opts)
128+
return args.Get(0).(Ingester_PushStreamClient), nil
129+
}
130+
124131
type mockClientConn struct {
125132
ClosableClientConn
126133
}
@@ -227,3 +234,40 @@ func TestClosableHealthAndIngesterClient_Close_WithPendingJobs(t *testing.T) {
227234
assert.True(t, job1Cancelled, "job1 should have been cancelled")
228235
assert.True(t, job2Cancelled, "job2 should have been cancelled")
229236
}
237+
238+
type mockClientStream struct {
239+
mock.Mock
240+
grpc.ClientStream
241+
}
242+
243+
func (m *mockClientStream) Send(msg *cortexpb.StreamWriteRequest) error {
244+
args := m.Called(msg)
245+
return args.Error(0)
246+
}
247+
248+
func (m *mockClientStream) Recv() (*cortexpb.WriteResponse, error) {
249+
return &cortexpb.WriteResponse{}, nil
250+
}
251+
252+
func TestClosableHealthAndIngesterClient_ShouldNotPanicWhenClose(t *testing.T) {
253+
ctx, cancel := context.WithCancel(context.Background())
254+
streamChan := make(chan *streamWriteJob)
255+
256+
mockIngester := &mockIngester{}
257+
mockStream := &mockClientStream{}
258+
mockIngester.On("PushStream", mock.Anything, mock.Anything).Return(mockStream, nil).Once()
259+
260+
client := &closableHealthAndIngesterClient{
261+
IngesterClient: mockIngester,
262+
conn: &mockClientConn{},
263+
addr: "test-addr",
264+
inflightPushRequests: prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"ingester"}),
265+
streamCtx: ctx,
266+
streamCancel: cancel,
267+
streamPushChan: streamChan,
268+
}
269+
require.NoError(t, client.worker(context.Background()))
270+
require.NoError(t, client.Close())
271+
272+
time.Sleep(100 * time.Millisecond)
273+
}

0 commit comments

Comments
 (0)