From d647ae40e5035ce58cbc4b2447e658395260c6d6 Mon Sep 17 00:00:00 2001 From: Leonid Borchuk Date: Tue, 28 Apr 2026 15:55:50 +0300 Subject: [PATCH 01/11] Add worker pool to gather pid stat Generate jobs for gather pid stat. Each job contain no more then JobsPerQuery pid's and belongs to a single host. Jobs should be processed in a worker pool witch context, use for it alitto/pond - good enough to this work. Here we add only worker pool without actual start working pool and doing some actual work. --- go.mod | 1 + go.sum | 2 + internal/app/app.go | 6 +- internal/grpc/actions_test.go | 2 +- internal/grpc/get_master_info.go | 22 +- internal/grpc/mocks_test.go | 16 + .../grpc/set_get_query_info_parallel_test.go | 4 +- internal/grpc/testutils_test.go | 9 +- internal/master/background.go | 90 +--- internal/master/deps.go | 2 + internal/master/procfs.go | 114 +++++ internal/master/procfs_test.go | 481 ++++++++++++++++++ internal/master/utils.go | 59 +++ 13 files changed, 716 insertions(+), 92 deletions(-) create mode 100644 internal/master/procfs.go create mode 100644 internal/master/procfs_test.go create mode 100644 internal/master/utils.go diff --git a/go.mod b/go.mod index 9bf8c14..b59c4cb 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.25.5 require ( github.com/afritzler/protoequal v0.1.10 + github.com/alitto/pond v1.9.2 github.com/gofrs/flock v0.13.0 github.com/golang/mock v1.6.0 github.com/golang/protobuf v1.5.4 diff --git a/go.sum b/go.sum index d0ea10c..96c8bdc 100644 --- a/go.sum +++ b/go.sum @@ -7,6 +7,8 @@ github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1 github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= github.com/afritzler/protoequal v0.1.10 h1:HRWukWQ6Q0msWv0BwArWJKcjZ1Hz2qFoYlKb1VnzkTg= github.com/afritzler/protoequal v0.1.10/go.mod h1:65ALCt5ghpaRzoWohyRnx88X7o5y6cQwJmOb9yzdheg= +github.com/alitto/pond v1.9.2 h1:9Qb75z/scEZVCoSU+osVmQ0I0JOeLfdTDafrbcJ8CLs= +github.com/alitto/pond v1.9.2/go.mod h1:xQn3P/sHTYcU/1BR3i86IGIrilcrGC2LiS+E2+CJWsI= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= diff --git a/internal/app/app.go b/internal/app/app.go index f209702..5d8bf83 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -265,7 +265,7 @@ func NewApp( pb.RegisterGetQueryInfoServer(s, &grpc.GetQueryInfoServer{Logger: baseApp.L(), MaxMessageSize: int(config.MaxOuterMessageSize), RQStorage: backgroundStorage.RQStorage}) pb.RegisterAgentControlServer(s, &grpc.AgentControlServer{Logger: baseApp.L(), RQStorage: backgroundStorage.RQStorage}) - getMasterInfo := grpc.NewGetMasterInfoServer(config.ClusterID, baseApp.L(), statActivityLister, int(config.MaxOuterMessageSize), backgroundStorage) + getMasterInfo := grpc.NewGetMasterInfoServer(config.ClusterID, baseApp.L(), int(config.MaxOuterMessageSize), backgroundStorage) actionInfo := &grpc.ActionsServer{ClusterID: config.ClusterID, Logger: baseApp.L(), Timeout: 5 * time.Minute, BackgroundStorage: backgroundStorage} pbm.RegisterGetGPInfoServer(s, getMasterInfo) @@ -382,12 +382,12 @@ func Run(ctx context.Context, configFile string) error { metrics.YagpccMetrics.ExecutingQueryLatencies.AssignQueryGetter(rqStorage.GetQueriesStartTime) aggStorage := storage.NewConfiguredAggregatedStorage(logger, cfg) sessionsStorage := gp.NewSessionsStorage(rqStorage) - backgroundStorage := master.NewBackgroundStorage(logger, sessionsStorage, rqStorage, aggStorage) masterConnection := gp.NewConnection(baseApp.L(), &cfg.MasterConnection, nil) masterSentinel := master_sentinel.NewSentinel(baseApp.L(), masterConnection) statActivityLister := stat_activity.NewLister(baseApp.L(), masterConnection) + backgroundStorage := master.NewBackgroundStorage(logger, sessionsStorage, rqStorage, aggStorage, statActivityLister) agentApp, err := NewApp(baseApp, cfg, statActivityLister, backgroundStorage) if err != nil { @@ -515,7 +515,7 @@ func Run(ctx context.Context, configFile string) error { logger.Infof("Starting master background tasks") ctxC, ctxF := context.WithCancel(ctx) defer ctxF() - err = master.InitBG(ctxC, logger, masterSentinel, statActivityLister, cfg, backgroundStorage) + err = master.InitBG(ctxC, logger, masterSentinel, cfg, backgroundStorage) if err != nil { logger.Fatal(err.Error()) return err diff --git a/internal/grpc/actions_test.go b/internal/grpc/actions_test.go index 4d32962..c88db77 100644 --- a/internal/grpc/actions_test.go +++ b/internal/grpc/actions_test.go @@ -23,7 +23,7 @@ func newTestActionsServer(t *testing.T) *ActionsServer { rq := storage.NewRunningQueriesStorage() sessStorage := gp.NewSessionsStorage(rq) agg := storage.NewAggregatedStorage(z) - bg := master.NewBackgroundStorage(z, sessStorage, rq, agg) + bg := master.NewBackgroundStorage(z, sessStorage, rq, agg, nil) return &ActionsServer{ Logger: z, Timeout: 5 * time.Second, diff --git a/internal/grpc/get_master_info.go b/internal/grpc/get_master_info.go index e2a8e7d..667b8a7 100644 --- a/internal/grpc/get_master_info.go +++ b/internal/grpc/get_master_info.go @@ -24,20 +24,18 @@ import ( type GetMasterInfoServer struct { pbm.UnimplementedGetGPInfoServer - clusterID string - logger *zap.SugaredLogger - statActivityLister statActivityLister - maxMessageSize int - backgroundStorage *master.BackgroundStorage + clusterID string + logger *zap.SugaredLogger + maxMessageSize int + backgroundStorage *master.BackgroundStorage } -func NewGetMasterInfoServer(clusterID string, logger *zap.SugaredLogger, statActivityLister statActivityLister, maxMessageSize int, backgroundStorage *master.BackgroundStorage) *GetMasterInfoServer { +func NewGetMasterInfoServer(clusterID string, logger *zap.SugaredLogger, maxMessageSize int, backgroundStorage *master.BackgroundStorage) *GetMasterInfoServer { return &GetMasterInfoServer{ - clusterID: clusterID, - logger: logger, - statActivityLister: statActivityLister, - maxMessageSize: maxMessageSize, - backgroundStorage: backgroundStorage, + clusterID: clusterID, + logger: logger, + maxMessageSize: maxMessageSize, + backgroundStorage: backgroundStorage, } } @@ -2557,7 +2555,7 @@ func (s *GetMasterInfoServer) GetGPSessions(ctx context.Context, in *pbm.GetGPSe queryType = pbm.RunningQueryType_RQT_TOP } // refresh list of sessions - err := s.backgroundStorage.TryRefreshSessionsFromGP(ctx, s.statActivityLister, true) + err := s.backgroundStorage.TryRefreshSessionsFromGP(ctx, true) if err != nil { s.logger.Errorf("error while refreshing session list: %v", err) } diff --git a/internal/grpc/mocks_test.go b/internal/grpc/mocks_test.go index dfb6a84..19fb9ba 100644 --- a/internal/grpc/mocks_test.go +++ b/internal/grpc/mocks_test.go @@ -10,6 +10,7 @@ import ( gomock "github.com/golang/mock/gomock" gp "github.com/open-gpdb/yagpcc/internal/gp" + "github.com/open-gpdb/yagpcc/internal/gp/stat_activity" ) // MockStatActivityLister is a mock of statActivityLister interface @@ -75,3 +76,18 @@ func (mr *MockStatActivityListerMockRecorder) List(ctx interface{}) *gomock.Call mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "List", reflect.TypeOf((*MockStatActivityLister)(nil).List), ctx) } + +// ListAllSessions mocks base method +func (m *MockStatActivityLister) ListAllSessions(ctx context.Context) ([]stat_activity.SessionPid, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListAllSessions", ctx) + ret0, _ := ret[0].([]stat_activity.SessionPid) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListAllSessions indicates an expected call of ListAllSessions +func (mr *MockStatActivityListerMockRecorder) ListAllSessions(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListAllSessions", reflect.TypeOf((*MockStatActivityLister)(nil).ListAllSessions), ctx) +} diff --git a/internal/grpc/set_get_query_info_parallel_test.go b/internal/grpc/set_get_query_info_parallel_test.go index d79f2b1..c329d7a 100644 --- a/internal/grpc/set_get_query_info_parallel_test.go +++ b/internal/grpc/set_get_query_info_parallel_test.go @@ -30,7 +30,7 @@ func TestParallelSetGet(t *testing.T) { rqStorage := storage.NewRunningQueriesStorage() aggStorage := storage.NewAggregatedStorage(zLogger) sessStorage := gp.NewSessionsStorage(rqStorage) - backgroundStorage := master.NewBackgroundStorage(zLogger, sessStorage, rqStorage, aggStorage) + backgroundStorage := master.NewBackgroundStorage(zLogger, sessStorage, rqStorage, aggStorage, nil) tests := []struct { name string @@ -48,7 +48,7 @@ func TestParallelSetGet(t *testing.T) { {name: "test Get Queries", isSet: false, paramName: "ALL", ssid: 1, value: 1, cnt: 80, sleep: 0.01}, {name: "test Get Query1", isSet: false, paramName: "QUERY", ssid: 1, value: 1, cnt: 10000, sleep: 0}, } - dial := setupGRPCDialer(t, nil, backgroundStorage) + dial := setupGRPCDialer(t, backgroundStorage) ctx := context.Background() connectTimeout := 5 * time.Second diff --git a/internal/grpc/testutils_test.go b/internal/grpc/testutils_test.go index 7863138..d792110 100644 --- a/internal/grpc/testutils_test.go +++ b/internal/grpc/testutils_test.go @@ -65,7 +65,7 @@ func assertQueriesInfoResponseEqual(t *testing.T, expected *pb.GetQueriesInfoRes return utils.AssertProtoMessagesEqual(t, normalize(expected), normalize(actual)) } -func setupGRPCDialer(t *testing.T, sessionMocker *MockStatActivityLister, backgroundStorage *master.BackgroundStorage) func(context.Context, string) (net.Conn, error) { +func setupGRPCDialer(t *testing.T, backgroundStorage *master.BackgroundStorage) func(context.Context, string) (net.Conn, error) { cfg, err := config.DefaultConfig() require.NoError(t, err, "error getting default config") @@ -88,7 +88,7 @@ func setupGRPCDialer(t *testing.T, sessionMocker *MockStatActivityLister, backgr pb.RegisterSetQueryInfoServer(server, &grpc.SetQueryInfoServer{Logger: zLogger, UpdateSessionMetrics: true, RQStorage: backgroundStorage.RQStorage, SessionsStorage: backgroundStorage.SessionStorage}) pb.RegisterGetQueryInfoServer(server, &grpc.GetQueryInfoServer{Logger: zLogger, MaxMessageSize: 100 * 1024 * 1024, RQStorage: backgroundStorage.RQStorage}) pb.RegisterAgentControlServer(server, &grpc.AgentControlServer{Logger: zLogger, RQStorage: backgroundStorage.RQStorage}) - pbm.RegisterGetGPInfoServer(server, grpc.NewGetMasterInfoServer("test", zLogger, sessionMocker, 100*1024*1024, backgroundStorage)) + pbm.RegisterGetGPInfoServer(server, grpc.NewGetMasterInfoServer("test", zLogger, 100*1024*1024, backgroundStorage)) go func() { if err := server.Serve(listener); err != nil { @@ -129,6 +129,7 @@ func setupGRPCClientSet(t *testing.T, sessionMocker *MockStatActivityLister) (*g ctrl := gomock.NewController(t) sessionMocker = NewMockStatActivityLister(ctrl) sessionMocker.EXPECT().List(gomock.Any()).AnyTimes() + sessionMocker.EXPECT().ListAllSessions(gomock.Any()).AnyTimes() } file, err := os.Create("trace.log") @@ -137,11 +138,11 @@ func setupGRPCClientSet(t *testing.T, sessionMocker *MockStatActivityLister) (*g rqStorage := storage.NewRunningQueriesStorage() sessStorage := gp.NewSessionsStorage(rqStorage) aggStorage := storage.NewAggregatedStorage(zLogger) - backgroundStorage := master.NewBackgroundStorage(zLogger, sessStorage, rqStorage, aggStorage) + backgroundStorage := master.NewBackgroundStorage(zLogger, sessStorage, rqStorage, aggStorage, sessionMocker) conn, err := gogrpc.NewClient( "localhost", - gogrpc.WithContextDialer(setupGRPCDialer(t, sessionMocker, backgroundStorage)), + gogrpc.WithContextDialer(setupGRPCDialer(t, backgroundStorage)), gogrpc.WithTransportCredentials(insecure.NewCredentials()), ) diff --git a/internal/master/background.go b/internal/master/background.go index 56d2b74..8771d7b 100644 --- a/internal/master/background.go +++ b/internal/master/background.go @@ -3,7 +3,6 @@ package master import ( "context" "fmt" - "net" "os" "sort" "sync" @@ -11,8 +10,6 @@ import ( "golang.org/x/sync/errgroup" "google.golang.org/grpc" - "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/credentials/insecure" pbm "github.com/open-gpdb/yagpcc/api/proto/agent_master" pb "github.com/open-gpdb/yagpcc/api/proto/agent_segment" @@ -38,67 +35,24 @@ type ( segmentMap map[string]*segmentAddr BackgroundStorage struct { - l *zap.SugaredLogger - SessionStorage *gp.SessionsStorage - AggStorage *storage.AggregatedStorage - RQStorage *storage.RunningQueriesStorage + l *zap.SugaredLogger + SessionStorage *gp.SessionsStorage + AggStorage *storage.AggregatedStorage + RQStorage *storage.RunningQueriesStorage + StatActivityLister statActivityLister } ) var ( - segChan chan segmentAddr - segConnections map[string]*grpc.ClientConn = make(map[string]*grpc.ClientConn) - segConnectionLock sync.Mutex - segCount int - segCountLock sync.Mutex + segChan chan segmentAddr + segCount int + segCountLock sync.Mutex ) func getSegAddr(hostname string, portn uint32) string { return fmt.Sprintf("%s:%d", hostname, portn) } -func getGrpcClientConnection(ctx context.Context, hostname string, portn uint32, segConnectTimeoutSec float64) (*grpc.ClientConn, error) { - var err error - segConnectionLock.Lock() - defer segConnectionLock.Unlock() - conn, ok := segConnections[hostname] - if ok { - if conn.GetState() == connectivity.Ready { - return conn, nil - } - } - connectTimeout := time.Second * time.Duration(segConnectTimeoutSec) - if portn > 0 { - conn, err = grpc.NewClient( - getSegAddr(hostname, portn), - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithConnectParams(grpc.ConnectParams{ - MinConnectTimeout: connectTimeout, - }), - ) - if err != nil { - return nil, err - } - } else { - conn, err = grpc.NewClient( - hostname, - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) { - var d net.Dialer - return d.DialContext(ctx, "unix", addr) - }), - grpc.WithConnectParams(grpc.ConnectParams{ - MinConnectTimeout: connectTimeout, - }), - ) - if err != nil { - return nil, err - } - } - segConnections[hostname] = conn - return conn, nil -} - func (bs *BackgroundStorage) SendSegmentRefreshMessages(ctx context.Context, pullRateSec float64, configCacheDurability time.Duration, portn uint32, customSegmentList *config.SegmentList) error { durationBetweenLoop := time.Second * time.Duration(pullRateSec) @@ -358,10 +312,9 @@ func queryCompleted(qKey *storage.QueryKey, qVal *storage.RunningQuery, segmentG func (bs *BackgroundStorage) TryRefreshSessionsFromGP( ctx context.Context, - statActivityLister statActivityLister, clearDeletedSessions bool, ) error { - newSesList, err := statActivityLister.List(ctx) + newSesList, err := bs.StatActivityLister.List(ctx) if err != nil { return fmt.Errorf("error getting sessions: %w", err) } @@ -458,10 +411,7 @@ func (bs *BackgroundStorage) ClearCompletedQueries(ctx context.Context, return nil } -func (bs *BackgroundStorage) RefreshSessions(ctx context.Context, - statActivityLister statActivityLister, - sessionRefreshInterval time.Duration, - clearDeletedSessions bool) error { +func (bs *BackgroundStorage) RefreshSessions(ctx context.Context, sessionRefreshInterval time.Duration, clearDeletedSessions bool) error { for { currTime := time.Now() nextTime := currTime.Truncate(sessionRefreshInterval).Add(sessionRefreshInterval) @@ -471,7 +421,7 @@ func (bs *BackgroundStorage) RefreshSessions(ctx context.Context, return fmt.Errorf("done context with %v", ctx.Err()) default: bs.l.Info("Refresh session List") - err := bs.TryRefreshSessionsFromGP(ctx, statActivityLister, clearDeletedSessions) + err := bs.TryRefreshSessionsFromGP(ctx, clearDeletedSessions) if err != nil { bs.l.Errorf("fail to refresh session list %v", err) return err @@ -528,12 +478,13 @@ func InitConnection(ctx context.Context, l *zap.SugaredLogger, cfg *config.Confi return nil } -func NewBackgroundStorage(l *zap.SugaredLogger, sessionStorage *gp.SessionsStorage, rqStorage *storage.RunningQueriesStorage, aggStorage *storage.AggregatedStorage) *BackgroundStorage { +func NewBackgroundStorage(l *zap.SugaredLogger, sessionStorage *gp.SessionsStorage, rqStorage *storage.RunningQueriesStorage, aggStorage *storage.AggregatedStorage, sActivityLister statActivityLister) *BackgroundStorage { return &BackgroundStorage{ - l: l, - SessionStorage: sessionStorage, - AggStorage: aggStorage, - RQStorage: rqStorage, + l: l, + SessionStorage: sessionStorage, + AggStorage: aggStorage, + RQStorage: rqStorage, + StatActivityLister: sActivityLister, } } @@ -541,7 +492,6 @@ func InitBG( ctx context.Context, l *zap.SugaredLogger, masterSentinel masterSentinel, - statActivityLister statActivityLister, cfg *config.Config, backgroundStorage *BackgroundStorage, ) error { @@ -568,7 +518,7 @@ func InitBG( return nil }) - if err = statActivityLister.Start(ctx); err != nil { + if err = backgroundStorage.StatActivityLister.Start(ctx); err != nil { return fmt.Errorf("error starting stat activity lister") } @@ -602,7 +552,7 @@ func InitBG( }, ) errG.Go(func() error { - err := backgroundStorage.RefreshSessions(ctxI, statActivityLister, cfg.SessionRefreshInterval, cfg.ClearDeletedSessions) + err := backgroundStorage.RefreshSessions(ctxI, cfg.SessionRefreshInterval, cfg.ClearDeletedSessions) l.Errorf("got %v refresh session and queries", err) return err }, @@ -615,7 +565,7 @@ func InitBG( ) err = errG.Wait() if err != nil { - statActivityLister.Stop() + backgroundStorage.StatActivityLister.Stop() l.Errorf("Fail in background precesses - done work with %v", err) return err } diff --git a/internal/master/deps.go b/internal/master/deps.go index c56b77e..d0db59c 100644 --- a/internal/master/deps.go +++ b/internal/master/deps.go @@ -4,12 +4,14 @@ import ( "context" "github.com/open-gpdb/yagpcc/internal/gp" + "github.com/open-gpdb/yagpcc/internal/gp/stat_activity" ) type statActivityLister interface { Start(ctx context.Context) error Stop() List(ctx context.Context) ([]*gp.GpStatActivity, error) + ListAllSessions(ctx context.Context) ([]stat_activity.SessionPid, error) } type masterSentinel interface { diff --git a/internal/master/procfs.go b/internal/master/procfs.go new file mode 100644 index 0000000..a045e02 --- /dev/null +++ b/internal/master/procfs.go @@ -0,0 +1,114 @@ +package master + +import ( + "context" + "time" + + "google.golang.org/grpc" + + "github.com/open-gpdb/yagpcc/internal/gp/stat_activity" + "github.com/open-gpdb/yagpcc/internal/storage" + + "github.com/alitto/pond" + + pb "github.com/open-gpdb/yagpcc/api/proto/agent_segment" +) + +const ( + JobsPerQuery = 100 + maxMsgSize = 4 * 1024 * 1024 +) + +type ( + hostJobMap = map[string][]stat_activity.SessionPid +) + +func getJobsMap(sessions []stat_activity.SessionPid) hostJobMap { + hostJobMap := make(hostJobMap) + // make work for each host + for _, process := range sessions { + segHost := storage.GetHostnameForSegindex(int32(process.GpSegmentId)) + jobList, ok := hostJobMap[segHost] + if !ok { + jobList = make([]stat_activity.SessionPid, 0) + } + jobList = append(jobList, stat_activity.SessionPid{ + GpSegmentId: process.GpSegmentId, + Pid: process.Pid, + SessId: process.SessId, + }) + hostJobMap[segHost] = jobList + } + return hostJobMap +} + +func processProcfsRequests(ctx context.Context, hostname string, portn uint32, gatherTimeout time.Duration, reqs []stat_activity.SessionPid) error { + grpcConn, err := getGrpcClientConnection(ctx, hostname, portn, gatherTimeout.Seconds()) + if err != nil { + return err + } + msgReq := &pb.GetPidProcInfoReq{ + SegmentProcess: make([]*pb.SegmentProcess, 0), + } + for _, req := range reqs { + select { + case <-ctx.Done(): + return nil + default: + msgReq.SegmentProcess = append(msgReq.SegmentProcess, &pb.SegmentProcess{ + GpSegmentId: int64(req.GpSegmentId), + Pid: int64(req.Pid), + SessId: int64(req.SessId), + }) + } + } + cGet := pb.NewGetQueryInfoClient(grpcConn) + ctxTimeout, ctxCancel := context.WithTimeout(ctx, gatherTimeout) + defer ctxCancel() + maxSizeOption := grpc.MaxCallRecvMsgSize(maxMsgSize) + _, errGet := cGet.GetPidProcStat(ctxTimeout, msgReq, maxSizeOption) + if errGet != nil { + return errGet + } + return nil +} + +func (bs *BackgroundStorage) GatherProcfsStat(ctx context.Context, nPullers int, portn uint32, gatherTimeout time.Duration) error { + bs.l.Debug("GatherProcfsStat") + sessions, err := bs.StatActivityLister.ListAllSessions(ctx) + if err != nil { + return err + } + hostJobMap := getJobsMap(sessions) + + pool := pond.New(nPullers, nPullers*2) + defer pool.StopAndWait() + + ctxT, ctxTC := context.WithTimeout(ctx, gatherTimeout) + defer ctxTC() + + group, ctxG := pool.GroupContext(ctxT) + + for hostname, processes := range hostJobMap { + jobProcesses := make([]stat_activity.SessionPid, 0) + for _, process := range processes { + jobProcesses = append(jobProcesses, process) + if len(jobProcesses) > JobsPerQuery { + group.Submit(func() error { + err := processProcfsRequests(ctxG, hostname, portn, gatherTimeout, jobProcesses) + return err + }, + ) + jobProcesses = make([]stat_activity.SessionPid, 0) + } + } + group.Submit(func() error { + err := processProcfsRequests(ctxG, hostname, portn, gatherTimeout, jobProcesses) + return err + }, + ) + } + + err = group.Wait() + return err +} diff --git a/internal/master/procfs_test.go b/internal/master/procfs_test.go new file mode 100644 index 0000000..1b0a355 --- /dev/null +++ b/internal/master/procfs_test.go @@ -0,0 +1,481 @@ +package master + +import ( + "context" + "fmt" + "log" + "net" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/test/bufconn" + + pb "github.com/open-gpdb/yagpcc/api/proto/agent_segment" + "github.com/open-gpdb/yagpcc/internal/gp" + "github.com/open-gpdb/yagpcc/internal/gp/stat_activity" + "github.com/open-gpdb/yagpcc/internal/storage" +) + +// --- mock statActivityLister --- + +type mockStatActivityLister struct { + sessions []stat_activity.SessionPid + sessionsErr error + listCalled bool +} + +func (m *mockStatActivityLister) Start(context.Context) error { return nil } +func (m *mockStatActivityLister) Stop() {} +func (m *mockStatActivityLister) List(context.Context) ([]*gp.GpStatActivity, error) { + return nil, nil +} +func (m *mockStatActivityLister) ListAllSessions(context.Context) ([]stat_activity.SessionPid, error) { + m.listCalled = true + return m.sessions, m.sessionsErr +} + +// --- fake gRPC server for GetPidProcStat --- + +type fakeProcStatServer struct { + pb.UnimplementedGetQueryInfoServer + mu sync.Mutex + called bool + lastReq *pb.GetPidProcInfoReq +} + +func (s *fakeProcStatServer) GetPidProcStat(_ context.Context, req *pb.GetPidProcInfoReq) (*pb.GetPidProcInfoResponse, error) { + s.mu.Lock() + s.called = true + s.lastReq = req + s.mu.Unlock() + return &pb.GetPidProcInfoResponse{}, nil +} + +func (s *fakeProcStatServer) snapshot() (bool, *pb.GetPidProcInfoReq) { + s.mu.Lock() + defer s.mu.Unlock() + return s.called, s.lastReq +} + +func (s *fakeProcStatServer) GetMetricQueries(_ context.Context, _ *pb.GetQueriesInfoReq) (*pb.GetQueriesInfoResponse, error) { + return &pb.GetQueriesInfoResponse{}, nil +} + +type failingProcStatServer struct { + pb.UnimplementedGetQueryInfoServer +} + +func (s *failingProcStatServer) GetPidProcStat(context.Context, *pb.GetPidProcInfoReq) (*pb.GetPidProcInfoResponse, error) { + return nil, fmt.Errorf("simulated gRPC error") +} + +func (s *failingProcStatServer) GetMetricQueries(context.Context, *pb.GetQueriesInfoReq) (*pb.GetQueriesInfoResponse, error) { + return &pb.GetQueriesInfoResponse{}, nil +} + +// setupBufconnServer starts a gRPC server on a bufconn listener and returns +// the listener. The caller must register services on the returned server before +// calling this, or pass a pre-configured server. +func setupBufconnServer(t *testing.T, srv pb.GetQueryInfoServer) *bufconn.Listener { + t.Helper() + lis := bufconn.Listen(1024 * 1024) + s := grpc.NewServer() + pb.RegisterGetQueryInfoServer(s, srv) + go func() { + if err := s.Serve(lis); err != nil { + log.Printf("bufconn server exited: %v", err) + } + }() + t.Cleanup(func() { s.Stop() }) + return lis +} + +func dialBufconn(t *testing.T, lis *bufconn.Listener) *grpc.ClientConn { + t.Helper() + conn, err := grpc.NewClient( + "passthrough:///bufconn", + grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { + return lis.Dial() + }), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + t.Cleanup(func() { conn.Close() }) + return conn +} + +func newTestLogger() *zap.SugaredLogger { + cfg := zap.NewDevelopmentConfig() + cfg.Level = zap.NewAtomicLevelAt(zap.WarnLevel) + l, _ := cfg.Build() + return l.Sugar() +} + +// ============================================================ +// Tests for getJobsMap +// ============================================================ + +func TestGetJobsMap_EmptyInput(t *testing.T) { + result := getJobsMap(nil) + assert.NotNil(t, result) + assert.Empty(t, result) + + result2 := getJobsMap([]stat_activity.SessionPid{}) + assert.NotNil(t, result2) + assert.Empty(t, result2) +} + +func TestGetJobsMap_SingleHost(t *testing.T) { + storage.SetHostnameForSegindex(10, "host-a") + + sessions := []stat_activity.SessionPid{ + {GpSegmentId: 10, Pid: 100, SessId: 1}, + {GpSegmentId: 10, Pid: 200, SessId: 2}, + } + + result := getJobsMap(sessions) + + // The map should contain an entry for "host-a" + _, exists := result["host-a"] + assert.True(t, exists, "expected key 'host-a' in hostJobMap") +} + +func TestGetJobsMap_MultipleHosts(t *testing.T) { + storage.SetHostnameForSegindex(20, "host-b") + storage.SetHostnameForSegindex(21, "host-c") + + sessions := []stat_activity.SessionPid{ + {GpSegmentId: 20, Pid: 100, SessId: 1}, + {GpSegmentId: 21, Pid: 200, SessId: 2}, + {GpSegmentId: 20, Pid: 300, SessId: 3}, + } + + result := getJobsMap(sessions) + + // Should have entries for both hosts + assert.Contains(t, result, "host-b") + assert.Contains(t, result, "host-c") +} + +func TestGetJobsMap_UnknownSegindex(t *testing.T) { + // When segindex is not in the config storage, GetHostnameForSegindex + // returns the string representation of the segindex. + sessions := []stat_activity.SessionPid{ + {GpSegmentId: 9999, Pid: 100, SessId: 1}, + } + + result := getJobsMap(sessions) + _, exists := result["9999"] + assert.True(t, exists, "expected key '9999' for unknown segindex") +} + +// ============================================================ +// Tests for processProcfsRequests +// ============================================================ + +func TestProcessProcfsRequests_Success(t *testing.T) { + fakeSrv := &fakeProcStatServer{} + lis := setupBufconnServer(t, fakeSrv) + + // Inject the bufconn connection into the global connection cache + conn := dialBufconn(t, lis) + hostname := fmt.Sprintf("test-procfs-success-%d", time.Now().UnixNano()) + segConnectionLock.Lock() + segConnections[hostname] = conn + segConnectionLock.Unlock() + t.Cleanup(func() { + segConnectionLock.Lock() + delete(segConnections, hostname) + segConnectionLock.Unlock() + }) + + reqs := []stat_activity.SessionPid{ + {GpSegmentId: 1, Pid: 100, SessId: 10}, + {GpSegmentId: 2, Pid: 200, SessId: 20}, + } + + ctx := context.Background() + err := processProcfsRequests(ctx, hostname, 0, 5*time.Second, reqs) + require.NoError(t, err) + called, lastReq := fakeSrv.snapshot() + assert.True(t, called, "expected GetPidProcStat to be called") + require.NotNil(t, lastReq) + assert.Len(t, lastReq.SegmentProcess, 2) + + // Verify the proto message fields + sp0 := lastReq.SegmentProcess[0] + assert.Equal(t, int64(1), sp0.GpSegmentId) + assert.Equal(t, int64(100), sp0.Pid) + assert.Equal(t, int64(10), sp0.SessId) + + sp1 := lastReq.SegmentProcess[1] + assert.Equal(t, int64(2), sp1.GpSegmentId) + assert.Equal(t, int64(200), sp1.Pid) + assert.Equal(t, int64(20), sp1.SessId) +} + +func TestProcessProcfsRequests_GrpcError(t *testing.T) { + failSrv := &failingProcStatServer{} + lis := setupBufconnServer(t, failSrv) + + conn := dialBufconn(t, lis) + hostname := fmt.Sprintf("test-procfs-fail-%d", time.Now().UnixNano()) + segConnectionLock.Lock() + segConnections[hostname] = conn + segConnectionLock.Unlock() + t.Cleanup(func() { + segConnectionLock.Lock() + delete(segConnections, hostname) + segConnectionLock.Unlock() + }) + + reqs := []stat_activity.SessionPid{ + {GpSegmentId: 1, Pid: 100, SessId: 10}, + } + + ctx := context.Background() + err := processProcfsRequests(ctx, hostname, 0, 5*time.Second, reqs) + require.Error(t, err) + assert.Contains(t, err.Error(), "simulated gRPC error") +} + +func TestProcessProcfsRequests_CancelledContext(t *testing.T) { + fakeSrv := &fakeProcStatServer{} + lis := setupBufconnServer(t, fakeSrv) + + conn := dialBufconn(t, lis) + hostname := fmt.Sprintf("test-procfs-cancel-%d", time.Now().UnixNano()) + segConnectionLock.Lock() + segConnections[hostname] = conn + segConnectionLock.Unlock() + t.Cleanup(func() { + segConnectionLock.Lock() + delete(segConnections, hostname) + segConnectionLock.Unlock() + }) + + reqs := []stat_activity.SessionPid{ + {GpSegmentId: 1, Pid: 100, SessId: 10}, + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // cancel immediately + + // With a cancelled context, processProcfsRequests should skip building + // the request body (due to select on ctx.Done()) and return nil. + err := processProcfsRequests(ctx, hostname, 0, 5*time.Second, reqs) + // The function returns nil when context is cancelled during request building, + // but may return an error from the gRPC call if the request was already built. + // Either outcome is acceptable with a cancelled context. + if err != nil { + assert.ErrorIs(t, ctx.Err(), context.Canceled) + } +} + +func TestProcessProcfsRequests_EmptyRequests(t *testing.T) { + fakeSrv := &fakeProcStatServer{} + lis := setupBufconnServer(t, fakeSrv) + + conn := dialBufconn(t, lis) + hostname := fmt.Sprintf("test-procfs-empty-%d", time.Now().UnixNano()) + segConnectionLock.Lock() + segConnections[hostname] = conn + segConnectionLock.Unlock() + t.Cleanup(func() { + segConnectionLock.Lock() + delete(segConnections, hostname) + segConnectionLock.Unlock() + }) + + ctx := context.Background() + err := processProcfsRequests(ctx, hostname, 0, 5*time.Second, nil) + require.NoError(t, err) + called, lastReq := fakeSrv.snapshot() + assert.True(t, called, "GetPidProcStat should still be called with empty segment list") + assert.Empty(t, lastReq.SegmentProcess) +} + +// ============================================================ +// Tests for GatherProcfsStat +// ============================================================ + +func TestGatherProcfsStat_ListAllSessionsError(t *testing.T) { + mock := &mockStatActivityLister{ + sessionsErr: fmt.Errorf("db connection failed"), + } + bs := &BackgroundStorage{ + l: newTestLogger(), + StatActivityLister: mock, + } + + err := bs.GatherProcfsStat(context.Background(), 2, 50051, 5*time.Second) + require.Error(t, err) + assert.Contains(t, err.Error(), "db connection failed") + assert.True(t, mock.listCalled) +} + +func TestGatherProcfsStat_EmptySessions(t *testing.T) { + mock := &mockStatActivityLister{ + sessions: []stat_activity.SessionPid{}, + } + bs := &BackgroundStorage{ + l: newTestLogger(), + StatActivityLister: mock, + } + + err := bs.GatherProcfsStat(context.Background(), 2, 50051, 5*time.Second) + require.NoError(t, err) + assert.True(t, mock.listCalled) +} + +func TestGatherProcfsStat_WithSessions(t *testing.T) { + // Set up a fake gRPC server + fakeSrv := &fakeProcStatServer{} + lis := setupBufconnServer(t, fakeSrv) + + conn := dialBufconn(t, lis) + hostname := fmt.Sprintf("test-gather-%d", time.Now().UnixNano()) + + // Register the hostname in the segment config + storage.SetHostnameForSegindex(30, hostname) + + // Inject the bufconn connection + segConnectionLock.Lock() + segConnections[hostname] = conn + segConnectionLock.Unlock() + t.Cleanup(func() { + segConnectionLock.Lock() + delete(segConnections, hostname) + segConnectionLock.Unlock() + }) + + mock := &mockStatActivityLister{ + sessions: []stat_activity.SessionPid{ + {GpSegmentId: 30, Pid: 100, SessId: 1}, + {GpSegmentId: 30, Pid: 200, SessId: 2}, + }, + } + bs := &BackgroundStorage{ + l: newTestLogger(), + StatActivityLister: mock, + } + + err := bs.GatherProcfsStat(context.Background(), 2, 0, 5*time.Second) + require.NoError(t, err) + assert.True(t, mock.listCalled) +} + +func TestGatherProcfsStat_ContextCancelled(t *testing.T) { + mock := &mockStatActivityLister{ + sessions: []stat_activity.SessionPid{ + {GpSegmentId: 40, Pid: 100, SessId: 1}, + }, + } + bs := &BackgroundStorage{ + l: newTestLogger(), + StatActivityLister: mock, + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // cancel immediately + + // With a cancelled context, the timeout context creation will produce + // an already-done context, so the pool tasks should handle it gracefully. + err := bs.GatherProcfsStat(ctx, 2, 50051, 5*time.Second) + // The error may be nil (if tasks detect cancellation early) or non-nil + // (if the gRPC call fails due to cancelled context). Both are acceptable. + _ = err +} + +func TestGatherProcfsStat_GrpcFailure(t *testing.T) { + failSrv := &failingProcStatServer{} + lis := setupBufconnServer(t, failSrv) + + conn := dialBufconn(t, lis) + hostname := fmt.Sprintf("test-gather-fail-%d", time.Now().UnixNano()) + + storage.SetHostnameForSegindex(50, hostname) + + segConnectionLock.Lock() + segConnections[hostname] = conn + segConnectionLock.Unlock() + t.Cleanup(func() { + segConnectionLock.Lock() + delete(segConnections, hostname) + segConnectionLock.Unlock() + }) + + mock := &mockStatActivityLister{ + sessions: []stat_activity.SessionPid{ + {GpSegmentId: 50, Pid: 100, SessId: 1}, + }, + } + bs := &BackgroundStorage{ + l: newTestLogger(), + StatActivityLister: mock, + } + + err := bs.GatherProcfsStat(context.Background(), 2, 0, 5*time.Second) + require.Error(t, err) + assert.Contains(t, err.Error(), "simulated gRPC error") +} + +func TestGatherProcfsStat_ManySessionsBatching(t *testing.T) { + // Create more than JobsPerQuery sessions to verify batching logic + fakeSrv := &fakeProcStatServer{} + lis := setupBufconnServer(t, fakeSrv) + + conn := dialBufconn(t, lis) + hostname := fmt.Sprintf("test-gather-batch-%d", time.Now().UnixNano()) + + storage.SetHostnameForSegindex(60, hostname) + + segConnectionLock.Lock() + segConnections[hostname] = conn + segConnectionLock.Unlock() + t.Cleanup(func() { + segConnectionLock.Lock() + delete(segConnections, hostname) + segConnectionLock.Unlock() + }) + + // Create JobsPerQuery + 5 sessions to trigger at least 2 batches + sessions := make([]stat_activity.SessionPid, 0, JobsPerQuery+5) + for i := 0; i < JobsPerQuery+5; i++ { + sessions = append(sessions, stat_activity.SessionPid{ + GpSegmentId: 60, + Pid: 100 + i, + SessId: i + 1, + }) + } + + mock := &mockStatActivityLister{ + sessions: sessions, + } + bs := &BackgroundStorage{ + l: newTestLogger(), + StatActivityLister: mock, + } + + err := bs.GatherProcfsStat(context.Background(), 4, 0, 10*time.Second) + require.NoError(t, err) + assert.True(t, mock.listCalled) + // The fake server should have been called (at least once for the batches) + called, _ := fakeSrv.snapshot() + assert.True(t, called) +} + +// ============================================================ +// Tests for constants +// ============================================================ + +func TestConstants(t *testing.T) { + assert.Equal(t, 100, JobsPerQuery) + assert.Equal(t, 4*1024*1024, maxMsgSize) +} diff --git a/internal/master/utils.go b/internal/master/utils.go new file mode 100644 index 0000000..b615275 --- /dev/null +++ b/internal/master/utils.go @@ -0,0 +1,59 @@ +package master + +import ( + "context" + "net" + "sync" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" +) + +var ( + segConnections map[string]*grpc.ClientConn = make(map[string]*grpc.ClientConn) + segConnectionLock sync.Mutex +) + +func getGrpcClientConnection(ctx context.Context, hostname string, portn uint32, segConnectTimeoutSec float64) (*grpc.ClientConn, error) { + var err error + segConnectionLock.Lock() + defer segConnectionLock.Unlock() + conn, ok := segConnections[hostname] + if ok { + if conn.GetState() != connectivity.Shutdown { + return conn, nil + } + } + connectTimeout := time.Second * time.Duration(segConnectTimeoutSec) + if portn > 0 { + conn, err = grpc.NewClient( + getSegAddr(hostname, portn), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithConnectParams(grpc.ConnectParams{ + MinConnectTimeout: connectTimeout, + }), + ) + if err != nil { + return nil, err + } + } else { + conn, err = grpc.NewClient( + hostname, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) { + var d net.Dialer + return d.DialContext(ctx, "unix", addr) + }), + grpc.WithConnectParams(grpc.ConnectParams{ + MinConnectTimeout: connectTimeout, + }), + ) + if err != nil { + return nil, err + } + } + segConnections[hostname] = conn + return conn, nil +} From 8445b59a030f6b299e81e98b50dbb66e90988329 Mon Sep 17 00:00:00 2001 From: Leonid Borchuk Date: Tue, 28 Apr 2026 16:03:44 +0300 Subject: [PATCH 02/11] Fix lint --- internal/grpc/deps.go | 2 +- internal/master/procfs_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/grpc/deps.go b/internal/grpc/deps.go index d53fa65..0d90214 100644 --- a/internal/grpc/deps.go +++ b/internal/grpc/deps.go @@ -8,7 +8,7 @@ import ( "github.com/open-gpdb/yagpcc/internal/gp" ) -type statActivityLister interface { +type statActivityLister interface { //nolint:unused // used by go:generate mockgen Start(ctx context.Context) error Stop() List(ctx context.Context) ([]*gp.GpStatActivity, error) diff --git a/internal/master/procfs_test.go b/internal/master/procfs_test.go index 1b0a355..00f53df 100644 --- a/internal/master/procfs_test.go +++ b/internal/master/procfs_test.go @@ -106,7 +106,7 @@ func dialBufconn(t *testing.T, lis *bufconn.Listener) *grpc.ClientConn { grpc.WithTransportCredentials(insecure.NewCredentials()), ) require.NoError(t, err) - t.Cleanup(func() { conn.Close() }) + t.Cleanup(func() { _ = conn.Close() }) return conn } From 85e6cca3f7d0a9f424ba4414f0df14a17991cb21 Mon Sep 17 00:00:00 2001 From: Leonid <63977577+leborchuk@users.noreply.github.com> Date: Tue, 28 Apr 2026 16:17:09 +0300 Subject: [PATCH 03/11] Update internal/master/background.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- internal/master/background.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/master/background.go b/internal/master/background.go index 8771d7b..856acfd 100644 --- a/internal/master/background.go +++ b/internal/master/background.go @@ -518,8 +518,11 @@ func InitBG( return nil }) + if backgroundStorage.StatActivityLister == nil { + return fmt.Errorf("stat activity lister is nil") + } if err = backgroundStorage.StatActivityLister.Start(ctx); err != nil { - return fmt.Errorf("error starting stat activity lister") + return fmt.Errorf("error starting stat activity lister: %w", err) } errG.Go(func() error { From 4d597d6b0856673b3a17f16a47dc9932b904b71d Mon Sep 17 00:00:00 2001 From: Leonid <63977577+leborchuk@users.noreply.github.com> Date: Tue, 28 Apr 2026 16:19:28 +0300 Subject: [PATCH 04/11] Update internal/master/procfs.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- internal/master/procfs.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/internal/master/procfs.go b/internal/master/procfs.go index a045e02..0d2d3ea 100644 --- a/internal/master/procfs.go +++ b/internal/master/procfs.go @@ -90,20 +90,23 @@ func (bs *BackgroundStorage) GatherProcfsStat(ctx context.Context, nPullers int, group, ctxG := pool.GroupContext(ctxT) for hostname, processes := range hostJobMap { + jobHostname := hostname jobProcesses := make([]stat_activity.SessionPid, 0) for _, process := range processes { jobProcesses = append(jobProcesses, process) if len(jobProcesses) > JobsPerQuery { + batch := append([]stat_activity.SessionPid(nil), jobProcesses...) group.Submit(func() error { - err := processProcfsRequests(ctxG, hostname, portn, gatherTimeout, jobProcesses) + err := processProcfsRequests(ctxG, jobHostname, portn, gatherTimeout, batch) return err }, ) jobProcesses = make([]stat_activity.SessionPid, 0) } } + batch := append([]stat_activity.SessionPid(nil), jobProcesses...) group.Submit(func() error { - err := processProcfsRequests(ctxG, hostname, portn, gatherTimeout, jobProcesses) + err := processProcfsRequests(ctxG, jobHostname, portn, gatherTimeout, batch) return err }, ) From ad10bd4f6c9d4a97db51ff99247630f35067e47c Mon Sep 17 00:00:00 2001 From: Leonid <63977577+leborchuk@users.noreply.github.com> Date: Tue, 28 Apr 2026 16:20:34 +0300 Subject: [PATCH 05/11] Update internal/master/procfs.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- internal/master/procfs.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/internal/master/procfs.go b/internal/master/procfs.go index 0d2d3ea..f2cee68 100644 --- a/internal/master/procfs.go +++ b/internal/master/procfs.go @@ -92,24 +92,29 @@ func (bs *BackgroundStorage) GatherProcfsStat(ctx context.Context, nPullers int, for hostname, processes := range hostJobMap { jobHostname := hostname jobProcesses := make([]stat_activity.SessionPid, 0) + for _, process := range processes { + host := hostname + jobProcesses := make([]stat_activity.SessionPid, 0) for _, process := range processes { jobProcesses = append(jobProcesses, process) if len(jobProcesses) > JobsPerQuery { batch := append([]stat_activity.SessionPid(nil), jobProcesses...) group.Submit(func() error { - err := processProcfsRequests(ctxG, jobHostname, portn, gatherTimeout, batch) + err := processProcfsRequests(ctxG, host, portn, gatherTimeout, batch) return err }, ) jobProcesses = make([]stat_activity.SessionPid, 0) } } - batch := append([]stat_activity.SessionPid(nil), jobProcesses...) - group.Submit(func() error { - err := processProcfsRequests(ctxG, jobHostname, portn, gatherTimeout, batch) - return err - }, - ) + if len(jobProcesses) > 0 { + batch := append([]stat_activity.SessionPid(nil), jobProcesses...) + group.Submit(func() error { + err := processProcfsRequests(ctxG, host, portn, gatherTimeout, batch) + return err + }, + ) + } } err = group.Wait() From 4cf0670d93f684970b1a67e6d60aa20172dab603 Mon Sep 17 00:00:00 2001 From: Leonid <63977577+leborchuk@users.noreply.github.com> Date: Tue, 28 Apr 2026 16:20:52 +0300 Subject: [PATCH 06/11] Update internal/master/procfs.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- internal/master/procfs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/master/procfs.go b/internal/master/procfs.go index f2cee68..6eaac88 100644 --- a/internal/master/procfs.go +++ b/internal/master/procfs.go @@ -53,7 +53,7 @@ func processProcfsRequests(ctx context.Context, hostname string, portn uint32, g for _, req := range reqs { select { case <-ctx.Done(): - return nil + return ctx.Err() default: msgReq.SegmentProcess = append(msgReq.SegmentProcess, &pb.SegmentProcess{ GpSegmentId: int64(req.GpSegmentId), From 6e7209b825a5b9e65562ec5b329b2f524add96e6 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 28 Apr 2026 13:29:02 +0000 Subject: [PATCH 07/11] Fix nPullers validation, remove hardcoded maxMsgSize constant, fix broken loop structure in GatherProcfsStat Agent-Logs-Url: https://github.com/open-gpdb/yagpcc/sessions/3c612e77-c1ba-4bbc-9773-cc035b3b8310 Co-authored-by: leborchuk <63977577+leborchuk@users.noreply.github.com> --- internal/master/procfs.go | 37 +++++++++++++----------------- internal/master/procfs_test.go | 42 ++++++++++++++++++++++------------ 2 files changed, 44 insertions(+), 35 deletions(-) diff --git a/internal/master/procfs.go b/internal/master/procfs.go index 6eaac88..b090203 100644 --- a/internal/master/procfs.go +++ b/internal/master/procfs.go @@ -2,6 +2,7 @@ package master import ( "context" + "fmt" "time" "google.golang.org/grpc" @@ -16,7 +17,6 @@ import ( const ( JobsPerQuery = 100 - maxMsgSize = 4 * 1024 * 1024 ) type ( @@ -42,7 +42,7 @@ func getJobsMap(sessions []stat_activity.SessionPid) hostJobMap { return hostJobMap } -func processProcfsRequests(ctx context.Context, hostname string, portn uint32, gatherTimeout time.Duration, reqs []stat_activity.SessionPid) error { +func processProcfsRequests(ctx context.Context, hostname string, portn uint32, gatherTimeout time.Duration, maxMsgSize int, reqs []stat_activity.SessionPid) error { grpcConn, err := getGrpcClientConnection(ctx, hostname, portn, gatherTimeout.Seconds()) if err != nil { return err @@ -73,7 +73,10 @@ func processProcfsRequests(ctx context.Context, hostname string, portn uint32, g return nil } -func (bs *BackgroundStorage) GatherProcfsStat(ctx context.Context, nPullers int, portn uint32, gatherTimeout time.Duration) error { +func (bs *BackgroundStorage) GatherProcfsStat(ctx context.Context, nPullers int, portn uint32, gatherTimeout time.Duration, maxMsgSize int) error { + if nPullers <= 0 { + return fmt.Errorf("nPullers must be greater than 0, got %d", nPullers) + } bs.l.Debug("GatherProcfsStat") sessions, err := bs.StatActivityLister.ListAllSessions(ctx) if err != nil { @@ -90,33 +93,25 @@ func (bs *BackgroundStorage) GatherProcfsStat(ctx context.Context, nPullers int, group, ctxG := pool.GroupContext(ctxT) for hostname, processes := range hostJobMap { - jobHostname := hostname - jobProcesses := make([]stat_activity.SessionPid, 0) - for _, process := range processes { host := hostname - jobProcesses := make([]stat_activity.SessionPid, 0) + jobProcesses := make([]stat_activity.SessionPid, 0, JobsPerQuery) for _, process := range processes { jobProcesses = append(jobProcesses, process) - if len(jobProcesses) > JobsPerQuery { - batch := append([]stat_activity.SessionPid(nil), jobProcesses...) + if len(jobProcesses) == JobsPerQuery { + batch := jobProcesses group.Submit(func() error { - err := processProcfsRequests(ctxG, host, portn, gatherTimeout, batch) - return err - }, - ) - jobProcesses = make([]stat_activity.SessionPid, 0) + return processProcfsRequests(ctxG, host, portn, gatherTimeout, maxMsgSize, batch) + }) + jobProcesses = make([]stat_activity.SessionPid, 0, JobsPerQuery) } } if len(jobProcesses) > 0 { - batch := append([]stat_activity.SessionPid(nil), jobProcesses...) + batch := jobProcesses group.Submit(func() error { - err := processProcfsRequests(ctxG, host, portn, gatherTimeout, batch) - return err - }, - ) + return processProcfsRequests(ctxG, host, portn, gatherTimeout, maxMsgSize, batch) + }) } } - err = group.Wait() - return err + return group.Wait() } diff --git a/internal/master/procfs_test.go b/internal/master/procfs_test.go index 00f53df..cefedbe 100644 --- a/internal/master/procfs_test.go +++ b/internal/master/procfs_test.go @@ -79,9 +79,9 @@ func (s *failingProcStatServer) GetMetricQueries(context.Context, *pb.GetQueries return &pb.GetQueriesInfoResponse{}, nil } -// setupBufconnServer starts a gRPC server on a bufconn listener and returns -// the listener. The caller must register services on the returned server before -// calling this, or pass a pre-configured server. +// setupBufconnServer creates a gRPC server on a bufconn listener, registers +// the provided GetQueryInfo service implementation, starts serving, and +// returns the listener. func setupBufconnServer(t *testing.T, srv pb.GetQueryInfoServer) *bufconn.Listener { t.Helper() lis := bufconn.Listen(1024 * 1024) @@ -201,7 +201,7 @@ func TestProcessProcfsRequests_Success(t *testing.T) { } ctx := context.Background() - err := processProcfsRequests(ctx, hostname, 0, 5*time.Second, reqs) + err := processProcfsRequests(ctx, hostname, 0, 5*time.Second, 4*1024*1024, reqs) require.NoError(t, err) called, lastReq := fakeSrv.snapshot() assert.True(t, called, "expected GetPidProcStat to be called") @@ -240,7 +240,7 @@ func TestProcessProcfsRequests_GrpcError(t *testing.T) { } ctx := context.Background() - err := processProcfsRequests(ctx, hostname, 0, 5*time.Second, reqs) + err := processProcfsRequests(ctx, hostname, 0, 5*time.Second, 4*1024*1024, reqs) require.Error(t, err) assert.Contains(t, err.Error(), "simulated gRPC error") } @@ -269,7 +269,7 @@ func TestProcessProcfsRequests_CancelledContext(t *testing.T) { // With a cancelled context, processProcfsRequests should skip building // the request body (due to select on ctx.Done()) and return nil. - err := processProcfsRequests(ctx, hostname, 0, 5*time.Second, reqs) + err := processProcfsRequests(ctx, hostname, 0, 5*time.Second, 4*1024*1024, reqs) // The function returns nil when context is cancelled during request building, // but may return an error from the gRPC call if the request was already built. // Either outcome is acceptable with a cancelled context. @@ -294,7 +294,7 @@ func TestProcessProcfsRequests_EmptyRequests(t *testing.T) { }) ctx := context.Background() - err := processProcfsRequests(ctx, hostname, 0, 5*time.Second, nil) + err := processProcfsRequests(ctx, hostname, 0, 5*time.Second, 4*1024*1024, nil) require.NoError(t, err) called, lastReq := fakeSrv.snapshot() assert.True(t, called, "GetPidProcStat should still be called with empty segment list") @@ -314,7 +314,7 @@ func TestGatherProcfsStat_ListAllSessionsError(t *testing.T) { StatActivityLister: mock, } - err := bs.GatherProcfsStat(context.Background(), 2, 50051, 5*time.Second) + err := bs.GatherProcfsStat(context.Background(), 2, 50051, 5*time.Second, 4*1024*1024) require.Error(t, err) assert.Contains(t, err.Error(), "db connection failed") assert.True(t, mock.listCalled) @@ -329,7 +329,7 @@ func TestGatherProcfsStat_EmptySessions(t *testing.T) { StatActivityLister: mock, } - err := bs.GatherProcfsStat(context.Background(), 2, 50051, 5*time.Second) + err := bs.GatherProcfsStat(context.Background(), 2, 50051, 5*time.Second, 4*1024*1024) require.NoError(t, err) assert.True(t, mock.listCalled) } @@ -366,7 +366,7 @@ func TestGatherProcfsStat_WithSessions(t *testing.T) { StatActivityLister: mock, } - err := bs.GatherProcfsStat(context.Background(), 2, 0, 5*time.Second) + err := bs.GatherProcfsStat(context.Background(), 2, 0, 5*time.Second, 4*1024*1024) require.NoError(t, err) assert.True(t, mock.listCalled) } @@ -387,7 +387,7 @@ func TestGatherProcfsStat_ContextCancelled(t *testing.T) { // With a cancelled context, the timeout context creation will produce // an already-done context, so the pool tasks should handle it gracefully. - err := bs.GatherProcfsStat(ctx, 2, 50051, 5*time.Second) + err := bs.GatherProcfsStat(ctx, 2, 50051, 5*time.Second, 4*1024*1024) // The error may be nil (if tasks detect cancellation early) or non-nil // (if the gRPC call fails due to cancelled context). Both are acceptable. _ = err @@ -421,7 +421,7 @@ func TestGatherProcfsStat_GrpcFailure(t *testing.T) { StatActivityLister: mock, } - err := bs.GatherProcfsStat(context.Background(), 2, 0, 5*time.Second) + err := bs.GatherProcfsStat(context.Background(), 2, 0, 5*time.Second, 4*1024*1024) require.Error(t, err) assert.Contains(t, err.Error(), "simulated gRPC error") } @@ -463,7 +463,7 @@ func TestGatherProcfsStat_ManySessionsBatching(t *testing.T) { StatActivityLister: mock, } - err := bs.GatherProcfsStat(context.Background(), 4, 0, 10*time.Second) + err := bs.GatherProcfsStat(context.Background(), 4, 0, 10*time.Second, 4*1024*1024) require.NoError(t, err) assert.True(t, mock.listCalled) // The fake server should have been called (at least once for the batches) @@ -477,5 +477,19 @@ func TestGatherProcfsStat_ManySessionsBatching(t *testing.T) { func TestConstants(t *testing.T) { assert.Equal(t, 100, JobsPerQuery) - assert.Equal(t, 4*1024*1024, maxMsgSize) +} + +func TestGatherProcfsStat_InvalidNPullers(t *testing.T) { + mock := &mockStatActivityLister{} + bs := &BackgroundStorage{ + l: newTestLogger(), + StatActivityLister: mock, + } + + for _, n := range []int{0, -1, -100} { + err := bs.GatherProcfsStat(context.Background(), n, 50051, 5*time.Second, 4*1024*1024) + require.Error(t, err) + assert.Contains(t, err.Error(), "nPullers must be greater than 0") + assert.False(t, mock.listCalled, "ListAllSessions should not be called for invalid nPullers") + } } From def4c55ecb9fa57f6d3ac926ef87599a9edec90b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 28 Apr 2026 13:31:07 +0000 Subject: [PATCH 08/11] Use >= for batch size check and explicit slice copy to prevent aliasing Agent-Logs-Url: https://github.com/open-gpdb/yagpcc/sessions/3c612e77-c1ba-4bbc-9773-cc035b3b8310 Co-authored-by: leborchuk <63977577+leborchuk@users.noreply.github.com> --- internal/master/procfs.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/master/procfs.go b/internal/master/procfs.go index b090203..c620f20 100644 --- a/internal/master/procfs.go +++ b/internal/master/procfs.go @@ -97,8 +97,8 @@ func (bs *BackgroundStorage) GatherProcfsStat(ctx context.Context, nPullers int, jobProcesses := make([]stat_activity.SessionPid, 0, JobsPerQuery) for _, process := range processes { jobProcesses = append(jobProcesses, process) - if len(jobProcesses) == JobsPerQuery { - batch := jobProcesses + if len(jobProcesses) >= JobsPerQuery { + batch := append([]stat_activity.SessionPid(nil), jobProcesses...) group.Submit(func() error { return processProcfsRequests(ctxG, host, portn, gatherTimeout, maxMsgSize, batch) }) @@ -106,7 +106,7 @@ func (bs *BackgroundStorage) GatherProcfsStat(ctx context.Context, nPullers int, } } if len(jobProcesses) > 0 { - batch := jobProcesses + batch := append([]stat_activity.SessionPid(nil), jobProcesses...) group.Submit(func() error { return processProcfsRequests(ctxG, host, portn, gatherTimeout, maxMsgSize, batch) }) From 93d069308057aca2111c95d18d42d56e8e36f4cd Mon Sep 17 00:00:00 2001 From: Leonid Borchuk Date: Wed, 29 Apr 2026 18:43:11 +0300 Subject: [PATCH 09/11] Rename to grpc_test.go and perform go generate --- go.mod | 1 + go.sum | 2 + internal/grpc/deps.go | 15 ----- internal/grpc/get_master_info_test.go | 2 +- internal/grpc/mocks_test.go | 84 ++++++++++++++------------- internal/grpc/testutils_test.go | 2 +- 6 files changed, 50 insertions(+), 56 deletions(-) delete mode 100644 internal/grpc/deps.go diff --git a/go.mod b/go.mod index b59c4cb..6bd499b 100644 --- a/go.mod +++ b/go.mod @@ -47,6 +47,7 @@ require ( github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.66.1 // indirect github.com/prometheus/procfs v0.16.1 // indirect + go.uber.org/mock v0.6.0 // indirect go.uber.org/multierr v1.10.0 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect diff --git a/go.sum b/go.sum index 96c8bdc..497068e 100644 --- a/go.sum +++ b/go.sum @@ -210,6 +210,8 @@ go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/mock v0.6.0 h1:hyF9dfmbgIX5EfOdasqLsWD6xqpNZlXblLB/Dbnwv3Y= +go.uber.org/mock v0.6.0/go.mod h1:KiVJ4BqZJaMj4svdfmHM0AUx4NJYO8ZNpPnZn1Z+BBU= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= diff --git a/internal/grpc/deps.go b/internal/grpc/deps.go deleted file mode 100644 index 0d90214..0000000 --- a/internal/grpc/deps.go +++ /dev/null @@ -1,15 +0,0 @@ -//go:generate mockgen -source=deps.go -package=grpc_test -mock_names statActivityLister=MockStatActivityLister -destination mocks_test.go - -package grpc - -import ( - "context" - - "github.com/open-gpdb/yagpcc/internal/gp" -) - -type statActivityLister interface { //nolint:unused // used by go:generate mockgen - Start(ctx context.Context) error - Stop() - List(ctx context.Context) ([]*gp.GpStatActivity, error) -} diff --git a/internal/grpc/get_master_info_test.go b/internal/grpc/get_master_info_test.go index c824952..ae83c7e 100644 --- a/internal/grpc/get_master_info_test.go +++ b/internal/grpc/get_master_info_test.go @@ -6,9 +6,9 @@ import ( "testing" "time" - "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" "google.golang.org/protobuf/types/known/timestamppb" pbm "github.com/open-gpdb/yagpcc/api/proto/agent_master" diff --git a/internal/grpc/mocks_test.go b/internal/grpc/mocks_test.go index 19fb9ba..e150ee1 100644 --- a/internal/grpc/mocks_test.go +++ b/internal/grpc/mocks_test.go @@ -1,5 +1,10 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: deps.go +// Source: grpc_test.go +// +// Generated by this command: +// +// mockgen -source=grpc_test.go -package=grpc_test -mock_names statActivityLister=MockStatActivityLister -destination mocks_test.go +// // Package grpc_test is a generated GoMock package. package grpc_test @@ -8,86 +13,87 @@ import ( context "context" reflect "reflect" - gomock "github.com/golang/mock/gomock" gp "github.com/open-gpdb/yagpcc/internal/gp" - "github.com/open-gpdb/yagpcc/internal/gp/stat_activity" + stat_activity "github.com/open-gpdb/yagpcc/internal/gp/stat_activity" + gomock "go.uber.org/mock/gomock" ) -// MockStatActivityLister is a mock of statActivityLister interface +// MockStatActivityLister is a mock of statActivityLister interface. type MockStatActivityLister struct { ctrl *gomock.Controller recorder *MockStatActivityListerMockRecorder + isgomock struct{} } -// MockStatActivityListerMockRecorder is the mock recorder for MockStatActivityLister +// MockStatActivityListerMockRecorder is the mock recorder for MockStatActivityLister. type MockStatActivityListerMockRecorder struct { mock *MockStatActivityLister } -// NewMockStatActivityLister creates a new mock instance +// NewMockStatActivityLister creates a new mock instance. func NewMockStatActivityLister(ctrl *gomock.Controller) *MockStatActivityLister { mock := &MockStatActivityLister{ctrl: ctrl} mock.recorder = &MockStatActivityListerMockRecorder{mock} return mock } -// EXPECT returns an object that allows the caller to indicate expected use +// EXPECT returns an object that allows the caller to indicate expected use. func (m *MockStatActivityLister) EXPECT() *MockStatActivityListerMockRecorder { return m.recorder } -// Start mocks base method -func (m *MockStatActivityLister) Start(ctx context.Context) error { +// List mocks base method. +func (m *MockStatActivityLister) List(ctx context.Context) ([]*gp.GpStatActivity, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Start", ctx) - ret0, _ := ret[0].(error) - return ret0 + ret := m.ctrl.Call(m, "List", ctx) + ret0, _ := ret[0].([]*gp.GpStatActivity) + ret1, _ := ret[1].(error) + return ret0, ret1 } -// Start indicates an expected call of Start -func (mr *MockStatActivityListerMockRecorder) Start(ctx interface{}) *gomock.Call { +// List indicates an expected call of List. +func (mr *MockStatActivityListerMockRecorder) List(ctx any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockStatActivityLister)(nil).Start), ctx) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "List", reflect.TypeOf((*MockStatActivityLister)(nil).List), ctx) } -// Stop mocks base method -func (m *MockStatActivityLister) Stop() { +// ListAllSessions mocks base method. +func (m *MockStatActivityLister) ListAllSessions(arg0 context.Context) ([]stat_activity.SessionPid, error) { m.ctrl.T.Helper() - m.ctrl.Call(m, "Stop") + ret := m.ctrl.Call(m, "ListAllSessions", arg0) + ret0, _ := ret[0].([]stat_activity.SessionPid) + ret1, _ := ret[1].(error) + return ret0, ret1 } -// Stop indicates an expected call of Stop -func (mr *MockStatActivityListerMockRecorder) Stop() *gomock.Call { +// ListAllSessions indicates an expected call of ListAllSessions. +func (mr *MockStatActivityListerMockRecorder) ListAllSessions(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockStatActivityLister)(nil).Stop)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListAllSessions", reflect.TypeOf((*MockStatActivityLister)(nil).ListAllSessions), arg0) } -// List mocks base method -func (m *MockStatActivityLister) List(ctx context.Context) ([]*gp.GpStatActivity, error) { +// Start mocks base method. +func (m *MockStatActivityLister) Start(ctx context.Context) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "List", ctx) - ret0, _ := ret[0].([]*gp.GpStatActivity) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret := m.ctrl.Call(m, "Start", ctx) + ret0, _ := ret[0].(error) + return ret0 } -// List indicates an expected call of List -func (mr *MockStatActivityListerMockRecorder) List(ctx interface{}) *gomock.Call { +// Start indicates an expected call of Start. +func (mr *MockStatActivityListerMockRecorder) Start(ctx any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "List", reflect.TypeOf((*MockStatActivityLister)(nil).List), ctx) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockStatActivityLister)(nil).Start), ctx) } -// ListAllSessions mocks base method -func (m *MockStatActivityLister) ListAllSessions(ctx context.Context) ([]stat_activity.SessionPid, error) { +// Stop mocks base method. +func (m *MockStatActivityLister) Stop() { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListAllSessions", ctx) - ret0, _ := ret[0].([]stat_activity.SessionPid) - ret1, _ := ret[1].(error) - return ret0, ret1 + m.ctrl.Call(m, "Stop") } -// ListAllSessions indicates an expected call of ListAllSessions -func (mr *MockStatActivityListerMockRecorder) ListAllSessions(ctx interface{}) *gomock.Call { +// Stop indicates an expected call of Stop. +func (mr *MockStatActivityListerMockRecorder) Stop() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListAllSessions", reflect.TypeOf((*MockStatActivityLister)(nil).ListAllSessions), ctx) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockStatActivityLister)(nil).Stop)) } diff --git a/internal/grpc/testutils_test.go b/internal/grpc/testutils_test.go index d792110..fdc6d8b 100644 --- a/internal/grpc/testutils_test.go +++ b/internal/grpc/testutils_test.go @@ -8,8 +8,8 @@ import ( "sort" "testing" - "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" gogrpc "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/test/bufconn" From 0c8c5e15539e0486dc6d4feafc5ad6f9223eefbd Mon Sep 17 00:00:00 2001 From: Leonid Borchuk Date: Wed, 29 Apr 2026 18:49:20 +0300 Subject: [PATCH 10/11] make statActivityLister as private method --- internal/grpc/grpc_test.go | 17 +++++++++++++++++ internal/master/background.go | 12 ++++++------ internal/master/procfs.go | 2 +- internal/master/procfs_test.go | 14 +++++++------- 4 files changed, 31 insertions(+), 14 deletions(-) create mode 100644 internal/grpc/grpc_test.go diff --git a/internal/grpc/grpc_test.go b/internal/grpc/grpc_test.go new file mode 100644 index 0000000..ef7fe83 --- /dev/null +++ b/internal/grpc/grpc_test.go @@ -0,0 +1,17 @@ +//go:generate mockgen -source=grpc_test.go -package=grpc_test -mock_names statActivityLister=MockStatActivityLister -destination mocks_test.go + +package grpc_test + +import ( + "context" + + "github.com/open-gpdb/yagpcc/internal/gp" + "github.com/open-gpdb/yagpcc/internal/gp/stat_activity" +) + +type statActivityLister interface { //nolint:unused // used by go:generate mockgen + Start(ctx context.Context) error + Stop() + List(ctx context.Context) ([]*gp.GpStatActivity, error) + ListAllSessions(context.Context) ([]stat_activity.SessionPid, error) +} diff --git a/internal/master/background.go b/internal/master/background.go index 856acfd..34c1d0e 100644 --- a/internal/master/background.go +++ b/internal/master/background.go @@ -39,7 +39,7 @@ type ( SessionStorage *gp.SessionsStorage AggStorage *storage.AggregatedStorage RQStorage *storage.RunningQueriesStorage - StatActivityLister statActivityLister + statActivityLister statActivityLister } ) @@ -314,7 +314,7 @@ func (bs *BackgroundStorage) TryRefreshSessionsFromGP( ctx context.Context, clearDeletedSessions bool, ) error { - newSesList, err := bs.StatActivityLister.List(ctx) + newSesList, err := bs.statActivityLister.List(ctx) if err != nil { return fmt.Errorf("error getting sessions: %w", err) } @@ -484,7 +484,7 @@ func NewBackgroundStorage(l *zap.SugaredLogger, sessionStorage *gp.SessionsStora SessionStorage: sessionStorage, AggStorage: aggStorage, RQStorage: rqStorage, - StatActivityLister: sActivityLister, + statActivityLister: sActivityLister, } } @@ -518,10 +518,10 @@ func InitBG( return nil }) - if backgroundStorage.StatActivityLister == nil { + if backgroundStorage.statActivityLister == nil { return fmt.Errorf("stat activity lister is nil") } - if err = backgroundStorage.StatActivityLister.Start(ctx); err != nil { + if err = backgroundStorage.statActivityLister.Start(ctx); err != nil { return fmt.Errorf("error starting stat activity lister: %w", err) } @@ -568,7 +568,7 @@ func InitBG( ) err = errG.Wait() if err != nil { - backgroundStorage.StatActivityLister.Stop() + backgroundStorage.statActivityLister.Stop() l.Errorf("Fail in background precesses - done work with %v", err) return err } diff --git a/internal/master/procfs.go b/internal/master/procfs.go index c620f20..11af4c3 100644 --- a/internal/master/procfs.go +++ b/internal/master/procfs.go @@ -78,7 +78,7 @@ func (bs *BackgroundStorage) GatherProcfsStat(ctx context.Context, nPullers int, return fmt.Errorf("nPullers must be greater than 0, got %d", nPullers) } bs.l.Debug("GatherProcfsStat") - sessions, err := bs.StatActivityLister.ListAllSessions(ctx) + sessions, err := bs.statActivityLister.ListAllSessions(ctx) if err != nil { return err } diff --git a/internal/master/procfs_test.go b/internal/master/procfs_test.go index cefedbe..8556ddb 100644 --- a/internal/master/procfs_test.go +++ b/internal/master/procfs_test.go @@ -311,7 +311,7 @@ func TestGatherProcfsStat_ListAllSessionsError(t *testing.T) { } bs := &BackgroundStorage{ l: newTestLogger(), - StatActivityLister: mock, + statActivityLister: mock, } err := bs.GatherProcfsStat(context.Background(), 2, 50051, 5*time.Second, 4*1024*1024) @@ -326,7 +326,7 @@ func TestGatherProcfsStat_EmptySessions(t *testing.T) { } bs := &BackgroundStorage{ l: newTestLogger(), - StatActivityLister: mock, + statActivityLister: mock, } err := bs.GatherProcfsStat(context.Background(), 2, 50051, 5*time.Second, 4*1024*1024) @@ -363,7 +363,7 @@ func TestGatherProcfsStat_WithSessions(t *testing.T) { } bs := &BackgroundStorage{ l: newTestLogger(), - StatActivityLister: mock, + statActivityLister: mock, } err := bs.GatherProcfsStat(context.Background(), 2, 0, 5*time.Second, 4*1024*1024) @@ -379,7 +379,7 @@ func TestGatherProcfsStat_ContextCancelled(t *testing.T) { } bs := &BackgroundStorage{ l: newTestLogger(), - StatActivityLister: mock, + statActivityLister: mock, } ctx, cancel := context.WithCancel(context.Background()) @@ -418,7 +418,7 @@ func TestGatherProcfsStat_GrpcFailure(t *testing.T) { } bs := &BackgroundStorage{ l: newTestLogger(), - StatActivityLister: mock, + statActivityLister: mock, } err := bs.GatherProcfsStat(context.Background(), 2, 0, 5*time.Second, 4*1024*1024) @@ -460,7 +460,7 @@ func TestGatherProcfsStat_ManySessionsBatching(t *testing.T) { } bs := &BackgroundStorage{ l: newTestLogger(), - StatActivityLister: mock, + statActivityLister: mock, } err := bs.GatherProcfsStat(context.Background(), 4, 0, 10*time.Second, 4*1024*1024) @@ -483,7 +483,7 @@ func TestGatherProcfsStat_InvalidNPullers(t *testing.T) { mock := &mockStatActivityLister{} bs := &BackgroundStorage{ l: newTestLogger(), - StatActivityLister: mock, + statActivityLister: mock, } for _, n := range []int{0, -1, -100} { From ec0ba9db44e7a1c1a21d44116289e1b34531febd Mon Sep 17 00:00:00 2001 From: Leonid Borchuk Date: Wed, 29 Apr 2026 19:32:08 +0300 Subject: [PATCH 11/11] Fix various style issues --- internal/master/background.go | 4 -- internal/master/procfs.go | 82 +++++++++++++++------------------- internal/master/procfs_test.go | 37 ++++++++------- internal/master/utils.go | 14 +++--- 4 files changed, 65 insertions(+), 72 deletions(-) diff --git a/internal/master/background.go b/internal/master/background.go index 34c1d0e..8fc4b70 100644 --- a/internal/master/background.go +++ b/internal/master/background.go @@ -49,10 +49,6 @@ var ( segCountLock sync.Mutex ) -func getSegAddr(hostname string, portn uint32) string { - return fmt.Sprintf("%s:%d", hostname, portn) -} - func (bs *BackgroundStorage) SendSegmentRefreshMessages(ctx context.Context, pullRateSec float64, configCacheDurability time.Duration, portn uint32, customSegmentList *config.SegmentList) error { durationBetweenLoop := time.Second * time.Duration(pullRateSec) diff --git a/internal/master/procfs.go b/internal/master/procfs.go index 11af4c3..849af0d 100644 --- a/internal/master/procfs.go +++ b/internal/master/procfs.go @@ -5,50 +5,54 @@ import ( "fmt" "time" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" "github.com/open-gpdb/yagpcc/internal/gp/stat_activity" "github.com/open-gpdb/yagpcc/internal/storage" - "github.com/alitto/pond" - pb "github.com/open-gpdb/yagpcc/api/proto/agent_segment" ) const ( - JobsPerQuery = 100 + jobsPerQuery = 1000 ) type ( hostJobMap = map[string][]stat_activity.SessionPid ) -func getJobsMap(sessions []stat_activity.SessionPid) hostJobMap { - hostJobMap := make(hostJobMap) +func (bs *BackgroundStorage) getJobsMap(sessions []stat_activity.SessionPid) hostJobMap { + hostJobs := make(hostJobMap) // make work for each host for _, process := range sessions { segHost := storage.GetHostnameForSegindex(int32(process.GpSegmentId)) - jobList, ok := hostJobMap[segHost] + jobList, ok := hostJobs[segHost] if !ok { - jobList = make([]stat_activity.SessionPid, 0) + jobList = make([]stat_activity.SessionPid, 0, 10) } jobList = append(jobList, stat_activity.SessionPid{ GpSegmentId: process.GpSegmentId, Pid: process.Pid, SessId: process.SessId, }) - hostJobMap[segHost] = jobList + hostJobs[segHost] = jobList } - return hostJobMap + return hostJobs } -func processProcfsRequests(ctx context.Context, hostname string, portn uint32, gatherTimeout time.Duration, maxMsgSize int, reqs []stat_activity.SessionPid) error { +func (bs *BackgroundStorage) processProcfsRequests(ctx context.Context, hostname string, portn uint32, gatherTimeout time.Duration, maxMsgSize int, reqs []stat_activity.SessionPid) error { grpcConn, err := getGrpcClientConnection(ctx, hostname, portn, gatherTimeout.Seconds()) if err != nil { - return err + return fmt.Errorf("grpc client connection error: %v", err) } + cGet := pb.NewGetQueryInfoClient(grpcConn) + ctxTimeout, ctxCancel := context.WithTimeout(ctx, gatherTimeout) + defer ctxCancel() + maxSizeOption := grpc.MaxCallRecvMsgSize(maxMsgSize) msgReq := &pb.GetPidProcInfoReq{ - SegmentProcess: make([]*pb.SegmentProcess, 0), + SegmentProcess: make([]*pb.SegmentProcess, 0, 10), } for _, req := range reqs { select { @@ -60,15 +64,21 @@ func processProcfsRequests(ctx context.Context, hostname string, portn uint32, g Pid: int64(req.Pid), SessId: int64(req.SessId), }) + if len(msgReq.SegmentProcess) >= jobsPerQuery { + _, errGet := cGet.GetPidProcStat(ctxTimeout, msgReq, maxSizeOption) + if errGet != nil { + return fmt.Errorf("grpc get pid proc stat error: %v", errGet) + } + msgReq.SegmentProcess = make([]*pb.SegmentProcess, 0, 10) + } } } - cGet := pb.NewGetQueryInfoClient(grpcConn) - ctxTimeout, ctxCancel := context.WithTimeout(ctx, gatherTimeout) - defer ctxCancel() - maxSizeOption := grpc.MaxCallRecvMsgSize(maxMsgSize) - _, errGet := cGet.GetPidProcStat(ctxTimeout, msgReq, maxSizeOption) - if errGet != nil { - return errGet + if len(msgReq.SegmentProcess) > 0 { + _, errGet := cGet.GetPidProcStat(ctxTimeout, msgReq, maxSizeOption) + if errGet != nil { + return fmt.Errorf("grpc get pid proc stat error: %v", errGet) + } + } return nil } @@ -80,38 +90,20 @@ func (bs *BackgroundStorage) GatherProcfsStat(ctx context.Context, nPullers int, bs.l.Debug("GatherProcfsStat") sessions, err := bs.statActivityLister.ListAllSessions(ctx) if err != nil { - return err + return fmt.Errorf("error listing sessions pids: %v", err) } - hostJobMap := getJobsMap(sessions) - - pool := pond.New(nPullers, nPullers*2) - defer pool.StopAndWait() + hostJobs := bs.getJobsMap(sessions) ctxT, ctxTC := context.WithTimeout(ctx, gatherTimeout) defer ctxTC() - group, ctxG := pool.GroupContext(ctxT) + g, ctxG := errgroup.WithContext(ctxT) - for hostname, processes := range hostJobMap { - host := hostname - jobProcesses := make([]stat_activity.SessionPid, 0, JobsPerQuery) - for _, process := range processes { - jobProcesses = append(jobProcesses, process) - if len(jobProcesses) >= JobsPerQuery { - batch := append([]stat_activity.SessionPid(nil), jobProcesses...) - group.Submit(func() error { - return processProcfsRequests(ctxG, host, portn, gatherTimeout, maxMsgSize, batch) - }) - jobProcesses = make([]stat_activity.SessionPid, 0, JobsPerQuery) - } - } - if len(jobProcesses) > 0 { - batch := append([]stat_activity.SessionPid(nil), jobProcesses...) - group.Submit(func() error { - return processProcfsRequests(ctxG, host, portn, gatherTimeout, maxMsgSize, batch) - }) - } + for hostname, procfsProcesses := range hostJobs { + g.Go(func() error { + return bs.processProcfsRequests(ctxG, hostname, portn, gatherTimeout, maxMsgSize, procfsProcesses) + }) } - return group.Wait() + return g.Wait() } diff --git a/internal/master/procfs_test.go b/internal/master/procfs_test.go index 8556ddb..0fa68ee 100644 --- a/internal/master/procfs_test.go +++ b/internal/master/procfs_test.go @@ -122,11 +122,12 @@ func newTestLogger() *zap.SugaredLogger { // ============================================================ func TestGetJobsMap_EmptyInput(t *testing.T) { - result := getJobsMap(nil) + bs := &BackgroundStorage{l: newTestLogger()} + result := bs.getJobsMap(nil) assert.NotNil(t, result) assert.Empty(t, result) - result2 := getJobsMap([]stat_activity.SessionPid{}) + result2 := bs.getJobsMap([]stat_activity.SessionPid{}) assert.NotNil(t, result2) assert.Empty(t, result2) } @@ -139,7 +140,8 @@ func TestGetJobsMap_SingleHost(t *testing.T) { {GpSegmentId: 10, Pid: 200, SessId: 2}, } - result := getJobsMap(sessions) + bs := &BackgroundStorage{l: newTestLogger()} + result := bs.getJobsMap(sessions) // The map should contain an entry for "host-a" _, exists := result["host-a"] @@ -156,7 +158,8 @@ func TestGetJobsMap_MultipleHosts(t *testing.T) { {GpSegmentId: 20, Pid: 300, SessId: 3}, } - result := getJobsMap(sessions) + bs := &BackgroundStorage{l: newTestLogger()} + result := bs.getJobsMap(sessions) // Should have entries for both hosts assert.Contains(t, result, "host-b") @@ -170,7 +173,8 @@ func TestGetJobsMap_UnknownSegindex(t *testing.T) { {GpSegmentId: 9999, Pid: 100, SessId: 1}, } - result := getJobsMap(sessions) + bs := &BackgroundStorage{l: newTestLogger()} + result := bs.getJobsMap(sessions) _, exists := result["9999"] assert.True(t, exists, "expected key '9999' for unknown segindex") } @@ -200,8 +204,9 @@ func TestProcessProcfsRequests_Success(t *testing.T) { {GpSegmentId: 2, Pid: 200, SessId: 20}, } + bs := &BackgroundStorage{l: newTestLogger()} ctx := context.Background() - err := processProcfsRequests(ctx, hostname, 0, 5*time.Second, 4*1024*1024, reqs) + err := bs.processProcfsRequests(ctx, hostname, 0, 5*time.Second, 4*1024*1024, reqs) require.NoError(t, err) called, lastReq := fakeSrv.snapshot() assert.True(t, called, "expected GetPidProcStat to be called") @@ -239,8 +244,9 @@ func TestProcessProcfsRequests_GrpcError(t *testing.T) { {GpSegmentId: 1, Pid: 100, SessId: 10}, } + bs := &BackgroundStorage{l: newTestLogger()} ctx := context.Background() - err := processProcfsRequests(ctx, hostname, 0, 5*time.Second, 4*1024*1024, reqs) + err := bs.processProcfsRequests(ctx, hostname, 0, 5*time.Second, 4*1024*1024, reqs) require.Error(t, err) assert.Contains(t, err.Error(), "simulated gRPC error") } @@ -269,7 +275,8 @@ func TestProcessProcfsRequests_CancelledContext(t *testing.T) { // With a cancelled context, processProcfsRequests should skip building // the request body (due to select on ctx.Done()) and return nil. - err := processProcfsRequests(ctx, hostname, 0, 5*time.Second, 4*1024*1024, reqs) + bs := &BackgroundStorage{l: newTestLogger()} + err := bs.processProcfsRequests(ctx, hostname, 0, 5*time.Second, 4*1024*1024, reqs) // The function returns nil when context is cancelled during request building, // but may return an error from the gRPC call if the request was already built. // Either outcome is acceptable with a cancelled context. @@ -293,12 +300,12 @@ func TestProcessProcfsRequests_EmptyRequests(t *testing.T) { segConnectionLock.Unlock() }) + bs := &BackgroundStorage{l: newTestLogger()} ctx := context.Background() - err := processProcfsRequests(ctx, hostname, 0, 5*time.Second, 4*1024*1024, nil) + err := bs.processProcfsRequests(ctx, hostname, 0, 5*time.Second, 4*1024*1024, nil) require.NoError(t, err) - called, lastReq := fakeSrv.snapshot() - assert.True(t, called, "GetPidProcStat should still be called with empty segment list") - assert.Empty(t, lastReq.SegmentProcess) + called, _ := fakeSrv.snapshot() + assert.False(t, called, "GetPidProcStat should not be called with empty segment list") } // ============================================================ @@ -446,8 +453,8 @@ func TestGatherProcfsStat_ManySessionsBatching(t *testing.T) { }) // Create JobsPerQuery + 5 sessions to trigger at least 2 batches - sessions := make([]stat_activity.SessionPid, 0, JobsPerQuery+5) - for i := 0; i < JobsPerQuery+5; i++ { + sessions := make([]stat_activity.SessionPid, 0, jobsPerQuery+5) + for i := 0; i < jobsPerQuery+5; i++ { sessions = append(sessions, stat_activity.SessionPid{ GpSegmentId: 60, Pid: 100 + i, @@ -476,7 +483,7 @@ func TestGatherProcfsStat_ManySessionsBatching(t *testing.T) { // ============================================================ func TestConstants(t *testing.T) { - assert.Equal(t, 100, JobsPerQuery) + assert.Equal(t, 1000, jobsPerQuery) } func TestGatherProcfsStat_InvalidNPullers(t *testing.T) { diff --git a/internal/master/utils.go b/internal/master/utils.go index b615275..c3469f2 100644 --- a/internal/master/utils.go +++ b/internal/master/utils.go @@ -3,6 +3,7 @@ package master import ( "context" "net" + "strconv" "sync" "time" @@ -12,7 +13,7 @@ import ( ) var ( - segConnections map[string]*grpc.ClientConn = make(map[string]*grpc.ClientConn) + segConnections = make(map[string]*grpc.ClientConn) segConnectionLock sync.Mutex ) @@ -29,15 +30,12 @@ func getGrpcClientConnection(ctx context.Context, hostname string, portn uint32, connectTimeout := time.Second * time.Duration(segConnectTimeoutSec) if portn > 0 { conn, err = grpc.NewClient( - getSegAddr(hostname, portn), + net.JoinHostPort(hostname, strconv.FormatUint(uint64(portn), 10)), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithConnectParams(grpc.ConnectParams{ MinConnectTimeout: connectTimeout, }), ) - if err != nil { - return nil, err - } } else { conn, err = grpc.NewClient( hostname, @@ -50,9 +48,9 @@ func getGrpcClientConnection(ctx context.Context, hostname string, portn uint32, MinConnectTimeout: connectTimeout, }), ) - if err != nil { - return nil, err - } + } + if err != nil { + return nil, err } segConnections[hostname] = conn return conn, nil