Skip to content

Commit 9b0a7c2

Browse files
authored
chore: refactor ingester and reconciler components (#345)
2 parents 7deff21 + 30d2ee5 commit 9b0a7c2

File tree

8 files changed

+208
-109
lines changed

8 files changed

+208
-109
lines changed

diode-server/cmd/ingester/main.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,17 @@ package main
22

33
import (
44
"context"
5+
"fmt"
56
"os"
67

78
"github.com/getsentry/sentry-go"
89
"github.com/kelseyhightower/envconfig"
10+
"github.com/redis/go-redis/v9"
911
"go.opentelemetry.io/otel"
1012
"go.opentelemetry.io/otel/metric"
13+
"google.golang.org/grpc"
1114

15+
"github.com/netboxlabs/diode/diode-server/authutil"
1216
"github.com/netboxlabs/diode/diode-server/ingester"
1317
"github.com/netboxlabs/diode/diode-server/server"
1418
"github.com/netboxlabs/diode/diode-server/telemetry"
@@ -57,7 +61,19 @@ func main() {
5761
}
5862
startupCounter.Add(ctx, 1)
5963

60-
ingesterComponent, err := ingester.New(ctx, s.Logger(), cfg, meter)
64+
redisStreamClient := redis.NewClient(&redis.Options{
65+
Addr: fmt.Sprintf("%s:%s", cfg.RedisHost, cfg.RedisPort),
66+
Password: cfg.RedisPassword,
67+
DB: cfg.RedisStreamDB,
68+
})
69+
70+
if _, err := redisStreamClient.Ping(ctx).Result(); err != nil {
71+
s.Logger().Error("failed to connect to redis stream", "redisStream", redisStreamClient.String(), "error", err)
72+
os.Exit(1)
73+
}
74+
75+
authorizer := authutil.NewUnverifiedJWTAuthorizer(s.Logger())
76+
ingesterComponent, err := ingester.New(ctx, s.Logger(), cfg, redisStreamClient, meter, serverInterceptors(authorizer)...)
6177
if err != nil {
6278
s.Logger().Error("failed to instantiate ingester component", "error", err)
6379
os.Exit(1)
@@ -75,3 +91,14 @@ func main() {
7591
os.Exit(1)
7692
}
7793
}
94+
95+
func serverInterceptors(authorizer authutil.Authorizer) []grpc.UnaryServerInterceptor {
96+
return []grpc.UnaryServerInterceptor{
97+
func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
98+
if err := authorizer.RequireScopesContext(ctx, []string{authutil.ScopeDiodeIngest}); err != nil {
99+
return nil, err
100+
}
101+
return handler(ctx, req)
102+
},
103+
}
104+
}

diode-server/cmd/reconciler/main.go

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@ import (
1111
_ "github.com/jackc/pgx/v5/stdlib" // pgx to database/sql compatibility
1212
"github.com/kelseyhightower/envconfig"
1313
"github.com/pressly/goose/v3"
14+
"github.com/redis/go-redis/v9"
1415
"go.opentelemetry.io/otel"
1516
"go.opentelemetry.io/otel/metric"
17+
"google.golang.org/grpc"
1618

1719
"github.com/netboxlabs/diode/diode-server/authutil"
1820
"github.com/netboxlabs/diode/diode-server/dbstore/postgres"
@@ -76,6 +78,28 @@ func main() {
7678
}
7779
}
7880

81+
redisClient := redis.NewClient(&redis.Options{
82+
Addr: fmt.Sprintf("%s:%s", cfg.RedisHost, cfg.RedisPort),
83+
Password: cfg.RedisPassword,
84+
DB: cfg.RedisDB,
85+
})
86+
87+
if _, err := redisClient.Ping(ctx).Result(); err != nil {
88+
s.Logger().Error("failed to connect to redis", "redis", redisClient.String(), "error", err)
89+
os.Exit(1)
90+
}
91+
92+
redisStreamClient := redis.NewClient(&redis.Options{
93+
Addr: fmt.Sprintf("%s:%s", cfg.RedisHost, cfg.RedisPort),
94+
Password: cfg.RedisPassword,
95+
DB: cfg.RedisStreamDB,
96+
})
97+
98+
if _, err := redisStreamClient.Ping(ctx).Result(); err != nil {
99+
s.Logger().Error("failed to connect to redis stream", "redisStream", redisStreamClient.String(), "error", err)
100+
os.Exit(1)
101+
}
102+
79103
dbPool, err := pgxpool.New(ctx, dbURL)
80104
if err != nil {
81105
s.Logger().Error("failed to connect to postgres database", "error", err)
@@ -102,7 +126,7 @@ func main() {
102126
s.Logger().Error("failed to create ingestion processor metrics", "error", err)
103127
os.Exit(1)
104128
}
105-
ingestionProcessor, err := reconciler.NewIngestionProcessor(ctx, s.Logger(), ops, ingestionMetrics)
129+
ingestionProcessor, err := reconciler.NewIngestionProcessor(ctx, s.Logger(), redisClient, redisStreamClient, reconciler.DefaultRedisStreamID, reconciler.DefaultRedisConsumerGroup, ops, ingestionMetrics)
106130
if err != nil {
107131
s.Logger().Error("failed to instantiate ingestion processor", "error", err)
108132
os.Exit(1)
@@ -114,8 +138,7 @@ func main() {
114138
}
115139

116140
authorizer := authutil.NewUnverifiedJWTAuthorizer(s.Logger())
117-
118-
gRPCServer, err := reconciler.NewServer(ctx, s.Logger(), repository, authorizer)
141+
gRPCServer, err := reconciler.NewServer(ctx, s.Logger(), repository, serverInterceptors(authorizer)...)
119142
if err != nil {
120143
s.Logger().Error("failed to instantiate gRPC server", "error", err)
121144
os.Exit(1)
@@ -156,3 +179,17 @@ func runDBMigrations(ctx context.Context, logger *slog.Logger, dbURL string) err
156179

157180
return nil
158181
}
182+
183+
func serverInterceptors(authorizer authutil.Authorizer) []grpc.UnaryServerInterceptor {
184+
return []grpc.UnaryServerInterceptor{
185+
func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
186+
// TODO: this is applied to all rpcs but could be checked per rpc
187+
// if the permissions differ (all are reads currently)
188+
if err := authorizer.RequireScopesContext(ctx, []string{authutil.ScopeDiodeRead}); err != nil {
189+
return nil, err
190+
}
191+
192+
return handler(ctx, req)
193+
},
194+
}
195+
}

diode-server/ingester/component.go

Lines changed: 9 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,12 @@ import (
1616
"google.golang.org/grpc/reflection"
1717
"google.golang.org/protobuf/proto"
1818

19-
"github.com/netboxlabs/diode/diode-server/authutil"
2019
"github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb"
20+
"github.com/netboxlabs/diode/diode-server/reconciler"
2121
"github.com/netboxlabs/diode/diode-server/sentry"
2222
"github.com/netboxlabs/diode/diode-server/telemetry"
2323
)
2424

25-
const (
26-
streamID = "diode.v1.ingest-stream"
27-
)
28-
2925
// Component asynchronously ingests data from the distributor
3026
type Component struct {
3127
diodepb.UnimplementedIngesterServiceServer
@@ -41,31 +37,19 @@ type Component struct {
4137
}
4238

4339
// New creates a new ingester component
44-
func New(ctx context.Context, logger *slog.Logger, cfg Config, meter metric.Meter) (*Component, error) {
40+
func New(ctx context.Context, logger *slog.Logger, cfg Config, redisStreamClient *redis.Client, meter metric.Meter, serverInterceptors ...grpc.UnaryServerInterceptor) (*Component, error) {
4541
grpcListener, err := net.Listen("tcp", fmt.Sprintf(":%d", cfg.GRPCPort))
4642
if err != nil {
4743
return nil, fmt.Errorf("failed to listen on port %d: %v", cfg.GRPCPort, err)
4844
}
4945

50-
redisStreamClient := redis.NewClient(&redis.Options{
51-
Addr: fmt.Sprintf("%s:%s", cfg.RedisHost, cfg.RedisPort),
52-
Password: cfg.RedisPassword,
53-
DB: cfg.RedisStreamDB,
54-
})
55-
56-
if _, err := redisStreamClient.Ping(ctx).Result(); err != nil {
57-
return nil, fmt.Errorf("failed connection to %s: %v", redisStreamClient.String(), err)
58-
}
59-
6046
hostname, err := os.Hostname()
6147
if err != nil {
6248
return nil, fmt.Errorf("failed to get hostname: %v", err)
6349
}
6450

65-
authorizer := authutil.NewUnverifiedJWTAuthorizer(logger)
66-
6751
grpcServer := grpc.NewServer(
68-
grpc.ChainUnaryInterceptor(newAuthUnaryInterceptor(authorizer)),
52+
grpc.ChainUnaryInterceptor(serverInterceptors...),
6953
grpc.StatsHandler(otelgrpc.NewServerHandler()),
7054
)
7155

@@ -157,11 +141,13 @@ func (c *Component) Ingest(ctx context.Context, in *diodepb.IngestRequest) (*dio
157141
}
158142
}
159143

160-
msg := map[string]interface{}{
144+
msg := map[string]any{
161145
"request": encodedRequest,
162146
"ingestion_ts": time.Now().UnixNano(),
163147
}
164148

149+
streamID := c.GetRedisStreamID()
150+
165151
if err := c.redisStreamClient.XAdd(ctx, &redis.XAddArgs{
166152
Stream: streamID,
167153
Values: msg,
@@ -204,12 +190,7 @@ func validateRequest(in *diodepb.IngestRequest) error {
204190
return nil
205191
}
206192

207-
func newAuthUnaryInterceptor(authorizer authutil.Authorizer) grpc.UnaryServerInterceptor {
208-
return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
209-
if err := authorizer.RequireScopesContext(ctx, []string{authutil.ScopeDiodeIngest}); err != nil {
210-
return nil, err
211-
}
212-
213-
return handler(ctx, req)
214-
}
193+
// GetRedisStreamID returns the redis stream ID to add ingested data into
194+
func (c *Component) GetRedisStreamID() string {
195+
return reconciler.DefaultRedisStreamID
215196
}

diode-server/ingester/component_test.go

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/alicebob/miniredis/v2"
1313
"github.com/kelseyhightower/envconfig"
14+
"github.com/redis/go-redis/v9"
1415
"github.com/stretchr/testify/require"
1516
"go.opentelemetry.io/otel"
1617
"google.golang.org/grpc"
@@ -73,7 +74,16 @@ const bufSize = 1024 * 1024
7374
func startReconcilerServer(ctx context.Context, t *testing.T) *reconciler.Server {
7475
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false}))
7576
mockRepository := mocks.NewRepository(t)
76-
server, err := reconciler.NewServer(ctx, logger, mockRepository, authutil.NewUnverifiedJWTAuthorizer(logger))
77+
authorizer := authutil.NewUnverifiedJWTAuthorizer(logger)
78+
serverInterceptors := []grpc.UnaryServerInterceptor{
79+
func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
80+
if err := authorizer.RequireScopesContext(ctx, []string{authutil.ScopeDiodeRead}); err != nil {
81+
return nil, err
82+
}
83+
return handler(ctx, req)
84+
},
85+
}
86+
server, err := reconciler.NewServer(ctx, logger, mockRepository, serverInterceptors...)
7787
require.NoError(t, err)
7888

7989
errChan := make(chan error, 1)
@@ -90,7 +100,7 @@ func startReconcilerServer(ctx context.Context, t *testing.T) *reconciler.Server
90100
return server
91101
}
92102

93-
func startTestComponent(ctx context.Context, t *testing.T) (*ingester.Component, *grpc.ClientConn) {
103+
func startTestComponent(ctx context.Context, t *testing.T, r *miniredis.Miniredis) (*ingester.Component, *grpc.ClientConn) {
94104
grpcPort, _ := getFreePort()
95105
_ = os.Setenv("GRPC_PORT", grpcPort)
96106

@@ -104,7 +114,23 @@ func startTestComponent(ctx context.Context, t *testing.T) (*ingester.Component,
104114
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false}))
105115

106116
meter := otel.GetMeterProvider().Meter("test.ingester")
107-
component, err := ingester.New(ctx, logger, cfg, meter)
117+
118+
redisStreamClient := redis.NewClient(&redis.Options{
119+
Addr: r.Addr(),
120+
DB: 1,
121+
})
122+
123+
authorizer := authutil.NewUnverifiedJWTAuthorizer(logger)
124+
serverInterceptors := []grpc.UnaryServerInterceptor{
125+
func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
126+
if err := authorizer.RequireScopesContext(ctx, []string{authutil.ScopeDiodeIngest}); err != nil {
127+
return nil, err
128+
}
129+
return handler(ctx, req)
130+
},
131+
}
132+
133+
component, err := ingester.New(ctx, logger, cfg, redisStreamClient, meter, serverInterceptors...)
108134
require.NoError(t, err)
109135

110136
pb.RegisterIngesterServiceServer(s, component)
@@ -148,7 +174,25 @@ func TestNewComponent(t *testing.T) {
148174
require.NoError(t, err)
149175

150176
meter := otel.GetMeterProvider().Meter("test.ingester")
151-
component, err := ingester.New(ctx, logger, cfg, meter)
177+
178+
redisStreamClient := redis.NewClient(&redis.Options{
179+
Addr: r.Addr(),
180+
DB: 1,
181+
})
182+
defer func() {
183+
_ = redisStreamClient.Close()
184+
}()
185+
authorizer := authutil.NewUnverifiedJWTAuthorizer(logger)
186+
serverInterceptors := []grpc.UnaryServerInterceptor{
187+
func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
188+
if err := authorizer.RequireScopesContext(ctx, []string{authutil.ScopeDiodeIngest}); err != nil {
189+
return nil, err
190+
}
191+
return handler(ctx, req)
192+
},
193+
}
194+
195+
component, err := ingester.New(ctx, logger, cfg, redisStreamClient, meter, serverInterceptors...)
152196

153197
require.NoError(t, err)
154198
require.NotNil(t, component)
@@ -328,7 +372,7 @@ func TestIngest(t *testing.T) {
328372
defer teardownEnv()
329373

330374
server := startReconcilerServer(ctx, t)
331-
component, conn := startTestComponent(ctx, t)
375+
component, conn := startTestComponent(ctx, t, r)
332376

333377
client := pb.NewIngesterServiceClient(conn)
334378
resp, err := client.Ingest(ctx, tt.request)

0 commit comments

Comments
 (0)