Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 127 additions & 27 deletions cmd/prismctl/cmd/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ package cmd
import (
"context"
"fmt"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"

pb "github.com/jrepp/prism-data-layer/pkg/plugin/gen/prism"
Expand Down Expand Up @@ -150,37 +152,53 @@ func startLocalStack() error {
return fmt.Errorf("patterns directory not found at %s", patternsDir)
}

// Start components in order
components := []struct {
name string
binary string
args []string
logFile string
delay time.Duration
}{
// Component definitions with health check endpoints
type component struct {
name string
binary string
args []string
logFile string
healthCheck string // HTTP or gRPC endpoint for health checks
healthTimeout time.Duration
isGRPC bool
}

components := []component{
{
name: "prism-admin",
binary: filepath.Join(absBinDir, "prism-admin"),
args: []string{"serve", "--port=8981"},
logFile: filepath.Join(logsDir, "admin.log"),
delay: 2 * time.Second,
name: "prism-admin",
binary: filepath.Join(absBinDir, "prism-admin"),
args: []string{"serve", "--port=8981"},
logFile: filepath.Join(logsDir, "admin.log"),
healthCheck: "localhost:8981",
healthTimeout: 10 * time.Second,
isGRPC: true,
},
{
name: "pattern-launcher",
binary: filepath.Join(absBinDir, "pattern-launcher"),
args: []string{"--admin-endpoint=localhost:8981", "--launcher-id=launcher-01", "--grpc-port=7070", "--patterns-dir=" + patternsDir},
logFile: filepath.Join(logsDir, "launcher.log"),
delay: 2 * time.Second,
name: "pattern-launcher",
binary: filepath.Join(absBinDir, "pattern-launcher"),
args: []string{"--admin-endpoint=localhost:8981", "--launcher-id=launcher-01", "--grpc-port=7070", "--patterns-dir=" + patternsDir},
logFile: filepath.Join(logsDir, "launcher.log"),
healthCheck: "http://localhost:9093/health",
healthTimeout: 10 * time.Second,
isGRPC: false,
},
{
name: "keyvalue-runner",
binary: filepath.Join(absBinDir, "keyvalue-runner"),
args: []string{"--proxy-addr=localhost:9090"},
logFile: filepath.Join(logsDir, "keyvalue.log"),
delay: 1 * time.Second,
name: "keyvalue-runner",
binary: filepath.Join(absBinDir, "keyvalue-runner"),
args: []string{"--proxy-addr=localhost:9090"},
logFile: filepath.Join(logsDir, "keyvalue.log"),
healthCheck: "", // No health check endpoint
healthTimeout: 2 * time.Second,
isGRPC: false,
},
}

// Start components and track errors
var wg sync.WaitGroup
startErrors := make(chan error, len(components))

startTime := time.Now()

for _, comp := range components {
fmt.Printf(" Starting %s...\n", comp.name)

Expand Down Expand Up @@ -216,13 +234,42 @@ func startLocalStack() error {
fmt.Printf(" ⚠️ Warning: Could not save PID file: %v\n", err)
}

// Wait before starting next component
if comp.delay > 0 {
time.Sleep(comp.delay)
// Wait for health check in parallel
if comp.healthCheck != "" {
wg.Add(1)
go func(c component) {
defer wg.Done()

var err error
if c.isGRPC {
err = waitForGRPCHealth(c.healthCheck, c.healthTimeout)
} else {
err = waitForHTTPHealth(c.healthCheck, c.healthTimeout)
}

if err != nil {
startErrors <- fmt.Errorf("%s health check failed: %w", c.name, err)
} else {
fmt.Printf(" ✓ %s ready\n", c.name)
}
}(comp)
} else {
// Fallback to short delay for components without health checks
time.Sleep(comp.healthTimeout)
}
}

fmt.Printf("\n✅ Local Prism stack started successfully!\n\n")
// Wait for all health checks to complete
wg.Wait()
close(startErrors)

// Check for any startup errors
if err := <-startErrors; err != nil {
return err
}
Comment on lines +267 to +269
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the first error from the channel is checked. If multiple components fail their health checks, subsequent errors will be ignored. Consider draining all errors from the channel and returning a combined error or the first error encountered.

Suggested change
if err := <-startErrors; err != nil {
return err
}
var errs []error
for err := range startErrors {
if err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
if len(errs) == 1 {
return errs[0]
}
// Combine all errors into a single error message
var sb strings.Builder
sb.WriteString("Multiple components failed health checks:\n")
for _, err := range errs {
sb.WriteString(" - ")
sb.WriteString(err.Error())
sb.WriteString("\n")
}
return fmt.Errorf(sb.String())
}

Copilot uses AI. Check for mistakes.

elapsed := time.Since(startTime)
fmt.Printf("\n✅ Local Prism stack started successfully in %s!\n\n", elapsed.Round(10*time.Millisecond))
fmt.Println("📊 Stack Overview:")
fmt.Println(" • Admin Control Plane: localhost:8981")
fmt.Println(" • Pattern Launcher: localhost:7070")
Expand Down Expand Up @@ -415,6 +462,59 @@ func isInBinariesDir(dir string) bool {
return true
}

// waitForHTTPHealth polls an HTTP health endpoint until it's ready or timeout
func waitForHTTPHealth(url string, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

client := &http.Client{Timeout: 1 * time.Second}

for {
select {
case <-ctx.Done():
return fmt.Errorf("timeout waiting for %s", url)
case <-ticker.C:
resp, err := client.Get(url)
if err == nil && resp.StatusCode == http.StatusOK {
resp.Body.Close()
return nil
}
if resp != nil {
resp.Body.Close()
}
Comment on lines +481 to +487
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The response body is closed in two separate branches. Consider using defer resp.Body.Close() immediately after checking resp != nil to consolidate cleanup logic and prevent potential resource leaks if future code changes add additional return paths.

Suggested change
if err == nil && resp.StatusCode == http.StatusOK {
resp.Body.Close()
return nil
}
if resp != nil {
resp.Body.Close()
}
if resp != nil {
defer resp.Body.Close()
}
if err == nil && resp.StatusCode == http.StatusOK {
return nil
}

Copilot uses AI. Check for mistakes.
}
}
}

// waitForGRPCHealth polls a gRPC endpoint until it's ready or timeout
func waitForGRPCHealth(addr string, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return fmt.Errorf("timeout waiting for %s", addr)
case <-ticker.C:
conn, err := grpc.NewClient(
addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
continue
}
conn.Close()
return nil
}
}
}

// provisionNamespace creates a namespace via the control plane
func provisionNamespace(namespace string) error {
fmt.Printf("📦 Provisioning namespace: %s\n", namespace)
Expand Down
Loading