Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 69 additions & 7 deletions app/jobs/taskjob/result_channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"hostlink/app/services/taskreporter"
"hostlink/domain/task"
"io"
"os"
"path/filepath"
"sync"
"testing"
"time"
Expand All @@ -26,6 +28,12 @@ func TestTaskJobStreamsOutputAndFinalOverResultChannel(t *testing.T) {

job.processTask(context.Background(), fetcher.tasks[0], reporter, channel)

if len(channel.started) != 1 {
t.Fatalf("started len = %d, want 1", len(channel.started))
}
if channel.started[0].TaskID != "task-1" || channel.started[0].ExecutionAttemptID != "attempt-1" {
t.Fatalf("started = %#v", channel.started[0])
}
if len(channel.outputs) != 2 {
t.Fatalf("outputs len = %d, want 2", len(channel.outputs))
}
Expand Down Expand Up @@ -84,6 +92,34 @@ func TestTaskJobFallsBackToHTTPReporterWhenFinalPersistenceFails(t *testing.T) {
}
}

func TestTaskJobRecordsStartedBeforeProcessLaunch(t *testing.T) {
marker := filepath.Join(t.TempDir(), "process-started")
reporter := &fakeTaskReporter{}
checked := false
channel := &fakeResultChannel{recordStartedHook: func() error {
checked = true
if _, err := os.Stat(marker); err == nil {
return errors.New("process launched before durable started state")
}
return nil
}}
job := NewJobWithConf(TaskJobConfig{Trigger: runOnceTrigger})

job.processTask(context.Background(), task.Task{
ID: "task-1",
ExecutionAttemptID: "attempt-1",
Command: "printf launched > " + marker,
Status: "pending",
}, reporter, channel)

if !checked {
t.Fatal("RecordStarted was not called")
}
if len(channel.started) != 1 {
t.Fatalf("started len = %d, want 1", len(channel.started))
}
}

