Skip to content

Commit fd54015

Browse files
committed
Add index updater to alertmanager/ruler
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent 1833e69 commit fd54015

File tree

23 files changed

+344
-48
lines changed

23 files changed

+344
-48
lines changed

docs/blocks-storage/querier.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1927,6 +1927,11 @@ blocks_storage:
19271927
# CLI flag: -blocks-storage.users-scanner.user-index.max-stale-period
19281928
[max_stale_period: <duration> | default = 1h]
19291929

1930+
# How frequently user index file is updated, it only take effect when user
1931+
# scan stratehy is user_index.
1932+
# CLI flag: -blocks-storage.users-scanner.user-index.cleanup-interval
1933+
[clean_up_interval: <duration> | default = 15m]
1934+
19301935
# TTL of the cached users. 0 disables caching and relies on caching at
19311936
# bucket client level.
19321937
# CLI flag: -blocks-storage.users-scanner.cache-ttl

docs/blocks-storage/store-gateway.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2013,6 +2013,11 @@ blocks_storage:
20132013
# CLI flag: -blocks-storage.users-scanner.user-index.max-stale-period
20142014
[max_stale_period: <duration> | default = 1h]
20152015

2016+
# How frequently user index file is updated, it only take effect when user
2017+
# scan stratehy is user_index.
2018+
# CLI flag: -blocks-storage.users-scanner.user-index.cleanup-interval
2019+
[clean_up_interval: <duration> | default = 15m]
2020+
20162021
# TTL of the cached users. 0 disables caching and relies on caching at
20172022
# bucket client level.
20182023
# CLI flag: -blocks-storage.users-scanner.cache-ttl

docs/configuration/config-file-reference.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -992,6 +992,11 @@ users_scanner:
992992
# CLI flag: -alertmanager-storage.users-scanner.user-index.max-stale-period
993993
[max_stale_period: <duration> | default = 1h]
994994

995+
# How frequently user index file is updated, it only take effect when user
996+
# scan stratehy is user_index.
997+
# CLI flag: -alertmanager-storage.users-scanner.user-index.cleanup-interval
998+
[clean_up_interval: <duration> | default = 15m]
999+
9951000
# TTL of the cached users. 0 disables caching and relies on caching at bucket
9961001
# client level.
9971002
# CLI flag: -alertmanager-storage.users-scanner.cache-ttl
@@ -2608,6 +2613,11 @@ users_scanner:
26082613
# CLI flag: -blocks-storage.users-scanner.user-index.max-stale-period
26092614
[max_stale_period: <duration> | default = 1h]
26102615

2616+
# How frequently user index file is updated, it only take effect when user
2617+
# scan stratehy is user_index.
2618+
# CLI flag: -blocks-storage.users-scanner.user-index.cleanup-interval
2619+
[clean_up_interval: <duration> | default = 15m]
2620+
26112621
# TTL of the cached users. 0 disables caching and relies on caching at bucket
26122622
# client level.
26132623
# CLI flag: -blocks-storage.users-scanner.cache-ttl
@@ -5796,6 +5806,11 @@ users_scanner:
57965806
# CLI flag: -ruler-storage.users-scanner.user-index.max-stale-period
57975807
[max_stale_period: <duration> | default = 1h]
57985808

5809+
# How frequently user index file is updated, it only take effect when user
5810+
# scan stratehy is user_index.
5811+
# CLI flag: -ruler-storage.users-scanner.user-index.cleanup-interval
5812+
[clean_up_interval: <duration> | default = 15m]
5813+
57995814
# TTL of the cached users. 0 disables caching and relies on caching at bucket
58005815
# client level.
58015816
# CLI flag: -ruler-storage.users-scanner.cache-ttl

integration/alertmanager_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,41 @@ func TestAlertmanager(t *testing.T) {
6969
assertServiceMetricsPrefixes(t, AlertManager, alertmanager)
7070
}
7171

