diff --git a/zipper/helper/requests.go b/zipper/helper/requests.go index a1cf865b4..9ebb1eac8 100644 --- a/zipper/helper/requests.go +++ b/zipper/helper/requests.go @@ -61,9 +61,9 @@ func (c *HttpQuery) pickServer(logger *zap.Logger) string { return srv } -func (c *HttpQuery) doRequest(ctx context.Context, logger *zap.Logger, server, uri string, r types.Request) (*ServerResponse, merry.Error) { +func (c *HttpQuery) doHttpRequest(ctx context.Context, logger *zap.Logger, server, uri string, r types.Request) (*http.Response, error) { logger = logger.With( - zap.String("function", "HttpQuery.doRequest"), + zap.String("function", "HttpQuery.doHttpRequest"), ) u, err := url.Parse(server + uri) @@ -115,7 +115,12 @@ func (c *HttpQuery) doRequest(ctx context.Context, logger *zap.Logger, server, u if r != nil { logger = logger.With(zap.Any("payloadData", r.LogInfo())) } - resp, err := c.client.Do(req.WithContext(ctx)) + + return c.client.Do(req.WithContext(ctx)) +} + +func (c *HttpQuery) doRequest(ctx context.Context, logger *zap.Logger, server, uri string, r types.Request) (*ServerResponse, merry.Error) { + resp, err := c.doHttpRequest(ctx, logger, server, uri, r) if err != nil { logger.Debug("error fetching result", zap.Error(err), @@ -131,6 +136,7 @@ func (c *HttpQuery) doRequest(ctx context.Context, logger *zap.Logger, server, u return &ServerResponse{Server: server}, nil } + var body []byte body, err = ioutil.ReadAll(resp.Body) if err != nil { logger.Debug("error reading body", @@ -146,6 +152,31 @@ func (c *HttpQuery) doRequest(ctx context.Context, logger *zap.Logger, server, u return &ServerResponse{Server: server, Response: body}, nil } +func (c *HttpQuery) DoHttpQuery(ctx context.Context, logger *zap.Logger, uri string, r types.Request) (*http.Response, merry.Error) { + maxTries := c.maxTries + if len(c.servers) > maxTries { + maxTries = len(c.servers) + } + + e := types.ErrFailedToFetch.WithValue("uri", uri) + for try := 0; try < maxTries; try++ { + server := c.pickServer(logger) + res, err := c.doHttpRequest(ctx, logger, server, uri, r) + if err != nil { + logger.Debug("have errors", + zap.Error(err), + ) + + e = e.WithCause(err) + continue + } + + return res, nil + } + + return nil, types.ErrMaxTriesExceeded.WithCause(e) +} + func (c *HttpQuery) DoQuery(ctx context.Context, logger *zap.Logger, uri string, r types.Request) (*ServerResponse, merry.Error) { maxTries := c.maxTries if len(c.servers) > maxTries { diff --git a/zipper/protocols/prometheus/helpers/json.go b/zipper/protocols/prometheus/helpers/json.go new file mode 100644 index 000000000..836a852fd --- /dev/null +++ b/zipper/protocols/prometheus/helpers/json.go @@ -0,0 +1,75 @@ +package helpers + +import ( + "encoding/json" + "io" + + "github.com/go-graphite/carbonapi/zipper/protocols/prometheus/types" +) + +// PrometheusFindResponseFromJSONStream decodes responses from /api/v1/series using a streaming approach to save memory when responses are large +func PrometheusFindResponseFromJSONStream(stream io.Reader) (response types.PrometheusFindResponse, err error) { + decoder := json.NewDecoder(stream) + var ( + fill func(json.Token, interface{}) + token json.Token + property interface{} + processData bool + ) + for decoder.More() { + // we are decoding a data value + if processData { + var v map[string]string + err = decoder.Decode(&v) + if err != nil { + processData = false + } else { + response.Data = append(response.Data, v) + continue + } + } + token, err = decoder.Token() + if err == io.EOF { + err = nil + return + } + if err != nil { + return + } + switch v := token.(type) { + case json.Delim: + switch v { + case ']': + if processData { + processData = false + } + } + case string: // a key or a string value + if fill != nil { // value + fill(token, property) + fill = nil + break + } + // key + switch token.(string) { + case "status": + property = &response.Status + case "errorType": + property = &response.ErrorType + case "error": + property = &response.Error + case "data": + _, err = decoder.Token() // move after the first '[' + if err != nil { + return + } + processData = true + continue + } + fill = func(t json.Token, p interface{}) { + *(p.(*string)) = t.(string) + } + } + } + return +} diff --git a/zipper/protocols/prometheus/helpers/json_test.go b/zipper/protocols/prometheus/helpers/json_test.go new file mode 100644 index 000000000..21d46495a --- /dev/null +++ b/zipper/protocols/prometheus/helpers/json_test.go @@ -0,0 +1,24 @@ +package helpers_test + +import ( + "fmt" + "strings" + "testing" + + "github.com/go-graphite/carbonapi/zipper/protocols/prometheus/helpers" +) + +func TestPrometheusFindResponseFromJSONStream(t *testing.T) { + str := `{"status":"success","data":[{"__name__":"go_gc_duration_seconds","instance":"localhost:9090","job":"prometheus","quantile":"0"},{"__name__":"go_gc_duration_seconds","instance":"localhost:9090","job":"prometheus","quantile":"0.25"},{"__name__":"go_gc_duration_seconds","instance":"localhost:9090","job":"prometheus","quantile":"0.5"},{"__name__":"go_gc_duration_seconds","instance":"localhost:9090","job":"prometheus","quantile":"0.75"},{"__name__":"go_gc_duration_seconds","instance":"localhost:9090","job":"prometheus","quantile":"1"}]}` + stream := strings.NewReader(str) + response, err := helpers.PrometheusFindResponseFromJSONStream(stream) + if err != nil { + t.Fatal(err) + } + if len(response.Data) != 5 { + t.Fatal(fmt.Sprintf("bad len %d, expected 5", len(response.Data))) + } + if response.Data[0]["__name__"] != "go_gc_duration_seconds" { + t.Fatal(fmt.Sprintf("unexpected decoded data")) + } +} diff --git a/zipper/protocols/prometheus/prometheus_group.go b/zipper/protocols/prometheus/prometheus_group.go index 3cb8d975e..e12bcb08f 100644 --- a/zipper/protocols/prometheus/prometheus_group.go +++ b/zipper/protocols/prometheus/prometheus_group.go @@ -387,7 +387,7 @@ func (c *PrometheusGroup) Find(ctx context.Context, request *protov3.MultiGlobRe rewrite.RawQuery = v.Encode() stats.FindRequests += 1 - res, err := c.httpQuery.DoQuery(ctx, logger, rewrite.RequestURI(), nil) + res, err := c.httpQuery.DoHttpQuery(ctx, logger, rewrite.RequestURI(), nil) if err != nil { stats.FindErrors += 1 if merry.Is(err, types.ErrTimeoutExceeded) { @@ -402,25 +402,26 @@ func (c *PrometheusGroup) Find(ctx context.Context, request *protov3.MultiGlobRe continue } - var pr prometheusTypes.PrometheusFindResponse - - err2 := json.Unmarshal(res.Response, &pr) - if err2 != nil { + var prometheusFindResponse prometheusTypes.PrometheusFindResponse + var decodeError error + prometheusFindResponse, decodeError = helpers.PrometheusFindResponseFromJSONStream(res.Body) + _ = res.Body.Close() + if decodeError != nil { stats.FindErrors += 1 if e == nil { - e = err + e = merry.New(decodeError.Error()) } else { e = e.WithCause(err) } continue } - if pr.Status != "success" { + if prometheusFindResponse.Status != "success" { stats.FindErrors += 1 if e == nil { - e = types.ErrFailedToFetch.WithMessage(pr.Error).WithValue("query", matchQuery).WithValue("error_type", pr.ErrorType).WithValue("error", pr.Error) + e = types.ErrFailedToFetch.WithMessage(prometheusFindResponse.Error).WithValue("query", matchQuery).WithValue("error_type", prometheusFindResponse.ErrorType).WithValue("error", prometheusFindResponse.Error) } else { - e = e.WithCause(err2).WithValue("query", matchQuery).WithValue("error_type", pr.ErrorType).WithValue("error", pr.Error) + e = e.WithCause(decodeError).WithValue("query", matchQuery).WithValue("error_type", prometheusFindResponse.ErrorType).WithValue("error", prometheusFindResponse.Error) } continue } @@ -430,7 +431,7 @@ func (c *PrometheusGroup) Find(ctx context.Context, request *protov3.MultiGlobRe Name: query, Matches: make([]protov3.GlobMatch, 0), } - for _, m := range pr.Data { + for _, m := range prometheusFindResponse.Data { name, ok := m["__name__"] if !ok { continue @@ -465,6 +466,9 @@ func (c *PrometheusGroup) Find(ctx context.Context, request *protov3.MultiGlobRe ) return &r, stats, e } + + logger.Info("streaming") + return &r, stats, nil }