Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ on:
jobs:
build:
name: Build
runs-on: ubuntu-20.04
runs-on: ubuntu-24.04
steps:
- name: Checkout
uses: actions/checkout@v2

- name: Setup Go 1.18
- name: Setup Go 1.24.3
uses: actions/setup-go@v2
with:
go-version: 1.18
go-version: 1.24.3

- name: Cache Go modules
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-build-${{ hashFiles('**/go.sum') }}
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.idea
.vscode
17 changes: 14 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,25 @@ type WriteResponse struct {

// Write sends HTTP requests to Prometheus Remote Write compatible API endpoint including Prometheus, Cortex and VictoriaMetrics.
func (p *Client) Write(ctx context.Context, req *WriteRequest, options ...WriteOption) (*WriteResponse, error) {
return p.internalWrite(ctx, &prompb.WriteRequest{
Timeseries: toProtoTimeSeries(req.TimeSeries),
}, options...)
}

// WriteProto sends HTTP requests to Prometheus Remote Write compatible API endpoint including Prometheus, Cortex and VictoriaMetrics.
// The difference between Write and WriteProto is that WriteProto allows you to write the full prompb.WriteRequest, which supports all metrics.
func (p *Client) WriteProto(ctx context.Context, req *prompb.WriteRequest, options ...WriteOption) (*WriteResponse, error) {
return p.internalWrite(ctx, req, options...)
}

// Write sends HTTP requests to Prometheus Remote Write compatible API endpoint including Prometheus, Cortex and VictoriaMetrics.
func (p *Client) internalWrite(ctx context.Context, req *prompb.WriteRequest, options ...WriteOption) (*WriteResponse, error) {
opts := writeOptions{}
for _, opt := range options {
opt(&opts)
}
// Marshal proto and compress.
pbBytes, err := proto.Marshal(&prompb.WriteRequest{
Timeseries: toProtoTimeSeries(req.TimeSeries),
})
pbBytes, err := proto.Marshal(req)
if err != nil {
return nil, fmt.Errorf("promwrite: marshaling remote write request proto: %w", err)
}
Expand Down
176 changes: 175 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/castai/promwrite"
)

func TestClient(t *testing.T) {
func TestWrite(t *testing.T) {
t.Run("write with default options", func(t *testing.T) {
r := require.New(t)

Expand Down Expand Up @@ -202,6 +202,180 @@ func TestClient(t *testing.T) {
})
}

func TestWriteProto(t *testing.T) {
t.Run("write with default options", func(t *testing.T) {
r := require.New(t)

receivedWriteRequest := make(chan *prompb.WriteRequest, 1)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
b, _ := io.ReadAll(req.Body)
parsed, err := parseWriteRequest(b)
r.NoError(err)
receivedWriteRequest <- parsed
w.WriteHeader(http.StatusOK)
}))
defer srv.Close()

client := promwrite.NewClient(srv.URL)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

nowUnixMilli := time.Now().UTC().UnixMilli()
req := &prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "metric_a",
},
{
Name: "custom_label_a",
Value: "custom_value_a",
},
},
Samples: []prompb.Sample{
{Timestamp: nowUnixMilli, Value: 123},
},
},
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "metric_b",
},
},
Samples: []prompb.Sample{
{Timestamp: nowUnixMilli, Value: 456},
},
},
},
}
_, err := client.WriteProto(ctx, req)
r.NoError(err)

res := <-receivedWriteRequest
r.Len(res.Timeseries, 2)
r.Equal(prompb.TimeSeries{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "metric_a",
},
{
Name: "custom_label_a",
Value: "custom_value_a",
},
},
Samples: []prompb.Sample{
{Timestamp: nowUnixMilli, Value: 123},
},
}, res.Timeseries[0])
r.Equal(prompb.TimeSeries{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "metric_b",
},
},
Samples: []prompb.Sample{
{Timestamp: nowUnixMilli, Value: 456},
},
}, res.Timeseries[1])
})