72+
func TestAlertmanagerWithUserIndexUpdater(t *testing.T) {
73+
s, err := e2e.NewScenario(networkName)
74+
require.NoError(t, err)
75+
defer s.Close()
76+
77+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs/user-1.yaml", []byte(cortexAlertmanagerUserConfigYaml)))
78+
79+
// Start dependencies.
80+
consul := e2edb.NewConsul()
81+
minio := e2edb.NewMinio(9000, alertsBucketName)
82+
require.NoError(t, s.StartAndWaitReady(consul, minio))
83+
84+
baseFlags := mergeFlags(AlertmanagerFlags(), AlertmanagerS3Flags())
85+
flags := mergeFlags(baseFlags, AlertmanagerShardingFlags(consul.NetworkHTTPEndpoint(), 1), map[string]string{
86+
"-alertmanager-storage.users-scanner.strategy": "user_index",
87+
"-alertmanager-storage.users-scanner.user-index.cleanup-interval": "15s",
88+
"-alertmanager.configs.poll-interval": "5s",
89+
})
90+
91+
am := e2ecortex.NewAlertmanager(
92+
"alertmanager",
93+
flags,
94+
"",
95+
)
96+
97+
require.NoError(t, s.StartAndWaitReady(am))
98+
// To make sure user index file is updated/scanned
99+
require.NoError(t, am.WaitSumMetricsWithOptions(e2e.Greater(float64(0)), []string{"cortex_user_index_last_successful_update_timestamp_seconds"}),
100+
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "alertmanager")),
101+
)
102+
require.NoError(t, am.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(float64(1)), []string{"cortex_user_index_scan_succeeded_total"}),
103+
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "alertmanager")),
104+
)
105+
}
106+
72107
func TestAlertmanagerStoreAPI(t *testing.T) {
73108
s, err := e2e.NewScenario(networkName)
74109
require.NoError(t, err)

integration/ruler_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,65 @@ func TestRulerAPI(t *testing.T) {
143143
}
144144
}
145145

