Skip to content

Commit f262ab1

Browse files
committed
Add index updater to alertmanager
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent 1833e69 commit f262ab1

File tree

19 files changed

+235
-36
lines changed

19 files changed

+235
-36
lines changed

docs/blocks-storage/querier.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1927,6 +1927,11 @@ blocks_storage:
19271927
# CLI flag: -blocks-storage.users-scanner.user-index.max-stale-period
19281928
[max_stale_period: <duration> | default = 1h]
19291929

1930+
# How frequently user index file is updated, it only take effect when user
1931+
# scan stratehy is user_index.
1932+
# CLI flag: -blocks-storage.users-scanner.user-index.cleanup-interval
1933+
[clean_up_interval: <duration> | default = 15m]
1934+
19301935
# TTL of the cached users. 0 disables caching and relies on caching at
19311936
# bucket client level.
19321937
# CLI flag: -blocks-storage.users-scanner.cache-ttl

docs/blocks-storage/store-gateway.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2013,6 +2013,11 @@ blocks_storage:
20132013
# CLI flag: -blocks-storage.users-scanner.user-index.max-stale-period
20142014
[max_stale_period: <duration> | default = 1h]
20152015

2016+
# How frequently user index file is updated, it only take effect when user
2017+
# scan stratehy is user_index.
2018+
# CLI flag: -blocks-storage.users-scanner.user-index.cleanup-interval
2019+
[clean_up_interval: <duration> | default = 15m]
2020+
20162021
# TTL of the cached users. 0 disables caching and relies on caching at
20172022
# bucket client level.
20182023
# CLI flag: -blocks-storage.users-scanner.cache-ttl

docs/configuration/config-file-reference.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -992,6 +992,11 @@ users_scanner:
992992
# CLI flag: -alertmanager-storage.users-scanner.user-index.max-stale-period
993993
[max_stale_period: <duration> | default = 1h]
994994

995+
# How frequently user index file is updated, it only take effect when user
996+
# scan stratehy is user_index.
997+
# CLI flag: -alertmanager-storage.users-scanner.user-index.cleanup-interval
998+
[clean_up_interval: <duration> | default = 15m]
999+
9951000
# TTL of the cached users. 0 disables caching and relies on caching at bucket
9961001
# client level.
9971002
# CLI flag: -alertmanager-storage.users-scanner.cache-ttl
@@ -2608,6 +2613,11 @@ users_scanner:
26082613
# CLI flag: -blocks-storage.users-scanner.user-index.max-stale-period
26092614
[max_stale_period: <duration> | default = 1h]
26102615

2616+
# How frequently user index file is updated, it only take effect when user
2617+
# scan stratehy is user_index.
2618+
# CLI flag: -blocks-storage.users-scanner.user-index.cleanup-interval
2619+
[clean_up_interval: <duration> | default = 15m]
2620+
26112621
# TTL of the cached users. 0 disables caching and relies on caching at bucket
26122622
# client level.
26132623
# CLI flag: -blocks-storage.users-scanner.cache-ttl
@@ -5796,6 +5806,11 @@ users_scanner:
57965806
# CLI flag: -ruler-storage.users-scanner.user-index.max-stale-period
57975807
[max_stale_period: <duration> | default = 1h]
57985808

5809+
# How frequently user index file is updated, it only take effect when user
5810+
# scan stratehy is user_index.
5811+
# CLI flag: -ruler-storage.users-scanner.user-index.cleanup-interval
5812+
[clean_up_interval: <duration> | default = 15m]
5813+
57995814
# TTL of the cached users. 0 disables caching and relies on caching at bucket
58005815
# client level.
58015816
# CLI flag: -ruler-storage.users-scanner.cache-ttl

integration/alertmanager_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,42 @@ func TestAlertmanager(t *testing.T) {
6969
assertServiceMetricsPrefixes(t, AlertManager, alertmanager)
7070
}
7171

