Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
)

// Config holds configuration for the client connection.
Expand Down Expand Up @@ -104,9 +105,22 @@ func (c *Client) CheckAndReconnect() error {
return nil
}

func attachAuthHeaders(ctx context.Context, auth *gen.Auth) context.Context {
md := metadata.Pairs()
if auth != nil {
if auth.AuthToken != nil && *auth.AuthToken != "" {
md.Set("Authorization", "Bearer "+*auth.AuthToken)
}
if auth.WorkspaceToken != nil && *auth.WorkspaceToken != "" {
md.Set("x-workspace-token", "Bearer "+*auth.WorkspaceToken)
}
}

return metadata.NewOutgoingContext(ctx, md)
}

// SendCommands sends a list of commands to the server
func (c *Client) SendCommands(commands []*gen.Command, auth *gen.Auth) error {

if err := c.CheckAndReconnect(); err != nil {
return fmt.Errorf("failed to reconnect: %w", err)
}
Expand All @@ -121,6 +135,7 @@ func (c *Client) SendCommands(commands []*gen.Command, auth *gen.Auth) error {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

ctx = attachAuthHeaders(ctx, auth)
_, err := client.SendCommands(ctx, req)
if err != nil {
c.logger.Error().Err(err).Msg("Failed to send commands")
Expand All @@ -131,7 +146,6 @@ func (c *Client) SendCommands(commands []*gen.Command, auth *gen.Auth) error {

// SendProcesses sends a list of processes to the server
func (c *Client) SendProcesses(processes []*gen.Process, auth *gen.Auth) error {

if err := c.CheckAndReconnect(); err != nil {
return fmt.Errorf("failed to reconnect: %w", err)
}
Expand All @@ -146,6 +160,7 @@ func (c *Client) SendProcesses(processes []*gen.Process, auth *gen.Auth) error {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

ctx = attachAuthHeaders(ctx, auth)
_, err := client.SendProcesses(ctx, req)
if err != nil {
c.logger.Error().Err(err).Msg("Failed to send processes")
Expand Down
21 changes: 9 additions & 12 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/devzero-inc/oda/client"
gen "github.com/devzero-inc/oda/gen/api/v1"
"github.com/devzero-inc/oda/process"
"github.com/devzero-inc/oda/util"

"github.com/rs/zerolog"

"time"
)

// TODO move this to /var/run or other appropriate location based on OS,
Expand Down Expand Up @@ -46,10 +45,12 @@ type IntervalConfig struct {

// AuthConfig contains the configuration for the command processing and authentication
type AuthConfig struct {
TeamID string
UserID string
WorkspaceID string
UserEmail string
TeamID string
UserID string
WorkspaceID string
UserEmail string
AuthToken string
WorkspaceToken string
}

// collectionConfig contains the configuration for the collection process
Expand All @@ -72,7 +73,6 @@ type collectionConfig struct {

// NewCollector creates a new collector instance
func NewCollector(socketPath string, client *client.Client, logger zerolog.Logger, config IntervalConfig, auth AuthConfig, excludeRegex string, excludeCommands []string, process process.SystemProcess) *Collector {

collector := &Collector{
socketPath: socketPath,
client: client,
Expand All @@ -93,6 +93,7 @@ func NewCollector(socketPath string, client *client.Client, logger zerolog.Logge
TeamId: auth.TeamID,
WorkspaceId: &auth.WorkspaceID,
UserEmail: auth.UserEmail,
AuthToken: &auth.AuthToken,
}
}

Expand Down Expand Up @@ -155,7 +156,6 @@ func (c *Collector) collectSystemInformation(ctx context.Context, initialDuratio
}

func (c *Collector) collectOnce() error {

c.logger.Debug().Msg("Collecting process")

processes, err := c.collectionConfig.process.Collect()
Expand All @@ -171,7 +171,6 @@ func (c *Collector) collectOnce() error {
if c.client != nil {
var processMetrics []*gen.Process
for _, p := range processes {

processMetrics = append(
processMetrics,
process.MapProcessToProto(p),
Expand Down Expand Up @@ -201,8 +200,7 @@ func (c *Collector) onStartCommand() {
// If the collection is not running, start it with a timeout
if !c.collectionConfig.isCollectionRunning {
c.logger.Debug().Msg("Starting collection")
c.collectionConfig.collectionContext, c.collectionConfig.collectionCancelFunc =
context.WithTimeout(context.Background(), c.intervalConfig.MaxDuration)
c.collectionConfig.collectionContext, c.collectionConfig.collectionCancelFunc = context.WithTimeout(context.Background(), c.intervalConfig.MaxDuration)
go c.collectSystemInformation(
c.collectionConfig.collectionContext,
c.intervalConfig.CommandInterval,
Expand Down Expand Up @@ -344,7 +342,6 @@ func (c *Collector) handleStartCommand(parts []string) error {
}

func (c *Collector) handleEndCommand(parts []string) error {

if !IsCommandAcceptable(parts[1], c.excludeRegex, c.excludeCommands) {
c.logger.Debug().Msg("Command is not acceptable")
return fmt.Errorf("command is not acceptable")
Expand Down
Loading
Loading