Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/app/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fuku/internal/config/logger"
)

// Help text constants
const (
Usage = `Usage:
fuku Run services with default profile (with TUI)
Expand Down
1 change: 1 addition & 0 deletions internal/app/cli/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
// CommandType represents the type of CLI command
type CommandType int

// Command type values
const (
CommandRun CommandType = iota
CommandLogs
Expand Down
1 change: 1 addition & 0 deletions internal/app/logs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Client interface {
Close() error
}

// client implements the Client interface
type client struct {
conn net.Conn
formatter *LogFormatter
Expand Down
1 change: 1 addition & 0 deletions internal/app/logs/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func (c *ClientConn) ShouldReceive(service string) bool {
return c.Services[service]
}

// hub implements the Hub interface
type hub struct {
clients map[*ClientConn]bool
register chan *ClientConn
Expand Down
1 change: 1 addition & 0 deletions internal/app/logs/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type Broadcaster interface {
// MessageType represents the type of message in the wire protocol
type MessageType string

// Message types for the wire protocol
const (
// MessageSubscribe is sent from client to server to subscribe to services
MessageSubscribe MessageType = "subscribe"
Expand Down
1 change: 1 addition & 0 deletions internal/app/logs/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Runner interface {
Run(profile string, services []string) int
}

// runner implements the Runner interface
type runner struct {
client Client
log logger.Logger
Expand Down
4 changes: 4 additions & 0 deletions internal/app/logs/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Server interface {
SocketPath() string
}

// server implements the Server interface
type server struct {
profile string
socketPath string
Expand Down Expand Up @@ -124,6 +125,7 @@ func (s *server) Broadcast(service, message string) {
}
}

// cleanupStaleSocket removes stale socket file if not in use
func (s *server) cleanupStaleSocket() error {
if _, err := os.Stat(s.socketPath); os.IsNotExist(err) {
return nil
Expand All @@ -141,6 +143,7 @@ func (s *server) cleanupStaleSocket() error {
return os.Remove(s.socketPath)
}

// acceptConnections handles incoming client connections
func (s *server) acceptConnections(ctx context.Context) {
for s.running.Load() {
conn, err := s.listener.Accept()
Expand All @@ -162,6 +165,7 @@ func (s *server) acceptConnections(ctx context.Context) {
}
}

// handleConnection processes a single client connection
func (s *server) handleConnection(ctx context.Context, conn net.Conn) {
defer conn.Close()

Expand Down
2 changes: 2 additions & 0 deletions internal/app/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ type Monitor interface {
GetStats(ctx context.Context, pid int) (Stats, error)
}

// monitor implements the Monitor interface
type monitor struct{}

// NewMonitor creates a new Monitor instance
func NewMonitor() Monitor {
return &monitor{}
}

// GetStats retrieves CPU and memory statistics for a process
func (m *monitor) GetStats(ctx context.Context, pid int) (Stats, error) {
if pid <= 0 || pid > math.MaxInt32 {
return Stats{}, nil
Expand Down
1 change: 1 addition & 0 deletions internal/app/runner/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Discovery interface {
Resolve(profile string) ([]Tier, error)
}

// discovery implements the Discovery interface
type discovery struct {
cfg *config.Config
topology *config.Topology
Expand Down
3 changes: 3 additions & 0 deletions internal/app/runner/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Lifecycle interface {
Terminate(proc Process, timeout time.Duration) error
}

// lifecycle implements the Lifecycle interface
type lifecycle struct {
log logger.Logger
}
Expand Down Expand Up @@ -59,10 +60,12 @@ func (l *lifecycle) Terminate(proc Process, timeout time.Duration) error {
}
}

// signalGroup sends a signal to the process group
func (l *lifecycle) signalGroup(pid int, sig syscall.Signal) error {
return syscall.Kill(-pid, sig)
}

// forceKill sends SIGKILL to the process group
func (l *lifecycle) forceKill(proc Process, pid int) error {
if err := syscall.Kill(-pid, syscall.SIGKILL); err != nil {
l.log.Warn().Err(err).Msgf("Failed to SIGKILL process group, trying direct kill")
Expand Down
8 changes: 8 additions & 0 deletions internal/app/runner/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Process interface {
StderrReader() *io.PipeReader
}

// process implements the Process interface
type process struct {
name string
cmd *exec.Cmd
Expand All @@ -25,22 +26,27 @@ type process struct {
stderrReader *io.PipeReader
}

// Name returns the service name
func (p *process) Name() string {
return p.name
}

// Cmd returns the underlying exec command
func (p *process) Cmd() *exec.Cmd {
return p.cmd
}

// Done returns a channel that closes when the process exits
func (p *process) Done() <-chan struct{} {
return p.done
}

// Ready returns a channel that receives when the process is ready
func (p *process) Ready() <-chan error {
return p.ready
}

// SignalReady signals the ready channel with optional error
func (p *process) SignalReady(err error) {
if err != nil {
select {
Expand All @@ -52,10 +58,12 @@ func (p *process) SignalReady(err error) {
close(p.ready)
}

// StdoutReader returns the stdout pipe reader
func (p *process) StdoutReader() *io.PipeReader {
return p.stdoutReader
}

// StderrReader returns the stderr pipe reader
func (p *process) StderrReader() *io.PipeReader {
return p.stderrReader
}
1 change: 1 addition & 0 deletions internal/app/runner/readiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Readiness interface {
Check(ctx context.Context, name string, service *config.Service, process Process)
}

// readiness implements the Readiness interface
type readiness struct {
log logger.Logger
}
Expand Down
10 changes: 10 additions & 0 deletions internal/app/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Runner interface {
Run(ctx context.Context, profile string) error
}

// runner implements the Runner interface
type runner struct {
cfg *config.Config
discovery Discovery
Expand Down Expand Up @@ -158,6 +159,7 @@ func (r *runner) Run(ctx context.Context, profile string) error {
return nil
}

// runStartupPhase handles the service startup phase and waits for completion or interruption
func (r *runner) runStartupPhase(ctx context.Context, cancel context.CancelFunc, tiers []Tier, registry Registry, sigChan chan os.Signal, commandChan <-chan runtime.Command) error {
startupDone := make(chan struct{}, 1)

Expand Down Expand Up @@ -217,6 +219,7 @@ func (r *runner) runStartupPhase(ctx context.Context, cancel context.CancelFunc,
}
}

// runServicePhase runs the main event loop handling signals and commands
func (r *runner) runServicePhase(ctx context.Context, cancel context.CancelFunc, sigChan chan os.Signal, registry Registry, commandChan <-chan runtime.Command) {
for {
select {
Expand Down Expand Up @@ -246,6 +249,7 @@ func (r *runner) runServicePhase(ctx context.Context, cancel context.CancelFunc,
}
}

// handleCommand processes a command and returns true if shutdown is requested
func (r *runner) handleCommand(ctx context.Context, cmd runtime.Command, registry Registry) bool {
switch cmd.Type {
case runtime.CommandStopService:
Expand Down Expand Up @@ -274,6 +278,7 @@ func (r *runner) handleCommand(ctx context.Context, cmd runtime.Command, registr
return false
}

// stopService stops a single service by name
func (r *runner) stopService(serviceName string, registry Registry) {
lookup := registry.Get(serviceName)
if !lookup.Exists {
Expand All @@ -293,6 +298,7 @@ func (r *runner) stopService(serviceName string, registry Registry) {
r.service.Stop(lookup.Proc)
}

// restartService stops and starts a service, or just starts if not running
func (r *runner) restartService(ctx context.Context, serviceName string, registry Registry) {
lookup := registry.Get(serviceName)

Expand Down Expand Up @@ -346,6 +352,7 @@ func (r *runner) restartService(ctx context.Context, serviceName string, registr
}()
}

// startTier starts all services in a tier concurrently and returns failed service names
func (r *runner) startTier(ctx context.Context, tierName string, tierServices []string, registry Registry) []string {
failedChan := make(chan string, len(tierServices))
procChan := make(chan Process, len(tierServices))
Expand Down Expand Up @@ -418,6 +425,7 @@ func (r *runner) startTier(ctx context.Context, tierName string, tierServices []
return failedServices
}

// startServiceWithRetry attempts to start a service with configurable retries
func (r *runner) startServiceWithRetry(ctx context.Context, name string, tierName string, service *config.Service) (Process, error) {
var lastErr error

Expand Down Expand Up @@ -493,6 +501,7 @@ func (r *runner) startServiceWithRetry(ctx context.Context, name string, tierNam
return nil, fmt.Errorf("%w after %d attempts: %w", errors.ErrMaxRetriesExceeded, config.RetryAttempt, lastErr)
}

// startAllTiers starts services tier by tier in order
func (r *runner) startAllTiers(ctx context.Context, tiers []Tier, registry Registry) {
for tierIdx, tier := range tiers {
if len(tier.Services) > 0 {
Expand Down Expand Up @@ -524,6 +533,7 @@ func (r *runner) startAllTiers(ctx context.Context, tiers []Tier, registry Regis
}
}

// shutdown stops all services in reverse order and waits for completion
func (r *runner) shutdown(registry Registry) {
processes := registry.SnapshotReverse()

Expand Down
6 changes: 6 additions & 0 deletions internal/app/runner/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"fuku/internal/config/logger"
)

// Scanner buffer size constants
const (
// scannerBufferSize is the initial buffer size for reading service output (64KB)
scannerBufferSize = 64 * 1024
Expand All @@ -30,6 +31,7 @@ type Service interface {
SetBroadcaster(broadcaster logs.Broadcaster)
}

// service implements the Service interface
type service struct {
lifecycle Lifecycle
readiness Readiness
Expand Down Expand Up @@ -134,6 +136,7 @@ func (s *service) SetBroadcaster(broadcaster logs.Broadcaster) {
s.broadcaster = broadcaster
}

// teeStream reads from source and writes to destination while logging output
func (s *service) teeStream(src io.Reader, dst *io.PipeWriter, serviceName, streamType string) {
scanner := bufio.NewScanner(src)
scanner.Buffer(make([]byte, scannerBufferSize), scannerMaxBufferSize)
Expand All @@ -156,11 +159,13 @@ func (s *service) teeStream(src io.Reader, dst *io.PipeWriter, serviceName, stre
}
}

// startDraining begins consuming both stdout and stderr pipes
func (s *service) startDraining(stdout, stderr *io.PipeReader) {
go s.drainPipe(stdout)
go s.drainPipe(stderr)
}

// drainPipe consumes all data from a pipe until EOF
func (s *service) drainPipe(reader *io.PipeReader) {
scanner := bufio.NewScanner(reader)
scanner.Buffer(make([]byte, scannerBufferSize), scannerMaxBufferSize)
Expand All @@ -173,6 +178,7 @@ func (s *service) drainPipe(reader *io.PipeReader) {
}
}

// handleReadinessCheck sets up the appropriate readiness check based on service config
func (s *service) handleReadinessCheck(ctx context.Context, name string, svc *config.Service, proc *process, stdout, stderr *io.PipeReader) {
if svc.Readiness == nil {
proc.SignalReady(nil)
Expand Down
1 change: 1 addition & 0 deletions internal/app/runner/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type WorkerPool interface {
Release()
}

// workerPool implements the WorkerPool interface
type workerPool struct {
sem chan struct{}
}
Expand Down
6 changes: 6 additions & 0 deletions internal/app/runtime/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
// CommandType represents the type of command
type CommandType string

// Command types for service control
const (
CommandStopService CommandType = "stop_service"
CommandRestartService CommandType = "restart_service"
Expand Down Expand Up @@ -37,6 +38,7 @@ type CommandBus interface {
Close()
}

// commandBus implements the CommandBus interface
type commandBus struct {
subscribers []chan Command
mu sync.RWMutex
Expand Down Expand Up @@ -103,6 +105,7 @@ func (cb *commandBus) Close() {
cb.subscribers = nil
}

// unsubscribe removes a channel from subscribers and closes it
func (cb *commandBus) unsubscribe(ch chan Command) {
cb.mu.Lock()
defer cb.mu.Unlock()
Expand All @@ -126,6 +129,7 @@ func NewNoOpCommandBus() CommandBus {
return &noOpCommandBus{}
}

// Subscribe returns a channel that closes when context is cancelled
func (ncb *noOpCommandBus) Subscribe(ctx context.Context) <-chan Command {
ch := make(chan Command)

Expand All @@ -137,6 +141,8 @@ func (ncb *noOpCommandBus) Subscribe(ctx context.Context) <-chan Command {
return ch
}

// Publish is a no-op
func (ncb *noOpCommandBus) Publish(cmd Command) {}

// Close is a no-op
func (ncb *noOpCommandBus) Close() {}
Loading