From edbcd50d08743cb4f41800ace4cff11ecf565fde Mon Sep 17 00:00:00 2001 From: Filipe Augusto Lima de Souza Date: Tue, 17 Jun 2025 13:18:20 +0200 Subject: [PATCH 1/3] adding WriteProto that accepts direct prompb.WriteRequest --- .github/workflows/build.yaml | 6 +- .gitignore | 1 + client.go | 17 +++- client_test.go | 176 ++++++++++++++++++++++++++++++++++- go.mod | 18 ++-- go.sum | 47 ++++++---- 6 files changed, 234 insertions(+), 31 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 9ad5e06..a443a44 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -6,15 +6,15 @@ on: jobs: build: name: Build - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest steps: - name: Checkout uses: actions/checkout@v2 - - name: Setup Go 1.18 + - name: Setup Go 1.24.3 uses: actions/setup-go@v2 with: - go-version: 1.18 + go-version: 1.24.3 - name: Cache Go modules uses: actions/cache@v2 diff --git a/.gitignore b/.gitignore index 485dee6..d48c759 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ .idea +.vscode \ No newline at end of file diff --git a/client.go b/client.go index abc7b2e..60fc2b7 100644 --- a/client.go +++ b/client.go @@ -85,14 +85,25 @@ type WriteResponse struct { // Write sends HTTP requests to Prometheus Remote Write compatible API endpoint including Prometheus, Cortex and VictoriaMetrics. func (p *Client) Write(ctx context.Context, req *WriteRequest, options ...WriteOption) (*WriteResponse, error) { + return p.internalWrite(ctx, &prompb.WriteRequest{ + Timeseries: toProtoTimeSeries(req.TimeSeries), + }, options...) +} + +// WriteProto sends HTTP requests to Prometheus Remote Write compatible API endpoint including Prometheus, Cortex and VictoriaMetrics. +// The difference between Write and WriteProto is that WriteProto allows you to write the full prompb.WriteRequest, which supports all metrics. +func (p *Client) WriteProto(ctx context.Context, req *prompb.WriteRequest, options ...WriteOption) (*WriteResponse, error) { + return p.internalWrite(ctx, req, options...) +} + +// Write sends HTTP requests to Prometheus Remote Write compatible API endpoint including Prometheus, Cortex and VictoriaMetrics. +func (p *Client) internalWrite(ctx context.Context, req *prompb.WriteRequest, options ...WriteOption) (*WriteResponse, error) { opts := writeOptions{} for _, opt := range options { opt(&opts) } // Marshal proto and compress. - pbBytes, err := proto.Marshal(&prompb.WriteRequest{ - Timeseries: toProtoTimeSeries(req.TimeSeries), - }) + pbBytes, err := proto.Marshal(req) if err != nil { return nil, fmt.Errorf("promwrite: marshaling remote write request proto: %w", err) } diff --git a/client_test.go b/client_test.go index 068a65a..1f778b3 100644 --- a/client_test.go +++ b/client_test.go @@ -19,7 +19,7 @@ import ( "github.com/castai/promwrite" ) -func TestClient(t *testing.T) { +func TestWrite(t *testing.T) { t.Run("write with default options", func(t *testing.T) { r := require.New(t) @@ -202,6 +202,180 @@ func TestClient(t *testing.T) { }) } +func TestWriteProto(t *testing.T) { + t.Run("write with default options", func(t *testing.T) { + r := require.New(t) + + receivedWriteRequest := make(chan *prompb.WriteRequest, 1) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + b, _ := io.ReadAll(req.Body) + parsed, err := parseWriteRequest(b) + r.NoError(err) + receivedWriteRequest <- parsed + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + client := promwrite.NewClient(srv.URL) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + nowUnixMilli := time.Now().UTC().UnixMilli() + req := &prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + { + Name: "__name__", + Value: "metric_a", + }, + { + Name: "custom_label_a", + Value: "custom_value_a", + }, + }, + Samples: []prompb.Sample{ + {Timestamp: nowUnixMilli, Value: 123}, + }, + }, + { + Labels: []prompb.Label{ + { + Name: "__name__", + Value: "metric_b", + }, + }, + Samples: []prompb.Sample{ + {Timestamp: nowUnixMilli, Value: 456}, + }, + }, + }, + } + _, err := client.WriteProto(ctx, req) + r.NoError(err) + + res := <-receivedWriteRequest + r.Len(res.Timeseries, 2) + r.Equal(prompb.TimeSeries{ + Labels: []prompb.Label{ + { + Name: "__name__", + Value: "metric_a", + }, + { + Name: "custom_label_a", + Value: "custom_value_a", + }, + }, + Samples: []prompb.Sample{ + {Timestamp: nowUnixMilli, Value: 123}, + }, + }, res.Timeseries[0]) + r.Equal(prompb.TimeSeries{ + Labels: []prompb.Label{ + { + Name: "__name__", + Value: "metric_b", + }, + }, + Samples: []prompb.Sample{ + {Timestamp: nowUnixMilli, Value: 456}, + }, + }, res.Timeseries[1]) + }) + + t.Run("write with custom options", func(t *testing.T) { + r := require.New(t) + + receivedWriteRequest := make(chan *prompb.WriteRequest, 1) + receivedHeaders := make(chan http.Header, 1) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + b, _ := ioutil.ReadAll(req.Body) + parsed, err := parseWriteRequest(b) + r.NoError(err) + receivedWriteRequest <- parsed + receivedHeaders <- req.Header + w.WriteHeader(http.StatusAccepted) + })) + defer srv.Close() + + sentRequest := make(chan *http.Request, 1) + client := promwrite.NewClient( + srv.URL, + promwrite.HttpClient(&http.Client{ + Timeout: 5 * time.Second, + Transport: &customTestHttpClientTransport{ + reqChan: sentRequest, + next: http.DefaultTransport, + }, + }), + ) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + nowUnixMilli := time.Now().UTC().UnixMilli() + req := &prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + { + Name: "__name__", + Value: "metric_a", + }, + }, + Samples: []prompb.Sample{ + {Timestamp: nowUnixMilli, Value: 123}, + }, + }, + }, + } + _, err := client.WriteProto(ctx, req, promwrite.WriteHeaders(map[string]string{"X-Scope-OrgID": "tenant1"})) + r.NoError(err) + + res := <-receivedWriteRequest + r.Len(res.Timeseries, 1) + r.Equal(prompb.TimeSeries{ + Labels: []prompb.Label{ + { + Name: "__name__", + Value: "metric_a", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: nowUnixMilli, + Value: 123, + }, + }, + }, res.Timeseries[0]) + + sentReq := <-sentRequest + reqHeaders := sentReq.Header + recvHeaders := <-receivedHeaders + r.Equal("tenant1", reqHeaders.Get("X-Scope-OrgID")) + r.Equal("tenant1", recvHeaders.Get("X-Scope-OrgID")) + }) + + t.Run("write error", func(t *testing.T) { + r := require.New(t) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("ups")) + })) + defer srv.Close() + + client := promwrite.NewClient(srv.URL) + + _, err := client.WriteProto(context.Background(), &prompb.WriteRequest{}) + r.EqualError(err, "promwrite: expected status 200, got 400: ups") + var writeErr *promwrite.WriteError + r.True(errors.As(err, &writeErr)) + r.Equal(http.StatusBadRequest, writeErr.StatusCode()) + }) +} + func parseWriteRequest(src []byte) (*prompb.WriteRequest, error) { decompressed, err := snappy.Decode(nil, src) if err != nil { diff --git a/go.mod b/go.mod index 4f9784a..f512055 100644 --- a/go.mod +++ b/go.mod @@ -1,17 +1,23 @@ module github.com/castai/promwrite -go 1.18 +go 1.24.3 require ( github.com/gogo/protobuf v1.3.2 - github.com/golang/snappy v0.0.4 - github.com/prometheus/prometheus v0.40.3 - github.com/stretchr/testify v1.8.1 + github.com/golang/snappy v1.0.0 + github.com/prometheus/prometheus v0.304.1 + github.com/stretchr/testify v1.10.0 ) require ( - github.com/davecgh/go-spew v1.1.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect github.com/kr/pretty v0.3.1 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.63.0 // indirect + golang.org/x/text v0.24.0 // indirect + google.golang.org/protobuf v1.36.6 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 556bcdf..dcb85bf 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,16 @@ +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= -github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= +github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc h1:GN2Lv3MGO7AS6PrRoT6yV5+wkrOpcszoIsO4+4ds248= +github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc/go.mod h1:+JKpmjMGhpgPL+rXZ5nsZieVzvarn86asRlBg4uNGnk= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -13,19 +18,19 @@ github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3x github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/prometheus v0.40.3 h1:oMw1vVyrxHTigXAcFY6QHrGUnQEbKEOKo737cPgYBwY= -github.com/prometheus/prometheus v0.40.3/go.mod h1:/UhsWkOXkO11wqTW2Bx5YDOwRweSDcaFBlTIzFe7P0Y= -github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.63.0 h1:YR/EIY1o3mEFP/kZCD7iDMnLPlGyuU2Gb3HIcXnA98k= +github.com/prometheus/common v0.63.0/go.mod h1:VVFF/fBIoToEnWRVkYoXEkq3R3paCoxG9PXP74SnV18= +github.com/prometheus/prometheus v0.304.1 h1:e4kpJMb2Vh/PcR6LInake+ofcvFYHT+bCfmBvOkaZbY= +github.com/prometheus/prometheus v0.304.1/go.mod h1:ioGx2SGKTY+fLnJSQCdTHqARVldGNS8OlIe3kvp98so= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -45,6 +50,8 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= +golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= @@ -53,8 +60,12 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From 1074fe4c6688979cf7803bab8372077afc7d9a86 Mon Sep 17 00:00:00 2001 From: Filipe Augusto Lima de Souza Date: Tue, 17 Jun 2025 13:23:15 +0200 Subject: [PATCH 2/3] update git hub workflow --- .github/workflows/build.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index a443a44..207ca7c 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -17,7 +17,7 @@ jobs: go-version: 1.24.3 - name: Cache Go modules - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: ~/go/pkg/mod key: ${{ runner.os }}-build-${{ hashFiles('**/go.sum') }} From 030593e9424e37a4aefe0c4d930bc962858050ac Mon Sep 17 00:00:00 2001 From: Filipe Augusto Lima de Souza Date: Tue, 17 Jun 2025 13:29:01 +0200 Subject: [PATCH 3/3] change agent --- .github/workflows/build.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 207ca7c..e81285f 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -6,7 +6,7 @@ on: jobs: build: name: Build - runs-on: ubuntu-latest + runs-on: ubuntu-24.04 steps: - name: Checkout uses: actions/checkout@v2