Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions zipper/helper/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func (c *HttpQuery) pickServer(logger *zap.Logger) string {
return srv
}

func (c *HttpQuery) doRequest(ctx context.Context, logger *zap.Logger, server, uri string, r types.Request) (*ServerResponse, merry.Error) {
func (c *HttpQuery) doHttpRequest(ctx context.Context, logger *zap.Logger, server, uri string, r types.Request) (*http.Response, error) {
logger = logger.With(
zap.String("function", "HttpQuery.doRequest"),
zap.String("function", "HttpQuery.doHttpRequest"),
)

u, err := url.Parse(server + uri)
Expand Down Expand Up @@ -115,7 +115,12 @@ func (c *HttpQuery) doRequest(ctx context.Context, logger *zap.Logger, server, u
if r != nil {
logger = logger.With(zap.Any("payloadData", r.LogInfo()))
}
resp, err := c.client.Do(req.WithContext(ctx))

return c.client.Do(req.WithContext(ctx))
}

func (c *HttpQuery) doRequest(ctx context.Context, logger *zap.Logger, server, uri string, r types.Request) (*ServerResponse, merry.Error) {
resp, err := c.doHttpRequest(ctx, logger, server, uri, r)
if err != nil {
logger.Debug("error fetching result",
zap.Error(err),
Expand All @@ -131,6 +136,7 @@ func (c *HttpQuery) doRequest(ctx context.Context, logger *zap.Logger, server, u
return &ServerResponse{Server: server}, nil
}

var body []byte
body, err = ioutil.ReadAll(resp.Body)
if err != nil {
logger.Debug("error reading body",
Expand All @@ -146,6 +152,31 @@ func (c *HttpQuery) doRequest(ctx context.Context, logger *zap.Logger, server, u
return &ServerResponse{Server: server, Response: body}, nil
}

func (c *HttpQuery) DoHttpQuery(ctx context.Context, logger *zap.Logger, uri string, r types.Request) (*http.Response, merry.Error) {
maxTries := c.maxTries
if len(c.servers) > maxTries {
maxTries = len(c.servers)
}

e := types.ErrFailedToFetch.WithValue("uri", uri)
for try := 0; try < maxTries; try++ {
server := c.pickServer(logger)
res, err := c.doHttpRequest(ctx, logger, server, uri, r)
if err != nil {
logger.Debug("have errors",
zap.Error(err),
)

e = e.WithCause(err)
continue
}

return res, nil
}

return nil, types.ErrMaxTriesExceeded.WithCause(e)
}

func (c *HttpQuery) DoQuery(ctx context.Context, logger *zap.Logger, uri string, r types.Request) (*ServerResponse, merry.Error) {
maxTries := c.maxTries
if len(c.servers) > maxTries {
Expand Down
75 changes: 75 additions & 0 deletions zipper/protocols/prometheus/helpers/json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package helpers

import (
"encoding/json"
"io"

"github.com/go-graphite/carbonapi/zipper/protocols/prometheus/types"
)

// PrometheusFindResponseFromJSONStream decodes responses from /api/v1/series using a streaming approach to save memory when responses are large
func PrometheusFindResponseFromJSONStream(stream io.Reader) (response types.PrometheusFindResponse, err error) {
decoder := json.NewDecoder(stream)
var (
fill func(json.Token, interface{})
token json.Token
property interface{}
processData bool
)
for decoder.More() {
// we are decoding a data value
if processData {
var v map[string]string
err = decoder.Decode(&v)
if err != nil {
processData = false
} else {
response.Data = append(response.Data, v)
continue
}
}
token, err = decoder.Token()
if err == io.EOF {
err = nil
return
}
if err != nil {
return
}
switch v := token.(type) {
case json.Delim:
switch v {
case ']':
if processData {
processData = false
}
}
case string: // a key or a string value
if fill != nil { // value
fill(token, property)
fill = nil
break
}
// key
switch token.(string) {
case "status":
property = &response.Status
case "errorType":
property = &response.ErrorType
case "error":
property = &response.Error
case "data":
_, err = decoder.Token() // move after the first '['
if err != nil {
return
}
processData = true
continue
}
fill = func(t json.Token, p interface{}) {
*(p.(*string)) = t.(string)
}
}
}
return
}
24 changes: 24 additions & 0 deletions zipper/protocols/prometheus/helpers/json_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package helpers_test

import (
"fmt"
"strings"
"testing"

"github.com/go-graphite/carbonapi/zipper/protocols/prometheus/helpers"
)

func TestPrometheusFindResponseFromJSONStream(t *testing.T) {
str := `{"status":"success","data":[{"__name__":"go_gc_duration_seconds","instance":"localhost:9090","job":"prometheus","quantile":"0"},{"__name__":"go_gc_duration_seconds","instance":"localhost:9090","job":"prometheus","quantile":"0.25"},{"__name__":"go_gc_duration_seconds","instance":"localhost:9090","job":"prometheus","quantile":"0.5"},{"__name__":"go_gc_duration_seconds","instance":"localhost:9090","job":"prometheus","quantile":"0.75"},{"__name__":"go_gc_duration_seconds","instance":"localhost:9090","job":"prometheus","quantile":"1"}]}`
stream := strings.NewReader(str)
response, err := helpers.PrometheusFindResponseFromJSONStream(stream)
if err != nil {
t.Fatal(err)
}
if len(response.Data) != 5 {
t.Fatal(fmt.Sprintf("bad len %d, expected 5", len(response.Data)))
}
if response.Data[0]["__name__"] != "go_gc_duration_seconds" {
t.Fatal(fmt.Sprintf("unexpected decoded data"))
}
}
24 changes: 14 additions & 10 deletions zipper/protocols/prometheus/prometheus_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func (c *PrometheusGroup) Find(ctx context.Context, request *protov3.MultiGlobRe

rewrite.RawQuery = v.Encode()
stats.FindRequests += 1
res, err := c.httpQuery.DoQuery(ctx, logger, rewrite.RequestURI(), nil)
res, err := c.httpQuery.DoHttpQuery(ctx, logger, rewrite.RequestURI(), nil)
if err != nil {
stats.FindErrors += 1
if merry.Is(err, types.ErrTimeoutExceeded) {
Expand All @@ -402,25 +402,26 @@ func (c *PrometheusGroup) Find(ctx context.Context, request *protov3.MultiGlobRe
continue
}

var pr prometheusTypes.PrometheusFindResponse

err2 := json.Unmarshal(res.Response, &pr)
if err2 != nil {
var prometheusFindResponse prometheusTypes.PrometheusFindResponse
var decodeError error
prometheusFindResponse, decodeError = helpers.PrometheusFindResponseFromJSONStream(res.Body)
_ = res.Body.Close()
if decodeError != nil {
stats.FindErrors += 1
if e == nil {
e = err
e = merry.New(decodeError.Error())
} else {
e = e.WithCause(err)
}
continue
}

if pr.Status != "success" {
if prometheusFindResponse.Status != "success" {
stats.FindErrors += 1
if e == nil {
e = types.ErrFailedToFetch.WithMessage(pr.Error).WithValue("query", matchQuery).WithValue("error_type", pr.ErrorType).WithValue("error", pr.Error)
e = types.ErrFailedToFetch.WithMessage(prometheusFindResponse.Error).WithValue("query", matchQuery).WithValue("error_type", prometheusFindResponse.ErrorType).WithValue("error", prometheusFindResponse.Error)
} else {
e = e.WithCause(err2).WithValue("query", matchQuery).WithValue("error_type", pr.ErrorType).WithValue("error", pr.Error)
e = e.WithCause(decodeError).WithValue("query", matchQuery).WithValue("error_type", prometheusFindResponse.ErrorType).WithValue("error", prometheusFindResponse.Error)
}
continue
}
Expand All @@ -430,7 +431,7 @@ func (c *PrometheusGroup) Find(ctx context.Context, request *protov3.MultiGlobRe
Name: query,
Matches: make([]protov3.GlobMatch, 0),
}
for _, m := range pr.Data {
for _, m := range prometheusFindResponse.Data {
name, ok := m["__name__"]
if !ok {
continue
Expand Down Expand Up @@ -465,6 +466,9 @@ func (c *PrometheusGroup) Find(ctx context.Context, request *protov3.MultiGlobRe
)
return &r, stats, e
}

logger.Info("streaming")

return &r, stats, nil
}

Expand Down