t.Run("write with custom options", func(t *testing.T) {
r := require.New(t)

receivedWriteRequest := make(chan *prompb.WriteRequest, 1)
receivedHeaders := make(chan http.Header, 1)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
b, _ := ioutil.ReadAll(req.Body)
parsed, err := parseWriteRequest(b)
r.NoError(err)
receivedWriteRequest <- parsed
receivedHeaders <- req.Header
w.WriteHeader(http.StatusAccepted)
}))
defer srv.Close()

sentRequest := make(chan *http.Request, 1)
client := promwrite.NewClient(
srv.URL,
promwrite.HttpClient(&http.Client{
Timeout: 5 * time.Second,
Transport: &customTestHttpClientTransport{
reqChan: sentRequest,
next: http.DefaultTransport,
},
}),
)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

nowUnixMilli := time.Now().UTC().UnixMilli()
req := &prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "metric_a",
},
},
Samples: []prompb.Sample{
{Timestamp: nowUnixMilli, Value: 123},
},
},
},
}
_, err := client.WriteProto(ctx, req, promwrite.WriteHeaders(map[string]string{"X-Scope-OrgID": "tenant1"}))
r.NoError(err)

res := <-receivedWriteRequest
r.Len(res.Timeseries, 1)
r.Equal(prompb.TimeSeries{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "metric_a",
},
},
Samples: []prompb.Sample{
{
Timestamp: nowUnixMilli,
Value: 123,
},
},
}, res.Timeseries[0])

sentReq := <-sentRequest
reqHeaders := sentReq.Header
recvHeaders := <-receivedHeaders
r.Equal("tenant1", reqHeaders.Get("X-Scope-OrgID"))
r.Equal("tenant1", recvHeaders.Get("X-Scope-OrgID"))
})

t.Run("write error", func(t *testing.T) {
r := require.New(t)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("ups"))
}))
defer srv.Close()

client := promwrite.NewClient(srv.URL)

_, err := client.WriteProto(context.Background(), &prompb.WriteRequest{})
r.EqualError(err, "promwrite: expected status 200, got 400: ups")
var writeErr *promwrite.WriteError
r.True(errors.As(err, &writeErr))
r.Equal(http.StatusBadRequest, writeErr.StatusCode())
})
}

func parseWriteRequest(src []byte) (*prompb.WriteRequest, error) {
decompressed, err := snappy.Decode(nil, src)
if err != nil {
Expand Down
18 changes: 12 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
module github.com/castai/promwrite

go 1.18
go 1.24.3

require (
github.com/gogo/protobuf v1.3.2
github.com/golang/snappy v0.0.4
github.com/prometheus/prometheus v0.40.3
github.com/stretchr/testify v1.8.1
github.com/golang/snappy v1.0.0
github.com/prometheus/prometheus v0.304.1
github.com/stretchr/testify v1.10.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.63.0 // indirect
golang.org/x/text v0.24.0 // indirect
google.golang.org/protobuf v1.36.6 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
47 changes: 29 additions & 18 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,31 +1,36 @@
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs=
github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc h1:GN2Lv3MGO7AS6PrRoT6yV5+wkrOpcszoIsO4+4ds248=
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc/go.mod h1:+JKpmjMGhpgPL+rXZ5nsZieVzvarn86asRlBg4uNGnk=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/prometheus v0.40.3 h1:oMw1vVyrxHTigXAcFY6QHrGUnQEbKEOKo737cPgYBwY=
github.com/prometheus/prometheus v0.40.3/go.mod h1:/UhsWkOXkO11wqTW2Bx5YDOwRweSDcaFBlTIzFe7P0Y=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
github.com/prometheus/common v0.63.0 h1:YR/EIY1o3mEFP/kZCD7iDMnLPlGyuU2Gb3HIcXnA98k=
github.com/prometheus/common v0.63.0/go.mod h1:VVFF/fBIoToEnWRVkYoXEkq3R3paCoxG9PXP74SnV18=
github.com/prometheus/prometheus v0.304.1 h1:e4kpJMb2Vh/PcR6LInake+ofcvFYHT+bCfmBvOkaZbY=
github.com/prometheus/prometheus v0.304.1/go.mod h1:ioGx2SGKTY+fLnJSQCdTHqARVldGNS8OlIe3kvp98so=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand All @@ -45,6 +50,8 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
Expand All @@ -53,8 +60,12 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=