func TestCaptureStreamFlushesOnByteThreshold(t *testing.T) {
reader, writer := io.Pipe()
channel := &fakeResultChannel{}
Expand Down Expand Up @@ -176,23 +212,49 @@ func (f *fakeTaskFetcher) Fetch() ([]task.Task, error) {
}

type fakeTaskReporter struct {
mu sync.Mutex
results []*taskreporter.TaskResult
mu sync.Mutex
taskIDsReported []string
results []*taskreporter.TaskResult
}

func (f *fakeTaskReporter) Report(taskID string, result *taskreporter.TaskResult) error {
f.mu.Lock()
defer f.mu.Unlock()
f.taskIDsReported = append(f.taskIDsReported, taskID)
f.results = append(f.results, result)
return nil
}

type fakeResultChannel struct {
mu sync.Mutex
outputs []localtaskstore.OutputChunk
finals []localtaskstore.FinalResult
outputErrs []error
finalErr error
mu sync.Mutex
recordedStarted []localtaskstore.TaskReceipt
started []localtaskstore.TaskReceipt
outputs []localtaskstore.OutputChunk
finals []localtaskstore.FinalResult
recordStartedErr error
recordStartedHook func() error
startedErr error
outputErrs []error
finalErr error
}

func (f *fakeResultChannel) RecordStarted(ctx context.Context, receipt localtaskstore.TaskReceipt) error {
f.mu.Lock()
defer f.mu.Unlock()
if f.recordStartedHook != nil {
if err := f.recordStartedHook(); err != nil {
return err
}
}
f.recordedStarted = append(f.recordedStarted, receipt)
return f.recordStartedErr
}

func (f *fakeResultChannel) SendStarted(ctx context.Context, receipt localtaskstore.TaskReceipt) error {
f.mu.Lock()
defer f.mu.Unlock()
f.started = append(f.started, receipt)
return f.startedErr
}

func (f *fakeResultChannel) SendOutput(ctx context.Context, chunk localtaskstore.OutputChunk) error {
Expand Down
138 changes: 138 additions & 0 deletions app/jobs/taskjob/runner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package taskjob

import (
"context"
"fmt"
"hostlink/app/services/taskreporter"
"hostlink/domain/task"
"os"
"path/filepath"
"testing"
"time"
)

func TestTaskJobQueuesPollingAndEnqueuedWorkSequentially(t *testing.T) {
marker := filepath.Join(t.TempDir(), "polling-started")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fetcher := &fakeTaskFetcher{tasks: []task.Task{{
ID: "poll-task",
Command: fmt.Sprintf("printf started > %q; sleep 0.2; printf first", marker),
Status: "pending",
}}}
reporter := &fakeTaskReporter{}
job := NewJobWithConf(TaskJobConfig{Trigger: runOnceTrigger})

cancelJob := job.Register(ctx, fetcher, reporter)
defer func() {
cancelJob()
job.Shutdown()
}()
waitForFile(t, marker)
if err := job.Enqueue(ctx, task.Task{ID: "ws-task", Command: "printf second", Status: "pending"}); err != nil {
t.Fatalf("enqueue websocket task: %v", err)
}
waitForReports(t, reporter, 2)

got := reporter.taskIDs()
want := []string{"poll-task", "ws-task"}
if fmt.Sprint(got) != fmt.Sprint(want) {
t.Fatalf("report order = %v, want %v", got, want)
}
}

func TestTaskJobRunsEnqueuedTasksSequentially(t *testing.T) {
marker := filepath.Join(t.TempDir(), "first-started")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
reporter := &fakeTaskReporter{}
job := NewJobWithConf(TaskJobConfig{Trigger: noOpTrigger})

cancelJob := job.Register(ctx, &fakeTaskFetcher{}, reporter)
defer func() {
cancelJob()
job.Shutdown()
}()
if err := job.Enqueue(ctx, task.Task{ID: "task-1", Command: fmt.Sprintf("printf started > %q; sleep 0.2; printf first", marker), Status: "pending"}); err != nil {
t.Fatalf("enqueue first task: %v", err)
}
waitForFile(t, marker)
if err := job.Enqueue(ctx, task.Task{ID: "task-2", Command: "printf second", Status: "pending"}); err != nil {
t.Fatalf("enqueue second task: %v", err)
}
waitForReports(t, reporter, 2)

got := reporter.taskIDs()
want := []string{"task-1", "task-2"}
if fmt.Sprint(got) != fmt.Sprint(want) {
t.Fatalf("report order = %v, want %v", got, want)
}
}

func TestTaskJobSuppressesDuplicateQueuedAttempt(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
reporter := &fakeTaskReporter{}
job := NewJobWithConf(TaskJobConfig{Trigger: noOpTrigger})
taskAttempt := task.Task{ID: "task-1", ExecutionAttemptID: "attempt-1", Command: "printf run", Status: "pending"}

cancelJob := job.Register(ctx, &fakeTaskFetcher{}, reporter)
defer func() {
cancelJob()
job.Shutdown()
}()
if err := job.Enqueue(ctx, taskAttempt); err != nil {
t.Fatalf("enqueue first attempt: %v", err)
}
if err := job.Enqueue(ctx, taskAttempt); err != nil {
t.Fatalf("enqueue duplicate attempt: %v", err)
}
waitForReports(t, reporter, 1)
time.Sleep(100 * time.Millisecond)

if got := len(reporter.resultsSnapshot()); got != 1 {
t.Fatalf("report count = %d, want 1", got)
}
}

func noOpTrigger(ctx context.Context, fn func() error) {}

func waitForFile(t *testing.T, path string) {
t.Helper()
deadline := time.Now().Add(time.Second)
for time.Now().Before(deadline) {
if _, err := os.Stat(path); err == nil {
return
}
time.Sleep(time.Millisecond)
}
t.Fatalf("timed out waiting for %s", path)
}

func waitForReports(t *testing.T, reporter *fakeTaskReporter, count int) {
t.Helper()
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
if len(reporter.resultsSnapshot()) >= count {
return
}
time.Sleep(time.Millisecond)
}
t.Fatalf("timed out waiting for %d reports", count)
}

func (f *fakeTaskReporter) taskIDs() []string {
f.mu.Lock()
defer f.mu.Unlock()
ids := make([]string, 0, len(f.taskIDsReported))
ids = append(ids, f.taskIDsReported...)
return ids
}

func (f *fakeTaskReporter) resultsSnapshot() []*taskreporter.TaskResult {
f.mu.Lock()
defer f.mu.Unlock()
results := make([]*taskreporter.TaskResult, 0, len(f.results))
results = append(results, f.results...)
return results
}
Loading