72+
func TestAlertmanagerWithUserIndexUpdater(t *testing.T) {
73+
s, err := e2e.NewScenario(networkName)
74+
require.NoError(t, err)
75+
defer s.Close()
76+
77+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs/user-1.yaml", []byte(cortexAlertmanagerUserConfigYaml)))
78+
79+
// Start dependencies.
80+
consul := e2edb.NewConsul()
81+
minio := e2edb.NewMinio(9000, alertsBucketName)
82+
require.NoError(t, s.StartAndWaitReady(consul, minio))
83+
84+
baseFlags := mergeFlags(AlertmanagerFlags(), AlertmanagerS3Flags())
85+
flags := mergeFlags(baseFlags, AlertmanagerShardingFlags(consul.NetworkHTTPEndpoint(), 1), map[string]string{
86+
"-alertmanager-storage.users-scanner.strategy": "user_index",
87+
"-alertmanager-storage.users-scanner.user-index.cleanup-interval": "15s",
88+
"-alertmanager.configs.poll-interval": "5s",
89+
"-alertmanager.sharding-enabled": "true",
90+
})
91+
92+
am := e2ecortex.NewAlertmanager(
93+
"alertmanager",
94+
flags,
95+
"",
96+
)
97+
98+
require.NoError(t, s.StartAndWaitReady(am))
99+
// To make sure user index file is updated/scanned
100+
require.NoError(t, am.WaitSumMetricsWithOptions(e2e.Greater(float64(0)), []string{"cortex_user_index_last_successful_update_timestamp_seconds"}),
101+
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "alertmanager")),
102+
)
103+
require.NoError(t, am.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(float64(1)), []string{"cortex_user_index_scan_succeeded_total"}),
104+
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "alertmanager")),
105+
)
106+
}
107+
72108
func TestAlertmanagerStoreAPI(t *testing.T) {
73109
s, err := e2e.NewScenario(networkName)
74110
require.NoError(t, err)

pkg/alertmanager/alertstore/bucketclient/bucket_client.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,25 +48,40 @@ type BucketAlertStore struct {
4848
cfgProvider bucket.TenantConfigProvider
4949
logger log.Logger
5050

51-
usersScanner users.Scanner
51+
usersScanner users.Scanner
52+
userIndexUpdater *users.UserIndexUpdater
5253
}
5354

5455
func NewBucketAlertStore(bkt objstore.InstrumentedBucket, userScannerCfg users.UsersScannerConfig, cfgProvider bucket.TenantConfigProvider, logger log.Logger, reg prometheus.Registerer) (*BucketAlertStore, error) {
5556
alertBucket := bucket.NewPrefixedBucketClient(bkt, alertsPrefix)
5657

57-
usersScanner, err := users.NewScanner(userScannerCfg, alertBucket, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "alertmanager"}, reg))
58+
regWithComponent := extprom.WrapRegistererWith(prometheus.Labels{"component": "alertmanager"}, reg)
59+
usersScanner, err := users.NewScanner(userScannerCfg, alertBucket, logger, regWithComponent)
5860
if err != nil {
5961
return nil, errors.Wrap(err, "unable to initialize alertmanager users scanner")
6062
}
63+
64+
// We hardcode strategy to be list so can ignore error.
65+
baseScanner, _ := users.NewScanner(users.UsersScannerConfig{
66+
Strategy: users.UserScanStrategyList,
67+
}, alertBucket, logger, regWithComponent)
68+
userIndexUpdater := users.NewUserIndexUpdater(alertBucket, userScannerCfg.CleanUpInterval, baseScanner, regWithComponent)
69+
6170
return &BucketAlertStore{
62-
usersScanner: usersScanner,
63-
alertsBucket: alertBucket,
64-
amBucket: bucket.NewPrefixedBucketClient(bkt, alertmanagerPrefix),
65-
cfgProvider: cfgProvider,
66-
logger: logger,
71+
usersScanner: usersScanner,
72+
userIndexUpdater: userIndexUpdater,
73+
alertsBucket: alertBucket,
74+
amBucket: bucket.NewPrefixedBucketClient(bkt, alertmanagerPrefix),
75+
cfgProvider: cfgProvider,
76+
logger: logger,
6777
}, nil
6878
}
6979

80+
// GetUserIndexUpdater implements alertstore.AlertStore.
81+
func (s *BucketAlertStore) GetUserIndexUpdater() *users.UserIndexUpdater {
82+
return s.userIndexUpdater
83+
}
84+
7085
// ListAllUsers implements alertstore.AlertStore.
7186
func (s *BucketAlertStore) ListAllUsers(ctx context.Context) ([]string, error) {
7287
active, deleting, _, err := s.usersScanner.ScanUsers(ctx)

pkg/alertmanager/alertstore/configdb/store.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
88
"github.com/cortexproject/cortex/pkg/configs/client"
99
"github.com/cortexproject/cortex/pkg/configs/userconfig"
10+
"github.com/cortexproject/cortex/pkg/util/users"
1011
)
1112

1213
const (
@@ -34,6 +35,11 @@ func NewStore(c client.Client) *Store {
3435
}
3536
}
3637

