Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions internal/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
connectorwrapperV1 "github.com/conductorone/baton-sdk/pb/c1/connector_wrapper/v1"
ratelimitV1 "github.com/conductorone/baton-sdk/pb/c1/ratelimit/v1"
tlsV1 "github.com/conductorone/baton-sdk/pb/c1/utls/v1"
"github.com/conductorone/baton-sdk/pkg/profiling"
ratelimit2 "github.com/conductorone/baton-sdk/pkg/ratelimit"
"github.com/conductorone/baton-sdk/pkg/types"
"github.com/conductorone/baton-sdk/pkg/ugrpc"
Expand Down Expand Up @@ -69,6 +70,8 @@ type wrapper struct {
rlCfg *ratelimitV1.RateLimiterConfig
rlDescriptors []*ratelimitV1.RateLimitDescriptors_Entry

profileCfg *connectorwrapperV1.ProfileConfig

now func() time.Time
}

Expand Down Expand Up @@ -124,6 +127,13 @@ func WithTargetedSyncResourceIDs(resourceIDs []string) Option {
}
}

func WithProfileConfig(cfg *connectorwrapperV1.ProfileConfig) Option {
return func(ctx context.Context, w *wrapper) error {
w.profileCfg = cfg
return nil
}
}

// NewConnectorWrapper returns a connector wrapper for running connector services locally.
func NewWrapper(ctx context.Context, server interface{}, opts ...Option) (*wrapper, error) {
connectorServer, isServer := server.(types.ConnectorServer)
Expand All @@ -149,6 +159,17 @@ func NewWrapper(ctx context.Context, server interface{}, opts ...Option) (*wrapp
func (cw *wrapper) Run(ctx context.Context, serverCfg *connectorwrapperV1.ServerConfig) error {
logger := ctxzap.Extract(ctx)

// Start profiling if configured
profiler := profiling.New(serverCfg.ProfileConfig)
if profiler != nil {
logger.Info("starting profiling before GRPC server initialization")
if err := profiler.Start(ctx); err != nil {
logger.Error("failed to start profiling", zap.Error(err))
return err
}
logger.Info("profiling started, GRPC server starting...")
}

l, err := cw.getListener(ctx, serverCfg)
if err != nil {
return err
Expand Down Expand Up @@ -180,12 +201,21 @@ func (cw *wrapper) Run(ctx context.Context, serverCfg *connectorwrapperV1.Server
return err
}
cw.rateLimiter = rl

// Register profile service if profiling is enabled
if profiler != nil {
ps := &profileService{profiler: profiler}
connectorwrapperV1.RegisterProfileServiceServer(server, ps)
}

opts := &RegisterOps{
Ratelimiter: cw.rateLimiter,
ProvisioningEnabled: cw.provisioningEnabled,
TicketingEnabled: cw.ticketingEnabled,
}
Register(ctx, server, cw.server, opts)

// Serve blocks until server stops
return server.Serve(l)
}

Expand All @@ -205,6 +235,7 @@ func (cw *wrapper) runServer(ctx context.Context, serverCred *tlsV1.Credential)
Credential: serverCred,
RateLimiterConfig: cw.rlCfg,
ListenPort: listenPort,
ProfileConfig: cw.profileCfg,
})
if err != nil {
return 0, err
Expand Down Expand Up @@ -335,6 +366,29 @@ func (cw *wrapper) C(ctx context.Context) (types.ConnectorClient, error) {
return cw.client, nil
}

// FlushProfiles calls the ProfileService RPC to flush any active profiling data.
func (cw *wrapper) FlushProfiles(ctx context.Context) error {
cw.mtx.RLock()
conn := cw.conn
cw.mtx.RUnlock()

if conn == nil {
return fmt.Errorf("no active connection")
}

client := connectorwrapperV1.NewProfileServiceClient(conn)
resp, err := client.FlushProfiles(ctx, &connectorwrapperV1.FlushProfilesRequest{})
if err != nil {
return err
}

if !resp.Success {
return fmt.Errorf("profile flush failed: %s", resp.Error)
}

return nil
}
Comment on lines +369 to +390
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Update for compatibility with the profile_service.go gRPC error fix.

Once profile_service.go is fixed to return proper gRPC errors (as suggested in the previous review), this method will break. When the service returns an error via status.Errorf, resp will be nil and lines 385-387 will cause a panic or become unreachable.

Apply this diff to handle errors correctly:

 func (cw *wrapper) FlushProfiles(ctx context.Context) error {
 	cw.mtx.RLock()
 	conn := cw.conn
 	cw.mtx.RUnlock()
 
 	if conn == nil {
 		return fmt.Errorf("no active connection")
 	}
 
 	client := connectorwrapperV1.NewProfileServiceClient(conn)
-	resp, err := client.FlushProfiles(ctx, &connectorwrapperV1.FlushProfilesRequest{})
-	if err != nil {
-		return err
-	}
-
-	if !resp.Success {
-		return fmt.Errorf("profile flush failed: %s", resp.Error)
-	}
-
-	return nil
+	_, err := client.FlushProfiles(ctx, &connectorwrapperV1.FlushProfilesRequest{})
+	return err
 }

This simplifies the method to rely on the gRPC error convention where errors are returned via the error return value, not via response fields.

🤖 Prompt for AI Agents
In internal/connector/connector.go around lines 369-390, FlushProfiles currently
assumes a non-nil resp and checks resp.Success/resp.Error which will break once
the service returns gRPC errors via status.Errorf (resp will be nil). Change the
function to follow gRPC conventions: call client.FlushProfiles, if err != nil
return err, otherwise return nil (remove reliance on resp.Success/resp.Error);
keep the early conn nil check and mutex usage as-is and ensure no
nil-dereference of resp occurs.


// Close shuts down the grpc server and closes the connection.
func (cw *wrapper) Close() error {
cw.mtx.Lock()
Expand Down
50 changes: 50 additions & 0 deletions internal/connector/profile_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package connector

import (
"context"

"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"

connectorwrapperV1 "github.com/conductorone/baton-sdk/pb/c1/connector_wrapper/v1"
"github.com/conductorone/baton-sdk/pkg/profiling"
)

// profileService implements the ProfileService gRPC service.
type profileService struct {
connectorwrapperV1.UnimplementedProfileServiceServer
profiler *profiling.Profiler
}

// FlushProfiles writes pending profile data to disk.
func (ps *profileService) FlushProfiles(ctx context.Context, req *connectorwrapperV1.FlushProfilesRequest) (*connectorwrapperV1.FlushProfilesResponse, error) {
l := ctxzap.Extract(ctx)
l.Info("FlushProfiles RPC called, stopping profiling and writing profiles")

if ps.profiler == nil {
return &connectorwrapperV1.FlushProfilesResponse{
Success: true,
}, nil
}

// Stop CPU profiling to flush data
if err := ps.profiler.Stop(ctx); err != nil {
//nolint:nilerr // This should be nil, we're returning the error to the client
return &connectorwrapperV1.FlushProfilesResponse{
Success: false,
Error: err.Error(),
}, nil
}

// Write memory profile
if err := ps.profiler.WriteMemProfile(ctx); err != nil {
//nolint:nilerr // This should be nil, we're returning the error to the client
return &connectorwrapperV1.FlushProfilesResponse{
Success: false,
Error: err.Error(),
}, nil
}

return &connectorwrapperV1.FlushProfilesResponse{
Success: true,
}, nil
}
Loading
Loading