Skip to content

Commit 366e4bf

Browse files
committed
Use status files to retrieve PID
This follows on from the previous PRs which migrated PIDs from the PID files to status files.
1 parent bc40edf commit 366e4bf

File tree

4 files changed

+75
-57
lines changed

4 files changed

+75
-57
lines changed

pkg/transport/proxy/manager.go

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,34 +9,6 @@ import (
99
// We may want to move these operations behind an interface. For now, they
1010
// have been moved to this package to keep proxy-related logic grouped together.
1111

12-
// StopProcess stops the proxy process associated with the container
13-
func StopProcess(containerBaseName string) {
14-
if containerBaseName == "" {
15-
logger.Warnf("Warning: Could not find base container name in labels")
16-
return
17-
}
18-
19-
// Try to read the PID file and kill the process
20-
pid, err := process.ReadPIDFile(containerBaseName)
21-
if err != nil {
22-
logger.Errorf("No PID file found for %s, proxy may not be running in detached mode", containerBaseName)
23-
return
24-
}
25-
26-
// PID file found, try to kill the process
27-
logger.Infof("Stopping proxy process (PID: %d)...", pid)
28-
if err := process.KillProcess(pid); err != nil {
29-
logger.Warnf("Warning: Failed to kill proxy process: %v", err)
30-
} else {
31-
logger.Info("Proxy process stopped")
32-
}
33-
34-
// Remove the PID file
35-
if err := process.RemovePIDFile(containerBaseName); err != nil {
36-
logger.Warnf("Warning: Failed to remove PID file: %v", err)
37-
}
38-
}
39-
4012
// IsRunning checks if the proxy process is running
4113
func IsRunning(containerBaseName string) bool {
4214
logger.Debugf("Checking if proxy process is running for container %s", containerBaseName)

pkg/workloads/manager.go

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -570,12 +570,7 @@ func (d *defaultManager) getWorkloadContainer(ctx context.Context, name string)
570570
func (d *defaultManager) stopProxyIfNeeded(ctx context.Context, name, baseName string) {
571571
logger.Infof("Removing proxy process for %s...", name)
572572
if baseName != "" {
573-
proxy.StopProcess(baseName)
574-
// TODO: refactor the StopProcess function to stop dealing explicitly with PID files.
575-
// Note that this is not a blocker for k8s since this code path is not called there.
576-
if err := d.statuses.ResetWorkloadPID(ctx, baseName); err != nil {
577-
logger.Warnf("Warning: Failed to reset workload %s PID: %v", name, err)
578-
}
573+
d.stopProcess(ctx, baseName)
579574
}
580575
}
581576

@@ -962,7 +957,7 @@ func (d *defaultManager) stopSingleContainerWorkload(ctx context.Context, worklo
962957

963958
name := labels.GetContainerBaseName(workload.Labels)
964959
// Stop the proxy process
965-
proxy.StopProcess(name)
960+
d.stopProcess(ctx, name)
966961
// TODO: refactor the StopProcess function to stop dealing explicitly with PID files.
967962
// Note that this is not a blocker for k8s since this code path is not called there.
968963
if err := d.statuses.ResetWorkloadPID(ctx, name); err != nil {
@@ -1121,3 +1116,36 @@ func (d *defaultManager) getRemoteWorkloadsFromState(
11211116

11221117
return remoteWorkloads, nil
11231118
}
1119+
1120+
// stopProcess stops the proxy process associated with the container
1121+
func (d *defaultManager) stopProcess(ctx context.Context, name string) {
1122+
if name == "" {
1123+
logger.Warnf("Warning: Could not find base container name in labels")
1124+
return
1125+
}
1126+
1127+
// Try to read the PID file and kill the process
1128+
pid, err := process.ReadPIDFile(name)
1129+
if err != nil {
1130+
logger.Errorf("No PID file found for %s, proxy may not be running in detached mode", name)
1131+
return
1132+
}
1133+
1134+
// PID file found, try to kill the process
1135+
logger.Infof("Stopping proxy process (PID: %d)...", pid)
1136+
if err := process.KillProcess(pid); err != nil {
1137+
logger.Warnf("Warning: Failed to kill proxy process: %v", err)
1138+
} else {
1139+
logger.Info("Proxy process stopped")
1140+
}
1141+
1142+
// Remove the PID file
1143+
if err := process.RemovePIDFile(name); err != nil {
1144+
logger.Warnf("Warning: Failed to remove PID file: %v", err)
1145+
}
1146+
// TODO: refactor the StopProcess function to stop dealing explicitly with PID files.
1147+
// Note that this is not a blocker for k8s since this code path is not called there.
1148+
if err := d.statuses.ResetWorkloadPID(ctx, name); err != nil {
1149+
logger.Warnf("Warning: Failed to reset workload %s PID: %v", name, err)
1150+
}
1151+
}

pkg/workloads/statuses/file_status.go

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"github.com/stacklok/toolhive/pkg/logger"
2020
"github.com/stacklok/toolhive/pkg/process"
2121
"github.com/stacklok/toolhive/pkg/state"
22-
"github.com/stacklok/toolhive/pkg/transport/proxy"
2322
"github.com/stacklok/toolhive/pkg/workloads/types"
2423
)
2524

@@ -114,6 +113,7 @@ type workloadStatusFile struct {
114113

115114
// GetWorkload retrieves the status of a workload by its name.
116115
func (f *fileStatusManager) GetWorkload(ctx context.Context, workloadName string) (core.Workload, error) {
116+
var pid int
117117
result := core.Workload{Name: workloadName}
118118
fileFound := false
119119

@@ -152,6 +152,8 @@ func (f *fileStatusManager) GetWorkload(ctx context.Context, workloadName string
152152
}
153153
}
154154

155+
pid = statusFile.ProcessID
156+
155157
return nil
156158
})
157159
if err != nil {
@@ -172,7 +174,7 @@ func (f *fileStatusManager) GetWorkload(ctx context.Context, workloadName string
172174

173175
// If workload is running, validate against runtime
174176
if result.Status == rt.WorkloadStatusRunning {
175-
return f.validateRunningWorkload(ctx, workloadName, result)
177+
return f.validateRunningWorkload(ctx, workloadName, result, pid)
176178
}
177179

178180
// Return file data
@@ -197,7 +199,7 @@ func (f *fileStatusManager) ListWorkloads(ctx context.Context, listAll bool, lab
197199
}
198200

199201
// Get workloads from files
200-
fileWorkloads, err := f.getWorkloadsFromFiles()
202+
fileWorkloadsWithPID, err := f.getWorkloadsFromFiles()
201203
if err != nil {
202204
return nil, fmt.Errorf("failed to get workloads from files: %w", err)
203205
}
@@ -206,7 +208,7 @@ func (f *fileStatusManager) ListWorkloads(ctx context.Context, listAll bool, lab
206208
// There's currently an import cycle between this package and the runconfig package
207209

208210
// Create a map of runtime workloads by name for easy lookup
209-
workloadMap := f.mergeRuntimeAndFileWorkloads(ctx, runtimeContainers, fileWorkloads)
211+
workloadMap := f.mergeRuntimeAndFileWorkloads(ctx, runtimeContainers, fileWorkloadsWithPID)
210212

211213
// Convert map to slice and apply filters
212214
var workloads []core.Workload
@@ -553,8 +555,14 @@ func (f *fileStatusManager) getWorkloadFromRuntime(ctx context.Context, workload
553555
return types.WorkloadFromContainerInfo(&info)
554556
}
555557

558+
// workloadWithPID holds a workload and its associated PID for internal processing
559+
type workloadWithPID struct {
560+
workload core.Workload
561+
pid int
562+
}
563+
556564
// getWorkloadsFromFiles retrieves all workloads from status files.
557-
func (f *fileStatusManager) getWorkloadsFromFiles() (map[string]core.Workload, error) {
565+
func (f *fileStatusManager) getWorkloadsFromFiles() (map[string]workloadWithPID, error) {
558566
// Ensure base directory exists
559567
if err := f.ensureBaseDir(); err != nil {
560568
return nil, fmt.Errorf("failed to ensure base directory: %w", err)
@@ -566,7 +574,7 @@ func (f *fileStatusManager) getWorkloadsFromFiles() (map[string]core.Workload, e
566574
return nil, fmt.Errorf("failed to list status files: %w", err)
567575
}
568576

569-
workloads := make(map[string]core.Workload)
577+
workloads := make(map[string]workloadWithPID)
570578
ctx := context.Background() // Create context for file locking
571579

572580
for _, file := range files {
@@ -614,13 +622,15 @@ func (f *fileStatusManager) getWorkloadsFromFiles() (map[string]core.Workload, e
614622
}
615623

616624
// Check if PID migration is needed
625+
pid := statusFile.ProcessID
617626
if statusFile.Status == rt.WorkloadStatusRunning && statusFile.ProcessID == 0 {
618627
// Try PID migration - the migration function will handle cases
619628
// where container info is not available gracefully
620629
if migratedPID, wasMigrated := f.migratePIDFromFile(workloadName, nil); wasMigrated {
621630
// Update the status file with the migrated PID
622631
statusFile.ProcessID = migratedPID
623632
statusFile.UpdatedAt = time.Now()
633+
pid = migratedPID
624634
if err := f.writeStatusFile(statusFilePath, *statusFile); err != nil {
625635
logger.Warnf("failed to write migrated PID for workload %s: %v", workloadName, err)
626636
} else {
@@ -629,7 +639,10 @@ func (f *fileStatusManager) getWorkloadsFromFiles() (map[string]core.Workload, e
629639
}
630640
}
631641

632-
workloads[workloadName] = workload
642+
workloads[workloadName] = workloadWithPID{
643+
workload: workload,
644+
pid: pid,
645+
}
633646
return nil
634647
})
635648

@@ -647,7 +660,7 @@ func (f *fileStatusManager) getWorkloadsFromFiles() (map[string]core.Workload, e
647660
// validateRunningWorkload validates that a workload marked as running in the file
648661
// is actually running in the runtime and has a healthy proxy process if applicable.
649662
func (f *fileStatusManager) validateRunningWorkload(
650-
ctx context.Context, workloadName string, result core.Workload,
663+
ctx context.Context, workloadName string, result core.Workload, pid int,
651664
) (core.Workload, error) {
652665
// For remote workloads, we don't need to validate against the container runtime
653666
// since they don't have containers
@@ -667,7 +680,7 @@ func (f *fileStatusManager) validateRunningWorkload(
667680
}
668681

669682
// Check if proxy process is running when workload is running
670-
if unhealthyWorkload, isUnhealthy := f.checkProxyHealth(ctx, workloadName, result, containerInfo); isUnhealthy {
683+
if unhealthyWorkload, isUnhealthy := f.isProxyUnhealthy(ctx, workloadName, result, containerInfo, pid); isUnhealthy {
671684
return unhealthyWorkload, nil
672685
}
673686

@@ -722,19 +735,21 @@ func (f *fileStatusManager) handleRuntimeMissing(
722735
return fileWorkload, nil
723736
}
724737

725-
// checkProxyHealth checks if the proxy process is running for the workload.
738+
// isProxyUnhealthy checks if the proxy process is running for the workload.
726739
// Returns (unhealthyWorkload, true) if proxy is not running, (emptyWorkload, false) if proxy is healthy or not applicable.
727-
func (f *fileStatusManager) checkProxyHealth(
728-
ctx context.Context, workloadName string, result core.Workload, containerInfo rt.ContainerInfo,
740+
func (f *fileStatusManager) isProxyUnhealthy(
741+
ctx context.Context, workloadName string, result core.Workload, containerInfo rt.ContainerInfo, pid int,
729742
) (core.Workload, bool) {
730743
// Use original container labels (before filtering) to get base name
731744
baseName := labels.GetContainerBaseName(containerInfo.Labels)
732745
if baseName == "" {
733746
return core.Workload{}, false // No proxy check needed
734747
}
735748

736-
proxyRunning := proxy.IsRunning(baseName)
737-
if proxyRunning {
749+
proxyRunning, err := process.FindProcess(pid)
750+
if err != nil {
751+
logger.Warnf("unable to find process %d: %v", pid, err)
752+
} else if proxyRunning {
738753
return core.Workload{}, false // Proxy is healthy
739754
}
740755

@@ -775,7 +790,7 @@ func (*fileStatusManager) mergeHealthyWorkloadData(containerInfo rt.ContainerInf
775790
// validateWorkloadInList validates a workload during list operations, similar to validateRunningWorkload
776791
// but with different error handling to avoid disrupting the entire list operation.
777792
func (f *fileStatusManager) validateWorkloadInList(
778-
ctx context.Context, workloadName string, fileWorkload core.Workload, containerInfo rt.ContainerInfo,
793+
ctx context.Context, workloadName string, fileWorkload core.Workload, containerInfo rt.ContainerInfo, pid int,
779794
) (core.Workload, error) {
780795
// Only validate if file shows running status
781796
if fileWorkload.Status != rt.WorkloadStatusRunning {
@@ -797,7 +812,7 @@ func (f *fileStatusManager) validateWorkloadInList(
797812
}
798813

799814
// Check if proxy process is running when workload is running
800-
if unhealthyWorkload, isUnhealthy := f.checkProxyHealth(ctx, workloadName, fileWorkload, containerInfo); isUnhealthy {
815+
if unhealthyWorkload, isUnhealthy := f.isProxyUnhealthy(ctx, workloadName, fileWorkload, containerInfo, pid); isUnhealthy {
801816
return unhealthyWorkload, nil
802817
}
803818

@@ -809,7 +824,7 @@ func (f *fileStatusManager) validateWorkloadInList(
809824
func (f *fileStatusManager) mergeRuntimeAndFileWorkloads(
810825
ctx context.Context,
811826
runtimeContainers []rt.ContainerInfo,
812-
fileWorkloads map[string]core.Workload,
827+
fileWorkloadsWithPID map[string]workloadWithPID,
813828
) map[string]core.Workload {
814829
runtimeWorkloadMap := make(map[string]rt.ContainerInfo)
815830
for _, container := range runtimeContainers {
@@ -840,15 +855,17 @@ func (f *fileStatusManager) mergeRuntimeAndFileWorkloads(
840855
}
841856

842857
// Then, merge with file workloads, validating running workloads
843-
for name, fileWorkload := range fileWorkloads {
858+
for name, fileWorkloadWithPID := range fileWorkloadsWithPID {
859+
fileWorkload := fileWorkloadWithPID.workload
860+
pid := fileWorkloadWithPID.pid
844861

845862
if fileWorkload.Remote { // Remote workloads are not managed by the container runtime
846863
workloadMap[name] = fileWorkload
847864
continue
848865
}
849866
if runtimeContainer, exists := runtimeWorkloadMap[name]; exists {
850867
// Validate running workloads similar to GetWorkload
851-
validatedWorkload, err := f.validateWorkloadInList(ctx, name, fileWorkload, runtimeContainer)
868+
validatedWorkload, err := f.validateWorkloadInList(ctx, name, fileWorkload, runtimeContainer, pid)
852869
if err != nil {
853870
logger.Warnf("failed to validate workload %s in list: %v", name, err)
854871
// Fall back to basic merge without validation
@@ -870,8 +887,9 @@ func (f *fileStatusManager) mergeRuntimeAndFileWorkloads(
870887
if err != nil {
871888
logger.Warnf("failed to handle missing runtime for workload %s: %v", name, err)
872889
workloadMap[name] = fileWorkload
890+
} else {
891+
workloadMap[name] = updatedWorkload
873892
}
874-
workloadMap[name] = updatedWorkload
875893
}
876894
}
877895
return workloadMap

pkg/workloads/statuses/file_status_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -526,8 +526,8 @@ func TestFileStatusManager_ValidateRunningWorkload_Remote(t *testing.T) {
526526
// Mock runtime should NOT be called for remote workloads
527527
// (no expectations set, so any call would fail the test)
528528

529-
// Validate the remote workload
530-
result, err := manager.validateRunningWorkload(ctx, "remote-test", remoteWorkload)
529+
// Validate the remote workload (PID is irrelevant for remote workloads)
530+
result, err := manager.validateRunningWorkload(ctx, "remote-test", remoteWorkload, 0)
531531
require.NoError(t, err)
532532

533533
// Should return the workload unchanged without calling runtime

0 commit comments

Comments
 (0)