38+
// GetUserIndexUpdater implements alertstore.AlertStore.
39+
func (c *Store) GetUserIndexUpdater() *users.UserIndexUpdater {
40+
return nil
41+
}
42+
3743
// ListAllUsers implements alertstore.AlertStore.
3844
func (c *Store) ListAllUsers(ctx context.Context) ([]string, error) {
3945
configs, err := c.reloadConfigs(ctx)

pkg/alertmanager/alertstore/local/store.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"path/filepath"
88
"strings"
99

10+
"github.com/cortexproject/cortex/pkg/util/users"
1011
"github.com/pkg/errors"
1112
"github.com/prometheus/alertmanager/config"
1213

@@ -43,6 +44,11 @@ func NewStore(cfg StoreConfig) (*Store, error) {
4344
return &Store{cfg}, nil
4445
}
4546

47+
// GetUserIndexUpdater implements alertstore.AlertStore.
48+
func (f *Store) GetUserIndexUpdater() *users.UserIndexUpdater {
49+
return nil
50+
}
51+
4652
// ListAllUsers implements alertstore.AlertStore.
4753
func (f *Store) ListAllUsers(_ context.Context) ([]string, error) {
4854
configs, err := f.reloadConfigs()

pkg/alertmanager/alertstore/store.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"io"
77

8+
"github.com/cortexproject/cortex/pkg/util/users"
89
"github.com/go-kit/log"
910
"github.com/prometheus/client_golang/prometheus"
1011
"github.com/thanos-io/objstore"
@@ -53,6 +54,9 @@ type AlertStore interface {
5354
// DeleteFullState deletes the alertmanager state for an user.
5455
// If state for the user doesn't exist, no error is reported.
5556
DeleteFullState(ctx context.Context, user string) error
57+
58+
// GetUserIndexUpdater is getter for UserIndexUpdater
59+
GetUserIndexUpdater() *users.UserIndexUpdater
5660
}
5761

5862
// NewAlertStore returns a alertmanager store backend client based on the provided cfg.

pkg/alertmanager/api_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -853,7 +853,7 @@ receivers:
853853

854854
// Create the Multitenant Alertmanager.
855855
cfg := mockAlertmanagerConfig(t)
856-
am, err := createMultitenantAlertmanager(cfg, nil, nil, alertStore, nil, nil, log.NewNopLogger(), reg)
856+
am, err := createMultitenantAlertmanager(cfg, nil, nil, nil, alertStore, nil, nil, log.NewNopLogger(), reg)
857857
require.NoError(t, err)
858858
require.NoError(t, services.StartAndAwaitRunning(context.Background(), am))
859859
defer services.StopAndAwaitTerminated(context.Background(), am) //nolint:errcheck

pkg/alertmanager/distributor.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package alertmanager
22

33
import (
44
"context"
5-
"hash/fnv"
65
"io"
76
"math/rand"
87
"net/http"
@@ -161,7 +160,7 @@ func (d *Distributor) doQuorum(userID string, w http.ResponseWriter, r *http.Req
161160
var responses []*httpgrpc.HTTPResponse
162161
var responsesMtx sync.Mutex
163162
grpcHeaders := httpToHttpgrpcHeaders(r.Header)
164-
err = ring.DoBatch(r.Context(), RingOp, d.alertmanagerRing, nil, []uint32{shardByUser(userID)}, func(am ring.InstanceDesc, _ []int) error {
163+
err = ring.DoBatch(r.Context(), RingOp, d.alertmanagerRing, nil, []uint32{users.ShardByUser(userID)}, func(am ring.InstanceDesc, _ []int) error {
165164
// Use a background context to make sure all alertmanagers get the request even if we return early.
166165
localCtx := opentracing.ContextWithSpan(user.InjectOrgID(context.Background(), userID), opentracing.SpanFromContext(r.Context()))
167166
sp, localCtx := opentracing.StartSpanFromContext(localCtx, "Distributor.doQuorum")
@@ -207,7 +206,7 @@ func (d *Distributor) doQuorum(userID string, w http.ResponseWriter, r *http.Req
207206
}
208207

209208
func (d *Distributor) doUnary(userID string, w http.ResponseWriter, r *http.Request, logger log.Logger) {
210-
key := shardByUser(userID)
209+
key := users.ShardByUser(userID)
211210
replicationSet, err := d.alertmanagerRing.Get(key, RingOp, nil, nil, nil)
212211
if err != nil {
213212
level.Error(logger).Log("msg", "failed to get replication set from the ring", "err", err)
@@ -299,13 +298,6 @@ func (d *Distributor) doRequest(ctx context.Context, am ring.InstanceDesc, req *
299298
return amClient.HandleRequest(ctx, req)
300299
}
301300

302-
func shardByUser(userID string) uint32 {
303-
ringHasher := fnv.New32a()
304-
// Hasher never returns err.
305-
_, _ = ringHasher.Write([]byte(userID))
306-
return ringHasher.Sum32()
307-
}
308-
309301
func httpToHttpgrpcHeaders(hs http.Header) []*httpgrpc.Header {
310302
result := make([]*httpgrpc.Header, 0, len(hs))
311303
for k, vs := range hs {

0 commit comments

Comments
 (0)