Skip to content

Commit 0b9befb

Browse files
authored
Add OTLP Benchmark to measure push performance (#6977)
* Chore: add small optimization to otlp Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * Add OTLP benchmark to evaluate push performance Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * Delete unused otlp benchmark Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * change to b.Loop() Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> --------- Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent bd4773b commit 0b9befb

File tree

2 files changed

+167
-2
lines changed

2 files changed

+167
-2
lines changed

pkg/util/push/otlp.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distri
7373
}
7474

7575
// convert prompb to cortexpb TimeSeries
76-
tsList := []cortexpb.PreallocTimeseries(nil)
76+
tsList := make([]cortexpb.PreallocTimeseries, 0, len(promTsList))
7777
for _, v := range promTsList {
7878
tsList = append(tsList, cortexpb.PreallocTimeseries{TimeSeries: &cortexpb.TimeSeries{
7979
Labels: makeLabels(v.Labels),

pkg/util/push/otlp_test.go

Lines changed: 166 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -620,7 +620,7 @@ func getOTLPHttpRequest(otlpRequest *pmetricotlp.ExportRequest, contentType, enc
620620
return req, nil
621621
}
622622

623-
func BenchmarkOTLPWriteHandler(b *testing.B) {
623+
func BenchmarkOTLPWriteHandlerCompression(b *testing.B) {
624624
cfg := distributor.OTLPConfig{
625625
ConvertAllAttributes: false,
626626
DisableTargetInfo: false,
@@ -695,6 +695,90 @@ func BenchmarkOTLPWriteHandler(b *testing.B) {
695695
})
696696
}
697697

698+
func BenchmarkOTLPWriteHandlerPush(b *testing.B) {
699+
cfg := distributor.OTLPConfig{
700+
ConvertAllAttributes: false,
701+
DisableTargetInfo: false,
702+
}
703+
overrides := validation.NewOverrides(querier.DefaultLimitsConfig(), nil)
704+
705+
mockPushFunc := func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
706+
return &cortexpb.WriteResponse{}, nil
707+
}
708+
handler := OTLPHandler(1000000, overrides, cfg, nil, mockPushFunc)
709+
710+
tests := []struct {
711+
description string
712+
numSeries int
713+
samplesPerSeries int
714+
numHistograms int
715+
}{
716+
{
717+
numSeries: 1,
718+
samplesPerSeries: 10,
719+
numHistograms: 1,
720+
},
721+
{
722+
numSeries: 1,
723+
samplesPerSeries: 100,
724+
numHistograms: 1,
725+
},
726+
{
727+
numSeries: 1,
728+
samplesPerSeries: 1000,
729+
numHistograms: 1,
730+
},
731+
{
732+
numSeries: 1,
733+
samplesPerSeries: 1,
734+
numHistograms: 10,
735+
},
736+
{
737+
numSeries: 1,
738+
samplesPerSeries: 1,
739+
numHistograms: 100,
740+
},
741+
{
742+
numSeries: 1,
743+
samplesPerSeries: 1,
744+
numHistograms: 1000,
745+
},
746+
{
747+
numSeries: 10,
748+
samplesPerSeries: 1,
749+
numHistograms: 1,
750+
},
751+
{
752+
numSeries: 100,
753+
samplesPerSeries: 1,
754+
numHistograms: 1,
755+
},
756+
{
757+
numSeries: 1000,
758+
samplesPerSeries: 1,
759+
numHistograms: 1,
760+
},
761+
}
762+
763+
for _, test := range tests {
764+
b.Run(fmt.Sprintf("numSeries:%d, samplesPerSeries:%d, numHistograms:%d", test.numSeries, test.samplesPerSeries, test.numHistograms), func(b *testing.B) {
765+
exportRequest := generateOTLPWriteRequestWithSeries(test.numSeries, test.samplesPerSeries, test.numHistograms)
766+
req, err := getOTLPHttpRequest(&exportRequest, pbContentType, "gzip")
767+
require.NoError(b, err)
768+
769+
b.ReportAllocs()
770+
for b.Loop() {
771+
recorder := httptest.NewRecorder()
772+
handler.ServeHTTP(recorder, req)
773+
774+
resp := recorder.Result()
775+
require.Equal(b, http.StatusOK, resp.StatusCode)
776+
req.Body.(*resetReader).Reset()
777+
}
778+
})
779+
}
780+
}
781+
698782
func TestOTLPWriteHandler(t *testing.T) {
699783
cfg := distributor.OTLPConfig{
700784
ConvertAllAttributes: false,
@@ -800,6 +884,87 @@ func TestOTLPWriteHandler(t *testing.T) {
800884
}
801885
}
802886

887+
func generateOTLPWriteRequestWithSeries(numSeries, samplesPerSeries, numHistogram int) pmetricotlp.ExportRequest {
888+
d := pmetric.NewMetrics()
889+
890+
attributes := pcommon.NewMap()
891+
attributes.PutStr("label1", "value1")
892+
attributes.PutStr("label2", "value2")
893+
attributes.PutStr("label3", "value3")
894+
895+
for i := 0; i < numSeries; i++ {
896+
metricName := fmt.Sprintf("series_%d", i)
897+
metricUnit := fmt.Sprintf("unit_%d", i)
898+
metricDescription := fmt.Sprintf("description_%d", i)
899+
900+
resourceMetric := d.ResourceMetrics().AppendEmpty()
901+
resourceMetric.Resource().Attributes().PutStr("service.name", "test-service")
902+
resourceMetric.Resource().Attributes().PutStr("service.instance.id", "test-instance")
903+
resourceMetric.Resource().Attributes().PutStr("host.name", "test-host")
904+
905+
scopeMetric := resourceMetric.ScopeMetrics()
906+
metric := scopeMetric.AppendEmpty().Metrics().AppendEmpty()
907+
908+
// set metadata
909+
metric.SetName(metricName)
910+
metric.SetDescription(metricDescription)
911+
metric.SetUnit(metricUnit)
912+
metric.SetEmptyGauge()
913+
914+
for j := 0; j < samplesPerSeries; j++ {
915+
v := float64(j + i)
916+
ts := time.Now().Add(time.Second * 30 * time.Duration(samplesPerSeries-j+1))
917+
dataPoint := metric.Gauge().DataPoints().AppendEmpty()
918+
dataPoint.SetTimestamp(pcommon.NewTimestampFromTime(ts))
919+
dataPoint.SetDoubleValue(v)
920+
attributes.CopyTo(dataPoint.Attributes())
921+
922+
// exemplar
923+
exemplar := dataPoint.Exemplars().AppendEmpty()
924+
exemplar.SetTimestamp(pcommon.NewTimestampFromTime(ts))
925+
exemplar.SetDoubleValue(v)
926+
exemplar.SetSpanID(pcommon.SpanID{0, 1, 2, 3, 4, 5, 6, 7})
927+
exemplar.SetTraceID(pcommon.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15})
928+
}
929+
930+
for j := 0; j < numHistogram; j++ {
931+
ts := time.Now().Add(time.Second * 30 * time.Duration(numHistogram-j+1))
932+
// Generate One Histogram
933+
histogramMetric := scopeMetric.AppendEmpty().Metrics().AppendEmpty()
934+
histogramMetric.SetName(fmt.Sprintf("test-histogram_%d", j))
935+
histogramMetric.SetDescription(fmt.Sprintf("test-histogram-description_%d", j))
936+
histogramMetric.SetEmptyHistogram()
937+
histogramMetric.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
938+
939+
histogramDataPoint := histogramMetric.Histogram().DataPoints().AppendEmpty()
940+
histogramDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(ts))
941+
histogramDataPoint.ExplicitBounds().FromRaw([]float64{0.0, 1.0, 2.0, 3.0, 4.0, 5.0})
942+
histogramDataPoint.BucketCounts().FromRaw([]uint64{2, 2, 2, 2, 2, 2})
943+
histogramDataPoint.SetCount(10)
944+
histogramDataPoint.SetSum(30.0)
945+
attributes.CopyTo(histogramDataPoint.Attributes())
946+
947+
// Generate One Exponential-Histogram
948+
exponentialHistogramMetric := scopeMetric.AppendEmpty().Metrics().AppendEmpty()
949+
exponentialHistogramMetric.SetName(fmt.Sprintf("test-exponential-histogram_%d", j))
950+
exponentialHistogramMetric.SetDescription(fmt.Sprintf("test-exponential-histogram-description_%d", j))
951+
exponentialHistogramMetric.SetEmptyExponentialHistogram()
952+
exponentialHistogramMetric.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
953+
954+
exponentialHistogramDataPoint := exponentialHistogramMetric.ExponentialHistogram().DataPoints().AppendEmpty()
955+
exponentialHistogramDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(ts))
956+
exponentialHistogramDataPoint.SetScale(2.0)
957+
exponentialHistogramDataPoint.Positive().BucketCounts().FromRaw([]uint64{2, 2, 2, 2, 2})
958+
exponentialHistogramDataPoint.SetZeroCount(2)
959+
exponentialHistogramDataPoint.SetCount(10)
960+
exponentialHistogramDataPoint.SetSum(30.0)
961+
attributes.CopyTo(exponentialHistogramDataPoint.Attributes())
962+
}
963+
}
964+
965+
return pmetricotlp.NewExportRequestFromMetrics(d)
966+
}
967+
803968
func generateOTLPWriteRequest() pmetricotlp.ExportRequest {
804969
d := pmetric.NewMetrics()
805970

0 commit comments

Comments
 (0)