diff --git a/internal/connector/connector.go b/internal/connector/connector.go index c5898902b..f1f9b5a7a 100644 --- a/internal/connector/connector.go +++ b/internal/connector/connector.go @@ -24,6 +24,7 @@ import ( connectorwrapperV1 "github.com/conductorone/baton-sdk/pb/c1/connector_wrapper/v1" ratelimitV1 "github.com/conductorone/baton-sdk/pb/c1/ratelimit/v1" tlsV1 "github.com/conductorone/baton-sdk/pb/c1/utls/v1" + "github.com/conductorone/baton-sdk/pkg/profiling" ratelimit2 "github.com/conductorone/baton-sdk/pkg/ratelimit" "github.com/conductorone/baton-sdk/pkg/types" "github.com/conductorone/baton-sdk/pkg/ugrpc" @@ -69,6 +70,8 @@ type wrapper struct { rlCfg *ratelimitV1.RateLimiterConfig rlDescriptors []*ratelimitV1.RateLimitDescriptors_Entry + profileCfg *connectorwrapperV1.ProfileConfig + now func() time.Time } @@ -124,6 +127,13 @@ func WithTargetedSyncResourceIDs(resourceIDs []string) Option { } } +func WithProfileConfig(cfg *connectorwrapperV1.ProfileConfig) Option { + return func(ctx context.Context, w *wrapper) error { + w.profileCfg = cfg + return nil + } +} + // NewConnectorWrapper returns a connector wrapper for running connector services locally. func NewWrapper(ctx context.Context, server interface{}, opts ...Option) (*wrapper, error) { connectorServer, isServer := server.(types.ConnectorServer) @@ -149,6 +159,17 @@ func NewWrapper(ctx context.Context, server interface{}, opts ...Option) (*wrapp func (cw *wrapper) Run(ctx context.Context, serverCfg *connectorwrapperV1.ServerConfig) error { logger := ctxzap.Extract(ctx) + // Start profiling if configured + profiler := profiling.New(serverCfg.ProfileConfig) + if profiler != nil { + logger.Info("starting profiling before GRPC server initialization") + if err := profiler.Start(ctx); err != nil { + logger.Error("failed to start profiling", zap.Error(err)) + return err + } + logger.Info("profiling started, GRPC server starting...") + } + l, err := cw.getListener(ctx, serverCfg) if err != nil { return err @@ -180,12 +201,21 @@ func (cw *wrapper) Run(ctx context.Context, serverCfg *connectorwrapperV1.Server return err } cw.rateLimiter = rl + + // Register profile service if profiling is enabled + if profiler != nil { + ps := &profileService{profiler: profiler} + connectorwrapperV1.RegisterProfileServiceServer(server, ps) + } + opts := &RegisterOps{ Ratelimiter: cw.rateLimiter, ProvisioningEnabled: cw.provisioningEnabled, TicketingEnabled: cw.ticketingEnabled, } Register(ctx, server, cw.server, opts) + + // Serve blocks until server stops return server.Serve(l) } @@ -205,6 +235,7 @@ func (cw *wrapper) runServer(ctx context.Context, serverCred *tlsV1.Credential) Credential: serverCred, RateLimiterConfig: cw.rlCfg, ListenPort: listenPort, + ProfileConfig: cw.profileCfg, }) if err != nil { return 0, err @@ -335,6 +366,29 @@ func (cw *wrapper) C(ctx context.Context) (types.ConnectorClient, error) { return cw.client, nil } +// FlushProfiles calls the ProfileService RPC to flush any active profiling data. +func (cw *wrapper) FlushProfiles(ctx context.Context) error { + cw.mtx.RLock() + conn := cw.conn + cw.mtx.RUnlock() + + if conn == nil { + return fmt.Errorf("no active connection") + } + + client := connectorwrapperV1.NewProfileServiceClient(conn) + resp, err := client.FlushProfiles(ctx, &connectorwrapperV1.FlushProfilesRequest{}) + if err != nil { + return err + } + + if !resp.Success { + return fmt.Errorf("profile flush failed: %s", resp.Error) + } + + return nil +} + // Close shuts down the grpc server and closes the connection. func (cw *wrapper) Close() error { cw.mtx.Lock() diff --git a/internal/connector/profile_service.go b/internal/connector/profile_service.go new file mode 100644 index 000000000..b4b6bf8a4 --- /dev/null +++ b/internal/connector/profile_service.go @@ -0,0 +1,50 @@ +package connector + +import ( + "context" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + + connectorwrapperV1 "github.com/conductorone/baton-sdk/pb/c1/connector_wrapper/v1" + "github.com/conductorone/baton-sdk/pkg/profiling" +) + +// profileService implements the ProfileService gRPC service. +type profileService struct { + connectorwrapperV1.UnimplementedProfileServiceServer + profiler *profiling.Profiler +} + +// FlushProfiles writes pending profile data to disk. +func (ps *profileService) FlushProfiles(ctx context.Context, req *connectorwrapperV1.FlushProfilesRequest) (*connectorwrapperV1.FlushProfilesResponse, error) { + l := ctxzap.Extract(ctx) + l.Info("FlushProfiles RPC called, stopping profiling and writing profiles") + + if ps.profiler == nil { + return &connectorwrapperV1.FlushProfilesResponse{ + Success: true, + }, nil + } + + // Stop CPU profiling to flush data + if err := ps.profiler.Stop(ctx); err != nil { + //nolint:nilerr // This should be nil, we're returning the error to the client + return &connectorwrapperV1.FlushProfilesResponse{ + Success: false, + Error: err.Error(), + }, nil + } + + // Write memory profile + if err := ps.profiler.WriteMemProfile(ctx); err != nil { + //nolint:nilerr // This should be nil, we're returning the error to the client + return &connectorwrapperV1.FlushProfilesResponse{ + Success: false, + Error: err.Error(), + }, nil + } + + return &connectorwrapperV1.FlushProfilesResponse{ + Success: true, + }, nil +} diff --git a/pb/c1/connector_wrapper/v1/connector_wrapper.pb.go b/pb/c1/connector_wrapper/v1/connector_wrapper.pb.go index ce416ac65..85b3165a7 100644 --- a/pb/c1/connector_wrapper/v1/connector_wrapper.pb.go +++ b/pb/c1/connector_wrapper/v1/connector_wrapper.pb.go @@ -23,18 +23,92 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +type ProfileConfig struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Directory to write profile files to. If empty, profiling is disabled. + // Filenames are generated as: cpu-{prefix}-{timestamp}.prof and mem-{prefix}-{timestamp}.prof + OutputDir string `protobuf:"bytes,1,opt,name=output_dir,json=outputDir,proto3" json:"output_dir,omitempty"` + // Enable CPU profiling + EnableCpu bool `protobuf:"varint,2,opt,name=enable_cpu,json=enableCpu,proto3" json:"enable_cpu,omitempty"` + // Enable memory profiling + EnableMem bool `protobuf:"varint,3,opt,name=enable_mem,json=enableMem,proto3" json:"enable_mem,omitempty"` + // Optional prefix for profile filenames (e.g., "child", "parent") + Prefix string `protobuf:"bytes,4,opt,name=prefix,proto3" json:"prefix,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ProfileConfig) Reset() { + *x = ProfileConfig{} + mi := &file_c1_connector_wrapper_v1_connector_wrapper_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ProfileConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ProfileConfig) ProtoMessage() {} + +func (x *ProfileConfig) ProtoReflect() protoreflect.Message { + mi := &file_c1_connector_wrapper_v1_connector_wrapper_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ProfileConfig.ProtoReflect.Descriptor instead. +func (*ProfileConfig) Descriptor() ([]byte, []int) { + return file_c1_connector_wrapper_v1_connector_wrapper_proto_rawDescGZIP(), []int{0} +} + +func (x *ProfileConfig) GetOutputDir() string { + if x != nil { + return x.OutputDir + } + return "" +} + +func (x *ProfileConfig) GetEnableCpu() bool { + if x != nil { + return x.EnableCpu + } + return false +} + +func (x *ProfileConfig) GetEnableMem() bool { + if x != nil { + return x.EnableMem + } + return false +} + +func (x *ProfileConfig) GetPrefix() string { + if x != nil { + return x.Prefix + } + return "" +} + type ServerConfig struct { state protoimpl.MessageState `protogen:"open.v1"` Credential *v1.Credential `protobuf:"bytes,1,opt,name=credential,proto3" json:"credential,omitempty"` RateLimiterConfig *v11.RateLimiterConfig `protobuf:"bytes,2,opt,name=rate_limiter_config,json=rateLimiterConfig,proto3" json:"rate_limiter_config,omitempty"` ListenPort uint32 `protobuf:"varint,3,opt,name=listen_port,json=listenPort,proto3" json:"listen_port,omitempty"` + ProfileConfig *ProfileConfig `protobuf:"bytes,4,opt,name=profile_config,json=profileConfig,proto3" json:"profile_config,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *ServerConfig) Reset() { *x = ServerConfig{} - mi := &file_c1_connector_wrapper_v1_connector_wrapper_proto_msgTypes[0] + mi := &file_c1_connector_wrapper_v1_connector_wrapper_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -46,7 +120,7 @@ func (x *ServerConfig) String() string { func (*ServerConfig) ProtoMessage() {} func (x *ServerConfig) ProtoReflect() protoreflect.Message { - mi := &file_c1_connector_wrapper_v1_connector_wrapper_proto_msgTypes[0] + mi := &file_c1_connector_wrapper_v1_connector_wrapper_proto_msgTypes[1] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -59,7 +133,7 @@ func (x *ServerConfig) ProtoReflect() protoreflect.Message { // Deprecated: Use ServerConfig.ProtoReflect.Descriptor instead. func (*ServerConfig) Descriptor() ([]byte, []int) { - return file_c1_connector_wrapper_v1_connector_wrapper_proto_rawDescGZIP(), []int{0} + return file_c1_connector_wrapper_v1_connector_wrapper_proto_rawDescGZIP(), []int{1} } func (x *ServerConfig) GetCredential() *v1.Credential { @@ -83,6 +157,101 @@ func (x *ServerConfig) GetListenPort() uint32 { return 0 } +func (x *ServerConfig) GetProfileConfig() *ProfileConfig { + if x != nil { + return x.ProfileConfig + } + return nil +} + +type FlushProfilesRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *FlushProfilesRequest) Reset() { + *x = FlushProfilesRequest{} + mi := &file_c1_connector_wrapper_v1_connector_wrapper_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *FlushProfilesRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FlushProfilesRequest) ProtoMessage() {} + +func (x *FlushProfilesRequest) ProtoReflect() protoreflect.Message { + mi := &file_c1_connector_wrapper_v1_connector_wrapper_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FlushProfilesRequest.ProtoReflect.Descriptor instead. +func (*FlushProfilesRequest) Descriptor() ([]byte, []int) { + return file_c1_connector_wrapper_v1_connector_wrapper_proto_rawDescGZIP(), []int{2} +} + +type FlushProfilesResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"` + Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *FlushProfilesResponse) Reset() { + *x = FlushProfilesResponse{} + mi := &file_c1_connector_wrapper_v1_connector_wrapper_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *FlushProfilesResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FlushProfilesResponse) ProtoMessage() {} + +func (x *FlushProfilesResponse) ProtoReflect() protoreflect.Message { + mi := &file_c1_connector_wrapper_v1_connector_wrapper_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FlushProfilesResponse.ProtoReflect.Descriptor instead. +func (*FlushProfilesResponse) Descriptor() ([]byte, []int) { + return file_c1_connector_wrapper_v1_connector_wrapper_proto_rawDescGZIP(), []int{3} +} + +func (x *FlushProfilesResponse) GetSuccess() bool { + if x != nil { + return x.Success + } + return false +} + +func (x *FlushProfilesResponse) GetError() string { + if x != nil { + return x.Error + } + return "" +} + var File_c1_connector_wrapper_v1_connector_wrapper_proto protoreflect.FileDescriptor var file_c1_connector_wrapper_v1_connector_wrapper_proto_rawDesc = string([]byte{ @@ -94,23 +263,51 @@ var file_c1_connector_wrapper_v1_connector_wrapper_proto_rawDesc = string([]byte 0x61, 0x74, 0x65, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x2f, 0x76, 0x31, 0x2f, 0x72, 0x61, 0x74, 0x65, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x14, 0x63, 0x31, 0x2f, 0x75, 0x74, 0x6c, 0x73, 0x2f, 0x76, 0x31, 0x2f, 0x74, 0x6c, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x22, 0xbb, 0x01, 0x0a, 0x0c, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, - 0x69, 0x67, 0x12, 0x36, 0x0a, 0x0a, 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63, 0x31, 0x2e, 0x75, 0x74, 0x6c, 0x73, - 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x52, 0x0a, - 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x12, 0x52, 0x0a, 0x13, 0x72, 0x61, - 0x74, 0x65, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, - 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x63, 0x31, 0x2e, 0x72, 0x61, 0x74, - 0x65, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, - 0x6d, 0x69, 0x74, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x11, 0x72, 0x61, 0x74, - 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x1f, - 0x0a, 0x0b, 0x6c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x6c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x50, 0x6f, 0x72, 0x74, 0x42, - 0x3e, 0x5a, 0x3c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, - 0x6e, 0x64, 0x75, 0x63, 0x74, 0x6f, 0x72, 0x6f, 0x6e, 0x65, 0x2f, 0x62, 0x61, 0x74, 0x6f, 0x6e, - 0x2d, 0x73, 0x64, 0x6b, 0x2f, 0x70, 0x62, 0x2f, 0x63, 0x31, 0x2f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, - 0x63, 0x74, 0x6f, 0x72, 0x5f, 0x77, 0x72, 0x61, 0x70, 0x70, 0x65, 0x72, 0x2f, 0x76, 0x31, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6f, 0x22, 0x84, 0x01, 0x0a, 0x0d, 0x50, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x43, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x12, 0x1d, 0x0a, 0x0a, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x5f, 0x64, 0x69, + 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x44, + 0x69, 0x72, 0x12, 0x1d, 0x0a, 0x0a, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x63, 0x70, 0x75, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x43, 0x70, + 0x75, 0x12, 0x1d, 0x0a, 0x0a, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6d, 0x65, 0x6d, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x4d, 0x65, 0x6d, + 0x12, 0x16, 0x0a, 0x06, 0x70, 0x72, 0x65, 0x66, 0x69, 0x78, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x06, 0x70, 0x72, 0x65, 0x66, 0x69, 0x78, 0x22, 0x8a, 0x02, 0x0a, 0x0c, 0x53, 0x65, 0x72, + 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x36, 0x0a, 0x0a, 0x63, 0x72, 0x65, + 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, + 0x63, 0x31, 0x2e, 0x75, 0x74, 0x6c, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x72, 0x65, 0x64, 0x65, + 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x52, 0x0a, 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, + 0x6c, 0x12, 0x52, 0x0a, 0x13, 0x72, 0x61, 0x74, 0x65, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x65, + 0x72, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, + 0x2e, 0x63, 0x31, 0x2e, 0x72, 0x61, 0x74, 0x65, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x2e, 0x76, 0x31, + 0x2e, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x52, 0x11, 0x72, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x65, 0x72, 0x43, + 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x1f, 0x0a, 0x0b, 0x6c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x5f, + 0x70, 0x6f, 0x72, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x6c, 0x69, 0x73, 0x74, + 0x65, 0x6e, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x4d, 0x0a, 0x0e, 0x70, 0x72, 0x6f, 0x66, 0x69, 0x6c, + 0x65, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x26, + 0x2e, 0x63, 0x31, 0x2e, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x5f, 0x77, 0x72, + 0x61, 0x70, 0x70, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, + 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0d, 0x70, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x43, + 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x22, 0x16, 0x0a, 0x14, 0x46, 0x6c, 0x75, 0x73, 0x68, 0x50, 0x72, + 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x47, 0x0a, + 0x15, 0x46, 0x6c, 0x75, 0x73, 0x68, 0x50, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x73, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, + 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, + 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x32, 0x80, 0x01, 0x0a, 0x0e, 0x50, 0x72, 0x6f, 0x66, 0x69, + 0x6c, 0x65, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x6e, 0x0a, 0x0d, 0x46, 0x6c, 0x75, + 0x73, 0x68, 0x50, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x73, 0x12, 0x2d, 0x2e, 0x63, 0x31, 0x2e, + 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x5f, 0x77, 0x72, 0x61, 0x70, 0x70, 0x65, + 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x46, 0x6c, 0x75, 0x73, 0x68, 0x50, 0x72, 0x6f, 0x66, 0x69, 0x6c, + 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2e, 0x2e, 0x63, 0x31, 0x2e, 0x63, + 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x5f, 0x77, 0x72, 0x61, 0x70, 0x70, 0x65, 0x72, + 0x2e, 0x76, 0x31, 0x2e, 0x46, 0x6c, 0x75, 0x73, 0x68, 0x50, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, + 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x3e, 0x5a, 0x3c, 0x67, 0x69, 0x74, + 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e, 0x64, 0x75, 0x63, 0x74, 0x6f, + 0x72, 0x6f, 0x6e, 0x65, 0x2f, 0x62, 0x61, 0x74, 0x6f, 0x6e, 0x2d, 0x73, 0x64, 0x6b, 0x2f, 0x70, + 0x62, 0x2f, 0x63, 0x31, 0x2f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x5f, 0x77, + 0x72, 0x61, 0x70, 0x70, 0x65, 0x72, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, }) var ( @@ -125,20 +322,26 @@ func file_c1_connector_wrapper_v1_connector_wrapper_proto_rawDescGZIP() []byte { return file_c1_connector_wrapper_v1_connector_wrapper_proto_rawDescData } -var file_c1_connector_wrapper_v1_connector_wrapper_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_c1_connector_wrapper_v1_connector_wrapper_proto_msgTypes = make([]protoimpl.MessageInfo, 4) var file_c1_connector_wrapper_v1_connector_wrapper_proto_goTypes = []any{ - (*ServerConfig)(nil), // 0: c1.connector_wrapper.v1.ServerConfig - (*v1.Credential)(nil), // 1: c1.utls.v1.Credential - (*v11.RateLimiterConfig)(nil), // 2: c1.ratelimit.v1.RateLimiterConfig + (*ProfileConfig)(nil), // 0: c1.connector_wrapper.v1.ProfileConfig + (*ServerConfig)(nil), // 1: c1.connector_wrapper.v1.ServerConfig + (*FlushProfilesRequest)(nil), // 2: c1.connector_wrapper.v1.FlushProfilesRequest + (*FlushProfilesResponse)(nil), // 3: c1.connector_wrapper.v1.FlushProfilesResponse + (*v1.Credential)(nil), // 4: c1.utls.v1.Credential + (*v11.RateLimiterConfig)(nil), // 5: c1.ratelimit.v1.RateLimiterConfig } var file_c1_connector_wrapper_v1_connector_wrapper_proto_depIdxs = []int32{ - 1, // 0: c1.connector_wrapper.v1.ServerConfig.credential:type_name -> c1.utls.v1.Credential - 2, // 1: c1.connector_wrapper.v1.ServerConfig.rate_limiter_config:type_name -> c1.ratelimit.v1.RateLimiterConfig - 2, // [2:2] is the sub-list for method output_type - 2, // [2:2] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 4, // 0: c1.connector_wrapper.v1.ServerConfig.credential:type_name -> c1.utls.v1.Credential + 5, // 1: c1.connector_wrapper.v1.ServerConfig.rate_limiter_config:type_name -> c1.ratelimit.v1.RateLimiterConfig + 0, // 2: c1.connector_wrapper.v1.ServerConfig.profile_config:type_name -> c1.connector_wrapper.v1.ProfileConfig + 2, // 3: c1.connector_wrapper.v1.ProfileService.FlushProfiles:input_type -> c1.connector_wrapper.v1.FlushProfilesRequest + 3, // 4: c1.connector_wrapper.v1.ProfileService.FlushProfiles:output_type -> c1.connector_wrapper.v1.FlushProfilesResponse + 4, // [4:5] is the sub-list for method output_type + 3, // [3:4] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name } func init() { file_c1_connector_wrapper_v1_connector_wrapper_proto_init() } @@ -152,9 +355,9 @@ func file_c1_connector_wrapper_v1_connector_wrapper_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_c1_connector_wrapper_v1_connector_wrapper_proto_rawDesc), len(file_c1_connector_wrapper_v1_connector_wrapper_proto_rawDesc)), NumEnums: 0, - NumMessages: 1, + NumMessages: 4, NumExtensions: 0, - NumServices: 0, + NumServices: 1, }, GoTypes: file_c1_connector_wrapper_v1_connector_wrapper_proto_goTypes, DependencyIndexes: file_c1_connector_wrapper_v1_connector_wrapper_proto_depIdxs, diff --git a/pb/c1/connector_wrapper/v1/connector_wrapper.pb.validate.go b/pb/c1/connector_wrapper/v1/connector_wrapper.pb.validate.go index 35cc1f0d0..363b23015 100644 --- a/pb/c1/connector_wrapper/v1/connector_wrapper.pb.validate.go +++ b/pb/c1/connector_wrapper/v1/connector_wrapper.pb.validate.go @@ -35,6 +35,114 @@ var ( _ = sort.Sort ) +// Validate checks the field values on ProfileConfig with the rules defined in +// the proto definition for this message. If any rules are violated, the first +// error encountered is returned, or nil if there are no violations. +func (m *ProfileConfig) Validate() error { + return m.validate(false) +} + +// ValidateAll checks the field values on ProfileConfig with the rules defined +// in the proto definition for this message. If any rules are violated, the +// result is a list of violation errors wrapped in ProfileConfigMultiError, or +// nil if none found. +func (m *ProfileConfig) ValidateAll() error { + return m.validate(true) +} + +func (m *ProfileConfig) validate(all bool) error { + if m == nil { + return nil + } + + var errors []error + + // no validation rules for OutputDir + + // no validation rules for EnableCpu + + // no validation rules for EnableMem + + // no validation rules for Prefix + + if len(errors) > 0 { + return ProfileConfigMultiError(errors) + } + + return nil +} + +// ProfileConfigMultiError is an error wrapping multiple validation errors +// returned by ProfileConfig.ValidateAll() if the designated constraints +// aren't met. +type ProfileConfigMultiError []error + +// Error returns a concatenation of all the error messages it wraps. +func (m ProfileConfigMultiError) Error() string { + msgs := make([]string, 0, len(m)) + for _, err := range m { + msgs = append(msgs, err.Error()) + } + return strings.Join(msgs, "; ") +} + +// AllErrors returns a list of validation violation errors. +func (m ProfileConfigMultiError) AllErrors() []error { return m } + +// ProfileConfigValidationError is the validation error returned by +// ProfileConfig.Validate if the designated constraints aren't met. +type ProfileConfigValidationError struct { + field string + reason string + cause error + key bool +} + +// Field function returns field value. +func (e ProfileConfigValidationError) Field() string { return e.field } + +// Reason function returns reason value. +func (e ProfileConfigValidationError) Reason() string { return e.reason } + +// Cause function returns cause value. +func (e ProfileConfigValidationError) Cause() error { return e.cause } + +// Key function returns key value. +func (e ProfileConfigValidationError) Key() bool { return e.key } + +// ErrorName returns error name. +func (e ProfileConfigValidationError) ErrorName() string { return "ProfileConfigValidationError" } + +// Error satisfies the builtin error interface +func (e ProfileConfigValidationError) Error() string { + cause := "" + if e.cause != nil { + cause = fmt.Sprintf(" | caused by: %v", e.cause) + } + + key := "" + if e.key { + key = "key for " + } + + return fmt.Sprintf( + "invalid %sProfileConfig.%s: %s%s", + key, + e.field, + e.reason, + cause) +} + +var _ error = ProfileConfigValidationError{} + +var _ interface { + Field() string + Reason() string + Key() bool + Cause() error + ErrorName() string +} = ProfileConfigValidationError{} + // Validate checks the field values on ServerConfig with the rules defined in // the proto definition for this message. If any rules are violated, the first // error encountered is returned, or nil if there are no violations. @@ -117,6 +225,35 @@ func (m *ServerConfig) validate(all bool) error { // no validation rules for ListenPort + if all { + switch v := interface{}(m.GetProfileConfig()).(type) { + case interface{ ValidateAll() error }: + if err := v.ValidateAll(); err != nil { + errors = append(errors, ServerConfigValidationError{ + field: "ProfileConfig", + reason: "embedded message failed validation", + cause: err, + }) + } + case interface{ Validate() error }: + if err := v.Validate(); err != nil { + errors = append(errors, ServerConfigValidationError{ + field: "ProfileConfig", + reason: "embedded message failed validation", + cause: err, + }) + } + } + } else if v, ok := interface{}(m.GetProfileConfig()).(interface{ Validate() error }); ok { + if err := v.Validate(); err != nil { + return ServerConfigValidationError{ + field: "ProfileConfig", + reason: "embedded message failed validation", + cause: err, + } + } + } + if len(errors) > 0 { return ServerConfigMultiError(errors) } @@ -193,3 +330,211 @@ var _ interface { Cause() error ErrorName() string } = ServerConfigValidationError{} + +// Validate checks the field values on FlushProfilesRequest with the rules +// defined in the proto definition for this message. If any rules are +// violated, the first error encountered is returned, or nil if there are no violations. +func (m *FlushProfilesRequest) Validate() error { + return m.validate(false) +} + +// ValidateAll checks the field values on FlushProfilesRequest with the rules +// defined in the proto definition for this message. If any rules are +// violated, the result is a list of violation errors wrapped in +// FlushProfilesRequestMultiError, or nil if none found. +func (m *FlushProfilesRequest) ValidateAll() error { + return m.validate(true) +} + +func (m *FlushProfilesRequest) validate(all bool) error { + if m == nil { + return nil + } + + var errors []error + + if len(errors) > 0 { + return FlushProfilesRequestMultiError(errors) + } + + return nil +} + +// FlushProfilesRequestMultiError is an error wrapping multiple validation +// errors returned by FlushProfilesRequest.ValidateAll() if the designated +// constraints aren't met. +type FlushProfilesRequestMultiError []error + +// Error returns a concatenation of all the error messages it wraps. +func (m FlushProfilesRequestMultiError) Error() string { + msgs := make([]string, 0, len(m)) + for _, err := range m { + msgs = append(msgs, err.Error()) + } + return strings.Join(msgs, "; ") +} + +// AllErrors returns a list of validation violation errors. +func (m FlushProfilesRequestMultiError) AllErrors() []error { return m } + +// FlushProfilesRequestValidationError is the validation error returned by +// FlushProfilesRequest.Validate if the designated constraints aren't met. +type FlushProfilesRequestValidationError struct { + field string + reason string + cause error + key bool +} + +// Field function returns field value. +func (e FlushProfilesRequestValidationError) Field() string { return e.field } + +// Reason function returns reason value. +func (e FlushProfilesRequestValidationError) Reason() string { return e.reason } + +// Cause function returns cause value. +func (e FlushProfilesRequestValidationError) Cause() error { return e.cause } + +// Key function returns key value. +func (e FlushProfilesRequestValidationError) Key() bool { return e.key } + +// ErrorName returns error name. +func (e FlushProfilesRequestValidationError) ErrorName() string { + return "FlushProfilesRequestValidationError" +} + +// Error satisfies the builtin error interface +func (e FlushProfilesRequestValidationError) Error() string { + cause := "" + if e.cause != nil { + cause = fmt.Sprintf(" | caused by: %v", e.cause) + } + + key := "" + if e.key { + key = "key for " + } + + return fmt.Sprintf( + "invalid %sFlushProfilesRequest.%s: %s%s", + key, + e.field, + e.reason, + cause) +} + +var _ error = FlushProfilesRequestValidationError{} + +var _ interface { + Field() string + Reason() string + Key() bool + Cause() error + ErrorName() string +} = FlushProfilesRequestValidationError{} + +// Validate checks the field values on FlushProfilesResponse with the rules +// defined in the proto definition for this message. If any rules are +// violated, the first error encountered is returned, or nil if there are no violations. +func (m *FlushProfilesResponse) Validate() error { + return m.validate(false) +} + +// ValidateAll checks the field values on FlushProfilesResponse with the rules +// defined in the proto definition for this message. If any rules are +// violated, the result is a list of violation errors wrapped in +// FlushProfilesResponseMultiError, or nil if none found. +func (m *FlushProfilesResponse) ValidateAll() error { + return m.validate(true) +} + +func (m *FlushProfilesResponse) validate(all bool) error { + if m == nil { + return nil + } + + var errors []error + + // no validation rules for Success + + // no validation rules for Error + + if len(errors) > 0 { + return FlushProfilesResponseMultiError(errors) + } + + return nil +} + +// FlushProfilesResponseMultiError is an error wrapping multiple validation +// errors returned by FlushProfilesResponse.ValidateAll() if the designated +// constraints aren't met. +type FlushProfilesResponseMultiError []error + +// Error returns a concatenation of all the error messages it wraps. +func (m FlushProfilesResponseMultiError) Error() string { + msgs := make([]string, 0, len(m)) + for _, err := range m { + msgs = append(msgs, err.Error()) + } + return strings.Join(msgs, "; ") +} + +// AllErrors returns a list of validation violation errors. +func (m FlushProfilesResponseMultiError) AllErrors() []error { return m } + +// FlushProfilesResponseValidationError is the validation error returned by +// FlushProfilesResponse.Validate if the designated constraints aren't met. +type FlushProfilesResponseValidationError struct { + field string + reason string + cause error + key bool +} + +// Field function returns field value. +func (e FlushProfilesResponseValidationError) Field() string { return e.field } + +// Reason function returns reason value. +func (e FlushProfilesResponseValidationError) Reason() string { return e.reason } + +// Cause function returns cause value. +func (e FlushProfilesResponseValidationError) Cause() error { return e.cause } + +// Key function returns key value. +func (e FlushProfilesResponseValidationError) Key() bool { return e.key } + +// ErrorName returns error name. +func (e FlushProfilesResponseValidationError) ErrorName() string { + return "FlushProfilesResponseValidationError" +} + +// Error satisfies the builtin error interface +func (e FlushProfilesResponseValidationError) Error() string { + cause := "" + if e.cause != nil { + cause = fmt.Sprintf(" | caused by: %v", e.cause) + } + + key := "" + if e.key { + key = "key for " + } + + return fmt.Sprintf( + "invalid %sFlushProfilesResponse.%s: %s%s", + key, + e.field, + e.reason, + cause) +} + +var _ error = FlushProfilesResponseValidationError{} + +var _ interface { + Field() string + Reason() string + Key() bool + Cause() error + ErrorName() string +} = FlushProfilesResponseValidationError{} diff --git a/pb/c1/connector_wrapper/v1/connector_wrapper_grpc.pb.go b/pb/c1/connector_wrapper/v1/connector_wrapper_grpc.pb.go new file mode 100644 index 000000000..ecdcd6960 --- /dev/null +++ b/pb/c1/connector_wrapper/v1/connector_wrapper_grpc.pb.go @@ -0,0 +1,123 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.5.1 +// - protoc (unknown) +// source: c1/connector_wrapper/v1/connector_wrapper.proto + +package v1 + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + ProfileService_FlushProfiles_FullMethodName = "/c1.connector_wrapper.v1.ProfileService/FlushProfiles" +) + +// ProfileServiceClient is the client API for ProfileService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type ProfileServiceClient interface { + // FlushProfiles writes any pending profile data to disk. + // Should be called before shutting down the server to ensure data is persisted. + FlushProfiles(ctx context.Context, in *FlushProfilesRequest, opts ...grpc.CallOption) (*FlushProfilesResponse, error) +} + +type profileServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewProfileServiceClient(cc grpc.ClientConnInterface) ProfileServiceClient { + return &profileServiceClient{cc} +} + +func (c *profileServiceClient) FlushProfiles(ctx context.Context, in *FlushProfilesRequest, opts ...grpc.CallOption) (*FlushProfilesResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(FlushProfilesResponse) + err := c.cc.Invoke(ctx, ProfileService_FlushProfiles_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// ProfileServiceServer is the server API for ProfileService service. +// All implementations should embed UnimplementedProfileServiceServer +// for forward compatibility. +type ProfileServiceServer interface { + // FlushProfiles writes any pending profile data to disk. + // Should be called before shutting down the server to ensure data is persisted. + FlushProfiles(context.Context, *FlushProfilesRequest) (*FlushProfilesResponse, error) +} + +// UnimplementedProfileServiceServer should be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedProfileServiceServer struct{} + +func (UnimplementedProfileServiceServer) FlushProfiles(context.Context, *FlushProfilesRequest) (*FlushProfilesResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method FlushProfiles not implemented") +} +func (UnimplementedProfileServiceServer) testEmbeddedByValue() {} + +// UnsafeProfileServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to ProfileServiceServer will +// result in compilation errors. +type UnsafeProfileServiceServer interface { + mustEmbedUnimplementedProfileServiceServer() +} + +func RegisterProfileServiceServer(s grpc.ServiceRegistrar, srv ProfileServiceServer) { + // If the following call pancis, it indicates UnimplementedProfileServiceServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&ProfileService_ServiceDesc, srv) +} + +func _ProfileService_FlushProfiles_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(FlushProfilesRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ProfileServiceServer).FlushProfiles(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: ProfileService_FlushProfiles_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ProfileServiceServer).FlushProfiles(ctx, req.(*FlushProfilesRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// ProfileService_ServiceDesc is the grpc.ServiceDesc for ProfileService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var ProfileService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "c1.connector_wrapper.v1.ProfileService", + HandlerType: (*ProfileServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "FlushProfiles", + Handler: _ProfileService_FlushProfiles_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "c1/connector_wrapper/v1/connector_wrapper.proto", +} diff --git a/pkg/cli/commands.go b/pkg/cli/commands.go index 3f728655b..d94a89684 100644 --- a/pkg/cli/commands.go +++ b/pkg/cli/commands.go @@ -21,7 +21,7 @@ import ( "github.com/conductorone/baton-sdk/internal/connector" v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" - v1 "github.com/conductorone/baton-sdk/pb/c1/connector_wrapper/v1" + connectorwrapperV1 "github.com/conductorone/baton-sdk/pb/c1/connector_wrapper/v1" "github.com/conductorone/baton-sdk/pkg/connectorrunner" "github.com/conductorone/baton-sdk/pkg/crypto" "github.com/conductorone/baton-sdk/pkg/field" @@ -299,6 +299,47 @@ func MakeMainCommand[T field.Configurable]( opts = append(opts, connectorrunner.WithSkipEntitlementsAndGrants(v.GetBool("skip-entitlements-and-grants"))) + // Configure profiling if requested + enableCPU := v.GetBool("profile-cpu") + enableMem := v.GetBool("profile-mem") + enableParent := v.GetBool("profile-parent") + if enableCPU || enableMem { + profileDir := v.GetString("profile-dir") + if profileDir == "" { + // Default to current working directory + wd, err := os.Getwd() + if err != nil { + return fmt.Errorf("failed to get current working directory for profiling: %w", err) + } + profileDir = wd + } + + // Set prefix for child process if parent profiling is enabled + childPrefix := "" + if enableParent { + childPrefix = "child" + } + + profileCfg := &connectorwrapperV1.ProfileConfig{ + OutputDir: profileDir, + EnableCpu: enableCPU, + EnableMem: enableMem, + Prefix: childPrefix, + } + opts = append(opts, connectorrunner.WithProfileConfig(profileCfg)) + + // Enable parent process profiling if requested + if enableParent { + parentProfileCfg := &connectorwrapperV1.ProfileConfig{ + OutputDir: profileDir, + EnableCpu: enableCPU, + EnableMem: enableMem, + Prefix: "parent", + } + opts = append(opts, connectorrunner.WithParentProfileConfig(parentProfileCfg)) + } + } + t, err := MakeGenericConfiguration[T](v) if err != nil { return fmt.Errorf("failed to make configuration: %w", err) @@ -511,7 +552,7 @@ func MakeGRPCServerCommand[T field.Configurable]( return fmt.Errorf("unexpected empty input") } - serverCfg := &v1.ServerConfig{} + serverCfg := &connectorwrapperV1.ServerConfig{} err = proto.Unmarshal(cfgBytes, serverCfg) if err != nil { return err diff --git a/pkg/connectorrunner/runner.go b/pkg/connectorrunner/runner.go index a0b76df3a..f85656c94 100644 --- a/pkg/connectorrunner/runner.go +++ b/pkg/connectorrunner/runner.go @@ -21,12 +21,14 @@ import ( v1 "github.com/conductorone/baton-sdk/pb/c1/connectorapi/baton/v1" ratelimitV1 "github.com/conductorone/baton-sdk/pb/c1/ratelimit/v1" + "github.com/conductorone/baton-sdk/pkg/profiling" "github.com/conductorone/baton-sdk/pkg/tasks" "github.com/conductorone/baton-sdk/pkg/tasks/c1api" "github.com/conductorone/baton-sdk/pkg/tasks/local" "github.com/conductorone/baton-sdk/pkg/types" "github.com/conductorone/baton-sdk/internal/connector" + connectorwrapperV1 "github.com/conductorone/baton-sdk/pb/c1/connector_wrapper/v1" ) const ( @@ -35,10 +37,11 @@ const ( ) type connectorRunner struct { - cw types.ClientWrapper - oneShot bool - tasks tasks.Manager - debugFile *os.File + cw types.ClientWrapper + oneShot bool + tasks tasks.Manager + debugFile *os.File + parentProfiler *profiling.Profiler } var ErrSigTerm = errors.New("context cancelled by process shutdown") @@ -94,6 +97,21 @@ func (c *connectorRunner) Run(ctx context.Context) error { } }() + // Start parent profiling if configured + if c.parentProfiler != nil { + if err := c.parentProfiler.Start(ctx); err != nil { + l.Warn("failed to start parent profiling", zap.Error(err)) + } + defer func() { + if err := c.parentProfiler.Stop(ctx); err != nil { + l.Warn("failed to stop parent CPU profiling", zap.Error(err)) + } + if err := c.parentProfiler.WriteMemProfile(ctx); err != nil { + l.Warn("failed to write parent memory profile", zap.Error(err)) + } + }() + } + err := c.run(ctx) if err != nil { return err @@ -133,6 +151,10 @@ func (c *connectorRunner) processTask(ctx context.Context, task *v1.Task) error return nil } +func (c *connectorRunner) flushProfiles(ctx context.Context) error { + return c.cw.FlushProfiles(ctx) +} + func (c *connectorRunner) backoff(ctx context.Context, errCount int) time.Duration { waitDuration := time.Duration(errCount*errCount) * time.Second if waitDuration > time.Minute { @@ -184,6 +206,10 @@ func (c *connectorRunner) run(ctx context.Context) error { l.Debug("runner: no tasks to process", zap.Duration("wait_duration", waitDuration)) if c.oneShot { l.Debug("runner: one-shot mode enabled. Exiting.") + // Flush profiles before exiting + if err := c.flushProfiles(ctx); err != nil { + l.Warn("failed to flush profiles", zap.Error(err)) + } return nil } continue @@ -344,6 +370,8 @@ type runnerConfig struct { externalResourceC1Z string externalResourceEntitlementIdFilter string skipEntitlementsAndGrants bool + profileConfig *connectorwrapperV1.ProfileConfig + parentProfileConfig *connectorwrapperV1.ProfileConfig } // WithRateLimiterConfig sets the RateLimiterConfig for a runner. @@ -649,6 +677,20 @@ func WithSkipEntitlementsAndGrants(skip bool) Option { } } +func WithProfileConfig(profileCfg *connectorwrapperV1.ProfileConfig) Option { + return func(ctx context.Context, cfg *runnerConfig) error { + cfg.profileConfig = profileCfg + return nil + } +} + +func WithParentProfileConfig(profileCfg *connectorwrapperV1.ProfileConfig) Option { + return func(ctx context.Context, cfg *runnerConfig) error { + cfg.parentProfileConfig = profileCfg + return nil + } +} + // NewConnectorRunner creates a new connector runner. func NewConnectorRunner(ctx context.Context, c types.ConnectorServer, opts ...Option) (*connectorRunner, error) { runner := &connectorRunner{} @@ -684,6 +726,10 @@ func NewConnectorRunner(ctx context.Context, c types.ConnectorServer, opts ...Op wrapperOpts = append(wrapperOpts, connector.WithTargetedSyncResourceIDs(cfg.targetedSyncResourceIDs)) } + if cfg.profileConfig != nil { + wrapperOpts = append(wrapperOpts, connector.WithProfileConfig(cfg.profileConfig)) + } + cw, err := connector.NewWrapper(ctx, c, wrapperOpts...) if err != nil { return nil, err @@ -691,6 +737,11 @@ func NewConnectorRunner(ctx context.Context, c types.ConnectorServer, opts ...Op runner.cw = cw + // Initialize parent profiler if configured + if cfg.parentProfileConfig != nil { + runner.parentProfiler = profiling.New(cfg.parentProfileConfig) + } + if cfg.onDemand { if cfg.c1zPath == "" && cfg.eventFeedConfig == nil && cfg.createTicketConfig == nil && cfg.listTicketSchemasConfig == nil && cfg.getTicketConfig == nil && cfg.bulkCreateTicketConfig == nil { return nil, errors.New("c1zPath must be set when in on-demand mode") diff --git a/pkg/field/defaults.go b/pkg/field/defaults.go index 8e92b832a..8d7c8df32 100644 --- a/pkg/field/defaults.go +++ b/pkg/field/defaults.go @@ -149,6 +149,27 @@ var ( WithExportTarget(ExportTargetNone), ) + profileDirField = StringField("profile-dir", + WithDescription("Directory to write profile files to. Defaults to current working directory. Generates cpu-TIMESTAMP.prof and mem-TIMESTAMP.prof"), + WithPersistent(true), + WithExportTarget(ExportTargetNone), + ) + profileCPUField = BoolField("profile-cpu", + WithDescription("Enable CPU profiling"), + WithPersistent(true), + WithExportTarget(ExportTargetNone), + ) + profileMemField = BoolField("profile-mem", + WithDescription("Enable memory profiling"), + WithPersistent(true), + WithExportTarget(ExportTargetNone), + ) + profileParentField = BoolField("profile-parent", + WithDescription("Enable profiling of the parent process in addition to the child process"), + WithPersistent(true), + WithExportTarget(ExportTargetNone), + ) + otelCollectorEndpoint = StringField(OtelCollectorEndpointFieldName, WithDescription("The endpoint of the OpenTelemetry collector to send observability data to (used for both tracing and logging if specific endpoints are not provided)"), WithPersistent(true), WithExportTarget(ExportTargetOps)) @@ -269,6 +290,10 @@ var DefaultFields = []SchemaField{ compactSyncsField, invokeActionField, invokeActionArgsField, + profileDirField, + profileCPUField, + profileMemField, + profileParentField, otelCollectorEndpoint, otelCollectorEndpointTLSCertPath, diff --git a/pkg/profiling/profiling.go b/pkg/profiling/profiling.go new file mode 100644 index 000000000..17a149034 --- /dev/null +++ b/pkg/profiling/profiling.go @@ -0,0 +1,145 @@ +package profiling + +import ( + "context" + "fmt" + "os" + "path/filepath" + "runtime/pprof" + "time" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "go.uber.org/zap" + + connectorwrapperV1 "github.com/conductorone/baton-sdk/pb/c1/connector_wrapper/v1" +) + +// Profiler manages CPU and memory profiling. +type Profiler struct { + cpuFile *os.File + cpuFilePath string + memFilePath string + cfg *connectorwrapperV1.ProfileConfig +} + +// New creates a new Profiler from the given configuration. +// Filenames are generated with a timestamp: cpu-YYYYMMDD-HHMMSS.prof and mem-YYYYMMDD-HHMMSS.prof +// If cfg.Prefix is set, filenames will be: cpu-{prefix}-YYYYMMDD-HHMMSS.prof. +func New(cfg *connectorwrapperV1.ProfileConfig) *Profiler { + if cfg == nil || (!cfg.EnableCpu && !cfg.EnableMem) { + return nil + } + + // Use prefix from config if provided + prefix := cfg.Prefix + return NewWithPrefix(cfg, prefix) +} + +// NewWithPrefix creates a new Profiler with a custom prefix for filenames. +// If prefix is "parent", filenames will be: cpu-parent-YYYYMMDD-HHMMSS.prof and mem-parent-YYYYMMDD-HHMMSS.prof. +func NewWithPrefix(cfg *connectorwrapperV1.ProfileConfig, prefix string) *Profiler { + if cfg == nil || (!cfg.EnableCpu && !cfg.EnableMem) { + return nil + } + + // Default to current working directory if not specified + outputDir := cfg.OutputDir + if outputDir == "" { + wd, err := os.Getwd() + if err != nil { + // If we can't get CWD, return nil to disable profiling + return nil + } + outputDir = wd + } + + timestamp := time.Now().Format("20060102-150405") + + // Generate filenames with optional prefix + cpuFilename := "cpu" + memFilename := "mem" + if prefix != "" { + cpuFilename = fmt.Sprintf("cpu-%s", prefix) + memFilename = fmt.Sprintf("mem-%s", prefix) + } + + return &Profiler{ + cfg: cfg, + cpuFilePath: filepath.Join(outputDir, fmt.Sprintf("%s-%s.prof", cpuFilename, timestamp)), + memFilePath: filepath.Join(outputDir, fmt.Sprintf("%s-%s.prof", memFilename, timestamp)), + } +} + +// Start begins CPU profiling if configured. Returns an error if profiling fails to start. +func (p *Profiler) Start(ctx context.Context) error { + if p == nil || !p.cfg.EnableCpu { + return nil + } + + l := ctxzap.Extract(ctx) + + // Ensure output directory exists + outputDir := filepath.Dir(p.cpuFilePath) + if err := os.MkdirAll(outputDir, 0755); err != nil { + return fmt.Errorf("failed to create profile output directory: %w", err) + } + + // Start CPU profiling + f, err := os.Create(p.cpuFilePath) + if err != nil { + return err + } + p.cpuFile = f + + if err := pprof.StartCPUProfile(f); err != nil { + _ = f.Close() + return err + } + + l.Info("CPU profiling started", zap.String("output_path", p.cpuFilePath)) + return nil +} + +// Stop stops CPU profiling. +func (p *Profiler) Stop(ctx context.Context) error { + if p == nil || p.cpuFile == nil { + return nil + } + + l := ctxzap.Extract(ctx) + + pprof.StopCPUProfile() + if err := p.cpuFile.Close(); err != nil { + l.Error("failed to close CPU profile file", zap.Error(err)) + return err + } + + l.Info("CPU profile written", zap.String("path", p.cpuFilePath)) + p.cpuFile = nil + return nil +} + +// WriteMemProfile writes a memory profile to disk. Should be called when you want to +// capture memory state (e.g., after main work completes but before cleanup). +func (p *Profiler) WriteMemProfile(ctx context.Context) error { + if p == nil || !p.cfg.EnableMem { + return nil + } + + l := ctxzap.Extract(ctx) + + f, err := os.Create(p.memFilePath) + if err != nil { + l.Error("failed to create memory profile file", zap.Error(err)) + return err + } + defer f.Close() + + if err := pprof.WriteHeapProfile(f); err != nil { + l.Error("failed to write memory profile", zap.Error(err)) + return err + } + + l.Info("Memory profile written", zap.String("path", p.memFilePath)) + return nil +} diff --git a/pkg/types/types.go b/pkg/types/types.go index fc52f6f4e..0270e997a 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -49,5 +49,6 @@ type ConnectorClient interface { type ClientWrapper interface { C(ctx context.Context) (ConnectorClient, error) Run(ctx context.Context, cfg *connectorwrapperV1.ServerConfig) error + FlushProfiles(ctx context.Context) error Close() error } diff --git a/proto/c1/connector_wrapper/v1/connector_wrapper.proto b/proto/c1/connector_wrapper/v1/connector_wrapper.proto index 67c143faa..a34bf98e7 100644 --- a/proto/c1/connector_wrapper/v1/connector_wrapper.proto +++ b/proto/c1/connector_wrapper/v1/connector_wrapper.proto @@ -7,8 +7,34 @@ import "c1/utls/v1/tls.proto"; option go_package = "github.com/conductorone/baton-sdk/pb/c1/connector_wrapper/v1"; +message ProfileConfig { + // Directory to write profile files to. If empty, profiling is disabled. + // Filenames are generated as: cpu-{prefix}-{timestamp}.prof and mem-{prefix}-{timestamp}.prof + string output_dir = 1; + // Enable CPU profiling + bool enable_cpu = 2; + // Enable memory profiling + bool enable_mem = 3; + // Optional prefix for profile filenames (e.g., "child", "parent") + string prefix = 4; +} + message ServerConfig { utls.v1.Credential credential = 1; ratelimit.v1.RateLimiterConfig rate_limiter_config = 2; uint32 listen_port = 3; + ProfileConfig profile_config = 4; +} + +service ProfileService { + // FlushProfiles writes any pending profile data to disk. + // Should be called before shutting down the server to ensure data is persisted. + rpc FlushProfiles(FlushProfilesRequest) returns (FlushProfilesResponse); +} + +message FlushProfilesRequest {} + +message FlushProfilesResponse { + bool success = 1; + string error = 2; }