diff --git a/api/proto/common/yagpcc_metrics.pb.go b/api/proto/common/yagpcc_metrics.pb.go index 03af38c..d892b89 100644 --- a/api/proto/common/yagpcc_metrics.pb.go +++ b/api/proto/common/yagpcc_metrics.pb.go @@ -1587,13 +1587,13 @@ func (x *SpillInfo) GetTotalBytes() int64 { type ProcStat struct { state protoimpl.MessageState `protogen:"open.v1"` // The process ID. - Pid int32 `protobuf:"varint,1,opt,name=pid,proto3" json:"pid,omitempty"` + Pid int64 `protobuf:"varint,1,opt,name=pid,proto3" json:"pid,omitempty"` // The filename of the executable. Comm string `protobuf:"bytes,2,opt,name=comm,proto3" json:"comm,omitempty"` // The process state. State string `protobuf:"bytes,3,opt,name=state,proto3" json:"state,omitempty"` // The PID of the parent of this process. - Ppid int32 `protobuf:"varint,4,opt,name=ppid,proto3" json:"ppid,omitempty"` + Ppid int64 `protobuf:"varint,4,opt,name=ppid,proto3" json:"ppid,omitempty"` // The process group ID of the process. Pgrp int32 `protobuf:"varint,5,opt,name=pgrp,proto3" json:"pgrp,omitempty"` // The session ID of the process. @@ -1607,28 +1607,28 @@ type ProcStat struct { Flags int32 `protobuf:"varint,9,opt,name=flags,proto3" json:"flags,omitempty"` // The number of minor faults the process has made which have not required // loading a memory page from disk. - MinFlt int32 `protobuf:"varint,10,opt,name=min_flt,json=minFlt,proto3" json:"min_flt,omitempty"` + MinFlt int64 `protobuf:"varint,10,opt,name=min_flt,json=minFlt,proto3" json:"min_flt,omitempty"` // The number of minor faults that the process's waited-for children have // made. - CminFlt int32 `protobuf:"varint,11,opt,name=cmin_flt,json=cminFlt,proto3" json:"cmin_flt,omitempty"` + CminFlt int64 `protobuf:"varint,11,opt,name=cmin_flt,json=cminFlt,proto3" json:"cmin_flt,omitempty"` // The number of major faults the process has made which have required // loading a memory page from disk. - MajFlt int32 `protobuf:"varint,12,opt,name=maj_flt,json=majFlt,proto3" json:"maj_flt,omitempty"` + MajFlt int64 `protobuf:"varint,12,opt,name=maj_flt,json=majFlt,proto3" json:"maj_flt,omitempty"` // The number of major faults that the process's waited-for children have // made. - CmajFlt int32 `protobuf:"varint,13,opt,name=cmaj_flt,json=cmajFlt,proto3" json:"cmaj_flt,omitempty"` + CmajFlt int64 `protobuf:"varint,13,opt,name=cmaj_flt,json=cmajFlt,proto3" json:"cmaj_flt,omitempty"` // Amount of time that this process has been scheduled in user mode, // measured in clock ticks. - Utime int32 `protobuf:"varint,14,opt,name=utime,proto3" json:"utime,omitempty"` + Utime int64 `protobuf:"varint,14,opt,name=utime,proto3" json:"utime,omitempty"` // Amount of time that this process has been scheduled in kernel mode, // measured in clock ticks. - Stime int32 `protobuf:"varint,15,opt,name=stime,proto3" json:"stime,omitempty"` + Stime int64 `protobuf:"varint,15,opt,name=stime,proto3" json:"stime,omitempty"` // Amount of time that this process's waited-for children have been // scheduled in user mode, measured in clock ticks. - Cutime int32 `protobuf:"varint,16,opt,name=cutime,proto3" json:"cutime,omitempty"` + Cutime int64 `protobuf:"varint,16,opt,name=cutime,proto3" json:"cutime,omitempty"` // Amount of time that this process's waited-for children have been // scheduled in kernel mode, measured in clock ticks. - Cstime int32 `protobuf:"varint,17,opt,name=cstime,proto3" json:"cstime,omitempty"` + Cstime int64 `protobuf:"varint,17,opt,name=cstime,proto3" json:"cstime,omitempty"` // For processes running a real-time scheduling policy, this is the negated // scheduling priority, minus one. Priority int32 `protobuf:"varint,18,opt,name=priority,proto3" json:"priority,omitempty"` @@ -1663,9 +1663,9 @@ type ProcStat struct { DelayAcctBlkIoTicks int64 `protobuf:"varint,31,opt,name=delay_acct_blk_io_ticks,json=delayAcctBlkIoTicks,proto3" json:"delay_acct_blk_io_ticks,omitempty"` // Guest time of the process (time spent running a virtual CPU for a guest // operating system), measured in clock ticks. - GuestTime int32 `protobuf:"varint,32,opt,name=guest_time,json=guestTime,proto3" json:"guest_time,omitempty"` + GuestTime int64 `protobuf:"varint,32,opt,name=guest_time,json=guestTime,proto3" json:"guest_time,omitempty"` // Guest time of the process's children, measured in clock ticks. - CguestTime int32 `protobuf:"varint,33,opt,name=cguest_time,json=cguestTime,proto3" json:"cguest_time,omitempty"` + CguestTime int64 `protobuf:"varint,33,opt,name=cguest_time,json=cguestTime,proto3" json:"cguest_time,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -1700,7 +1700,7 @@ func (*ProcStat) Descriptor() ([]byte, []int) { return file_api_proto_common_yagpcc_metrics_proto_rawDescGZIP(), []int{11} } -func (x *ProcStat) GetPid() int32 { +func (x *ProcStat) GetPid() int64 { if x != nil { return x.Pid } @@ -1721,7 +1721,7 @@ func (x *ProcStat) GetState() string { return "" } -func (x *ProcStat) GetPpid() int32 { +func (x *ProcStat) GetPpid() int64 { if x != nil { return x.Ppid } @@ -1763,56 +1763,56 @@ func (x *ProcStat) GetFlags() int32 { return 0 } -func (x *ProcStat) GetMinFlt() int32 { +func (x *ProcStat) GetMinFlt() int64 { if x != nil { return x.MinFlt } return 0 } -func (x *ProcStat) GetCminFlt() int32 { +func (x *ProcStat) GetCminFlt() int64 { if x != nil { return x.CminFlt } return 0 } -func (x *ProcStat) GetMajFlt() int32 { +func (x *ProcStat) GetMajFlt() int64 { if x != nil { return x.MajFlt } return 0 } -func (x *ProcStat) GetCmajFlt() int32 { +func (x *ProcStat) GetCmajFlt() int64 { if x != nil { return x.CmajFlt } return 0 } -func (x *ProcStat) GetUtime() int32 { +func (x *ProcStat) GetUtime() int64 { if x != nil { return x.Utime } return 0 } -func (x *ProcStat) GetStime() int32 { +func (x *ProcStat) GetStime() int64 { if x != nil { return x.Stime } return 0 } -func (x *ProcStat) GetCutime() int32 { +func (x *ProcStat) GetCutime() int64 { if x != nil { return x.Cutime } return 0 } -func (x *ProcStat) GetCstime() int32 { +func (x *ProcStat) GetCstime() int64 { if x != nil { return x.Cstime } @@ -1917,14 +1917,14 @@ func (x *ProcStat) GetDelayAcctBlkIoTicks() int64 { return 0 } -func (x *ProcStat) GetGuestTime() int32 { +func (x *ProcStat) GetGuestTime() int64 { if x != nil { return x.GuestTime } return 0 } -func (x *ProcStat) GetCguestTime() int32 { +func (x *ProcStat) GetCguestTime() int64 { if x != nil { return x.CguestTime } @@ -1937,7 +1937,7 @@ func (x *ProcStat) GetCguestTime() int32 { type ProcStatus struct { state protoimpl.MessageState `protogen:"open.v1"` // The process ID. - Pid int32 `protobuf:"varint,1,opt,name=pid,proto3" json:"pid,omitempty"` + Pid int64 `protobuf:"varint,1,opt,name=pid,proto3" json:"pid,omitempty"` // The process name. Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` // Thread group ID. @@ -2032,7 +2032,7 @@ func (*ProcStatus) Descriptor() ([]byte, []int) { return file_api_proto_common_yagpcc_metrics_proto_rawDescGZIP(), []int{12} } -func (x *ProcStatus) GetPid() int32 { +func (x *ProcStatus) GetPid() int64 { if x != nil { return x.Pid } @@ -2673,24 +2673,24 @@ const file_api_proto_common_yagpcc_metrics_proto_rawDesc = "" + "totalBytes\x18\x02 \x01(\x03R\n" + "totalBytes\"\xe6\x06\n" + "\bProcStat\x12\x10\n" + - "\x03pid\x18\x01 \x01(\x05R\x03pid\x12\x12\n" + + "\x03pid\x18\x01 \x01(\x03R\x03pid\x12\x12\n" + "\x04comm\x18\x02 \x01(\tR\x04comm\x12\x14\n" + "\x05state\x18\x03 \x01(\tR\x05state\x12\x12\n" + - "\x04ppid\x18\x04 \x01(\x05R\x04ppid\x12\x12\n" + + "\x04ppid\x18\x04 \x01(\x03R\x04ppid\x12\x12\n" + "\x04pgrp\x18\x05 \x01(\x05R\x04pgrp\x12\x18\n" + "\asession\x18\x06 \x01(\x05R\asession\x12\x10\n" + "\x03tty\x18\a \x01(\x05R\x03tty\x12\x14\n" + "\x05tpgid\x18\b \x01(\x05R\x05tpgid\x12\x14\n" + "\x05flags\x18\t \x01(\x05R\x05flags\x12\x17\n" + "\amin_flt\x18\n" + - " \x01(\x05R\x06minFlt\x12\x19\n" + - "\bcmin_flt\x18\v \x01(\x05R\acminFlt\x12\x17\n" + - "\amaj_flt\x18\f \x01(\x05R\x06majFlt\x12\x19\n" + - "\bcmaj_flt\x18\r \x01(\x05R\acmajFlt\x12\x14\n" + - "\x05utime\x18\x0e \x01(\x05R\x05utime\x12\x14\n" + - "\x05stime\x18\x0f \x01(\x05R\x05stime\x12\x16\n" + - "\x06cutime\x18\x10 \x01(\x05R\x06cutime\x12\x16\n" + - "\x06cstime\x18\x11 \x01(\x05R\x06cstime\x12\x1a\n" + + " \x01(\x03R\x06minFlt\x12\x19\n" + + "\bcmin_flt\x18\v \x01(\x03R\acminFlt\x12\x17\n" + + "\amaj_flt\x18\f \x01(\x03R\x06majFlt\x12\x19\n" + + "\bcmaj_flt\x18\r \x01(\x03R\acmajFlt\x12\x14\n" + + "\x05utime\x18\x0e \x01(\x03R\x05utime\x12\x14\n" + + "\x05stime\x18\x0f \x01(\x03R\x05stime\x12\x16\n" + + "\x06cutime\x18\x10 \x01(\x03R\x06cutime\x12\x16\n" + + "\x06cstime\x18\x11 \x01(\x03R\x06cstime\x12\x1a\n" + "\bpriority\x18\x12 \x01(\x05R\bpriority\x12\x12\n" + "\x04nice\x18\x13 \x01(\x05R\x04nice\x12\x1f\n" + "\vnum_threads\x18\x14 \x01(\x05R\n" + @@ -2710,12 +2710,12 @@ const file_api_proto_common_yagpcc_metrics_proto_rawDesc = "" + "\x06policy\x18\x1e \x01(\x05R\x06policy\x124\n" + "\x17delay_acct_blk_io_ticks\x18\x1f \x01(\x03R\x13delayAcctBlkIoTicks\x12\x1d\n" + "\n" + - "guest_time\x18 \x01(\x05R\tguestTime\x12\x1f\n" + - "\vcguest_time\x18! \x01(\x05R\n" + + "guest_time\x18 \x01(\x03R\tguestTime\x12\x1f\n" + + "\vcguest_time\x18! \x01(\x03R\n" + "cguestTime\"\xd2\x06\n" + "\n" + "ProcStatus\x12\x10\n" + - "\x03pid\x18\x01 \x01(\x05R\x03pid\x12\x12\n" + + "\x03pid\x18\x01 \x01(\x03R\x03pid\x12\x12\n" + "\x04name\x18\x02 \x01(\tR\x04name\x12\x12\n" + "\x04tgid\x18\x03 \x01(\x05R\x04tgid\x12\x17\n" + "\ans_pids\x18\x04 \x03(\x03R\x06nsPids\x12\x17\n" + diff --git a/api/proto/common/yagpcc_metrics.proto b/api/proto/common/yagpcc_metrics.proto index f6eb57e..7322a56 100644 --- a/api/proto/common/yagpcc_metrics.proto +++ b/api/proto/common/yagpcc_metrics.proto @@ -577,7 +577,7 @@ message SpillInfo { // See https://pkg.go.dev/github.com/prometheus/procfs#ProcStat message ProcStat { // The process ID. - int32 pid = 1; + int64 pid = 1; // The filename of the executable. string comm = 2; @@ -586,7 +586,7 @@ message ProcStat { string state = 3; // The PID of the parent of this process. - int32 ppid = 4; + int64 ppid = 4; // The process group ID of the process. int32 pgrp = 5; @@ -606,35 +606,35 @@ message ProcStat { // The number of minor faults the process has made which have not required // loading a memory page from disk. - int32 min_flt = 10; + int64 min_flt = 10; // The number of minor faults that the process's waited-for children have // made. - int32 cmin_flt = 11; + int64 cmin_flt = 11; // The number of major faults the process has made which have required // loading a memory page from disk. - int32 maj_flt = 12; + int64 maj_flt = 12; // The number of major faults that the process's waited-for children have // made. - int32 cmaj_flt = 13; + int64 cmaj_flt = 13; // Amount of time that this process has been scheduled in user mode, // measured in clock ticks. - int32 utime = 14; + int64 utime = 14; // Amount of time that this process has been scheduled in kernel mode, // measured in clock ticks. - int32 stime = 15; + int64 stime = 15; // Amount of time that this process's waited-for children have been // scheduled in user mode, measured in clock ticks. - int32 cutime = 16; + int64 cutime = 16; // Amount of time that this process's waited-for children have been // scheduled in kernel mode, measured in clock ticks. - int32 cstime = 17; + int64 cstime = 17; // For processes running a real-time scheduling policy, this is the negated // scheduling priority, minus one. @@ -684,10 +684,10 @@ message ProcStat { // Guest time of the process (time spent running a virtual CPU for a guest // operating system), measured in clock ticks. - int32 guest_time = 32; + int64 guest_time = 32; // Guest time of the process's children, measured in clock ticks. - int32 cguest_time = 33; + int64 cguest_time = 33; } // ProcStatus provides status information about the process, @@ -695,7 +695,7 @@ message ProcStat { // See https://pkg.go.dev/github.com/prometheus/procfs@v0.20.1#ProcStatus message ProcStatus { // The process ID. - int32 pid = 1; + int64 pid = 1; // The process name. string name = 2; diff --git a/internal/app/app.go b/internal/app/app.go index 5d8bf83..7e1e680 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -382,12 +382,13 @@ func Run(ctx context.Context, configFile string) error { metrics.YagpccMetrics.ExecutingQueryLatencies.AssignQueryGetter(rqStorage.GetQueriesStartTime) aggStorage := storage.NewConfiguredAggregatedStorage(logger, cfg) sessionsStorage := gp.NewSessionsStorage(rqStorage) + procfsStorage := storage.NewProcfsStorage() masterConnection := gp.NewConnection(baseApp.L(), &cfg.MasterConnection, nil) masterSentinel := master_sentinel.NewSentinel(baseApp.L(), masterConnection) statActivityLister := stat_activity.NewLister(baseApp.L(), masterConnection) - backgroundStorage := master.NewBackgroundStorage(logger, sessionsStorage, rqStorage, aggStorage, statActivityLister) + backgroundStorage := master.NewBackgroundStorage(logger, sessionsStorage, rqStorage, aggStorage, procfsStorage, statActivityLister) agentApp, err := NewApp(baseApp, cfg, statActivityLister, backgroundStorage) if err != nil { diff --git a/internal/config/config.go b/internal/config/config.go index d9514fc..46944bf 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -54,6 +54,7 @@ type Config struct { ShortAggInterval time.Duration `config:"short_agg_interval" yaml:"short_agg_interval"` SessionRefreshInterval time.Duration `config:"session_refresh_interval" yaml:"session_refresh_interval"` QueriesRefreshInterval time.Duration `config:"session_refresh_interval" yaml:"queries_refresh_interval"` + ProcfsRefreshInterval time.Duration `config:"procfs_refresh_interval" yaml:"procfs_refresh_interval"` SessionSendMetricInterval time.Duration `config:"session_send_metric_interval" yaml:"session_send_metric_interval"` MinFreePercent uint32 `config:"min_free_percent" yaml:"min_free_percent"` CustomSegmentList *SegmentList `config:"custom_segment_list" yaml:"custom_segment_list"` @@ -116,6 +117,7 @@ func DefaultConfig() (*Config, error) { ShortAggInterval: time.Duration(time.Minute * 10), SessionRefreshInterval: time.Duration(time.Second * 30), QueriesRefreshInterval: time.Duration(time.Second * 1), + ProcfsRefreshInterval: time.Duration(time.Second * 60), SessionSendMetricInterval: time.Duration(time.Second * 60), MinFreePercent: 10, CustomSegmentList: nil, diff --git a/internal/grpc/actions_test.go b/internal/grpc/actions_test.go index c88db77..d47dfb6 100644 --- a/internal/grpc/actions_test.go +++ b/internal/grpc/actions_test.go @@ -23,7 +23,7 @@ func newTestActionsServer(t *testing.T) *ActionsServer { rq := storage.NewRunningQueriesStorage() sessStorage := gp.NewSessionsStorage(rq) agg := storage.NewAggregatedStorage(z) - bg := master.NewBackgroundStorage(z, sessStorage, rq, agg, nil) + bg := master.NewBackgroundStorage(z, sessStorage, rq, agg, nil, nil) return &ActionsServer{ Logger: z, Timeout: 5 * time.Second, diff --git a/internal/grpc/set_get_query_info_parallel_test.go b/internal/grpc/set_get_query_info_parallel_test.go index c329d7a..8901374 100644 --- a/internal/grpc/set_get_query_info_parallel_test.go +++ b/internal/grpc/set_get_query_info_parallel_test.go @@ -30,7 +30,7 @@ func TestParallelSetGet(t *testing.T) { rqStorage := storage.NewRunningQueriesStorage() aggStorage := storage.NewAggregatedStorage(zLogger) sessStorage := gp.NewSessionsStorage(rqStorage) - backgroundStorage := master.NewBackgroundStorage(zLogger, sessStorage, rqStorage, aggStorage, nil) + backgroundStorage := master.NewBackgroundStorage(zLogger, sessStorage, rqStorage, aggStorage, nil, nil) tests := []struct { name string diff --git a/internal/grpc/testutils_test.go b/internal/grpc/testutils_test.go index fdc6d8b..b30db27 100644 --- a/internal/grpc/testutils_test.go +++ b/internal/grpc/testutils_test.go @@ -138,7 +138,8 @@ func setupGRPCClientSet(t *testing.T, sessionMocker *MockStatActivityLister) (*g rqStorage := storage.NewRunningQueriesStorage() sessStorage := gp.NewSessionsStorage(rqStorage) aggStorage := storage.NewAggregatedStorage(zLogger) - backgroundStorage := master.NewBackgroundStorage(zLogger, sessStorage, rqStorage, aggStorage, sessionMocker) + procfsStorage := storage.NewProcfsStorage() + backgroundStorage := master.NewBackgroundStorage(zLogger, sessStorage, rqStorage, aggStorage, procfsStorage, sessionMocker) conn, err := gogrpc.NewClient( "localhost", diff --git a/internal/master/background.go b/internal/master/background.go index 8fc4b70..41cac62 100644 --- a/internal/master/background.go +++ b/internal/master/background.go @@ -39,6 +39,7 @@ type ( SessionStorage *gp.SessionsStorage AggStorage *storage.AggregatedStorage RQStorage *storage.RunningQueriesStorage + ProcfsStorage *storage.ProcfsStorage statActivityLister statActivityLister } ) @@ -49,6 +50,22 @@ var ( segCountLock sync.Mutex ) +func NewBackgroundStorage(l *zap.SugaredLogger, + sessionStorage *gp.SessionsStorage, + rqStorage *storage.RunningQueriesStorage, + aggStorage *storage.AggregatedStorage, + procfsStorage *storage.ProcfsStorage, + sActivityLister statActivityLister) *BackgroundStorage { + return &BackgroundStorage{ + l: l, + SessionStorage: sessionStorage, + AggStorage: aggStorage, + RQStorage: rqStorage, + ProcfsStorage: procfsStorage, + statActivityLister: sActivityLister, + } +} + func (bs *BackgroundStorage) SendSegmentRefreshMessages(ctx context.Context, pullRateSec float64, configCacheDurability time.Duration, portn uint32, customSegmentList *config.SegmentList) error { durationBetweenLoop := time.Second * time.Duration(pullRateSec) @@ -462,6 +479,32 @@ func (bs *BackgroundStorage) RefreshQueries(ctx context.Context, } } +func (bs *BackgroundStorage) RefreshProcfs(ctx context.Context, procfsRefreshInterval time.Duration, nPullers int, portn uint32, msgSize int) error { + for { + currTime := time.Now() + nextTime := currTime.Add(procfsRefreshInterval) + select { + case <-ctx.Done(): + bs.l.Warn("Done RefreshProcfs") + return fmt.Errorf("done context with %v", ctx.Err()) + default: + bs.l.Debugf("Refresh procfs stat %v", currTime) + procfsGatherStorage := NewProcfsGatherStorage(bs.l, bs.statActivityLister, currTime) + err := procfsGatherStorage.GatherProcfsStat(ctx, nPullers, portn, procfsRefreshInterval, msgSize) + if err != nil { + // just log error, do not fail the whole service + bs.l.Errorf("fail to get procfs data %v", err) + } else { + bs.ProcfsStorage.RegisterProcfsStat(currTime, procfsGatherStorage.GetProcfsStat()) + } + } + if metrics.YagpccMetrics != nil { + metrics.YagpccMetrics.HandleLatencies.With(map[string]string{"method": "RefreshProcfs"}).Observe(time.Since(currTime).Seconds()) + } + time.Sleep(time.Until(nextTime)) + } +} + func InitConnection(ctx context.Context, l *zap.SugaredLogger, cfg *config.Config, firstTry bool) error { tries := int(cfg.MasterConnectionTries) if firstTry { @@ -474,16 +517,6 @@ func InitConnection(ctx context.Context, l *zap.SugaredLogger, cfg *config.Confi return nil } -func NewBackgroundStorage(l *zap.SugaredLogger, sessionStorage *gp.SessionsStorage, rqStorage *storage.RunningQueriesStorage, aggStorage *storage.AggregatedStorage, sActivityLister statActivityLister) *BackgroundStorage { - return &BackgroundStorage{ - l: l, - SessionStorage: sessionStorage, - AggStorage: aggStorage, - RQStorage: rqStorage, - statActivityLister: sActivityLister, - } -} - func InitBG( ctx context.Context, l *zap.SugaredLogger, @@ -562,6 +595,12 @@ func InitBG( return err }, ) + errG.Go(func() error { + err := backgroundStorage.RefreshProcfs(ctxI, cfg.ProcfsRefreshInterval, int(cfg.SegmentPullThreads), cfg.ListenPort, int(cfg.MaxMessageSize)) + l.Errorf("got %v refresh session and queries", err) + return err + }, + ) err = errG.Wait() if err != nil { backgroundStorage.statActivityLister.Stop() diff --git a/internal/master/procfs.go b/internal/master/procfs_gather.go similarity index 55% rename from internal/master/procfs.go rename to internal/master/procfs_gather.go index 849af0d..5d8cab5 100644 --- a/internal/master/procfs.go +++ b/internal/master/procfs_gather.go @@ -3,6 +3,8 @@ package master import ( "context" "fmt" + "slices" + "sync" "time" "golang.org/x/sync/errgroup" @@ -13,6 +15,8 @@ import ( "github.com/open-gpdb/yagpcc/internal/storage" pb "github.com/open-gpdb/yagpcc/api/proto/agent_segment" + pbc "github.com/open-gpdb/yagpcc/api/proto/common" + "go.uber.org/zap" ) const ( @@ -21,9 +25,27 @@ const ( type ( hostJobMap = map[string][]stat_activity.SessionPid + + ProcfsGatherStorage struct { + mx *sync.RWMutex + procfsStat []*pbc.GpPidProcInfo + l *zap.SugaredLogger + statActivityLister statActivityLister + gatherTime time.Time + } ) -func (bs *BackgroundStorage) getJobsMap(sessions []stat_activity.SessionPid) hostJobMap { +func NewProcfsGatherStorage(l *zap.SugaredLogger, sActivityLister statActivityLister, gTime time.Time) *ProcfsGatherStorage { + return &ProcfsGatherStorage{ + mx: &sync.RWMutex{}, + l: l, + statActivityLister: sActivityLister, + gatherTime: gTime, + procfsStat: make([]*pbc.GpPidProcInfo, 0, 10), + } +} + +func (ps *ProcfsGatherStorage) getJobsMap(sessions []stat_activity.SessionPid) hostJobMap { hostJobs := make(hostJobMap) // make work for each host for _, process := range sessions { @@ -42,7 +64,14 @@ func (bs *BackgroundStorage) getJobsMap(sessions []stat_activity.SessionPid) hos return hostJobs } -func (bs *BackgroundStorage) processProcfsRequests(ctx context.Context, hostname string, portn uint32, gatherTimeout time.Duration, maxMsgSize int, reqs []stat_activity.SessionPid) error { +func (ps *ProcfsGatherStorage) addProcfsStat(procfsStat []*pbc.GpPidProcInfo) { + ps.mx.Lock() + defer ps.mx.Unlock() + + ps.procfsStat = append(ps.procfsStat, procfsStat...) +} + +func (ps *ProcfsGatherStorage) processProcfsRequests(ctx context.Context, hostname string, portn uint32, gatherTimeout time.Duration, maxMsgSize int, reqs []stat_activity.SessionPid) error { grpcConn, err := getGrpcClientConnection(ctx, hostname, portn, gatherTimeout.Seconds()) if err != nil { return fmt.Errorf("grpc client connection error: %v", err) @@ -65,34 +94,35 @@ func (bs *BackgroundStorage) processProcfsRequests(ctx context.Context, hostname SessId: int64(req.SessId), }) if len(msgReq.SegmentProcess) >= jobsPerQuery { - _, errGet := cGet.GetPidProcStat(ctxTimeout, msgReq, maxSizeOption) + segResponse, errGet := cGet.GetPidProcStat(ctxTimeout, msgReq, maxSizeOption) if errGet != nil { return fmt.Errorf("grpc get pid proc stat error: %v", errGet) } + ps.addProcfsStat(segResponse.GetPidProcData()) msgReq.SegmentProcess = make([]*pb.SegmentProcess, 0, 10) } } } if len(msgReq.SegmentProcess) > 0 { - _, errGet := cGet.GetPidProcStat(ctxTimeout, msgReq, maxSizeOption) + segResponse, errGet := cGet.GetPidProcStat(ctxTimeout, msgReq, maxSizeOption) if errGet != nil { return fmt.Errorf("grpc get pid proc stat error: %v", errGet) } - + ps.addProcfsStat(segResponse.GetPidProcData()) } return nil } -func (bs *BackgroundStorage) GatherProcfsStat(ctx context.Context, nPullers int, portn uint32, gatherTimeout time.Duration, maxMsgSize int) error { +func (ps *ProcfsGatherStorage) GatherProcfsStat(ctx context.Context, nPullers int, portn uint32, gatherTimeout time.Duration, maxMsgSize int) error { if nPullers <= 0 { return fmt.Errorf("nPullers must be greater than 0, got %d", nPullers) } - bs.l.Debug("GatherProcfsStat") - sessions, err := bs.statActivityLister.ListAllSessions(ctx) + ps.l.Debug("GatherProcfsStat") + sessions, err := ps.statActivityLister.ListAllSessions(ctx) if err != nil { return fmt.Errorf("error listing sessions pids: %v", err) } - hostJobs := bs.getJobsMap(sessions) + hostJobs := ps.getJobsMap(sessions) ctxT, ctxTC := context.WithTimeout(ctx, gatherTimeout) defer ctxTC() @@ -101,9 +131,16 @@ func (bs *BackgroundStorage) GatherProcfsStat(ctx context.Context, nPullers int, for hostname, procfsProcesses := range hostJobs { g.Go(func() error { - return bs.processProcfsRequests(ctxG, hostname, portn, gatherTimeout, maxMsgSize, procfsProcesses) + return ps.processProcfsRequests(ctxG, hostname, portn, gatherTimeout, maxMsgSize, procfsProcesses) }) } return g.Wait() } + +func (ps *ProcfsGatherStorage) GetProcfsStat() []*pbc.GpPidProcInfo { + ps.mx.RLock() + defer ps.mx.RUnlock() + + return slices.Clone(ps.procfsStat) +} diff --git a/internal/master/procfs_gather_test.go b/internal/master/procfs_gather_test.go new file mode 100644 index 0000000..542c93d --- /dev/null +++ b/internal/master/procfs_gather_test.go @@ -0,0 +1,623 @@ +package master + +import ( + "context" + "fmt" + "log" + "net" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/test/bufconn" + + pb "github.com/open-gpdb/yagpcc/api/proto/agent_segment" + pbc "github.com/open-gpdb/yagpcc/api/proto/common" + "github.com/open-gpdb/yagpcc/internal/gp" + "github.com/open-gpdb/yagpcc/internal/gp/stat_activity" + "github.com/open-gpdb/yagpcc/internal/storage" +) + +// --- mock statActivityLister --- + +type mockStatActivityLister struct { + sessions []stat_activity.SessionPid + sessionsErr error + listCalled bool +} + +func (m *mockStatActivityLister) Start(context.Context) error { return nil } +func (m *mockStatActivityLister) Stop() {} +func (m *mockStatActivityLister) List(context.Context) ([]*gp.GpStatActivity, error) { + return nil, nil +} +func (m *mockStatActivityLister) ListAllSessions(context.Context) ([]stat_activity.SessionPid, error) { + m.listCalled = true + return m.sessions, m.sessionsErr +} + +// --- fake gRPC servers --- + +// fakeProcStatServer returns empty responses (no PidProcData). +type fakeProcStatServer struct { + pb.UnimplementedGetQueryInfoServer + mu sync.Mutex + called bool + lastReq *pb.GetPidProcInfoReq +} + +func (s *fakeProcStatServer) GetPidProcStat(_ context.Context, req *pb.GetPidProcInfoReq) (*pb.GetPidProcInfoResponse, error) { + s.mu.Lock() + s.called = true + s.lastReq = req + s.mu.Unlock() + return &pb.GetPidProcInfoResponse{}, nil +} + +func (s *fakeProcStatServer) snapshot() (bool, *pb.GetPidProcInfoReq) { + s.mu.Lock() + defer s.mu.Unlock() + return s.called, s.lastReq +} + +func (s *fakeProcStatServer) GetMetricQueries(_ context.Context, _ *pb.GetQueriesInfoReq) (*pb.GetQueriesInfoResponse, error) { + return &pb.GetQueriesInfoResponse{}, nil +} + +// dataProcStatServer returns actual PidProcData echoing back the requested segments. +type dataProcStatServer struct { + pb.UnimplementedGetQueryInfoServer + mu sync.Mutex + calls int + allData []*pbc.GpPidProcInfo +} + +func (s *dataProcStatServer) GetPidProcStat(_ context.Context, req *pb.GetPidProcInfoReq) (*pb.GetPidProcInfoResponse, error) { + s.mu.Lock() + defer s.mu.Unlock() + s.calls++ + resp := &pb.GetPidProcInfoResponse{ + PidProcData: make([]*pbc.GpPidProcInfo, 0, len(req.SegmentProcess)), + } + for _, sp := range req.SegmentProcess { + info := &pbc.GpPidProcInfo{ + GpSegmentId: sp.GpSegmentId, + SessId: sp.SessId, + Pid: sp.Pid, + Cmdline: fmt.Sprintf("cmd-%d", sp.Pid), + State: "S", + } + resp.PidProcData = append(resp.PidProcData, info) + s.allData = append(s.allData, info) + } + return resp, nil +} + +func (s *dataProcStatServer) GetMetricQueries(context.Context, *pb.GetQueriesInfoReq) (*pb.GetQueriesInfoResponse, error) { + return &pb.GetQueriesInfoResponse{}, nil +} + +func (s *dataProcStatServer) getCalls() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.calls +} + +type failingProcStatServer struct { + pb.UnimplementedGetQueryInfoServer +} + +func (s *failingProcStatServer) GetPidProcStat(context.Context, *pb.GetPidProcInfoReq) (*pb.GetPidProcInfoResponse, error) { + return nil, fmt.Errorf("simulated gRPC error") +} + +func (s *failingProcStatServer) GetMetricQueries(context.Context, *pb.GetQueriesInfoReq) (*pb.GetQueriesInfoResponse, error) { + return &pb.GetQueriesInfoResponse{}, nil +} + +// --- test helpers --- + +func setupBufconnServer(t *testing.T, srv pb.GetQueryInfoServer) *bufconn.Listener { + t.Helper() + lis := bufconn.Listen(1024 * 1024) + s := grpc.NewServer() + pb.RegisterGetQueryInfoServer(s, srv) + go func() { + if err := s.Serve(lis); err != nil { + log.Printf("bufconn server exited: %v", err) + } + }() + t.Cleanup(func() { s.Stop() }) + return lis +} + +func dialBufconn(t *testing.T, lis *bufconn.Listener) *grpc.ClientConn { + t.Helper() + conn, err := grpc.NewClient( + "passthrough:///bufconn", + grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { + return lis.Dial() + }), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + t.Cleanup(func() { _ = conn.Close() }) + return conn +} + +func newTestLogger() *zap.SugaredLogger { + cfg := zap.NewDevelopmentConfig() + cfg.Level = zap.NewAtomicLevelAt(zap.WarnLevel) + l, _ := cfg.Build() + return l.Sugar() +} + +// newTestProcfsGatherStorage creates a properly initialized ProcfsGatherStorage for tests. +func newTestProcfsGatherStorage(mock statActivityLister) *ProcfsGatherStorage { + return NewProcfsGatherStorage(newTestLogger(), mock, time.Now()) +} + +// injectBufconn registers a bufconn connection for the given hostname in the global cache. +func injectBufconn(t *testing.T, lis *bufconn.Listener) string { + t.Helper() + conn := dialBufconn(t, lis) + hostname := fmt.Sprintf("test-%d", time.Now().UnixNano()) + segConnectionLock.Lock() + segConnections[hostname] = conn + segConnectionLock.Unlock() + t.Cleanup(func() { + segConnectionLock.Lock() + delete(segConnections, hostname) + segConnectionLock.Unlock() + }) + return hostname +} + +// ============================================================ +// Tests for NewProcfsGatherStorage +// ============================================================ + +func TestNewProcfsGatherStorage(t *testing.T) { + gTime := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) + mock := &mockStatActivityLister{} + ps := NewProcfsGatherStorage(newTestLogger(), mock, gTime) + + require.NotNil(t, ps) + require.NotNil(t, ps.mx) + assert.Equal(t, gTime, ps.gatherTime) + assert.NotNil(t, ps.procfsStat) + assert.Empty(t, ps.procfsStat) +} + +// ============================================================ +// Tests for addProcfsStat +// ============================================================ + +func TestAddProcfsStat_SingleBatch(t *testing.T) { + ps := newTestProcfsGatherStorage(nil) + + data := []*pbc.GpPidProcInfo{ + {GpSegmentId: 1, SessId: 10, Pid: 100, Cmdline: "cmd1"}, + {GpSegmentId: 2, SessId: 20, Pid: 200, Cmdline: "cmd2"}, + } + ps.addProcfsStat(data) + + result := ps.GetProcfsStat() + require.Len(t, result, 2) + assert.Equal(t, "cmd1", result[0].Cmdline) + assert.Equal(t, "cmd2", result[1].Cmdline) +} + +func TestAddProcfsStat_MultipleBatches(t *testing.T) { + ps := newTestProcfsGatherStorage(nil) + + ps.addProcfsStat([]*pbc.GpPidProcInfo{ + {GpSegmentId: 1, SessId: 10, Pid: 100, Cmdline: "batch1"}, + }) + ps.addProcfsStat([]*pbc.GpPidProcInfo{ + {GpSegmentId: 2, SessId: 20, Pid: 200, Cmdline: "batch2"}, + {GpSegmentId: 3, SessId: 30, Pid: 300, Cmdline: "batch3"}, + }) + + result := ps.GetProcfsStat() + require.Len(t, result, 3) + assert.Equal(t, "batch1", result[0].Cmdline) + assert.Equal(t, "batch2", result[1].Cmdline) + assert.Equal(t, "batch3", result[2].Cmdline) +} + +func TestAddProcfsStat_EmptyBatch(t *testing.T) { + ps := newTestProcfsGatherStorage(nil) + + ps.addProcfsStat([]*pbc.GpPidProcInfo{}) + assert.Empty(t, ps.GetProcfsStat()) + + ps.addProcfsStat(nil) + assert.Empty(t, ps.GetProcfsStat()) +} + +func TestAddProcfsStat_ConcurrentAccess(t *testing.T) { + ps := newTestProcfsGatherStorage(nil) + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + ps.addProcfsStat([]*pbc.GpPidProcInfo{ + {GpSegmentId: int64(id), Pid: int64(id * 100), Cmdline: fmt.Sprintf("cmd-%d", id)}, + }) + }(i) + } + wg.Wait() + + result := ps.GetProcfsStat() + assert.Len(t, result, 10) +} + +// ============================================================ +// Tests for GetProcfsStat +// ============================================================ + +func TestGetProcfsStat_Empty(t *testing.T) { + ps := newTestProcfsGatherStorage(nil) + result := ps.GetProcfsStat() + assert.NotNil(t, result) + assert.Empty(t, result) +} + +func TestGetProcfsStat_ReturnsClone(t *testing.T) { + ps := newTestProcfsGatherStorage(nil) + ps.addProcfsStat([]*pbc.GpPidProcInfo{ + {GpSegmentId: 1, Pid: 100, Cmdline: "original"}, + }) + + result1 := ps.GetProcfsStat() + result2 := ps.GetProcfsStat() + + // Modifying the returned slice should not affect the internal state + result1[0] = &pbc.GpPidProcInfo{Cmdline: "modified"} + assert.Equal(t, "original", result2[0].Cmdline) + assert.Equal(t, "original", ps.GetProcfsStat()[0].Cmdline) +} + +// ============================================================ +// Tests for getJobsMap +// ============================================================ + +func TestGetJobsMap_EmptyInput(t *testing.T) { + ps := newTestProcfsGatherStorage(nil) + result := ps.getJobsMap(nil) + assert.NotNil(t, result) + assert.Empty(t, result) + + result2 := ps.getJobsMap([]stat_activity.SessionPid{}) + assert.NotNil(t, result2) + assert.Empty(t, result2) +} + +func TestGetJobsMap_SingleHost(t *testing.T) { + storage.SetHostnameForSegindex(10, "host-a") + + sessions := []stat_activity.SessionPid{ + {GpSegmentId: 10, Pid: 100, SessId: 1}, + {GpSegmentId: 10, Pid: 200, SessId: 2}, + } + + ps := newTestProcfsGatherStorage(nil) + result := ps.getJobsMap(sessions) + + _, exists := result["host-a"] + assert.True(t, exists, "expected key 'host-a' in hostJobMap") +} + +func TestGetJobsMap_MultipleHosts(t *testing.T) { + storage.SetHostnameForSegindex(20, "host-b") + storage.SetHostnameForSegindex(21, "host-c") + + sessions := []stat_activity.SessionPid{ + {GpSegmentId: 20, Pid: 100, SessId: 1}, + {GpSegmentId: 21, Pid: 200, SessId: 2}, + {GpSegmentId: 20, Pid: 300, SessId: 3}, + } + + ps := newTestProcfsGatherStorage(nil) + result := ps.getJobsMap(sessions) + + assert.Contains(t, result, "host-b") + assert.Contains(t, result, "host-c") +} + +func TestGetJobsMap_UnknownSegindex(t *testing.T) { + sessions := []stat_activity.SessionPid{ + {GpSegmentId: 9999, Pid: 100, SessId: 1}, + } + + ps := newTestProcfsGatherStorage(nil) + result := ps.getJobsMap(sessions) + _, exists := result["9999"] + assert.True(t, exists, "expected key '9999' for unknown segindex") +} + +// ============================================================ +// Tests for processProcfsRequests +// ============================================================ + +func TestProcessProcfsRequests_Success(t *testing.T) { + fakeSrv := &fakeProcStatServer{} + lis := setupBufconnServer(t, fakeSrv) + hostname := injectBufconn(t, lis) + + reqs := []stat_activity.SessionPid{ + {GpSegmentId: 1, Pid: 100, SessId: 10}, + {GpSegmentId: 2, Pid: 200, SessId: 20}, + } + + ps := newTestProcfsGatherStorage(nil) + ctx := context.Background() + err := ps.processProcfsRequests(ctx, hostname, 0, 5*time.Second, 4*1024*1024, reqs) + require.NoError(t, err) + called, lastReq := fakeSrv.snapshot() + assert.True(t, called, "expected GetPidProcStat to be called") + require.NotNil(t, lastReq) + assert.Len(t, lastReq.SegmentProcess, 2) + + sp0 := lastReq.SegmentProcess[0] + assert.Equal(t, int64(1), sp0.GpSegmentId) + assert.Equal(t, int64(100), sp0.Pid) + assert.Equal(t, int64(10), sp0.SessId) + + sp1 := lastReq.SegmentProcess[1] + assert.Equal(t, int64(2), sp1.GpSegmentId) + assert.Equal(t, int64(200), sp1.Pid) + assert.Equal(t, int64(20), sp1.SessId) +} + +func TestProcessProcfsRequests_SavesData(t *testing.T) { + dataSrv := &dataProcStatServer{} + lis := setupBufconnServer(t, dataSrv) + hostname := injectBufconn(t, lis) + + reqs := []stat_activity.SessionPid{ + {GpSegmentId: 1, Pid: 100, SessId: 10}, + {GpSegmentId: 2, Pid: 200, SessId: 20}, + } + + ps := newTestProcfsGatherStorage(nil) + err := ps.processProcfsRequests(context.Background(), hostname, 0, 5*time.Second, 4*1024*1024, reqs) + require.NoError(t, err) + + // Verify data was saved via addProcfsStat + result := ps.GetProcfsStat() + require.Len(t, result, 2) + assert.Equal(t, int64(1), result[0].GpSegmentId) + assert.Equal(t, int64(100), result[0].Pid) + assert.Equal(t, "cmd-100", result[0].Cmdline) + assert.Equal(t, int64(2), result[1].GpSegmentId) + assert.Equal(t, int64(200), result[1].Pid) + assert.Equal(t, "cmd-200", result[1].Cmdline) +} + +func TestProcessProcfsRequests_SavesDataWithBatching(t *testing.T) { + dataSrv := &dataProcStatServer{} + lis := setupBufconnServer(t, dataSrv) + hostname := injectBufconn(t, lis) + + // Create jobsPerQuery + 5 requests to trigger batching + reqs := make([]stat_activity.SessionPid, 0, jobsPerQuery+5) + for i := 0; i < jobsPerQuery+5; i++ { + reqs = append(reqs, stat_activity.SessionPid{ + GpSegmentId: 1, + Pid: 100 + i, + SessId: i + 1, + }) + } + + ps := newTestProcfsGatherStorage(nil) + err := ps.processProcfsRequests(context.Background(), hostname, 0, 10*time.Second, 4*1024*1024, reqs) + require.NoError(t, err) + + // Should have called the server at least twice (one batch of 1000, one of 5) + assert.GreaterOrEqual(t, dataSrv.getCalls(), 2) + + // All data should be accumulated + result := ps.GetProcfsStat() + assert.Len(t, result, jobsPerQuery+5) +} + +func TestProcessProcfsRequests_GrpcError(t *testing.T) { + failSrv := &failingProcStatServer{} + lis := setupBufconnServer(t, failSrv) + hostname := injectBufconn(t, lis) + + reqs := []stat_activity.SessionPid{ + {GpSegmentId: 1, Pid: 100, SessId: 10}, + } + + ps := newTestProcfsGatherStorage(nil) + err := ps.processProcfsRequests(context.Background(), hostname, 0, 5*time.Second, 4*1024*1024, reqs) + require.Error(t, err) + assert.Contains(t, err.Error(), "simulated gRPC error") + + // No data should have been saved + assert.Empty(t, ps.GetProcfsStat()) +} + +func TestProcessProcfsRequests_CancelledContext(t *testing.T) { + fakeSrv := &fakeProcStatServer{} + lis := setupBufconnServer(t, fakeSrv) + hostname := injectBufconn(t, lis) + + reqs := []stat_activity.SessionPid{ + {GpSegmentId: 1, Pid: 100, SessId: 10}, + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // cancel immediately + + ps := newTestProcfsGatherStorage(nil) + err := ps.processProcfsRequests(ctx, hostname, 0, 5*time.Second, 4*1024*1024, reqs) + if err != nil { + assert.ErrorIs(t, ctx.Err(), context.Canceled) + } +} + +func TestProcessProcfsRequests_EmptyRequests(t *testing.T) { + fakeSrv := &fakeProcStatServer{} + lis := setupBufconnServer(t, fakeSrv) + hostname := injectBufconn(t, lis) + + ps := newTestProcfsGatherStorage(nil) + err := ps.processProcfsRequests(context.Background(), hostname, 0, 5*time.Second, 4*1024*1024, nil) + require.NoError(t, err) + called, _ := fakeSrv.snapshot() + assert.False(t, called, "GetPidProcStat should not be called with empty segment list") + assert.Empty(t, ps.GetProcfsStat()) +} + +// ============================================================ +// Tests for GatherProcfsStat +// ============================================================ + +func TestGatherProcfsStat_ListAllSessionsError(t *testing.T) { + mock := &mockStatActivityLister{ + sessionsErr: fmt.Errorf("db connection failed"), + } + ps := newTestProcfsGatherStorage(mock) + + err := ps.GatherProcfsStat(context.Background(), 2, 50051, 5*time.Second, 4*1024*1024) + require.Error(t, err) + assert.Contains(t, err.Error(), "db connection failed") + assert.True(t, mock.listCalled) +} + +func TestGatherProcfsStat_EmptySessions(t *testing.T) { + mock := &mockStatActivityLister{ + sessions: []stat_activity.SessionPid{}, + } + ps := newTestProcfsGatherStorage(mock) + + err := ps.GatherProcfsStat(context.Background(), 2, 50051, 5*time.Second, 4*1024*1024) + require.NoError(t, err) + assert.True(t, mock.listCalled) + assert.Empty(t, ps.GetProcfsStat()) +} + +func TestGatherProcfsStat_WithSessions_SavesData(t *testing.T) { + dataSrv := &dataProcStatServer{} + lis := setupBufconnServer(t, dataSrv) + hostname := injectBufconn(t, lis) + + storage.SetHostnameForSegindex(30, hostname) + + mock := &mockStatActivityLister{ + sessions: []stat_activity.SessionPid{ + {GpSegmentId: 30, Pid: 100, SessId: 1}, + {GpSegmentId: 30, Pid: 200, SessId: 2}, + }, + } + ps := newTestProcfsGatherStorage(mock) + + err := ps.GatherProcfsStat(context.Background(), 2, 0, 5*time.Second, 4*1024*1024) + require.NoError(t, err) + assert.True(t, mock.listCalled) + + // Verify data was accumulated + result := ps.GetProcfsStat() + require.Len(t, result, 2) + // Check that the PIDs match (order may vary due to concurrency) + pids := make(map[int64]bool) + for _, r := range result { + pids[r.Pid] = true + } + assert.True(t, pids[100]) + assert.True(t, pids[200]) +} + +func TestGatherProcfsStat_ContextCancelled(t *testing.T) { + mock := &mockStatActivityLister{ + sessions: []stat_activity.SessionPid{ + {GpSegmentId: 40, Pid: 100, SessId: 1}, + }, + } + ps := newTestProcfsGatherStorage(mock) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err := ps.GatherProcfsStat(ctx, 2, 50051, 5*time.Second, 4*1024*1024) + _ = err // may or may not error +} + +func TestGatherProcfsStat_GrpcFailure(t *testing.T) { + failSrv := &failingProcStatServer{} + lis := setupBufconnServer(t, failSrv) + hostname := injectBufconn(t, lis) + + storage.SetHostnameForSegindex(50, hostname) + + mock := &mockStatActivityLister{ + sessions: []stat_activity.SessionPid{ + {GpSegmentId: 50, Pid: 100, SessId: 1}, + }, + } + ps := newTestProcfsGatherStorage(mock) + + err := ps.GatherProcfsStat(context.Background(), 2, 0, 5*time.Second, 4*1024*1024) + require.Error(t, err) + assert.Contains(t, err.Error(), "simulated gRPC error") + // No data should be saved on error + assert.Empty(t, ps.GetProcfsStat()) +} + +func TestGatherProcfsStat_ManySessionsBatching(t *testing.T) { + dataSrv := &dataProcStatServer{} + lis := setupBufconnServer(t, dataSrv) + hostname := injectBufconn(t, lis) + + storage.SetHostnameForSegindex(60, hostname) + + sessions := make([]stat_activity.SessionPid, 0, jobsPerQuery+5) + for i := 0; i < jobsPerQuery+5; i++ { + sessions = append(sessions, stat_activity.SessionPid{ + GpSegmentId: 60, + Pid: 100 + i, + SessId: i + 1, + }) + } + + mock := &mockStatActivityLister{sessions: sessions} + ps := newTestProcfsGatherStorage(mock) + + err := ps.GatherProcfsStat(context.Background(), 4, 0, 10*time.Second, 4*1024*1024) + require.NoError(t, err) + assert.True(t, mock.listCalled) + + result := ps.GetProcfsStat() + assert.Len(t, result, jobsPerQuery+5) +} + +func TestGatherProcfsStat_InvalidNPullers(t *testing.T) { + mock := &mockStatActivityLister{} + ps := newTestProcfsGatherStorage(mock) + + for _, n := range []int{0, -1, -100} { + err := ps.GatherProcfsStat(context.Background(), n, 50051, 5*time.Second, 4*1024*1024) + require.Error(t, err) + assert.Contains(t, err.Error(), "nPullers must be greater than 0") + assert.False(t, mock.listCalled, "ListAllSessions should not be called for invalid nPullers") + } +} + +// ============================================================ +// Tests for constants +// ============================================================ + +func TestConstants(t *testing.T) { + assert.Equal(t, 1000, jobsPerQuery) +} diff --git a/internal/master/procfs_test.go b/internal/master/procfs_test.go deleted file mode 100644 index 0fa68ee..0000000 --- a/internal/master/procfs_test.go +++ /dev/null @@ -1,502 +0,0 @@ -package master - -import ( - "context" - "fmt" - "log" - "net" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/test/bufconn" - - pb "github.com/open-gpdb/yagpcc/api/proto/agent_segment" - "github.com/open-gpdb/yagpcc/internal/gp" - "github.com/open-gpdb/yagpcc/internal/gp/stat_activity" - "github.com/open-gpdb/yagpcc/internal/storage" -) - -// --- mock statActivityLister --- - -type mockStatActivityLister struct { - sessions []stat_activity.SessionPid - sessionsErr error - listCalled bool -} - -func (m *mockStatActivityLister) Start(context.Context) error { return nil } -func (m *mockStatActivityLister) Stop() {} -func (m *mockStatActivityLister) List(context.Context) ([]*gp.GpStatActivity, error) { - return nil, nil -} -func (m *mockStatActivityLister) ListAllSessions(context.Context) ([]stat_activity.SessionPid, error) { - m.listCalled = true - return m.sessions, m.sessionsErr -} - -// --- fake gRPC server for GetPidProcStat --- - -type fakeProcStatServer struct { - pb.UnimplementedGetQueryInfoServer - mu sync.Mutex - called bool - lastReq *pb.GetPidProcInfoReq -} - -func (s *fakeProcStatServer) GetPidProcStat(_ context.Context, req *pb.GetPidProcInfoReq) (*pb.GetPidProcInfoResponse, error) { - s.mu.Lock() - s.called = true - s.lastReq = req - s.mu.Unlock() - return &pb.GetPidProcInfoResponse{}, nil -} - -func (s *fakeProcStatServer) snapshot() (bool, *pb.GetPidProcInfoReq) { - s.mu.Lock() - defer s.mu.Unlock() - return s.called, s.lastReq -} - -func (s *fakeProcStatServer) GetMetricQueries(_ context.Context, _ *pb.GetQueriesInfoReq) (*pb.GetQueriesInfoResponse, error) { - return &pb.GetQueriesInfoResponse{}, nil -} - -type failingProcStatServer struct { - pb.UnimplementedGetQueryInfoServer -} - -func (s *failingProcStatServer) GetPidProcStat(context.Context, *pb.GetPidProcInfoReq) (*pb.GetPidProcInfoResponse, error) { - return nil, fmt.Errorf("simulated gRPC error") -} - -func (s *failingProcStatServer) GetMetricQueries(context.Context, *pb.GetQueriesInfoReq) (*pb.GetQueriesInfoResponse, error) { - return &pb.GetQueriesInfoResponse{}, nil -} - -// setupBufconnServer creates a gRPC server on a bufconn listener, registers -// the provided GetQueryInfo service implementation, starts serving, and -// returns the listener. -func setupBufconnServer(t *testing.T, srv pb.GetQueryInfoServer) *bufconn.Listener { - t.Helper() - lis := bufconn.Listen(1024 * 1024) - s := grpc.NewServer() - pb.RegisterGetQueryInfoServer(s, srv) - go func() { - if err := s.Serve(lis); err != nil { - log.Printf("bufconn server exited: %v", err) - } - }() - t.Cleanup(func() { s.Stop() }) - return lis -} - -func dialBufconn(t *testing.T, lis *bufconn.Listener) *grpc.ClientConn { - t.Helper() - conn, err := grpc.NewClient( - "passthrough:///bufconn", - grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { - return lis.Dial() - }), - grpc.WithTransportCredentials(insecure.NewCredentials()), - ) - require.NoError(t, err) - t.Cleanup(func() { _ = conn.Close() }) - return conn -} - -func newTestLogger() *zap.SugaredLogger { - cfg := zap.NewDevelopmentConfig() - cfg.Level = zap.NewAtomicLevelAt(zap.WarnLevel) - l, _ := cfg.Build() - return l.Sugar() -} - -// ============================================================ -// Tests for getJobsMap -// ============================================================ - -func TestGetJobsMap_EmptyInput(t *testing.T) { - bs := &BackgroundStorage{l: newTestLogger()} - result := bs.getJobsMap(nil) - assert.NotNil(t, result) - assert.Empty(t, result) - - result2 := bs.getJobsMap([]stat_activity.SessionPid{}) - assert.NotNil(t, result2) - assert.Empty(t, result2) -} - -func TestGetJobsMap_SingleHost(t *testing.T) { - storage.SetHostnameForSegindex(10, "host-a") - - sessions := []stat_activity.SessionPid{ - {GpSegmentId: 10, Pid: 100, SessId: 1}, - {GpSegmentId: 10, Pid: 200, SessId: 2}, - } - - bs := &BackgroundStorage{l: newTestLogger()} - result := bs.getJobsMap(sessions) - - // The map should contain an entry for "host-a" - _, exists := result["host-a"] - assert.True(t, exists, "expected key 'host-a' in hostJobMap") -} - -func TestGetJobsMap_MultipleHosts(t *testing.T) { - storage.SetHostnameForSegindex(20, "host-b") - storage.SetHostnameForSegindex(21, "host-c") - - sessions := []stat_activity.SessionPid{ - {GpSegmentId: 20, Pid: 100, SessId: 1}, - {GpSegmentId: 21, Pid: 200, SessId: 2}, - {GpSegmentId: 20, Pid: 300, SessId: 3}, - } - - bs := &BackgroundStorage{l: newTestLogger()} - result := bs.getJobsMap(sessions) - - // Should have entries for both hosts - assert.Contains(t, result, "host-b") - assert.Contains(t, result, "host-c") -} - -func TestGetJobsMap_UnknownSegindex(t *testing.T) { - // When segindex is not in the config storage, GetHostnameForSegindex - // returns the string representation of the segindex. - sessions := []stat_activity.SessionPid{ - {GpSegmentId: 9999, Pid: 100, SessId: 1}, - } - - bs := &BackgroundStorage{l: newTestLogger()} - result := bs.getJobsMap(sessions) - _, exists := result["9999"] - assert.True(t, exists, "expected key '9999' for unknown segindex") -} - -// ============================================================ -// Tests for processProcfsRequests -// ============================================================ - -func TestProcessProcfsRequests_Success(t *testing.T) { - fakeSrv := &fakeProcStatServer{} - lis := setupBufconnServer(t, fakeSrv) - - // Inject the bufconn connection into the global connection cache - conn := dialBufconn(t, lis) - hostname := fmt.Sprintf("test-procfs-success-%d", time.Now().UnixNano()) - segConnectionLock.Lock() - segConnections[hostname] = conn - segConnectionLock.Unlock() - t.Cleanup(func() { - segConnectionLock.Lock() - delete(segConnections, hostname) - segConnectionLock.Unlock() - }) - - reqs := []stat_activity.SessionPid{ - {GpSegmentId: 1, Pid: 100, SessId: 10}, - {GpSegmentId: 2, Pid: 200, SessId: 20}, - } - - bs := &BackgroundStorage{l: newTestLogger()} - ctx := context.Background() - err := bs.processProcfsRequests(ctx, hostname, 0, 5*time.Second, 4*1024*1024, reqs) - require.NoError(t, err) - called, lastReq := fakeSrv.snapshot() - assert.True(t, called, "expected GetPidProcStat to be called") - require.NotNil(t, lastReq) - assert.Len(t, lastReq.SegmentProcess, 2) - - // Verify the proto message fields - sp0 := lastReq.SegmentProcess[0] - assert.Equal(t, int64(1), sp0.GpSegmentId) - assert.Equal(t, int64(100), sp0.Pid) - assert.Equal(t, int64(10), sp0.SessId) - - sp1 := lastReq.SegmentProcess[1] - assert.Equal(t, int64(2), sp1.GpSegmentId) - assert.Equal(t, int64(200), sp1.Pid) - assert.Equal(t, int64(20), sp1.SessId) -} - -func TestProcessProcfsRequests_GrpcError(t *testing.T) { - failSrv := &failingProcStatServer{} - lis := setupBufconnServer(t, failSrv) - - conn := dialBufconn(t, lis) - hostname := fmt.Sprintf("test-procfs-fail-%d", time.Now().UnixNano()) - segConnectionLock.Lock() - segConnections[hostname] = conn - segConnectionLock.Unlock() - t.Cleanup(func() { - segConnectionLock.Lock() - delete(segConnections, hostname) - segConnectionLock.Unlock() - }) - - reqs := []stat_activity.SessionPid{ - {GpSegmentId: 1, Pid: 100, SessId: 10}, - } - - bs := &BackgroundStorage{l: newTestLogger()} - ctx := context.Background() - err := bs.processProcfsRequests(ctx, hostname, 0, 5*time.Second, 4*1024*1024, reqs) - require.Error(t, err) - assert.Contains(t, err.Error(), "simulated gRPC error") -} - -func TestProcessProcfsRequests_CancelledContext(t *testing.T) { - fakeSrv := &fakeProcStatServer{} - lis := setupBufconnServer(t, fakeSrv) - - conn := dialBufconn(t, lis) - hostname := fmt.Sprintf("test-procfs-cancel-%d", time.Now().UnixNano()) - segConnectionLock.Lock() - segConnections[hostname] = conn - segConnectionLock.Unlock() - t.Cleanup(func() { - segConnectionLock.Lock() - delete(segConnections, hostname) - segConnectionLock.Unlock() - }) - - reqs := []stat_activity.SessionPid{ - {GpSegmentId: 1, Pid: 100, SessId: 10}, - } - - ctx, cancel := context.WithCancel(context.Background()) - cancel() // cancel immediately - - // With a cancelled context, processProcfsRequests should skip building - // the request body (due to select on ctx.Done()) and return nil. - bs := &BackgroundStorage{l: newTestLogger()} - err := bs.processProcfsRequests(ctx, hostname, 0, 5*time.Second, 4*1024*1024, reqs) - // The function returns nil when context is cancelled during request building, - // but may return an error from the gRPC call if the request was already built. - // Either outcome is acceptable with a cancelled context. - if err != nil { - assert.ErrorIs(t, ctx.Err(), context.Canceled) - } -} - -func TestProcessProcfsRequests_EmptyRequests(t *testing.T) { - fakeSrv := &fakeProcStatServer{} - lis := setupBufconnServer(t, fakeSrv) - - conn := dialBufconn(t, lis) - hostname := fmt.Sprintf("test-procfs-empty-%d", time.Now().UnixNano()) - segConnectionLock.Lock() - segConnections[hostname] = conn - segConnectionLock.Unlock() - t.Cleanup(func() { - segConnectionLock.Lock() - delete(segConnections, hostname) - segConnectionLock.Unlock() - }) - - bs := &BackgroundStorage{l: newTestLogger()} - ctx := context.Background() - err := bs.processProcfsRequests(ctx, hostname, 0, 5*time.Second, 4*1024*1024, nil) - require.NoError(t, err) - called, _ := fakeSrv.snapshot() - assert.False(t, called, "GetPidProcStat should not be called with empty segment list") -} - -// ============================================================ -// Tests for GatherProcfsStat -// ============================================================ - -func TestGatherProcfsStat_ListAllSessionsError(t *testing.T) { - mock := &mockStatActivityLister{ - sessionsErr: fmt.Errorf("db connection failed"), - } - bs := &BackgroundStorage{ - l: newTestLogger(), - statActivityLister: mock, - } - - err := bs.GatherProcfsStat(context.Background(), 2, 50051, 5*time.Second, 4*1024*1024) - require.Error(t, err) - assert.Contains(t, err.Error(), "db connection failed") - assert.True(t, mock.listCalled) -} - -func TestGatherProcfsStat_EmptySessions(t *testing.T) { - mock := &mockStatActivityLister{ - sessions: []stat_activity.SessionPid{}, - } - bs := &BackgroundStorage{ - l: newTestLogger(), - statActivityLister: mock, - } - - err := bs.GatherProcfsStat(context.Background(), 2, 50051, 5*time.Second, 4*1024*1024) - require.NoError(t, err) - assert.True(t, mock.listCalled) -} - -func TestGatherProcfsStat_WithSessions(t *testing.T) { - // Set up a fake gRPC server - fakeSrv := &fakeProcStatServer{} - lis := setupBufconnServer(t, fakeSrv) - - conn := dialBufconn(t, lis) - hostname := fmt.Sprintf("test-gather-%d", time.Now().UnixNano()) - - // Register the hostname in the segment config - storage.SetHostnameForSegindex(30, hostname) - - // Inject the bufconn connection - segConnectionLock.Lock() - segConnections[hostname] = conn - segConnectionLock.Unlock() - t.Cleanup(func() { - segConnectionLock.Lock() - delete(segConnections, hostname) - segConnectionLock.Unlock() - }) - - mock := &mockStatActivityLister{ - sessions: []stat_activity.SessionPid{ - {GpSegmentId: 30, Pid: 100, SessId: 1}, - {GpSegmentId: 30, Pid: 200, SessId: 2}, - }, - } - bs := &BackgroundStorage{ - l: newTestLogger(), - statActivityLister: mock, - } - - err := bs.GatherProcfsStat(context.Background(), 2, 0, 5*time.Second, 4*1024*1024) - require.NoError(t, err) - assert.True(t, mock.listCalled) -} - -func TestGatherProcfsStat_ContextCancelled(t *testing.T) { - mock := &mockStatActivityLister{ - sessions: []stat_activity.SessionPid{ - {GpSegmentId: 40, Pid: 100, SessId: 1}, - }, - } - bs := &BackgroundStorage{ - l: newTestLogger(), - statActivityLister: mock, - } - - ctx, cancel := context.WithCancel(context.Background()) - cancel() // cancel immediately - - // With a cancelled context, the timeout context creation will produce - // an already-done context, so the pool tasks should handle it gracefully. - err := bs.GatherProcfsStat(ctx, 2, 50051, 5*time.Second, 4*1024*1024) - // The error may be nil (if tasks detect cancellation early) or non-nil - // (if the gRPC call fails due to cancelled context). Both are acceptable. - _ = err -} - -func TestGatherProcfsStat_GrpcFailure(t *testing.T) { - failSrv := &failingProcStatServer{} - lis := setupBufconnServer(t, failSrv) - - conn := dialBufconn(t, lis) - hostname := fmt.Sprintf("test-gather-fail-%d", time.Now().UnixNano()) - - storage.SetHostnameForSegindex(50, hostname) - - segConnectionLock.Lock() - segConnections[hostname] = conn - segConnectionLock.Unlock() - t.Cleanup(func() { - segConnectionLock.Lock() - delete(segConnections, hostname) - segConnectionLock.Unlock() - }) - - mock := &mockStatActivityLister{ - sessions: []stat_activity.SessionPid{ - {GpSegmentId: 50, Pid: 100, SessId: 1}, - }, - } - bs := &BackgroundStorage{ - l: newTestLogger(), - statActivityLister: mock, - } - - err := bs.GatherProcfsStat(context.Background(), 2, 0, 5*time.Second, 4*1024*1024) - require.Error(t, err) - assert.Contains(t, err.Error(), "simulated gRPC error") -} - -func TestGatherProcfsStat_ManySessionsBatching(t *testing.T) { - // Create more than JobsPerQuery sessions to verify batching logic - fakeSrv := &fakeProcStatServer{} - lis := setupBufconnServer(t, fakeSrv) - - conn := dialBufconn(t, lis) - hostname := fmt.Sprintf("test-gather-batch-%d", time.Now().UnixNano()) - - storage.SetHostnameForSegindex(60, hostname) - - segConnectionLock.Lock() - segConnections[hostname] = conn - segConnectionLock.Unlock() - t.Cleanup(func() { - segConnectionLock.Lock() - delete(segConnections, hostname) - segConnectionLock.Unlock() - }) - - // Create JobsPerQuery + 5 sessions to trigger at least 2 batches - sessions := make([]stat_activity.SessionPid, 0, jobsPerQuery+5) - for i := 0; i < jobsPerQuery+5; i++ { - sessions = append(sessions, stat_activity.SessionPid{ - GpSegmentId: 60, - Pid: 100 + i, - SessId: i + 1, - }) - } - - mock := &mockStatActivityLister{ - sessions: sessions, - } - bs := &BackgroundStorage{ - l: newTestLogger(), - statActivityLister: mock, - } - - err := bs.GatherProcfsStat(context.Background(), 4, 0, 10*time.Second, 4*1024*1024) - require.NoError(t, err) - assert.True(t, mock.listCalled) - // The fake server should have been called (at least once for the batches) - called, _ := fakeSrv.snapshot() - assert.True(t, called) -} - -// ============================================================ -// Tests for constants -// ============================================================ - -func TestConstants(t *testing.T) { - assert.Equal(t, 1000, jobsPerQuery) -} - -func TestGatherProcfsStat_InvalidNPullers(t *testing.T) { - mock := &mockStatActivityLister{} - bs := &BackgroundStorage{ - l: newTestLogger(), - statActivityLister: mock, - } - - for _, n := range []int{0, -1, -100} { - err := bs.GatherProcfsStat(context.Background(), n, 50051, 5*time.Second, 4*1024*1024) - require.Error(t, err) - assert.Contains(t, err.Error(), "nPullers must be greater than 0") - assert.False(t, mock.listCalled, "ListAllSessions should not be called for invalid nPullers") - } -} diff --git a/internal/storage/util.go b/internal/storage/group.go similarity index 99% rename from internal/storage/util.go rename to internal/storage/group.go index 9314b1f..b3aea2d 100644 --- a/internal/storage/util.go +++ b/internal/storage/group.go @@ -37,7 +37,7 @@ func GroupAggMetrics(dest *pbc.AggregatedMetrics, queryDuration time.Duration) e return nil } -func UpdateIntermediateKey(intermediateResults map[MapAggregateKey]uint64, mapKey MapAggregateKey, value uint64) { +func UpdateIntermediateKey[T int | uint64 | int64](intermediateResults map[MapAggregateKey]T, mapKey MapAggregateKey, value T) { valMap, ok := intermediateResults[mapKey] if !ok { intermediateResults[mapKey] = value diff --git a/internal/storage/util_test.go b/internal/storage/group_test.go similarity index 100% rename from internal/storage/util_test.go rename to internal/storage/group_test.go diff --git a/internal/storage/procfs_group.go b/internal/storage/procfs_group.go new file mode 100644 index 0000000..bad8a0d --- /dev/null +++ b/internal/storage/procfs_group.go @@ -0,0 +1,187 @@ +package storage + +import ( + "fmt" + + pbc "github.com/open-gpdb/yagpcc/api/proto/common" + "google.golang.org/protobuf/proto" +) + +func nonNegativeDiff[T int | int32 | int64](first, last T) T { + if last < first { + return 0 + } + return last - first +} + +// diffProcStat computes the diff between two ProcStat snapshots. +// Snapshot fields are taken from last; cumulative counters are diffed. +// If either argument is nil it is treated as a zero-valued ProcStat. +func diffProcStat(first, last *pbc.ProcStat) *pbc.ProcStat { + if first == nil && last == nil { + return nil + } + if first == nil { + first = &pbc.ProcStat{} + } + if last == nil { + last = &pbc.ProcStat{} + } + return &pbc.ProcStat{ + Pid: last.Pid, + Comm: last.Comm, + State: last.State, + Ppid: last.Ppid, + Pgrp: last.Pgrp, + Session: last.Session, + Tty: last.Tty, + Tpgid: last.Tpgid, + Flags: last.Flags, + MinFlt: last.MinFlt, + CminFlt: last.CminFlt, + MajFlt: last.MajFlt, + CmajFlt: last.CmajFlt, + Utime: nonNegativeDiff(first.Utime, last.Utime), + Stime: nonNegativeDiff(first.Stime, last.Stime), + Cutime: nonNegativeDiff(first.Cutime, last.Cutime), + Cstime: nonNegativeDiff(first.Cstime, last.Cstime), + Priority: last.Priority, + Nice: last.Nice, + NumThreads: last.NumThreads, + Starttime: last.Starttime, + Vsize: last.Vsize, + Rss: last.Rss, + RssLimit: last.RssLimit, + StartCode: last.StartCode, + EndCode: last.EndCode, + StartStack: last.StartStack, + Processor: last.Processor, + RtPriority: last.RtPriority, + Policy: last.Policy, + DelayAcctBlkIoTicks: last.DelayAcctBlkIoTicks, + GuestTime: nonNegativeDiff(first.GuestTime, last.GuestTime), + CguestTime: nonNegativeDiff(first.CguestTime, last.CguestTime), + } +} + +// diffProcIO computes the diff between two ProcIO snapshots. +// All fields are cumulative counters, so every field is diffed. +// If either argument is nil it is treated as a zero-valued ProcIO. +func diffProcIO(first, last *pbc.ProcIO) *pbc.ProcIO { + if first == nil && last == nil { + return nil + } + if first == nil { + first = &pbc.ProcIO{} + } + if last == nil { + last = &pbc.ProcIO{} + } + return &pbc.ProcIO{ + Rchar: nonNegativeDiff(first.Rchar, last.Rchar), + Wchar: nonNegativeDiff(first.Wchar, last.Wchar), + Syscr: nonNegativeDiff(first.Syscr, last.Syscr), + Syscw: nonNegativeDiff(first.Syscw, last.Syscw), + ReadBytes: nonNegativeDiff(first.ReadBytes, last.ReadBytes), + WriteBytes: nonNegativeDiff(first.WriteBytes, last.WriteBytes), + CancelledWriteBytes: nonNegativeDiff(first.CancelledWriteBytes, last.CancelledWriteBytes), + } +} + +// ProcfsDiff returns a top-like diff between two procfs measurements. +// Snapshot (point-in-time) fields are taken from last; cumulative counters +// are diffed via nonNegativeDiff. Nil ProcStat / ProcIo / ProcStatus +// sub-structs are handled gracefully (treated as zero-valued). +func ProcfsDiff(first, last *pbc.GpPidProcInfo) (*pbc.GpPidProcInfo, error) { + if first == nil && last == nil { + return nil, fmt.Errorf("first and last both nil") + } + if first == nil { + return last, nil + } + if last == nil { + return first, nil + } + return &pbc.GpPidProcInfo{ + GpSegmentId: last.GpSegmentId, + SessId: last.SessId, + Pid: last.Pid, + Cmdline: last.Cmdline, + State: last.State, + ProcStatus: last.ProcStatus, + ProcStat: diffProcStat(first.ProcStat, last.ProcStat), + ProcIo: diffProcIO(first.ProcIo, last.ProcIo), + }, nil +} + +func GroupProcfsMetrics(dest *pbc.GpPidProcInfo, source *pbc.GpPidProcInfo, aggKind AggregateKind, segHostname string, intermediateResults map[MapAggregateKey]int64) error { + if source == nil { + return nil + } + if dest == nil { + return fmt.Errorf("cannot merge with nil procfs dst") + } + if intermediateResults == nil { + return fmt.Errorf("need map for store intermediate procfs results") + } + // group only procstat and procio + if source.ProcStat != nil { + if dest.ProcStat == nil { + dest.ProcStat = proto.Clone(source.ProcStat).(*pbc.ProcStat) + } else { + dest.ProcStat.Utime += source.ProcStat.Utime + dest.ProcStat.Stime += source.ProcStat.Stime + dest.ProcStat.Cutime += source.ProcStat.Cutime + dest.ProcStat.Cstime += source.ProcStat.Cstime + dest.ProcStat.GuestTime += source.ProcStat.GuestTime + dest.ProcStat.CguestTime += source.ProcStat.CguestTime + + } + + // update intermediate results + UpdateIntermediateKey(intermediateResults, + MapAggregateKey{MetricName: "Vsize", Hostname: segHostname}, + source.ProcStat.Vsize) + UpdateIntermediateKey(intermediateResults, + MapAggregateKey{MetricName: "Rss", Hostname: segHostname}, + source.ProcStat.Rss) + + // choose maximum value + for key, val := range intermediateResults { + if aggKind == AggSegmentHost && key.Hostname != segHostname { + // skip other hosts + continue + } + if key.MetricName == "Vsize" { + if aggKind == AggMax { + dest.ProcStat.Vsize = max(dest.ProcStat.Vsize, val) + } else { + dest.ProcStat.Vsize = val + } + } + if key.MetricName == "Rss" { + if aggKind == AggMax { + dest.ProcStat.Rss = max(dest.ProcStat.Rss, val) + } else { + dest.ProcStat.Rss = val + } + } + } + } + + if source.ProcIo != nil { + if dest.ProcIo == nil { + dest.ProcIo = proto.Clone(source.ProcIo).(*pbc.ProcIO) + } else { + dest.ProcIo.Rchar += source.ProcIo.Rchar + dest.ProcIo.Wchar += source.ProcIo.Wchar + dest.ProcIo.Syscr += source.ProcIo.Syscr + dest.ProcIo.Syscw += source.ProcIo.Syscw + dest.ProcIo.ReadBytes += source.ProcIo.ReadBytes + dest.ProcIo.WriteBytes += source.ProcIo.WriteBytes + dest.ProcIo.CancelledWriteBytes += source.ProcIo.CancelledWriteBytes + } + } + + return nil +} diff --git a/internal/storage/procfs_group_metrics_test.go b/internal/storage/procfs_group_metrics_test.go new file mode 100644 index 0000000..6b10471 --- /dev/null +++ b/internal/storage/procfs_group_metrics_test.go @@ -0,0 +1,429 @@ +package storage + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + pbc "github.com/open-gpdb/yagpcc/api/proto/common" + "github.com/open-gpdb/yagpcc/internal/utils" +) + +// =========================================================================== +// GroupProcfsMetrics - error / nil handling +// =========================================================================== + +func TestGroupProcfsMetrics_NilSource_NoOp(t *testing.T) { + dest := &pbc.GpPidProcInfo{Pid: 1} + ir := make(map[MapAggregateKey]int64) + err := GroupProcfsMetrics(dest, nil, AggMax, "host1", ir) + assert.NoError(t, err) + assert.Equal(t, int64(1), dest.Pid) + assert.Nil(t, dest.ProcStat) + assert.Nil(t, dest.ProcIo) +} + +func TestGroupProcfsMetrics_NilDest_Error(t *testing.T) { + ir := make(map[MapAggregateKey]int64) + err := GroupProcfsMetrics(nil, &pbc.GpPidProcInfo{Pid: 1}, AggMax, "host1", ir) + assert.EqualError(t, err, "cannot merge with nil procfs dst") +} + +func TestGroupProcfsMetrics_NilIntermediateResults_Error(t *testing.T) { + err := GroupProcfsMetrics(&pbc.GpPidProcInfo{}, &pbc.GpPidProcInfo{}, AggMax, "host1", nil) + assert.EqualError(t, err, "need map for store intermediate procfs results") +} + +// =========================================================================== +// GroupProcfsMetrics - ProcStat +// =========================================================================== + +func TestGroupProcfsMetrics_ProcStat_CloneOnEmptyDest(t *testing.T) { + dest := &pbc.GpPidProcInfo{} + source := &pbc.GpPidProcInfo{ + ProcStat: &pbc.ProcStat{ + Pid: 42, Comm: "postgres", + Utime: 100, Stime: 200, Cutime: 10, Cstime: 20, + GuestTime: 5, CguestTime: 3, Vsize: 1024, Rss: 512, + }, + } + ir := make(map[MapAggregateKey]int64) + err := GroupProcfsMetrics(dest, source, AggMax, "host1", ir) + require.NoError(t, err) + require.NotNil(t, dest.ProcStat) + utils.AssertProtoMessagesEqual(t, source.ProcStat, dest.ProcStat) + assert.NotSame(t, source.ProcStat, dest.ProcStat) +} + +func TestGroupProcfsMetrics_ProcStat_SumCounters(t *testing.T) { + dest := &pbc.GpPidProcInfo{ + ProcStat: &pbc.ProcStat{ + Utime: 100, Stime: 200, Cutime: 10, Cstime: 20, + GuestTime: 5, CguestTime: 3, + }, + } + source := &pbc.GpPidProcInfo{ + ProcStat: &pbc.ProcStat{ + Utime: 50, Stime: 75, Cutime: 5, Cstime: 8, + GuestTime: 2, CguestTime: 1, + }, + } + ir := make(map[MapAggregateKey]int64) + err := GroupProcfsMetrics(dest, source, AggMax, "host1", ir) + require.NoError(t, err) + assert.Equal(t, int64(150), dest.ProcStat.Utime) + assert.Equal(t, int64(275), dest.ProcStat.Stime) + assert.Equal(t, int64(15), dest.ProcStat.Cutime) + assert.Equal(t, int64(28), dest.ProcStat.Cstime) + assert.Equal(t, int64(7), dest.ProcStat.GuestTime) + assert.Equal(t, int64(4), dest.ProcStat.CguestTime) +} + +func TestGroupProcfsMetrics_ProcStat_MultipleMerges(t *testing.T) { + dest := &pbc.GpPidProcInfo{} + ir := make(map[MapAggregateKey]int64) + sources := []*pbc.GpPidProcInfo{ + {ProcStat: &pbc.ProcStat{Utime: 10, Stime: 20, Vsize: 100, Rss: 50}}, + {ProcStat: &pbc.ProcStat{Utime: 30, Stime: 40, Vsize: 200, Rss: 100}}, + {ProcStat: &pbc.ProcStat{Utime: 50, Stime: 60, Vsize: 150, Rss: 75}}, + } + for _, src := range sources { + require.NoError(t, GroupProcfsMetrics(dest, src, AggMax, "host1", ir)) + } + assert.Equal(t, int64(90), dest.ProcStat.Utime) + assert.Equal(t, int64(120), dest.ProcStat.Stime) + // Vsize/Rss via intermediate: sum per host = 100+200+150=450 / 50+100+75=225 + assert.Equal(t, int64(450), dest.ProcStat.Vsize) + assert.Equal(t, int64(225), dest.ProcStat.Rss) +} + +// =========================================================================== +// GroupProcfsMetrics - ProcIo +// =========================================================================== + +func TestGroupProcfsMetrics_ProcIo_CloneOnEmptyDest(t *testing.T) { + dest := &pbc.GpPidProcInfo{} + source := &pbc.GpPidProcInfo{ + ProcIo: &pbc.ProcIO{ + Rchar: 1000, Wchar: 2000, Syscr: 300, Syscw: 400, + ReadBytes: 500, WriteBytes: 600, CancelledWriteBytes: 50, + }, + } + ir := make(map[MapAggregateKey]int64) + err := GroupProcfsMetrics(dest, source, AggMax, "host1", ir) + require.NoError(t, err) + require.NotNil(t, dest.ProcIo) + utils.AssertProtoMessagesEqual(t, source.ProcIo, dest.ProcIo) + assert.NotSame(t, source.ProcIo, dest.ProcIo) +} + +func TestGroupProcfsMetrics_ProcIo_SumCounters(t *testing.T) { + dest := &pbc.GpPidProcInfo{ + ProcIo: &pbc.ProcIO{ + Rchar: 1000, Wchar: 2000, Syscr: 300, Syscw: 400, + ReadBytes: 500, WriteBytes: 600, CancelledWriteBytes: 50, + }, + } + source := &pbc.GpPidProcInfo{ + ProcIo: &pbc.ProcIO{ + Rchar: 500, Wchar: 800, Syscr: 100, Syscw: 200, + ReadBytes: 250, WriteBytes: 300, CancelledWriteBytes: 25, + }, + } + ir := make(map[MapAggregateKey]int64) + err := GroupProcfsMetrics(dest, source, AggMax, "host1", ir) + require.NoError(t, err) + assert.Equal(t, int64(1500), dest.ProcIo.Rchar) + assert.Equal(t, int64(2800), dest.ProcIo.Wchar) + assert.Equal(t, int64(400), dest.ProcIo.Syscr) + assert.Equal(t, int64(600), dest.ProcIo.Syscw) + assert.Equal(t, int64(750), dest.ProcIo.ReadBytes) + assert.Equal(t, int64(900), dest.ProcIo.WriteBytes) + assert.Equal(t, int64(75), dest.ProcIo.CancelledWriteBytes) +} + +// =========================================================================== +// GroupProcfsMetrics - mixed scenarios +// =========================================================================== + +func TestGroupProcfsMetrics_BothProcStatAndProcIo(t *testing.T) { + dest := &pbc.GpPidProcInfo{ + ProcStat: &pbc.ProcStat{Utime: 100, Stime: 200}, + ProcIo: &pbc.ProcIO{Rchar: 1000, Wchar: 2000}, + } + source := &pbc.GpPidProcInfo{ + ProcStat: &pbc.ProcStat{Utime: 50, Stime: 75}, + ProcIo: &pbc.ProcIO{Rchar: 500, Wchar: 800}, + } + ir := make(map[MapAggregateKey]int64) + err := GroupProcfsMetrics(dest, source, AggMax, "host1", ir) + require.NoError(t, err) + assert.Equal(t, int64(150), dest.ProcStat.Utime) + assert.Equal(t, int64(275), dest.ProcStat.Stime) + assert.Equal(t, int64(1500), dest.ProcIo.Rchar) + assert.Equal(t, int64(2800), dest.ProcIo.Wchar) +} + +func TestGroupProcfsMetrics_SourceOnlyProcStat(t *testing.T) { + dest := &pbc.GpPidProcInfo{ProcIo: &pbc.ProcIO{Rchar: 1000}} + source := &pbc.GpPidProcInfo{ProcStat: &pbc.ProcStat{Utime: 100, Vsize: 512, Rss: 256}} + ir := make(map[MapAggregateKey]int64) + err := GroupProcfsMetrics(dest, source, AggMax, "host1", ir) + require.NoError(t, err) + require.NotNil(t, dest.ProcStat) + assert.Equal(t, int64(100), dest.ProcStat.Utime) + assert.Equal(t, int64(1000), dest.ProcIo.Rchar) // unchanged +} + +func TestGroupProcfsMetrics_SourceOnlyProcIo(t *testing.T) { + dest := &pbc.GpPidProcInfo{ProcStat: &pbc.ProcStat{Utime: 100}} + source := &pbc.GpPidProcInfo{ProcIo: &pbc.ProcIO{Rchar: 500}} + ir := make(map[MapAggregateKey]int64) + err := GroupProcfsMetrics(dest, source, AggMax, "host1", ir) + require.NoError(t, err) + assert.Equal(t, int64(100), dest.ProcStat.Utime) // unchanged + require.NotNil(t, dest.ProcIo) + assert.Equal(t, int64(500), dest.ProcIo.Rchar) +} + +func TestGroupProcfsMetrics_SourceEmptySubStructs(t *testing.T) { + dest := &pbc.GpPidProcInfo{ + ProcStat: &pbc.ProcStat{Utime: 100}, + ProcIo: &pbc.ProcIO{Rchar: 500}, + } + source := &pbc.GpPidProcInfo{Pid: 42} // no ProcStat, no ProcIo + ir := make(map[MapAggregateKey]int64) + err := GroupProcfsMetrics(dest, source, AggMax, "host1", ir) + require.NoError(t, err) + assert.Equal(t, int64(100), dest.ProcStat.Utime) + assert.Equal(t, int64(500), dest.ProcIo.Rchar) +} + +// =========================================================================== +// GroupProcfsMetrics - Vsize/Rss with AggSegmentHost +// =========================================================================== + +func TestGroupProcfsMetrics_VsizeRss_AggSegmentHost(t *testing.T) { + for _, tt := range []struct { + name string + hostname string + ir map[MapAggregateKey]int64 + source *pbc.GpPidProcInfo + expectedVsize int64 + expectedRss int64 + }{ + { + name: "initial set", hostname: "seg1", + ir: map[MapAggregateKey]int64{}, + source: &pbc.GpPidProcInfo{ProcStat: &pbc.ProcStat{Vsize: 100, Rss: 50}}, + expectedVsize: 100, expectedRss: 50, + }, + { + name: "same host accumulates", hostname: "seg1", + ir: map[MapAggregateKey]int64{ + {MetricName: "Vsize", Hostname: "seg1"}: 100, + {MetricName: "Rss", Hostname: "seg1"}: 50, + }, + source: &pbc.GpPidProcInfo{ProcStat: &pbc.ProcStat{Vsize: 200, Rss: 100}}, + expectedVsize: 300, expectedRss: 150, + }, + { + name: "other host values ignored", hostname: "seg1", + ir: map[MapAggregateKey]int64{ + {MetricName: "Vsize", Hostname: "seg1"}: 100, + {MetricName: "Rss", Hostname: "seg1"}: 50, + {MetricName: "Vsize", Hostname: "seg2"}: 9999, + {MetricName: "Rss", Hostname: "seg2"}: 8888, + }, + source: &pbc.GpPidProcInfo{ProcStat: &pbc.ProcStat{Vsize: 50, Rss: 25}}, + expectedVsize: 150, expectedRss: 75, + }, + } { + t.Run(tt.name, func(t *testing.T) { + dest := &pbc.GpPidProcInfo{} + err := GroupProcfsMetrics(dest, tt.source, AggSegmentHost, tt.hostname, tt.ir) + require.NoError(t, err) + assert.Equal(t, tt.expectedVsize, dest.ProcStat.Vsize) + assert.Equal(t, tt.expectedRss, dest.ProcStat.Rss) + }) + } +} + +// =========================================================================== +// GroupProcfsMetrics - Vsize/Rss with AggMax +// =========================================================================== + +func TestGroupProcfsMetrics_VsizeRss_AggMax(t *testing.T) { + for _, tt := range []struct { + name string + hostname string + ir map[MapAggregateKey]int64 + dest *pbc.GpPidProcInfo + source *pbc.GpPidProcInfo + expectedVsize int64 + expectedRss int64 + }{ + { + name: "initial set", hostname: "seg1", + ir: map[MapAggregateKey]int64{}, + dest: &pbc.GpPidProcInfo{}, + source: &pbc.GpPidProcInfo{ProcStat: &pbc.ProcStat{Vsize: 100, Rss: 50}}, + expectedVsize: 100, expectedRss: 50, + }, + { + name: "picks max across hosts", hostname: "seg1", + ir: map[MapAggregateKey]int64{ + {MetricName: "Vsize", Hostname: "seg1"}: 100, + {MetricName: "Rss", Hostname: "seg1"}: 50, + {MetricName: "Vsize", Hostname: "seg2"}: 500, + {MetricName: "Rss", Hostname: "seg2"}: 250, + }, + dest: &pbc.GpPidProcInfo{ProcStat: &pbc.ProcStat{Vsize: 10, Rss: 5}}, + source: &pbc.GpPidProcInfo{ProcStat: &pbc.ProcStat{Vsize: 50, Rss: 25}}, + // After UpdateIntermediateKey: seg1 Vsize=150, Rss=75; seg2 unchanged at 500/250 + // AggMax picks max across all hosts: max(150, 500) = 500, max(75, 250) = 250 + expectedVsize: 500, expectedRss: 250, + }, + { + name: "dest already larger", hostname: "seg1", + ir: map[MapAggregateKey]int64{ + {MetricName: "Vsize", Hostname: "seg1"}: 100, + {MetricName: "Rss", Hostname: "seg1"}: 50, + }, + dest: &pbc.GpPidProcInfo{ProcStat: &pbc.ProcStat{Vsize: 9999, Rss: 8888}}, + source: &pbc.GpPidProcInfo{ProcStat: &pbc.ProcStat{Vsize: 10, Rss: 5}}, + expectedVsize: 9999, expectedRss: 8888, + }, + } { + t.Run(tt.name, func(t *testing.T) { + err := GroupProcfsMetrics(tt.dest, tt.source, AggMax, tt.hostname, tt.ir) + require.NoError(t, err) + assert.Equal(t, tt.expectedVsize, tt.dest.ProcStat.Vsize) + assert.Equal(t, tt.expectedRss, tt.dest.ProcStat.Rss) + }) + } +} + +// =========================================================================== +// GroupProcfsMetrics - table-driven positive (full proto comparison) +// =========================================================================== + +func TestGroupProcfsMetrics_Positive(t *testing.T) { + for _, tt := range []struct { + name string + dest *pbc.GpPidProcInfo + source *pbc.GpPidProcInfo + expected *pbc.GpPidProcInfo + }{ + { + name: "nil source is no-op", + dest: &pbc.GpPidProcInfo{Pid: 1}, source: nil, + expected: &pbc.GpPidProcInfo{Pid: 1}, + }, + { + name: "ProcStat on empty dest", + dest: &pbc.GpPidProcInfo{}, + source: &pbc.GpPidProcInfo{ProcStat: &pbc.ProcStat{ + Utime: 100, Stime: 200, Cutime: 10, Cstime: 20, + GuestTime: 5, CguestTime: 3, Vsize: 1024, Rss: 512, Pid: 42, Comm: "pg", + }}, + expected: &pbc.GpPidProcInfo{ProcStat: &pbc.ProcStat{ + Utime: 100, Stime: 200, Cutime: 10, Cstime: 20, + GuestTime: 5, CguestTime: 3, Vsize: 1024, Rss: 512, Pid: 42, Comm: "pg", + }}, + }, + { + name: "ProcIo on empty dest", + dest: &pbc.GpPidProcInfo{}, + source: &pbc.GpPidProcInfo{ProcIo: &pbc.ProcIO{ + Rchar: 1000, Wchar: 2000, Syscr: 300, Syscw: 400, + ReadBytes: 500, WriteBytes: 600, CancelledWriteBytes: 50, + }}, + expected: &pbc.GpPidProcInfo{ProcIo: &pbc.ProcIO{ + Rchar: 1000, Wchar: 2000, Syscr: 300, Syscw: 400, + ReadBytes: 500, WriteBytes: 600, CancelledWriteBytes: 50, + }}, + }, + { + name: "ProcIo on non-empty dest sums", + dest: &pbc.GpPidProcInfo{ProcIo: &pbc.ProcIO{ + Rchar: 1000, Wchar: 2000, Syscr: 300, Syscw: 400, + ReadBytes: 500, WriteBytes: 600, CancelledWriteBytes: 50, + }}, + source: &pbc.GpPidProcInfo{ProcIo: &pbc.ProcIO{ + Rchar: 500, Wchar: 800, Syscr: 100, Syscw: 200, + ReadBytes: 250, WriteBytes: 300, CancelledWriteBytes: 25, + }}, + expected: &pbc.GpPidProcInfo{ProcIo: &pbc.ProcIO{ + Rchar: 1500, Wchar: 2800, Syscr: 400, Syscw: 600, + ReadBytes: 750, WriteBytes: 900, CancelledWriteBytes: 75, + }}, + }, + { + name: "nil ProcStat in source does not affect dest", + dest: &pbc.GpPidProcInfo{ + ProcStat: &pbc.ProcStat{Utime: 100}, + ProcIo: &pbc.ProcIO{Rchar: 500}, + }, + source: &pbc.GpPidProcInfo{ProcIo: &pbc.ProcIO{Rchar: 200}}, + expected: &pbc.GpPidProcInfo{ + ProcStat: &pbc.ProcStat{Utime: 100}, + ProcIo: &pbc.ProcIO{Rchar: 700}, + }, + }, + { + name: "nil ProcIo in source does not affect dest", + dest: &pbc.GpPidProcInfo{ + ProcStat: &pbc.ProcStat{Utime: 100}, + ProcIo: &pbc.ProcIO{Rchar: 500}, + }, + source: &pbc.GpPidProcInfo{ProcStat: &pbc.ProcStat{Utime: 50}}, + expected: &pbc.GpPidProcInfo{ + ProcStat: &pbc.ProcStat{Utime: 150}, + ProcIo: &pbc.ProcIO{Rchar: 500}, + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + ir := make(map[MapAggregateKey]int64) + err := GroupProcfsMetrics(tt.dest, tt.source, AggMax, "hostname", ir) + assert.NoError(t, err) + utils.AssertProtoMessagesEqual(t, tt.expected, tt.dest) + }) + } +} + +// =========================================================================== +// GroupProcfsMetrics - intermediate results tracking +// =========================================================================== + +func TestGroupProcfsMetrics_IntermediateResultsUpdated(t *testing.T) { + dest := &pbc.GpPidProcInfo{} + ir := make(map[MapAggregateKey]int64) + + require.NoError(t, GroupProcfsMetrics(dest, &pbc.GpPidProcInfo{ProcStat: &pbc.ProcStat{Vsize: 100, Rss: 50}}, AggMax, "host1", ir)) + assert.Equal(t, int64(100), ir[MapAggregateKey{MetricName: "Vsize", Hostname: "host1"}]) + assert.Equal(t, int64(50), ir[MapAggregateKey{MetricName: "Rss", Hostname: "host1"}]) + + require.NoError(t, GroupProcfsMetrics(dest, &pbc.GpPidProcInfo{ProcStat: &pbc.ProcStat{Vsize: 200, Rss: 75}}, AggMax, "host1", ir)) + assert.Equal(t, int64(300), ir[MapAggregateKey{MetricName: "Vsize", Hostname: "host1"}]) + assert.Equal(t, int64(125), ir[MapAggregateKey{MetricName: "Rss", Hostname: "host1"}]) +} + +func TestGroupProcfsMetrics_IntermediateResults_MultipleHosts(t *testing.T) { + dest := &pbc.GpPidProcInfo{} + ir := make(map[MapAggregateKey]int64) + + require.NoError(t, GroupProcfsMetrics(dest, &pbc.GpPidProcInfo{ProcStat: &pbc.ProcStat{Vsize: 100, Rss: 50}}, AggMax, "host1", ir)) + require.NoError(t, GroupProcfsMetrics(dest, &pbc.GpPidProcInfo{ProcStat: &pbc.ProcStat{Vsize: 200, Rss: 75}}, AggMax, "host2", ir)) + + assert.Equal(t, int64(100), ir[MapAggregateKey{MetricName: "Vsize", Hostname: "host1"}]) + assert.Equal(t, int64(200), ir[MapAggregateKey{MetricName: "Vsize", Hostname: "host2"}]) + assert.Equal(t, int64(50), ir[MapAggregateKey{MetricName: "Rss", Hostname: "host1"}]) + assert.Equal(t, int64(75), ir[MapAggregateKey{MetricName: "Rss", Hostname: "host2"}]) + + // AggMax picks max across all hosts: max(100, 200) = 200 + assert.Equal(t, int64(200), dest.ProcStat.Vsize) + assert.Equal(t, int64(75), dest.ProcStat.Rss) +} diff --git a/internal/storage/procfs_group_test.go b/internal/storage/procfs_group_test.go new file mode 100644 index 0000000..50b2e60 --- /dev/null +++ b/internal/storage/procfs_group_test.go @@ -0,0 +1,238 @@ +package storage + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + pbc "github.com/open-gpdb/yagpcc/api/proto/common" + "github.com/open-gpdb/yagpcc/internal/utils" +) + +// --------------------------------------------------------------------------- +// nonNegativeDiff +// --------------------------------------------------------------------------- + +func TestNonNegativeDiff_Int(t *testing.T) { + t.Run("positive diff", func(t *testing.T) { assert.Equal(t, 7, nonNegativeDiff(3, 10)) }) + t.Run("zero diff", func(t *testing.T) { assert.Equal(t, 0, nonNegativeDiff(5, 5)) }) + t.Run("negative diff clamped", func(t *testing.T) { assert.Equal(t, 0, nonNegativeDiff(10, 3)) }) + t.Run("zero values", func(t *testing.T) { assert.Equal(t, 0, nonNegativeDiff(0, 0)) }) + t.Run("first zero", func(t *testing.T) { assert.Equal(t, 42, nonNegativeDiff(0, 42)) }) + t.Run("last zero", func(t *testing.T) { assert.Equal(t, 0, nonNegativeDiff(42, 0)) }) +} + +func TestNonNegativeDiff_Int32(t *testing.T) { + assert.Equal(t, int32(100), nonNegativeDiff(int32(50), int32(150))) + assert.Equal(t, int32(0), nonNegativeDiff(int32(150), int32(50))) +} + +func TestNonNegativeDiff_Int64(t *testing.T) { + assert.Equal(t, int64(1000), nonNegativeDiff(int64(500), int64(1500))) + assert.Equal(t, int64(0), nonNegativeDiff(int64(1500), int64(500))) + assert.Equal(t, int64(1000), nonNegativeDiff(int64(1_000_000_000_000), int64(1_000_000_001_000))) +} + +// --------------------------------------------------------------------------- +// helper +// --------------------------------------------------------------------------- + +func makeProcInfo( + segID, sessID, pid int64, cmdline, state string, + utime, stime, cutime, cstime, guestTime, cguestTime int64, + rchar, wchar, syscr, syscw, readBytes, writeBytes, cancelledWriteBytes int64, +) *pbc.GpPidProcInfo { + return &pbc.GpPidProcInfo{ + GpSegmentId: segID, SessId: sessID, Pid: pid, Cmdline: cmdline, State: state, + ProcStatus: &pbc.ProcStatus{VmSize: 1024, VmRss: 512, VmPeak: 2048}, + ProcStat: &pbc.ProcStat{ + Pid: pid, Comm: "postgres", State: state, Ppid: 1, + Pgrp: int32(pid), Session: int32(pid), Tpgid: -1, Flags: 0x0040, + MinFlt: 100, CminFlt: 10, MajFlt: 5, CmajFlt: 1, + Utime: utime, Stime: stime, Cutime: cutime, Cstime: cstime, + Priority: 20, NumThreads: 4, Starttime: 12345, + Vsize: 1048576, Rss: 256, RssLimit: 999999, + StartCode: 0x400000, EndCode: 0x500000, StartStack: 0x7fff0000, + Processor: 3, DelayAcctBlkIoTicks: 42, + GuestTime: guestTime, CguestTime: cguestTime, + }, + ProcIo: &pbc.ProcIO{ + Rchar: rchar, Wchar: wchar, Syscr: syscr, Syscw: syscw, + ReadBytes: readBytes, WriteBytes: writeBytes, CancelledWriteBytes: cancelledWriteBytes, + }, + } +} + +// --------------------------------------------------------------------------- +// ProcfsDiff +// --------------------------------------------------------------------------- + +func TestProcfsDiff_BothNil(t *testing.T) { + result, err := ProcfsDiff(nil, nil) + assert.Nil(t, result) + assert.EqualError(t, err, "first and last both nil") +} + +func TestProcfsDiff_FirstNil_ReturnsLast(t *testing.T) { + last := &pbc.GpPidProcInfo{Pid: 1234, ProcStat: &pbc.ProcStat{Pid: 1234, Utime: 100}, ProcIo: &pbc.ProcIO{Rchar: 500}} + result, err := ProcfsDiff(nil, last) + require.NoError(t, err) + assert.Same(t, last, result) +} + +func TestProcfsDiff_LastNil_ReturnsFirst(t *testing.T) { + first := &pbc.GpPidProcInfo{Pid: 5678, ProcStat: &pbc.ProcStat{Pid: 5678}, ProcIo: &pbc.ProcIO{Wchar: 300}} + result, err := ProcfsDiff(first, nil) + require.NoError(t, err) + assert.Same(t, first, result) +} + +func TestProcfsDiff_NormalDiff(t *testing.T) { + first := makeProcInfo(1, 10, 100, "old", "S", 100, 200, 10, 20, 5, 3, 1000, 2000, 300, 400, 500, 600, 50) + last := makeProcInfo(1, 10, 100, "new", "R", 350, 550, 40, 60, 15, 8, 3000, 5000, 800, 900, 1200, 1500, 120) + result, err := ProcfsDiff(first, last) + require.NoError(t, err) + + assert.Equal(t, int64(1), result.GpSegmentId) + assert.Equal(t, "new", result.Cmdline) + assert.Equal(t, "R", result.State) + utils.AssertProtoMessagesEqual(t, last.ProcStatus, result.ProcStatus) + assert.Equal(t, int64(100), result.ProcStat.Pid) + assert.Equal(t, int64(1), result.ProcStat.Ppid) + assert.Equal(t, int32(20), result.ProcStat.Priority) + assert.Equal(t, int64(250), result.ProcStat.Utime) + assert.Equal(t, int64(350), result.ProcStat.Stime) + assert.Equal(t, int64(30), result.ProcStat.Cutime) + assert.Equal(t, int64(40), result.ProcStat.Cstime) + assert.Equal(t, int64(10), result.ProcStat.GuestTime) + assert.Equal(t, int64(5), result.ProcStat.CguestTime) + assert.Equal(t, int64(2000), result.ProcIo.Rchar) + assert.Equal(t, int64(3000), result.ProcIo.Wchar) + assert.Equal(t, int64(500), result.ProcIo.Syscr) + assert.Equal(t, int64(500), result.ProcIo.Syscw) + assert.Equal(t, int64(700), result.ProcIo.ReadBytes) + assert.Equal(t, int64(900), result.ProcIo.WriteBytes) + assert.Equal(t, int64(70), result.ProcIo.CancelledWriteBytes) +} + +func TestProcfsDiff_CounterReset_ClampedToZero(t *testing.T) { + first := makeProcInfo(1, 10, 100, "pg", "S", 500, 600, 70, 80, 30, 20, 9000, 8000, 700, 600, 5000, 4000, 300) + last := makeProcInfo(1, 10, 100, "pg", "S", 100, 200, 10, 20, 5, 3, 1000, 2000, 100, 200, 500, 600, 50) + result, err := ProcfsDiff(first, last) + require.NoError(t, err) + assert.Equal(t, int64(0), result.ProcStat.Utime) + assert.Equal(t, int64(0), result.ProcStat.Stime) + assert.Equal(t, int64(0), result.ProcStat.GuestTime) + assert.Equal(t, int64(0), result.ProcIo.Rchar) + assert.Equal(t, int64(0), result.ProcIo.Wchar) + assert.Equal(t, int64(0), result.ProcIo.CancelledWriteBytes) +} + +func TestProcfsDiff_MixedCounters(t *testing.T) { + first := makeProcInfo(2, 20, 200, "pg", "S", 100, 500, 10, 80, 5, 20, 1000, 9000, 300, 600, 500, 4000, 50) + last := makeProcInfo(2, 20, 200, "pg", "R", 300, 200, 40, 20, 15, 8, 3000, 2000, 800, 200, 1200, 1500, 120) + result, err := ProcfsDiff(first, last) + require.NoError(t, err) + assert.Equal(t, int64(200), result.ProcStat.Utime) + assert.Equal(t, int64(0), result.ProcStat.Stime) + assert.Equal(t, int64(30), result.ProcStat.Cutime) + assert.Equal(t, int64(0), result.ProcStat.Cstime) + assert.Equal(t, int64(10), result.ProcStat.GuestTime) + assert.Equal(t, int64(0), result.ProcStat.CguestTime) + assert.Equal(t, int64(2000), result.ProcIo.Rchar) + assert.Equal(t, int64(0), result.ProcIo.Wchar) + assert.Equal(t, int64(700), result.ProcIo.ReadBytes) + assert.Equal(t, int64(0), result.ProcIo.WriteBytes) + assert.Equal(t, int64(70), result.ProcIo.CancelledWriteBytes) +} + +func TestProcfsDiff_IdenticalMeasurements(t *testing.T) { + a := makeProcInfo(1, 10, 100, "pg", "S", 100, 200, 10, 20, 5, 3, 1000, 2000, 300, 400, 500, 600, 50) + b := makeProcInfo(1, 10, 100, "pg", "S", 100, 200, 10, 20, 5, 3, 1000, 2000, 300, 400, 500, 600, 50) + result, err := ProcfsDiff(a, b) + require.NoError(t, err) + assert.Equal(t, int64(0), result.ProcStat.Utime) + assert.Equal(t, int64(0), result.ProcStat.Stime) + assert.Equal(t, int64(0), result.ProcIo.Rchar) + assert.Equal(t, int64(0), result.ProcIo.WriteBytes) +} + +func TestProcfsDiff_SnapshotFieldsFromLast(t *testing.T) { + first := makeProcInfo(1, 10, 100, "first", "S", 100, 200, 10, 20, 5, 3, 1000, 2000, 300, 400, 500, 600, 50) + last := &pbc.GpPidProcInfo{ + GpSegmentId: 99, SessId: 77, Pid: 888, Cmdline: "last", State: "D", + ProcStatus: &pbc.ProcStatus{VmSize: 9999, VmRss: 8888, VmPeak: 7777}, + ProcStat: &pbc.ProcStat{ + Pid: 999, Comm: "pg_worker", State: "D", Ppid: 42, Pgrp: 43, + MinFlt: 555, CminFlt: 66, MajFlt: 77, CmajFlt: 88, + Utime: 200, Stime: 300, Cutime: 20, Cstime: 30, + Priority: 10, Nice: -5, NumThreads: 16, Processor: 7, + Vsize: 2097152, Rss: 512, DelayAcctBlkIoTicks: 99, + GuestTime: 10, CguestTime: 6, + }, + ProcIo: &pbc.ProcIO{Rchar: 2000, Wchar: 3000, Syscr: 400, Syscw: 500, ReadBytes: 600, WriteBytes: 700, CancelledWriteBytes: 80}, + } + result, err := ProcfsDiff(first, last) + require.NoError(t, err) + assert.Equal(t, int64(99), result.GpSegmentId) + assert.Equal(t, int64(999), result.ProcStat.Pid) + assert.Equal(t, int64(42), result.ProcStat.Ppid) + assert.Equal(t, int64(555), result.ProcStat.MinFlt) + assert.Equal(t, int64(100), result.ProcStat.Utime) + assert.Equal(t, int64(100), result.ProcStat.Stime) + assert.Equal(t, int64(5), result.ProcStat.GuestTime) + assert.Equal(t, int64(3), result.ProcStat.CguestTime) + assert.Equal(t, int64(1000), result.ProcIo.Rchar) + assert.Equal(t, int64(30), result.ProcIo.CancelledWriteBytes) +} + +func TestProcfsDiff_ResultIsNewObject(t *testing.T) { + first := makeProcInfo(1, 10, 100, "cmd", "S", 10, 20, 1, 2, 1, 1, 100, 200, 30, 40, 50, 60, 5) + last := makeProcInfo(1, 10, 100, "cmd", "S", 20, 40, 2, 4, 2, 2, 200, 400, 60, 80, 100, 120, 10) + result, err := ProcfsDiff(first, last) + require.NoError(t, err) + assert.NotSame(t, first, result) + assert.NotSame(t, last, result) +} + +func TestProcfsDiff_NilSubStructs(t *testing.T) { + t.Run("first nil ProcStat", func(t *testing.T) { + result, err := ProcfsDiff(&pbc.GpPidProcInfo{ProcIo: &pbc.ProcIO{Rchar: 100}}, &pbc.GpPidProcInfo{ProcStat: &pbc.ProcStat{Utime: 200, Pid: 42}, ProcIo: &pbc.ProcIO{Rchar: 300}}) + require.NoError(t, err) + assert.Equal(t, int64(200), result.ProcStat.Utime) + assert.Equal(t, int64(200), result.ProcIo.Rchar) + }) + t.Run("last nil ProcStat", func(t *testing.T) { + result, err := ProcfsDiff(&pbc.GpPidProcInfo{ProcStat: &pbc.ProcStat{Utime: 100}, ProcIo: &pbc.ProcIO{}}, &pbc.GpPidProcInfo{ProcIo: &pbc.ProcIO{}}) + require.NoError(t, err) + assert.Equal(t, int64(0), result.ProcStat.Utime) + }) + t.Run("both nil ProcStat", func(t *testing.T) { + result, err := ProcfsDiff(&pbc.GpPidProcInfo{ProcIo: &pbc.ProcIO{}}, &pbc.GpPidProcInfo{ProcIo: &pbc.ProcIO{}}) + require.NoError(t, err) + assert.Nil(t, result.ProcStat) + }) + t.Run("first nil ProcIo", func(t *testing.T) { + result, err := ProcfsDiff(&pbc.GpPidProcInfo{ProcStat: &pbc.ProcStat{}}, &pbc.GpPidProcInfo{ProcStat: &pbc.ProcStat{}, ProcIo: &pbc.ProcIO{Rchar: 500}}) + require.NoError(t, err) + assert.Equal(t, int64(500), result.ProcIo.Rchar) + }) + t.Run("last nil ProcIo", func(t *testing.T) { + result, err := ProcfsDiff(&pbc.GpPidProcInfo{ProcStat: &pbc.ProcStat{}, ProcIo: &pbc.ProcIO{Rchar: 500}}, &pbc.GpPidProcInfo{ProcStat: &pbc.ProcStat{}}) + require.NoError(t, err) + assert.Equal(t, int64(0), result.ProcIo.Rchar) + }) + t.Run("both nil ProcIo", func(t *testing.T) { + result, err := ProcfsDiff(&pbc.GpPidProcInfo{ProcStat: &pbc.ProcStat{}}, &pbc.GpPidProcInfo{ProcStat: &pbc.ProcStat{}}) + require.NoError(t, err) + assert.Nil(t, result.ProcIo) + }) + t.Run("all sub-structs nil", func(t *testing.T) { + result, err := ProcfsDiff(&pbc.GpPidProcInfo{Pid: 1}, &pbc.GpPidProcInfo{Pid: 2}) + require.NoError(t, err) + assert.Equal(t, int64(2), result.Pid) + assert.Nil(t, result.ProcStat) + assert.Nil(t, result.ProcIo) + }) +} diff --git a/internal/storage/procfs_storage.go b/internal/storage/procfs_storage.go new file mode 100644 index 0000000..f984839 --- /dev/null +++ b/internal/storage/procfs_storage.go @@ -0,0 +1,155 @@ +package storage + +import ( + "errors" + "fmt" + "sync" + "time" + + pbc "github.com/open-gpdb/yagpcc/api/proto/common" +) + +type ( + ProcKey struct { + GpSegmentId int64 + SessId int64 + Pid int64 + } + + ProcStat struct { + Cmdline string + State string + ProcStat *pbc.ProcStat + ProcStatus *pbc.ProcStatus + ProcIO *pbc.ProcIO + } + + ProcMap map[ProcKey]*ProcStat + + ProcfsStatType struct { + statTime time.Time + pidProcData ProcMap + } + + ProcfsStorage struct { + mx *sync.RWMutex + procfsStat []ProcfsStatType + maximumStoredPoints int + } +) + +type ProcfsOption = func(*ProcfsStorage) + +const ( + defaultStoredPoints = 30 +) + +func NewProcfsStorage() *ProcfsStorage { + return &ProcfsStorage{ + mx: &sync.RWMutex{}, + maximumStoredPoints: defaultStoredPoints, + procfsStat: make([]ProcfsStatType, 0, defaultStoredPoints), + } +} + +func WithMaximumStoredPoints(maximumStoredPoints int) ProcfsOption { + return func(p *ProcfsStorage) { + p.maximumStoredPoints = maximumStoredPoints + } +} + +func (p *ProcfsStorage) TidyUpProcfsStat() { + p.mx.Lock() + defer p.mx.Unlock() + if len(p.procfsStat) > p.maximumStoredPoints { + firstSurvive := len(p.procfsStat) - p.maximumStoredPoints + p.procfsStat = p.procfsStat[firstSurvive:] + } +} + +func (p *ProcfsStorage) RegisterProcfsStat(statTime time.Time, procfsStat []*pbc.GpPidProcInfo) { + + stat := ProcfsStatType{ + statTime: statTime, + pidProcData: make(ProcMap, len(procfsStat)), + } + // create map for fast access + for _, proc := range procfsStat { + stat.pidProcData[ProcKey{ + GpSegmentId: proc.GpSegmentId, + SessId: proc.SessId, + Pid: proc.Pid, + }] = &ProcStat{ + Cmdline: proc.Cmdline, + State: proc.State, + ProcStat: proc.ProcStat, + ProcStatus: proc.ProcStatus, + ProcIO: proc.ProcIo, + } + } + + // store new map + p.mx.Lock() + p.procfsStat = append(p.procfsStat, stat) + p.mx.Unlock() + + // delete old data + p.TidyUpProcfsStat() +} + +func absDuration(d time.Duration) time.Duration { + if d < 0 { + return -d + } + return d +} + +func (p *ProcfsStorage) GetNearestNTime(d time.Duration) (ProcMap, error) { + p.mx.RLock() + defer p.mx.RUnlock() + + if len(p.procfsStat) == 0 { + return nil, errors.New("no data in procfsStat") + } + + lastIdx := len(p.procfsStat) - 1 + currentTime := p.procfsStat[lastIdx].statTime + minAbsDiff := absDuration(d) // worst case: the last element itself (diff=0, absDiff=|d-0|=d) + minIndex := lastIdx + + for i := range p.procfsStat { + idx := lastIdx - i + currentDiff := currentTime.Sub(p.procfsStat[idx].statTime) + absDiff := absDuration(d - currentDiff) + if absDiff <= minAbsDiff { + minAbsDiff = absDiff + minIndex = idx + continue + } + // Since times are sorted ascending, currentDiff is monotonically increasing. + // Once the absolute difference starts growing, it will only get worse. + break + } + + return p.procfsStat[minIndex].pidProcData, nil +} + +func (p *ProcfsStorage) getNMin(d time.Duration) (ProcMap, ProcMap, error) { + nearest, err := p.GetNearestNTime(d) + if err != nil { + return nil, nil, fmt.Errorf("fail in get 5 minutes interval %v", err) + } + return nearest, p.procfsStat[len(p.procfsStat)-1].pidProcData, nil +} + +func (p *ProcfsStorage) Get5Min() (ProcMap, ProcMap, error) { + return p.getNMin(5 * time.Minute) +} + +func (p *ProcfsStorage) Get15Min() (ProcMap, ProcMap, error) { + return p.getNMin(15 * time.Minute) +} + +func (p *ProcfsStorage) Get30Min() (ProcMap, ProcMap, error) { + return p.getNMin(30 * time.Minute) +} diff --git a/internal/storage/procfs_storage_test.go b/internal/storage/procfs_storage_test.go new file mode 100644 index 0000000..946c102 --- /dev/null +++ b/internal/storage/procfs_storage_test.go @@ -0,0 +1,547 @@ +package storage + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + pbc "github.com/open-gpdb/yagpcc/api/proto/common" +) + +func buildProcfsStorage(seconds ...float64) *ProcfsStorage { + ps := &ProcfsStorage{ + mx: &sync.RWMutex{}, + maximumStoredPoints: defaultStoredPoints, + procfsStat: make([]ProcfsStatType, 0, len(seconds)), + } + for i, s := range seconds { + pm := ProcMap{ + ProcKey{GpSegmentId: int64(i)}: &ProcStat{Cmdline: "cmd"}, + } + ps.procfsStat = append(ps.procfsStat, ProcfsStatType{ + statTime: time.Unix(0, int64(s*float64(time.Second))), + pidProcData: pm, + }) + } + return ps +} + +func identifyIndex(ps *ProcfsStorage, pm ProcMap) int { + for i, s := range ps.procfsStat { + if len(s.pidProcData) == len(pm) { + match := true + for k, v := range s.pidProcData { + if pm[k] != v { + match = false + break + } + } + if match { + return i + } + } + } + return -1 +} + +func TestGetNearestNTime_EmptyStorage(t *testing.T) { + ps := NewProcfsStorage() + _, err := ps.GetNearestNTime(time.Second) + assert.Error(t, err) + assert.Contains(t, err.Error(), "no data in procfsStat") +} + +func TestGetNearestNTime_SingleElement(t *testing.T) { + ps := buildProcfsStorage(5.0) + pm, err := ps.GetNearestNTime(0) + require.NoError(t, err) + assert.Equal(t, 0, identifyIndex(ps, pm)) + pm, err = ps.GetNearestNTime(time.Second) + require.NoError(t, err) + assert.Equal(t, 0, identifyIndex(ps, pm)) + pm, err = ps.GetNearestNTime(10 * time.Second) + require.NoError(t, err) + assert.Equal(t, 0, identifyIndex(ps, pm)) +} + +func TestGetNearestNTime_ExampleFromDescription(t *testing.T) { + ps := buildProcfsStorage(1, 2, 3, 4.1, 4.2, 5) + pm, err := ps.GetNearestNTime(time.Second) + require.NoError(t, err) + assert.Equal(t, 3, identifyIndex(ps, pm)) +} + +func TestGetNearestNTime_ExactMatch(t *testing.T) { + ps := buildProcfsStorage(0, 1, 2, 3, 4, 5) + pm, err := ps.GetNearestNTime(3 * time.Second) + require.NoError(t, err) + assert.Equal(t, 2, identifyIndex(ps, pm)) +} + +func TestGetNearestNTime_ZeroDuration(t *testing.T) { + ps := buildProcfsStorage(1, 2, 3, 4, 5) + pm, err := ps.GetNearestNTime(0) + require.NoError(t, err) + assert.Equal(t, 4, identifyIndex(ps, pm)) +} + +func TestGetNearestNTime_DurationLargerThanRange(t *testing.T) { + ps := buildProcfsStorage(10, 11, 12) + pm, err := ps.GetNearestNTime(100 * time.Second) + require.NoError(t, err) + assert.Equal(t, 0, identifyIndex(ps, pm)) +} + +func TestGetNearestNTime_TwoElements(t *testing.T) { + ps := buildProcfsStorage(0, 10) + pm, err := ps.GetNearestNTime(3 * time.Second) + require.NoError(t, err) + assert.Equal(t, 1, identifyIndex(ps, pm)) + pm, err = ps.GetNearestNTime(7 * time.Second) + require.NoError(t, err) + assert.Equal(t, 0, identifyIndex(ps, pm)) + pm, err = ps.GetNearestNTime(5 * time.Second) + require.NoError(t, err) + assert.Equal(t, 0, identifyIndex(ps, pm)) +} + +func TestGetNearestNTime_CloselySpacedTimes(t *testing.T) { + ps := buildProcfsStorage(10.0, 10.1, 10.2, 10.3, 10.4, 10.5) + pm, err := ps.GetNearestNTime(250 * time.Millisecond) + require.NoError(t, err) + assert.Equal(t, 2, identifyIndex(ps, pm)) +} + +func TestGetNearestNTime_AllSameTime(t *testing.T) { + ps := buildProcfsStorage(5, 5, 5, 5) + pm, err := ps.GetNearestNTime(time.Second) + require.NoError(t, err) + assert.Equal(t, 0, identifyIndex(ps, pm)) +} + +func TestGetNearestNTime_ReturnsCorrectProcMap(t *testing.T) { + ps := buildProcfsStorage(1, 2, 3, 4, 5) + pm, err := ps.GetNearestNTime(2 * time.Second) + require.NoError(t, err) + _, ok := pm[ProcKey{GpSegmentId: 2}] + assert.True(t, ok) +} + +func TestGetNearestNTime_FirstElementIsNearest(t *testing.T) { + ps := buildProcfsStorage(0, 1, 2) + pm, err := ps.GetNearestNTime(2 * time.Second) + require.NoError(t, err) + assert.Equal(t, 0, identifyIndex(ps, pm)) +} + +func TestGetNearestNTime_LastElementIsNearest(t *testing.T) { + ps := buildProcfsStorage(0, 1, 2) + pm, err := ps.GetNearestNTime(0) + require.NoError(t, err) + assert.Equal(t, 2, identifyIndex(ps, pm)) +} + +func TestGetNearestNTime_MiddleElementIsNearest(t *testing.T) { + ps := buildProcfsStorage(0, 5, 10) + pm, err := ps.GetNearestNTime(5 * time.Second) + require.NoError(t, err) + assert.Equal(t, 1, identifyIndex(ps, pm)) +} + +func TestGetNearestNTime_LargeSlice(t *testing.T) { + seconds := make([]float64, 100) + for i := range seconds { + seconds[i] = float64(i) + } + ps := buildProcfsStorage(seconds...) + pm, err := ps.GetNearestNTime(50 * time.Second) + require.NoError(t, err) + assert.Equal(t, 49, identifyIndex(ps, pm)) + pm, err = ps.GetNearestNTime(0) + require.NoError(t, err) + assert.Equal(t, 99, identifyIndex(ps, pm)) + pm, err = ps.GetNearestNTime(200 * time.Second) + require.NoError(t, err) + assert.Equal(t, 0, identifyIndex(ps, pm)) +} + +func TestGetNearestNTime_NonUniformSpacing(t *testing.T) { + ps := buildProcfsStorage(0, 1, 2, 8, 9, 10) + pm, err := ps.GetNearestNTime(5 * time.Second) + require.NoError(t, err) + assert.Equal(t, 2, identifyIndex(ps, pm)) +} + +func TestGetNearestNTime_SubSecondPrecision(t *testing.T) { + ps := buildProcfsStorage(0, 0.5, 1.0, 1.5, 2.0) + pm, err := ps.GetNearestNTime(700 * time.Millisecond) + require.NoError(t, err) + assert.Equal(t, 3, identifyIndex(ps, pm)) +} + +func TestAbsDuration(t *testing.T) { + assert.Equal(t, time.Second, absDuration(time.Second)) + assert.Equal(t, time.Second, absDuration(-time.Second)) + assert.Equal(t, time.Duration(0), absDuration(0)) + assert.Equal(t, 500*time.Millisecond, absDuration(-500*time.Millisecond)) +} + +func TestNewProcfsStorage_MutexInitialized(t *testing.T) { + ps := NewProcfsStorage() + assert.NotNil(t, ps.mx) + ps.mx.RLock() + defer ps.mx.RUnlock() +} + +// --- RegisterProcfsStat --- + +func TestRegisterProcfsStat_SingleEntry(t *testing.T) { + ps := NewProcfsStorage() + now := time.Now() + procs := []*pbc.GpPidProcInfo{{ + GpSegmentId: 1, SessId: 100, Pid: 42, + Cmdline: "SELECT 1", State: "R", + ProcStat: &pbc.ProcStat{Utime: 10}, ProcStatus: &pbc.ProcStatus{VmRss: 1024}, + ProcIo: &pbc.ProcIO{ReadBytes: 512}, + }} + ps.RegisterProcfsStat(now, procs) + ps.mx.RLock() + defer ps.mx.RUnlock() + require.Len(t, ps.procfsStat, 1) + assert.Equal(t, now, ps.procfsStat[0].statTime) + key := ProcKey{GpSegmentId: 1, SessId: 100, Pid: 42} + stat, ok := ps.procfsStat[0].pidProcData[key] + require.True(t, ok) + assert.Equal(t, "SELECT 1", stat.Cmdline) + assert.Equal(t, "R", stat.State) + assert.Equal(t, int64(10), stat.ProcStat.Utime) + assert.Equal(t, int64(1024), stat.ProcStatus.VmRss) + assert.Equal(t, int64(512), stat.ProcIO.ReadBytes) +} + +func TestRegisterProcfsStat_MultipleProcesses(t *testing.T) { + ps := NewProcfsStorage() + now := time.Now() + procs := []*pbc.GpPidProcInfo{ + {GpSegmentId: 0, SessId: 1, Pid: 10, Cmdline: "cmd1", State: "S"}, + {GpSegmentId: 0, SessId: 1, Pid: 20, Cmdline: "cmd2", State: "R"}, + {GpSegmentId: 1, SessId: 2, Pid: 30, Cmdline: "cmd3", State: "D"}, + } + ps.RegisterProcfsStat(now, procs) + ps.mx.RLock() + defer ps.mx.RUnlock() + require.Len(t, ps.procfsStat, 1) + pm := ps.procfsStat[0].pidProcData + assert.Len(t, pm, 3) + assert.Equal(t, "cmd1", pm[ProcKey{GpSegmentId: 0, SessId: 1, Pid: 10}].Cmdline) + assert.Equal(t, "cmd2", pm[ProcKey{GpSegmentId: 0, SessId: 1, Pid: 20}].Cmdline) + assert.Equal(t, "cmd3", pm[ProcKey{GpSegmentId: 1, SessId: 2, Pid: 30}].Cmdline) +} + +func TestRegisterProcfsStat_EmptyProcList(t *testing.T) { + ps := NewProcfsStorage() + now := time.Now() + ps.RegisterProcfsStat(now, []*pbc.GpPidProcInfo{}) + ps.mx.RLock() + defer ps.mx.RUnlock() + require.Len(t, ps.procfsStat, 1) + assert.Empty(t, ps.procfsStat[0].pidProcData) + assert.Equal(t, now, ps.procfsStat[0].statTime) +} + +func TestRegisterProcfsStat_NilProcList(t *testing.T) { + ps := NewProcfsStorage() + now := time.Now() + ps.RegisterProcfsStat(now, nil) + ps.mx.RLock() + defer ps.mx.RUnlock() + require.Len(t, ps.procfsStat, 1) + assert.Empty(t, ps.procfsStat[0].pidProcData) +} + +func TestRegisterProcfsStat_MultipleRegistrations(t *testing.T) { + ps := NewProcfsStorage() + t1 := time.Now() + t2 := t1.Add(time.Minute) + t3 := t2.Add(time.Minute) + ps.RegisterProcfsStat(t1, []*pbc.GpPidProcInfo{{GpSegmentId: 0, SessId: 1, Pid: 10, Cmdline: "first"}}) + ps.RegisterProcfsStat(t2, []*pbc.GpPidProcInfo{{GpSegmentId: 0, SessId: 1, Pid: 10, Cmdline: "second"}}) + ps.RegisterProcfsStat(t3, []*pbc.GpPidProcInfo{{GpSegmentId: 0, SessId: 1, Pid: 10, Cmdline: "third"}}) + ps.mx.RLock() + defer ps.mx.RUnlock() + require.Len(t, ps.procfsStat, 3) + assert.Equal(t, t1, ps.procfsStat[0].statTime) + assert.Equal(t, t2, ps.procfsStat[1].statTime) + assert.Equal(t, t3, ps.procfsStat[2].statTime) + key := ProcKey{GpSegmentId: 0, SessId: 1, Pid: 10} + assert.Equal(t, "first", ps.procfsStat[0].pidProcData[key].Cmdline) + assert.Equal(t, "second", ps.procfsStat[1].pidProcData[key].Cmdline) + assert.Equal(t, "third", ps.procfsStat[2].pidProcData[key].Cmdline) +} + +func TestRegisterProcfsStat_DuplicateKeysInSameBatch(t *testing.T) { + ps := NewProcfsStorage() + now := time.Now() + procs := []*pbc.GpPidProcInfo{ + {GpSegmentId: 0, SessId: 1, Pid: 10, Cmdline: "first"}, + {GpSegmentId: 0, SessId: 1, Pid: 10, Cmdline: "second"}, + } + ps.RegisterProcfsStat(now, procs) + ps.mx.RLock() + defer ps.mx.RUnlock() + pm := ps.procfsStat[0].pidProcData + assert.Len(t, pm, 1) + assert.Equal(t, "second", pm[ProcKey{GpSegmentId: 0, SessId: 1, Pid: 10}].Cmdline) +} + +func TestRegisterProcfsStat_NilSubFields(t *testing.T) { + ps := NewProcfsStorage() + now := time.Now() + procs := []*pbc.GpPidProcInfo{{GpSegmentId: 0, SessId: 1, Pid: 10, Cmdline: "cmd", State: "S"}} + ps.RegisterProcfsStat(now, procs) + ps.mx.RLock() + defer ps.mx.RUnlock() + stat := ps.procfsStat[0].pidProcData[ProcKey{GpSegmentId: 0, SessId: 1, Pid: 10}] + require.NotNil(t, stat) + assert.Nil(t, stat.ProcStat) + assert.Nil(t, stat.ProcStatus) + assert.Nil(t, stat.ProcIO) +} + +// --- TidyUpProcfsStat --- + +func TestTidyUpProcfsStat_UnderLimit(t *testing.T) { + ps := &ProcfsStorage{mx: &sync.RWMutex{}, maximumStoredPoints: 5, procfsStat: make([]ProcfsStatType, 0, 5)} + for i := 0; i < 3; i++ { + ps.procfsStat = append(ps.procfsStat, ProcfsStatType{statTime: time.Unix(int64(i), 0)}) + } + ps.TidyUpProcfsStat() + ps.mx.RLock() + defer ps.mx.RUnlock() + assert.Len(t, ps.procfsStat, 3) +} + +func TestTidyUpProcfsStat_AtLimit(t *testing.T) { + ps := &ProcfsStorage{mx: &sync.RWMutex{}, maximumStoredPoints: 3, procfsStat: make([]ProcfsStatType, 0, 3)} + for i := 0; i < 3; i++ { + ps.procfsStat = append(ps.procfsStat, ProcfsStatType{statTime: time.Unix(int64(i), 0)}) + } + ps.TidyUpProcfsStat() + ps.mx.RLock() + defer ps.mx.RUnlock() + assert.Len(t, ps.procfsStat, 3) +} + +func TestTidyUpProcfsStat_OverLimit(t *testing.T) { + ps := &ProcfsStorage{mx: &sync.RWMutex{}, maximumStoredPoints: 3, procfsStat: make([]ProcfsStatType, 0, 6)} + for i := 0; i < 6; i++ { + ps.procfsStat = append(ps.procfsStat, ProcfsStatType{statTime: time.Unix(int64(i), 0)}) + } + ps.TidyUpProcfsStat() + ps.mx.RLock() + defer ps.mx.RUnlock() + require.Len(t, ps.procfsStat, 3) + assert.Equal(t, time.Unix(3, 0), ps.procfsStat[0].statTime) + assert.Equal(t, time.Unix(4, 0), ps.procfsStat[1].statTime) + assert.Equal(t, time.Unix(5, 0), ps.procfsStat[2].statTime) +} + +func TestRegisterProcfsStat_TriggersCleanup(t *testing.T) { + ps := &ProcfsStorage{mx: &sync.RWMutex{}, maximumStoredPoints: 3, procfsStat: make([]ProcfsStatType, 0, 5)} + base := time.Now() + for i := 0; i < 5; i++ { + ps.RegisterProcfsStat(base.Add(time.Duration(i)*time.Minute), []*pbc.GpPidProcInfo{ + {GpSegmentId: int64(i), SessId: 1, Pid: 1, Cmdline: "cmd"}, + }) + } + ps.mx.RLock() + defer ps.mx.RUnlock() + assert.Len(t, ps.procfsStat, 3) + assert.Equal(t, base.Add(2*time.Minute), ps.procfsStat[0].statTime) + assert.Equal(t, base.Add(3*time.Minute), ps.procfsStat[1].statTime) + assert.Equal(t, base.Add(4*time.Minute), ps.procfsStat[2].statTime) +} + +func TestTidyUpProcfsStat_EmptySlice(t *testing.T) { + ps := NewProcfsStorage() + ps.TidyUpProcfsStat() + ps.mx.RLock() + defer ps.mx.RUnlock() + assert.Empty(t, ps.procfsStat) +} + +// --- getNMin, Get5Min, Get15Min, Get30Min --- + +func buildWithUniqueData(seconds ...float64) *ProcfsStorage { + ps := &ProcfsStorage{ + mx: &sync.RWMutex{}, + maximumStoredPoints: defaultStoredPoints, + procfsStat: make([]ProcfsStatType, 0, len(seconds)), + } + for i, s := range seconds { + pm := ProcMap{ + ProcKey{GpSegmentId: int64(i), SessId: int64(i), Pid: int64(i)}: &ProcStat{ + Cmdline: fmt.Sprintf("cmd-%d", i), + }, + } + ps.procfsStat = append(ps.procfsStat, ProcfsStatType{ + statTime: time.Unix(0, int64(s*float64(time.Second))), + pidProcData: pm, + }) + } + return ps +} + +func findIdx(ps *ProcfsStorage, pm ProcMap) int { + for i, s := range ps.procfsStat { + if len(s.pidProcData) == len(pm) { + match := true + for k, v := range s.pidProcData { + if pm[k] != v { + match = false + break + } + } + if match { + return i + } + } + } + return -1 +} + +func TestGetNMin_EmptyStorage(t *testing.T) { + ps := NewProcfsStorage() + nearest, latest, err := ps.getNMin(5 * time.Minute) + require.Error(t, err) + assert.Nil(t, nearest) + assert.Nil(t, latest) + assert.Contains(t, err.Error(), "fail in get 5 minutes interval") +} + +func TestGetNMin_SingleElement(t *testing.T) { + ps := buildWithUniqueData(100) + nearest, latest, err := ps.getNMin(5 * time.Minute) + require.NoError(t, err) + assert.Equal(t, 0, findIdx(ps, nearest)) + assert.Equal(t, 0, findIdx(ps, latest)) +} + +func TestGetNMin_ReturnsNearestAndLatest(t *testing.T) { + // 0,60,120,180,240,300; d=120s: last=300, nearest at diff=120 -> idx 3 + ps := buildWithUniqueData(0, 60, 120, 180, 240, 300) + nearest, latest, err := ps.getNMin(120 * time.Second) + require.NoError(t, err) + assert.Equal(t, 3, findIdx(ps, nearest)) + assert.Equal(t, 5, findIdx(ps, latest)) +} + +func TestGetNMin_ZeroDuration(t *testing.T) { + ps := buildWithUniqueData(0, 10, 20) + nearest, latest, err := ps.getNMin(0) + require.NoError(t, err) + assert.Equal(t, 2, findIdx(ps, nearest)) + assert.Equal(t, 2, findIdx(ps, latest)) +} + +func TestGetNMin_DurationExceedsRange(t *testing.T) { + ps := buildWithUniqueData(0, 1, 2) + nearest, latest, err := ps.getNMin(1000 * time.Second) + require.NoError(t, err) + assert.Equal(t, 0, findIdx(ps, nearest)) + assert.Equal(t, 2, findIdx(ps, latest)) +} + +func TestGetNMin_DifferentMaps(t *testing.T) { + ps := buildWithUniqueData(0, 60, 120, 180, 240, 300) + nearest, latest, err := ps.getNMin(120 * time.Second) + require.NoError(t, err) + assert.NotEqual(t, findIdx(ps, nearest), findIdx(ps, latest)) +} + +func TestGet5Min_EmptyStorage(t *testing.T) { + ps := NewProcfsStorage() + n, l, err := ps.Get5Min() + require.Error(t, err) + assert.Nil(t, n) + assert.Nil(t, l) +} + +func TestGet5Min_ReturnsCorrectInterval(t *testing.T) { + // 0,60,120,180,240,300,360,420; d=300s, last=420 -> nearest at 120 -> idx 2 + ps := buildWithUniqueData(0, 60, 120, 180, 240, 300, 360, 420) + nearest, latest, err := ps.Get5Min() + require.NoError(t, err) + assert.Equal(t, 2, findIdx(ps, nearest)) + assert.Equal(t, 7, findIdx(ps, latest)) +} + +func TestGet5Min_SingleElement(t *testing.T) { + ps := buildWithUniqueData(100) + nearest, latest, err := ps.Get5Min() + require.NoError(t, err) + assert.Equal(t, 0, findIdx(ps, nearest)) + assert.Equal(t, 0, findIdx(ps, latest)) +} + +func TestGet15Min_EmptyStorage(t *testing.T) { + ps := NewProcfsStorage() + n, l, err := ps.Get15Min() + require.Error(t, err) + assert.Nil(t, n) + assert.Nil(t, l) +} + +func TestGet15Min_ReturnsCorrectInterval(t *testing.T) { + // 0,300,600,900,1200,1500; d=900s, last=1500 -> nearest at 600 -> idx 2 + ps := buildWithUniqueData(0, 300, 600, 900, 1200, 1500) + nearest, latest, err := ps.Get15Min() + require.NoError(t, err) + assert.Equal(t, 2, findIdx(ps, nearest)) + assert.Equal(t, 5, findIdx(ps, latest)) +} + +func TestGet15Min_RangeSmaller(t *testing.T) { + ps := buildWithUniqueData(0, 60, 120) + nearest, latest, err := ps.Get15Min() + require.NoError(t, err) + assert.Equal(t, 0, findIdx(ps, nearest)) + assert.Equal(t, 2, findIdx(ps, latest)) +} + +func TestGet30Min_EmptyStorage(t *testing.T) { + ps := NewProcfsStorage() + n, l, err := ps.Get30Min() + require.Error(t, err) + assert.Nil(t, n) + assert.Nil(t, l) +} + +func TestGet30Min_ReturnsCorrectInterval(t *testing.T) { + // 0,600,1200,1800,2400,3000; d=1800s, last=3000 -> nearest at 1200 -> idx 2 + ps := buildWithUniqueData(0, 600, 1200, 1800, 2400, 3000) + nearest, latest, err := ps.Get30Min() + require.NoError(t, err) + assert.Equal(t, 2, findIdx(ps, nearest)) + assert.Equal(t, 5, findIdx(ps, latest)) +} + +func TestGet30Min_RangeSmaller(t *testing.T) { + ps := buildWithUniqueData(0, 60, 120, 180, 240, 300) + nearest, latest, err := ps.Get30Min() + require.NoError(t, err) + assert.Equal(t, 0, findIdx(ps, nearest)) + assert.Equal(t, 5, findIdx(ps, latest)) +} + +func TestGet30Min_SingleElement(t *testing.T) { + ps := buildWithUniqueData(100) + nearest, latest, err := ps.Get30Min() + require.NoError(t, err) + assert.Equal(t, 0, findIdx(ps, nearest)) + assert.Equal(t, 0, findIdx(ps, latest)) +} diff --git a/internal/storage/trace.log b/internal/storage/trace.log deleted file mode 100644 index e69de29..0000000 diff --git a/internal/utils/procfs.go b/internal/utils/procfs.go index 867cb4c..87c14d6 100644 --- a/internal/utils/procfs.go +++ b/internal/utils/procfs.go @@ -138,23 +138,23 @@ func isProcessGone(err error) bool { // convertProcStat maps prometheus/procfs.ProcStat → protobuf ProcStat. func convertProcStat(s *procfs.ProcStat) *pb.ProcStat { return &pb.ProcStat{ - Pid: int32(s.PID), + Pid: int64(s.PID), Comm: s.Comm, State: s.State, - Ppid: int32(s.PPID), + Ppid: int64(s.PPID), Pgrp: int32(s.PGRP), Session: int32(s.Session), Tty: int32(s.TTY), Tpgid: int32(s.TPGID), Flags: int32(s.Flags), - MinFlt: int32(s.MinFlt), - CminFlt: int32(s.CMinFlt), - MajFlt: int32(s.MajFlt), - CmajFlt: int32(s.CMajFlt), - Utime: int32(s.UTime), - Stime: int32(s.STime), - Cutime: int32(s.CUTime), - Cstime: int32(s.CSTime), + MinFlt: int64(s.MinFlt), + CminFlt: int64(s.CMinFlt), + MajFlt: int64(s.MajFlt), + CmajFlt: int64(s.CMajFlt), + Utime: int64(s.UTime), + Stime: int64(s.STime), + Cutime: int64(s.CUTime), + Cstime: int64(s.CSTime), Priority: int32(s.Priority), Nice: int32(s.Nice), NumThreads: int32(s.NumThreads), @@ -166,15 +166,15 @@ func convertProcStat(s *procfs.ProcStat) *pb.ProcStat { RtPriority: int32(s.RTPriority), Policy: int32(s.Policy), DelayAcctBlkIoTicks: int64(s.DelayAcctBlkIOTicks), - GuestTime: int32(s.GuestTime), - CguestTime: int32(s.CGuestTime), + GuestTime: int64(s.GuestTime), + CguestTime: int64(s.CGuestTime), } } // convertProcStatus maps prometheus/procfs.ProcStatus → protobuf ProcStatus. func convertProcStatus(s *procfs.ProcStatus) *pb.ProcStatus { return &pb.ProcStatus{ - Pid: int32(s.PID), + Pid: int64(s.PID), Name: s.Name, Tgid: int32(s.TGID), NsPids: uint64SliceToInt64(s.NSpids), diff --git a/internal/utils/procfs_getprocinfo_test.go b/internal/utils/procfs_getprocinfo_test.go index ab0de7d..b9ca8d9 100644 --- a/internal/utils/procfs_getprocinfo_test.go +++ b/internal/utils/procfs_getprocinfo_test.go @@ -133,22 +133,22 @@ func TestGetProcInfo_AllFiles(t *testing.T) { // ProcStat populated. require.NotNil(t, info.ProcStat) - assert.Equal(t, int32(pid), info.ProcStat.Pid) + assert.Equal(t, int64(pid), info.ProcStat.Pid) assert.Equal(t, "postgres", info.ProcStat.Comm) assert.Equal(t, "S", info.ProcStat.State) - assert.Equal(t, int32(1), info.ProcStat.Ppid) + assert.Equal(t, int64(1), info.ProcStat.Ppid) assert.Equal(t, int32(4), info.ProcStat.NumThreads) assert.Equal(t, int64(123456789), info.ProcStat.Starttime) assert.Equal(t, int64(1048576), info.ProcStat.Vsize) assert.Equal(t, int64(256), info.ProcStat.Rss) - assert.Equal(t, int32(500), info.ProcStat.Utime) - assert.Equal(t, int32(200), info.ProcStat.Stime) + assert.Equal(t, int64(500), info.ProcStat.Utime) + assert.Equal(t, int64(200), info.ProcStat.Stime) assert.Equal(t, int32(3), info.ProcStat.Processor) assert.Equal(t, int64(100), info.ProcStat.DelayAcctBlkIoTicks) // ProcStatus populated. require.NotNil(t, info.ProcStatus) - assert.Equal(t, int32(pid), info.ProcStatus.Pid) + assert.Equal(t, int64(pid), info.ProcStatus.Pid) assert.Equal(t, "postgres", info.ProcStatus.Name) assert.Equal(t, int32(pid), info.ProcStatus.Tgid) assert.Equal(t, int64(2048*1024), info.ProcStatus.VmPeak) @@ -242,9 +242,9 @@ func TestGetProcInfo_MissingCmdline(t *testing.T) { // Other fields should still be populated. require.NotNil(t, info.ProcStat) - assert.Equal(t, int32(pid), info.ProcStat.Pid) + assert.Equal(t, int64(pid), info.ProcStat.Pid) require.NotNil(t, info.ProcStatus) - assert.Equal(t, int32(pid), info.ProcStatus.Pid) + assert.Equal(t, int64(pid), info.ProcStatus.Pid) require.NotNil(t, info.ProcIo) } diff --git a/internal/utils/procfs_test.go b/internal/utils/procfs_test.go index 5003ebd..0082f2d 100644 --- a/internal/utils/procfs_test.go +++ b/internal/utils/procfs_test.go @@ -67,23 +67,23 @@ func TestConvertProcStat(t *testing.T) { result := convertProcStat(src) require.NotNil(t, result) - assert.Equal(t, int32(42), result.Pid) + assert.Equal(t, int64(42), result.Pid) assert.Equal(t, "postgres", result.Comm) assert.Equal(t, "S", result.State) - assert.Equal(t, int32(1), result.Ppid) + assert.Equal(t, int64(1), result.Ppid) assert.Equal(t, int32(42), result.Pgrp) assert.Equal(t, int32(42), result.Session) assert.Equal(t, int32(0), result.Tty) assert.Equal(t, int32(-1), result.Tpgid) assert.Equal(t, int32(0x00400000), result.Flags) - assert.Equal(t, int32(100), result.MinFlt) - assert.Equal(t, int32(10), result.CminFlt) - assert.Equal(t, int32(5), result.MajFlt) - assert.Equal(t, int32(1), result.CmajFlt) - assert.Equal(t, int32(500), result.Utime) - assert.Equal(t, int32(200), result.Stime) - assert.Equal(t, int32(50), result.Cutime) - assert.Equal(t, int32(20), result.Cstime) + assert.Equal(t, int64(100), result.MinFlt) + assert.Equal(t, int64(10), result.CminFlt) + assert.Equal(t, int64(5), result.MajFlt) + assert.Equal(t, int64(1), result.CmajFlt) + assert.Equal(t, int64(500), result.Utime) + assert.Equal(t, int64(200), result.Stime) + assert.Equal(t, int64(50), result.Cutime) + assert.Equal(t, int64(20), result.Cstime) assert.Equal(t, int32(20), result.Priority) assert.Equal(t, int32(0), result.Nice) assert.Equal(t, int32(4), result.NumThreads) @@ -95,8 +95,8 @@ func TestConvertProcStat(t *testing.T) { assert.Equal(t, int32(0), result.RtPriority) assert.Equal(t, int32(0), result.Policy) assert.Equal(t, int64(100), result.DelayAcctBlkIoTicks) - assert.Equal(t, int32(0), result.GuestTime) - assert.Equal(t, int32(0), result.CguestTime) + assert.Equal(t, int64(0), result.GuestTime) + assert.Equal(t, int64(0), result.CguestTime) } func TestConvertProcStatus(t *testing.T) { @@ -132,7 +132,7 @@ func TestConvertProcStatus(t *testing.T) { result := convertProcStatus(src) require.NotNil(t, result) - assert.Equal(t, int32(42), result.Pid) + assert.Equal(t, int64(42), result.Pid) assert.Equal(t, "postgres", result.Name) assert.Equal(t, int32(42), result.Tgid) assert.Equal(t, []int64{42, 1}, result.NsPids)