Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.25.5

require (
github.com/afritzler/protoequal v0.1.10
github.com/alitto/pond v1.9.2
github.com/gofrs/flock v0.13.0
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.4
Expand Down Expand Up @@ -46,6 +47,7 @@ require (
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.66.1 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
go.uber.org/mock v0.6.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1
github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=
github.com/afritzler/protoequal v0.1.10 h1:HRWukWQ6Q0msWv0BwArWJKcjZ1Hz2qFoYlKb1VnzkTg=
github.com/afritzler/protoequal v0.1.10/go.mod h1:65ALCt5ghpaRzoWohyRnx88X7o5y6cQwJmOb9yzdheg=
github.com/alitto/pond v1.9.2 h1:9Qb75z/scEZVCoSU+osVmQ0I0JOeLfdTDafrbcJ8CLs=
github.com/alitto/pond v1.9.2/go.mod h1:xQn3P/sHTYcU/1BR3i86IGIrilcrGC2LiS+E2+CJWsI=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
Expand Down Expand Up @@ -208,6 +210,8 @@ go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/mock v0.6.0 h1:hyF9dfmbgIX5EfOdasqLsWD6xqpNZlXblLB/Dbnwv3Y=
go.uber.org/mock v0.6.0/go.mod h1:KiVJ4BqZJaMj4svdfmHM0AUx4NJYO8ZNpPnZn1Z+BBU=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
Expand Down
6 changes: 3 additions & 3 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func NewApp(
pb.RegisterGetQueryInfoServer(s, &grpc.GetQueryInfoServer{Logger: baseApp.L(), MaxMessageSize: int(config.MaxOuterMessageSize), RQStorage: backgroundStorage.RQStorage})
pb.RegisterAgentControlServer(s, &grpc.AgentControlServer{Logger: baseApp.L(), RQStorage: backgroundStorage.RQStorage})

getMasterInfo := grpc.NewGetMasterInfoServer(config.ClusterID, baseApp.L(), statActivityLister, int(config.MaxOuterMessageSize), backgroundStorage)
getMasterInfo := grpc.NewGetMasterInfoServer(config.ClusterID, baseApp.L(), int(config.MaxOuterMessageSize), backgroundStorage)
actionInfo := &grpc.ActionsServer{ClusterID: config.ClusterID, Logger: baseApp.L(), Timeout: 5 * time.Minute, BackgroundStorage: backgroundStorage}

pbm.RegisterGetGPInfoServer(s, getMasterInfo)
Expand Down Expand Up @@ -382,12 +382,12 @@ func Run(ctx context.Context, configFile string) error {
metrics.YagpccMetrics.ExecutingQueryLatencies.AssignQueryGetter(rqStorage.GetQueriesStartTime)
aggStorage := storage.NewConfiguredAggregatedStorage(logger, cfg)
sessionsStorage := gp.NewSessionsStorage(rqStorage)
backgroundStorage := master.NewBackgroundStorage(logger, sessionsStorage, rqStorage, aggStorage)

masterConnection := gp.NewConnection(baseApp.L(), &cfg.MasterConnection, nil)

masterSentinel := master_sentinel.NewSentinel(baseApp.L(), masterConnection)
statActivityLister := stat_activity.NewLister(baseApp.L(), masterConnection)
backgroundStorage := master.NewBackgroundStorage(logger, sessionsStorage, rqStorage, aggStorage, statActivityLister)

agentApp, err := NewApp(baseApp, cfg, statActivityLister, backgroundStorage)
if err != nil {
Expand Down Expand Up @@ -515,7 +515,7 @@ func Run(ctx context.Context, configFile string) error {
logger.Infof("Starting master background tasks")
ctxC, ctxF := context.WithCancel(ctx)
defer ctxF()
err = master.InitBG(ctxC, logger, masterSentinel, statActivityLister, cfg, backgroundStorage)
err = master.InitBG(ctxC, logger, masterSentinel, cfg, backgroundStorage)
if err != nil {
logger.Fatal(err.Error())
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/grpc/actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func newTestActionsServer(t *testing.T) *ActionsServer {
rq := storage.NewRunningQueriesStorage()
sessStorage := gp.NewSessionsStorage(rq)
agg := storage.NewAggregatedStorage(z)
bg := master.NewBackgroundStorage(z, sessStorage, rq, agg)
bg := master.NewBackgroundStorage(z, sessStorage, rq, agg, nil)
return &ActionsServer{
Logger: z,
Timeout: 5 * time.Second,
Expand Down
15 changes: 0 additions & 15 deletions internal/grpc/deps.go

This file was deleted.

22 changes: 10 additions & 12 deletions internal/grpc/get_master_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,18 @@ import (

type GetMasterInfoServer struct {
pbm.UnimplementedGetGPInfoServer
clusterID string
logger *zap.SugaredLogger
statActivityLister statActivityLister
maxMessageSize int
backgroundStorage *master.BackgroundStorage
clusterID string
logger *zap.SugaredLogger
maxMessageSize int
backgroundStorage *master.BackgroundStorage
}

func NewGetMasterInfoServer(clusterID string, logger *zap.SugaredLogger, statActivityLister statActivityLister, maxMessageSize int, backgroundStorage *master.BackgroundStorage) *GetMasterInfoServer {
func NewGetMasterInfoServer(clusterID string, logger *zap.SugaredLogger, maxMessageSize int, backgroundStorage *master.BackgroundStorage) *GetMasterInfoServer {
return &GetMasterInfoServer{
clusterID: clusterID,
logger: logger,
statActivityLister: statActivityLister,
maxMessageSize: maxMessageSize,
backgroundStorage: backgroundStorage,
clusterID: clusterID,
logger: logger,
maxMessageSize: maxMessageSize,
backgroundStorage: backgroundStorage,
}
}

Expand Down Expand Up @@ -2557,7 +2555,7 @@ func (s *GetMasterInfoServer) GetGPSessions(ctx context.Context, in *pbm.GetGPSe
queryType = pbm.RunningQueryType_RQT_TOP
}
// refresh list of sessions
err := s.backgroundStorage.TryRefreshSessionsFromGP(ctx, s.statActivityLister, true)
err := s.backgroundStorage.TryRefreshSessionsFromGP(ctx, true)
if err != nil {
s.logger.Errorf("error while refreshing session list: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/grpc/get_master_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"google.golang.org/protobuf/types/known/timestamppb"

pbm "github.com/open-gpdb/yagpcc/api/proto/agent_master"
Expand Down
17 changes: 17 additions & 0 deletions internal/grpc/grpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
//go:generate mockgen -source=grpc_test.go -package=grpc_test -mock_names statActivityLister=MockStatActivityLister -destination mocks_test.go

package grpc_test

import (
"context"

"github.com/open-gpdb/yagpcc/internal/gp"
"github.com/open-gpdb/yagpcc/internal/gp/stat_activity"
)

type statActivityLister interface { //nolint:unused // used by go:generate mockgen
Start(ctx context.Context) error
Stop()
List(ctx context.Context) ([]*gp.GpStatActivity, error)
ListAllSessions(context.Context) ([]stat_activity.SessionPid, error)
}
74 changes: 48 additions & 26 deletions internal/grpc/mocks_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/grpc/set_get_query_info_parallel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestParallelSetGet(t *testing.T) {
rqStorage := storage.NewRunningQueriesStorage()
aggStorage := storage.NewAggregatedStorage(zLogger)
sessStorage := gp.NewSessionsStorage(rqStorage)
backgroundStorage := master.NewBackgroundStorage(zLogger, sessStorage, rqStorage, aggStorage)
backgroundStorage := master.NewBackgroundStorage(zLogger, sessStorage, rqStorage, aggStorage, nil)

tests := []struct {
name string
Expand All @@ -48,7 +48,7 @@ func TestParallelSetGet(t *testing.T) {
{name: "test Get Queries", isSet: false, paramName: "ALL", ssid: 1, value: 1, cnt: 80, sleep: 0.01},
{name: "test Get Query1", isSet: false, paramName: "QUERY", ssid: 1, value: 1, cnt: 10000, sleep: 0},
}
dial := setupGRPCDialer(t, nil, backgroundStorage)
dial := setupGRPCDialer(t, backgroundStorage)

ctx := context.Background()
connectTimeout := 5 * time.Second
Expand Down
11 changes: 6 additions & 5 deletions internal/grpc/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"sort"
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
gogrpc "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
Expand Down Expand Up @@ -65,7 +65,7 @@ func assertQueriesInfoResponseEqual(t *testing.T, expected *pb.GetQueriesInfoRes
return utils.AssertProtoMessagesEqual(t, normalize(expected), normalize(actual))
}

func setupGRPCDialer(t *testing.T, sessionMocker *MockStatActivityLister, backgroundStorage *master.BackgroundStorage) func(context.Context, string) (net.Conn, error) {
func setupGRPCDialer(t *testing.T, backgroundStorage *master.BackgroundStorage) func(context.Context, string) (net.Conn, error) {

cfg, err := config.DefaultConfig()
require.NoError(t, err, "error getting default config")
Expand All @@ -88,7 +88,7 @@ func setupGRPCDialer(t *testing.T, sessionMocker *MockStatActivityLister, backgr
pb.RegisterSetQueryInfoServer(server, &grpc.SetQueryInfoServer{Logger: zLogger, UpdateSessionMetrics: true, RQStorage: backgroundStorage.RQStorage, SessionsStorage: backgroundStorage.SessionStorage})
pb.RegisterGetQueryInfoServer(server, &grpc.GetQueryInfoServer{Logger: zLogger, MaxMessageSize: 100 * 1024 * 1024, RQStorage: backgroundStorage.RQStorage})
pb.RegisterAgentControlServer(server, &grpc.AgentControlServer{Logger: zLogger, RQStorage: backgroundStorage.RQStorage})
pbm.RegisterGetGPInfoServer(server, grpc.NewGetMasterInfoServer("test", zLogger, sessionMocker, 100*1024*1024, backgroundStorage))
pbm.RegisterGetGPInfoServer(server, grpc.NewGetMasterInfoServer("test", zLogger, 100*1024*1024, backgroundStorage))

go func() {
if err := server.Serve(listener); err != nil {
Expand Down Expand Up @@ -129,6 +129,7 @@ func setupGRPCClientSet(t *testing.T, sessionMocker *MockStatActivityLister) (*g
ctrl := gomock.NewController(t)
sessionMocker = NewMockStatActivityLister(ctrl)
sessionMocker.EXPECT().List(gomock.Any()).AnyTimes()
sessionMocker.EXPECT().ListAllSessions(gomock.Any()).AnyTimes()
}

file, err := os.Create("trace.log")
Expand All @@ -137,11 +138,11 @@ func setupGRPCClientSet(t *testing.T, sessionMocker *MockStatActivityLister) (*g
rqStorage := storage.NewRunningQueriesStorage()
sessStorage := gp.NewSessionsStorage(rqStorage)
aggStorage := storage.NewAggregatedStorage(zLogger)
backgroundStorage := master.NewBackgroundStorage(zLogger, sessStorage, rqStorage, aggStorage)
backgroundStorage := master.NewBackgroundStorage(zLogger, sessStorage, rqStorage, aggStorage, sessionMocker)

conn, err := gogrpc.NewClient(
"localhost",
gogrpc.WithContextDialer(setupGRPCDialer(t, sessionMocker, backgroundStorage)),
gogrpc.WithContextDialer(setupGRPCDialer(t, backgroundStorage)),
gogrpc.WithTransportCredentials(insecure.NewCredentials()),
)

Expand Down
Loading