146+
func TestRulerWithUserIndexUpdater(t *testing.T) {
147+
s, err := e2e.NewScenario(networkName)
148+
require.NoError(t, err)
149+
defer s.Close()
150+
151+
// Start dependencies.
152+
consul := e2edb.NewConsul()
153+
minio := e2edb.NewMinio(9000, rulestoreBucketName)
154+
require.NoError(t, s.StartAndWaitReady(consul, minio))
155+
156+
// Configure the ruler.
157+
rulerFlags := mergeFlags(
158+
BlocksStorageFlags(),
159+
RulerFlags(),
160+
RulerShardingFlags(consul.NetworkHTTPEndpoint()),
161+
map[string]string{
162+
"-ruler.sharding-strategy": "shuffle-sharding",
163+
"-ruler-storage.users-scanner.strategy": "user_index",
164+
"-ruler-storage.users-scanner.user-index.cleanup-interval": "15s",
165+
"-ruler.tenant-shard-size": "1",
166+
// Since we're not going to run any rule, we don't need the
167+
// store-gateway to be configured to a valid address.
168+
"-querier.store-gateway-addresses": "localhost:12345",
169+
// Enable the bucket index so we can skip the initial bucket scan.
170+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
171+
"-ruler.poll-interval": "2s",
172+
"-log.level": "info",
173+
},
174+
)
175+
176+
ruler := e2ecortex.NewRuler(
177+
"ruler",
178+
consul.NetworkHTTPEndpoint(),
179+
rulerFlags,
180+
"",
181+
)
182+
183+
require.NoError(t, s.StartAndWaitReady(ruler))
184+
185+
// Create a client with the ruler address configured
186+
c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), "user-1")
187+
require.NoError(t, err)
188+
189+
ruleGroup := createTestRuleGroup(t)
190+
ns := "ns"
191+
192+
// Set the rule group into the ruler
193+
require.NoError(t, c.SetRuleGroup(ruleGroup, ns))
194+
195+
// To make sure user index file is updated/scanned
196+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Greater(float64(0)), []string{"cortex_user_index_last_successful_update_timestamp_seconds"}),
197+
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "ruler")),
198+
)
199+
200+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(float64(1)), []string{"cortex_user_index_scan_succeeded_total"}),
201+
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "ruler")),
202+
)
203+
}
204+
146205
func TestRulerAPISingleBinary(t *testing.T) {
147206
s, err := e2e.NewScenario(networkName)
148207
require.NoError(t, err)

pkg/alertmanager/alertstore/bucketclient/bucket_client.go

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,25 +48,43 @@ type BucketAlertStore struct {
4848
cfgProvider bucket.TenantConfigProvider
4949
logger log.Logger
5050

51-
usersScanner users.Scanner
51+
usersScanner users.Scanner
52+
userIndexUpdater *users.UserIndexUpdater
5253
}
5354

5455
func NewBucketAlertStore(bkt objstore.InstrumentedBucket, userScannerCfg users.UsersScannerConfig, cfgProvider bucket.TenantConfigProvider, logger log.Logger, reg prometheus.Registerer) (*BucketAlertStore, error) {
5556
alertBucket := bucket.NewPrefixedBucketClient(bkt, alertsPrefix)
5657

57-
usersScanner, err := users.NewScanner(userScannerCfg, alertBucket, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "alertmanager"}, reg))
58+
regWithComponent := extprom.WrapRegistererWith(prometheus.Labels{"component": "alertmanager"}, reg)
59+
usersScanner, err := users.NewScanner(userScannerCfg, alertBucket, logger, regWithComponent)
5860
if err != nil {
5961
return nil, errors.Wrap(err, "unable to initialize alertmanager users scanner")
6062
}
63+
64+
var userIndexUpdater *users.UserIndexUpdater
65+
if userScannerCfg.Strategy == users.UserScanStrategyUserIndex {
66+
// We hardcode strategy to be list so can ignore error.
67+
baseScanner, _ := users.NewScanner(users.UsersScannerConfig{
68+
Strategy: users.UserScanStrategyList,
69+
}, alertBucket, logger, regWithComponent)
70+
userIndexUpdater = users.NewUserIndexUpdater(alertBucket, userScannerCfg.CleanUpInterval, baseScanner, regWithComponent)
71+
}
72+
6173
return &BucketAlertStore{
62-
usersScanner: usersScanner,
63-
alertsBucket: alertBucket,
64-
amBucket: bucket.NewPrefixedBucketClient(bkt, alertmanagerPrefix),
65-
cfgProvider: cfgProvider,
66-
logger: logger,
74+
alertsBucket: alertBucket,
75+
amBucket: bucket.NewPrefixedBucketClient(bkt, alertmanagerPrefix),
76+
cfgProvider: cfgProvider,
77+
logger: logger,
78+
usersScanner: usersScanner,
79+
userIndexUpdater: userIndexUpdater,
6780
}, nil
6881
}
6982

83+
// GetUserIndexUpdater implements alertstore.AlertStore.
84+
func (s *BucketAlertStore) GetUserIndexUpdater() *users.UserIndexUpdater {
85+
return s.userIndexUpdater
86+
}
87+
7088
// ListAllUsers implements alertstore.AlertStore.
7189
func (s *BucketAlertStore) ListAllUsers(ctx context.Context) ([]string, error) {
7290
active, deleting, _, err := s.usersScanner.ScanUsers(ctx)

pkg/alertmanager/alertstore/configdb/store.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
88
"github.com/cortexproject/cortex/pkg/configs/client"
99
"github.com/cortexproject/cortex/pkg/configs/userconfig"
10+
"github.com/cortexproject/cortex/pkg/util/users"
1011
)
1112

1213
const (
@@ -34,6 +35,11 @@ func NewStore(c client.Client) *Store {
3435
}
3536
}
3637

38+
// GetUserIndexUpdater implements alertstore.AlertStore.
39+
func (c *Store) GetUserIndexUpdater() *users.UserIndexUpdater {
40+
return nil
41+
}
42+
3743
// ListAllUsers implements alertstore.AlertStore.
3844
func (c *Store) ListAllUsers(ctx context.Context) ([]string, error) {
3945
configs, err := c.reloadConfigs(ctx)

pkg/alertmanager/alertstore/local/store.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/prometheus/alertmanager/config"
1212

1313
"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
14+
"github.com/cortexproject/cortex/pkg/util/users"
1415
)
1516

