From 57491997a300fbff091fff9d3330b243abdb01fe Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Thu, 25 Sep 2025 21:21:01 +0900 Subject: [PATCH] use exp/api/remote in PRW2 e2e test code Signed-off-by: SungJin1212 --- integration/e2ecortex/client.go | 42 +++++++++------------- integration/remote_write_v2_test.go | 56 +++++++++++++---------------- 2 files changed, 40 insertions(+), 58 deletions(-) diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index fa707e7f49d..73f3c6bbf32 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -20,6 +20,7 @@ import ( "github.com/prometheus/alertmanager/types" promapi "github.com/prometheus/client_golang/api" promv1 "github.com/prometheus/client_golang/api/prometheus/v1" + remoteapi "github.com/prometheus/client_golang/exp/api/remote" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/rulefmt" @@ -51,6 +52,7 @@ type Client struct { distributorAddress string timeout time.Duration httpClient *http.Client + remoteWriteAPI *remoteapi.API querierClient promv1.API orgID string } @@ -72,6 +74,17 @@ func NewClient( return nil, err } + client := &http.Client{ + Transport: &addOrgIDRoundTripper{orgID: orgID, next: http.DefaultTransport}, + } + remoteWriteAPI, err := remoteapi.NewAPI(fmt.Sprintf("http://%s", distributorAddress), + remoteapi.WithAPIHTTPClient(client), + remoteapi.WithAPIPath("/api/prom/push"), + ) + if err != nil { + return nil, err + } + c := &Client{ distributorAddress: distributorAddress, querierAddress: querierAddress, @@ -80,6 +93,7 @@ func NewClient( timeout: 30 * time.Second, httpClient: &http.Client{}, querierClient: promv1.NewAPI(querierAPIClient), + remoteWriteAPI: remoteWriteAPI, orgID: orgID, } @@ -184,36 +198,12 @@ func (c *Client) Push(timeseries []prompb.TimeSeries, metadata ...prompb.MetricM } // PushV2 the input timeseries to the remote endpoint -func (c *Client) PushV2(symbols []string, timeseries []writev2.TimeSeries) (*http.Response, error) { +func (c *Client) PushV2(symbols []string, timeseries []writev2.TimeSeries) (remoteapi.WriteResponseStats, error) { // Create write request - data, err := proto.Marshal(&writev2.Request{Symbols: symbols, Timeseries: timeseries}) - if err != nil { - return nil, err - } - - // Create HTTP request - compressed := snappy.Encode(nil, data) - req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/prom/push", c.distributorAddress), bytes.NewReader(compressed)) - if err != nil { - return nil, err - } - - req.Header.Add("Content-Encoding", "snappy") - req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request") - req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0") - req.Header.Set("X-Scope-OrgID", c.orgID) - ctx, cancel := context.WithTimeout(context.Background(), c.timeout) defer cancel() - // Execute HTTP request - res, err := c.httpClient.Do(req.WithContext(ctx)) - if err != nil { - return nil, err - } - - defer res.Body.Close() - return res, nil + return c.remoteWriteAPI.Write(ctx, remoteapi.WriteV2MessageType, &writev2.Request{Symbols: symbols, Timeseries: timeseries}) } func getNameAndAttributes(ts prompb.TimeSeries) (string, map[string]any) { diff --git a/integration/remote_write_v2_test.go b/integration/remote_write_v2_test.go index 8ba26447f68..c295b6a62ab 100644 --- a/integration/remote_write_v2_test.go +++ b/integration/remote_write_v2_test.go @@ -5,11 +5,11 @@ package integration import ( "math/rand" - "net/http" "path" "testing" "time" + remoteapi "github.com/prometheus/client_golang/exp/api/remote" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/prompb" @@ -95,10 +95,9 @@ func TestIngesterRollingUpdate(t *testing.T) { // series push symbols1, series, expectedVector := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"}) - res, err := c.PushV2(symbols1, series) + stats, err := c.PushV2(symbols1, series) require.NoError(t, err) - require.Equal(t, 200, res.StatusCode) - testPushHeader(t, res.Header, "1", "0", "0") + testPushHeader(t, stats, 1, 0, 0) // sample result, err := c.Query("test_series", now) @@ -113,16 +112,14 @@ func TestIngesterRollingUpdate(t *testing.T) { // histogram histogramIdx := rand.Uint32() symbols2, histogramSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"}) - res, err = c.PushV2(symbols2, histogramSeries) + writeStats, err := c.PushV2(symbols2, histogramSeries) require.NoError(t, err) - require.Equal(t, 200, res.StatusCode) - testPushHeader(t, res.Header, "0", "1", "0") + testPushHeader(t, writeStats, 0, 1, 0) symbols3, histogramFloatSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"}) - res, err = c.PushV2(symbols3, histogramFloatSeries) + writeStats, err = c.PushV2(symbols3, histogramFloatSeries) require.NoError(t, err) - require.Equal(t, 200, res.StatusCode) - testPushHeader(t, res.Header, "0", "1", "0") + testPushHeader(t, writeStats, 0, 1, 0) testHistogramTimestamp := now.Add(blockRangePeriod * 2) expectedHistogram := tsdbutil.GenerateTestHistogram(int64(histogramIdx)) @@ -198,9 +195,9 @@ func TestIngest_SenderSendPRW2_DistributorNotAllowPRW2(t *testing.T) { // series push symbols1, series, _ := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"}) - res, err := c.PushV2(symbols1, series) - require.NoError(t, err) - require.Equal(t, 200, res.StatusCode) + _, err = c.PushV2(symbols1, series) + require.Error(t, err) + require.Contains(t, err.Error(), "sent v2 request; got 2xx, but PRW 2.0 response header statistics indicate 0 samples, 0 histograms and 0 exemplars were accepted") // sample result, err := c.Query("test_series", now) @@ -266,10 +263,9 @@ func TestIngest(t *testing.T) { // series push symbols1, series, expectedVector := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"}) - res, err := c.PushV2(symbols1, series) + writeStats, err := c.PushV2(symbols1, series) require.NoError(t, err) - require.Equal(t, 200, res.StatusCode) - testPushHeader(t, res.Header, "1", "0", "0") + testPushHeader(t, writeStats, 1, 0, 0) // sample result, err := c.Query("test_series", now) @@ -284,17 +280,15 @@ func TestIngest(t *testing.T) { // histogram histogramIdx := rand.Uint32() symbols2, histogramSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"}) - res, err = c.PushV2(symbols2, histogramSeries) + writeStats, err = c.PushV2(symbols2, histogramSeries) require.NoError(t, err) - require.Equal(t, 200, res.StatusCode) - testPushHeader(t, res.Header, "0", "1", "0") + testPushHeader(t, writeStats, 0, 1, 0) // float histogram symbols3, histogramFloatSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"}) - res, err = c.PushV2(symbols3, histogramFloatSeries) + writeStats, err = c.PushV2(symbols3, histogramFloatSeries) require.NoError(t, err) - require.Equal(t, 200, res.StatusCode) - testPushHeader(t, res.Header, "0", "1", "0") + testPushHeader(t, writeStats, 0, 1, 0) testHistogramTimestamp := now.Add(blockRangePeriod * 2) expectedHistogram := tsdbutil.GenerateTestHistogram(int64(histogramIdx)) @@ -379,10 +373,9 @@ func TestExemplar(t *testing.T) { }, } - res, err := c.PushV2(symbols, timeseries) + writeStats, err := c.PushV2(symbols, timeseries) require.NoError(t, err) - require.Equal(t, 200, res.StatusCode) - testPushHeader(t, res.Header, "1", "0", "1") + testPushHeader(t, writeStats, 1, 0, 1) start := time.Now().Add(-time.Minute) end := now.Add(time.Minute) @@ -451,14 +444,13 @@ func Test_WriteStatWithReplication(t *testing.T) { numSamples := 20 scrapeInterval := 30 * time.Second symbols, series := e2e.GenerateV2SeriesWithSamples("test_series", start, scrapeInterval, 0, numSamples, prompb.Label{Name: "job", Value: "test"}) - res, err := c.PushV2(symbols, []writev2.TimeSeries{series}) + writeStats, err := c.PushV2(symbols, []writev2.TimeSeries{series}) require.NoError(t, err) - require.Equal(t, 200, res.StatusCode) - testPushHeader(t, res.Header, "20", "0", "0") + testPushHeader(t, writeStats, 20, 0, 0) } -func testPushHeader(t *testing.T, header http.Header, expectedSamples, expectedHistogram, expectedExemplars string) { - require.Equal(t, expectedSamples, header.Get("X-Prometheus-Remote-Write-Samples-Written")) - require.Equal(t, expectedHistogram, header.Get("X-Prometheus-Remote-Write-Histograms-Written")) - require.Equal(t, expectedExemplars, header.Get("X-Prometheus-Remote-Write-Exemplars-Written")) +func testPushHeader(t *testing.T, stats remoteapi.WriteResponseStats, expectedSamples, expectedHistogram, expectedExemplars int) { + require.Equal(t, expectedSamples, stats.Samples) + require.Equal(t, expectedHistogram, stats.Histograms) + require.Equal(t, expectedExemplars, stats.Exemplars) }