1617
const (
@@ -43,6 +44,11 @@ func NewStore(cfg StoreConfig) (*Store, error) {
4344
return &Store{cfg}, nil
4445
}
4546

47+
// GetUserIndexUpdater implements alertstore.AlertStore.
48+
func (f *Store) GetUserIndexUpdater() *users.UserIndexUpdater {
49+
return nil
50+
}
51+
4652
// ListAllUsers implements alertstore.AlertStore.
4753
func (f *Store) ListAllUsers(_ context.Context) ([]string, error) {
4854
configs, err := f.reloadConfigs()

pkg/alertmanager/alertstore/store.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore/local"
1616
"github.com/cortexproject/cortex/pkg/configs/client"
1717
"github.com/cortexproject/cortex/pkg/storage/bucket"
18+
"github.com/cortexproject/cortex/pkg/util/users"
1819
)
1920

2021
var (
@@ -53,6 +54,9 @@ type AlertStore interface {
5354
// DeleteFullState deletes the alertmanager state for an user.
5455
// If state for the user doesn't exist, no error is reported.
5556
DeleteFullState(ctx context.Context, user string) error
57+
58+
// GetUserIndexUpdater is getter for UserIndexUpdater
59+
GetUserIndexUpdater() *users.UserIndexUpdater
5660
}
5761

5862
// NewAlertStore returns a alertmanager store backend client based on the provided cfg.

pkg/alertmanager/distributor.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package alertmanager
22

33
import (
44
"context"
5-
"hash/fnv"
65
"io"
76
"math/rand"
87
"net/http"
@@ -161,7 +160,7 @@ func (d *Distributor) doQuorum(userID string, w http.ResponseWriter, r *http.Req
161160
var responses []*httpgrpc.HTTPResponse
162161
var responsesMtx sync.Mutex
163162
grpcHeaders := httpToHttpgrpcHeaders(r.Header)
164-
err = ring.DoBatch(r.Context(), RingOp, d.alertmanagerRing, nil, []uint32{shardByUser(userID)}, func(am ring.InstanceDesc, _ []int) error {
163+
err = ring.DoBatch(r.Context(), RingOp, d.alertmanagerRing, nil, []uint32{users.ShardByUser(userID)}, func(am ring.InstanceDesc, _ []int) error {
165164
// Use a background context to make sure all alertmanagers get the request even if we return early.
166165
localCtx := opentracing.ContextWithSpan(user.InjectOrgID(context.Background(), userID), opentracing.SpanFromContext(r.Context()))
167166
sp, localCtx := opentracing.StartSpanFromContext(localCtx, "Distributor.doQuorum")
@@ -207,7 +206,7 @@ func (d *Distributor) doQuorum(userID string, w http.ResponseWriter, r *http.Req
207206
}
208207

209208
func (d *Distributor) doUnary(userID string, w http.ResponseWriter, r *http.Request, logger log.Logger) {
210-
key := shardByUser(userID)
209+
key := users.ShardByUser(userID)
211210
replicationSet, err := d.alertmanagerRing.Get(key, RingOp, nil, nil, nil)
212211
if err != nil {
213212
level.Error(logger).Log("msg", "failed to get replication set from the ring", "err", err)
@@ -299,13 +298,6 @@ func (d *Distributor) doRequest(ctx context.Context, am ring.InstanceDesc, req *
299298
return amClient.HandleRequest(ctx, req)
300299
}
301300

302-
func shardByUser(userID string) uint32 {
303-
ringHasher := fnv.New32a()
304-
// Hasher never returns err.
305-
_, _ = ringHasher.Write([]byte(userID))
306-
return ringHasher.Sum32()
307-
}
308-
309301
func httpToHttpgrpcHeaders(hs http.Header) []*httpgrpc.Header {
310302
result := make([]*httpgrpc.Header, 0, len(hs))
311303
for k, vs := range hs {

0 commit comments

Comments
 (0)