Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion kubernetes/internal/controller/batchsandbox_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,16 @@ func (r *BatchSandboxReconciler) getTaskScheduler(ctx context.Context, batchSbx
}
// Update the pods list for this scheduler
tSch.UpdatePods(pods)
// Handle scale-out: register task specs for any replicas added since the
// scheduler was first created. Already-tracked task names are skipped.
taskStrategy := strategy.NewTaskSchedulingStrategy(batchSbx)
taskSpecs, err := taskStrategy.GenerateTaskSpecs()
if err != nil {
return nil, fmt.Errorf("failed to generate task specs for scale-out: %w", err)
}
if err := tSch.AddTasks(taskSpecs); err != nil {
return nil, fmt.Errorf("failed to add tasks on scale-out: %w", err)
}
}
return tSch, nil
}
Expand Down Expand Up @@ -464,7 +474,6 @@ func (r *BatchSandboxReconciler) scaleBatchSandbox(ctx context.Context, batchSan
for i := range pods {
pod := pods[i]
BatchSandboxScaleExpectations.ObserveScale(controllerutils.GetControllerKey(batchSandbox), expectations.Create, pod.Name)
pods = append(pods, pod)
idx, err := parseIndex(pod)
if err != nil {
return fmt.Errorf("failed to parse idx Pod %s, err %w", pod.Name, err)
Expand Down
17 changes: 17 additions & 0 deletions kubernetes/internal/scheduler/default_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,23 @@ func (sch *defaultTaskScheduler) UpdatePods(pods []*corev1.Pod) {
sch.allPods = pods
}

// AddTasks registers task specs that are not yet tracked by the scheduler.
// Tasks whose names are already tracked are silently skipped, making this
// safe to call with the full task list during a scale-out reconciliation.
func (sch *defaultTaskScheduler) AddTasks(tasks []*api.Task) error {
newNodes, err := initTaskNodes(tasks)
if err != nil {
return err
}
for _, node := range newNodes {
if _, exists := sch.taskNodeByNameIndex[node.Name]; !exists {
sch.taskNodes = append(sch.taskNodes, node)
sch.taskNodeByNameIndex[node.Name] = node
}
}
return nil
}
Comment on lines +221 to +233
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AddTasks currently calls initTaskNodes(tasks) for the full task list every time, even though most tasks may already be tracked. In a reconciliation loop, this can become unnecessarily expensive as replica counts grow (allocating []*taskNode proportional to total replicas every call). Consider filtering to only the missing task specs first (by checking taskNodeByNameIndex on the incoming tasks), then initializing/appending nodes only for that smaller subset.

Copilot uses AI. Check for mistakes.

func (sch *defaultTaskScheduler) ListTask() []Task {
ret := make([]Task, len(sch.taskNodes), len(sch.taskNodes))
for i := range sch.taskNodes {
Expand Down
78 changes: 78 additions & 0 deletions kubernetes/internal/scheduler/default_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1290,3 +1290,81 @@ func Test_initTaskNodes(t *testing.T) {
})
}
}

func Test_addTasks(t *testing.T) {
tests := []struct {
name string
initial []*api.Task
addTasks []*api.Task
wantNodeNames []string
wantNodeCount int
}{
{
name: "scale-out: new tasks are appended, existing tasks are skipped",
initial: []*api.Task{
{Name: "sandbox-0", Process: &api.Process{Command: []string{"echo", "0"}}},
},
addTasks: []*api.Task{
{Name: "sandbox-0", Process: &api.Process{Command: []string{"echo", "0"}}},
{Name: "sandbox-1", Process: &api.Process{Command: []string{"echo", "1"}}},
},
wantNodeNames: []string{"sandbox-0", "sandbox-1"},
wantNodeCount: 2,
},
{
name: "no-op: add same tasks as already tracked",
initial: []*api.Task{
{Name: "sandbox-0"},
{Name: "sandbox-1"},
},
addTasks: []*api.Task{
{Name: "sandbox-0"},
{Name: "sandbox-1"},
},
wantNodeNames: []string{"sandbox-0", "sandbox-1"},
wantNodeCount: 2,
},
{
name: "empty scheduler: add initial tasks",
initial: []*api.Task{},
addTasks: []*api.Task{
{Name: "sandbox-0"},
},
wantNodeNames: []string{"sandbox-0"},
wantNodeCount: 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
initialNodes, err := initTaskNodes(tt.initial)
if err != nil {
t.Fatalf("initTaskNodes() error = %v", err)
}
sch := &defaultTaskScheduler{
taskNodes: initialNodes,
taskNodeByNameIndex: indexByName(initialNodes),
logger: testLogger,
}

if err := sch.AddTasks(tt.addTasks); err != nil {
t.Fatalf("AddTasks() unexpected error = %v", err)
}

if len(sch.taskNodes) != tt.wantNodeCount {
t.Errorf("AddTasks() taskNodes count = %d, want %d", len(sch.taskNodes), tt.wantNodeCount)
}

nodeNames := make([]string, len(sch.taskNodes))
for i, n := range sch.taskNodes {
nodeNames[i] = n.Name
}
if !reflect.DeepEqual(nodeNames, tt.wantNodeNames) {
t.Errorf("AddTasks() taskNode names = %v, want %v", nodeNames, tt.wantNodeNames)
}

if len(sch.taskNodeByNameIndex) != tt.wantNodeCount {
t.Errorf("AddTasks() taskNodeByNameIndex size = %d, want %d", len(sch.taskNodeByNameIndex), tt.wantNodeCount)
}
})
}
}
4 changes: 4 additions & 0 deletions kubernetes/internal/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ type TaskScheduler interface {
UpdatePods(pod []*corev1.Pod)
ListTask() []Task
StopTask() []Task
// AddTasks registers task specs that are not yet tracked by the scheduler.
// Tasks whose names are already tracked are silently skipped, making this
// safe to call with the full task list during a scale-out reconciliation.
AddTasks(tasks []*apis.Task) error
Comment on lines 27 to +33
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new AddTasks method uses *apis.Task here, but the implementation and mock in this PR use *api.Task. If these are different packages/types, defaultTaskScheduler and MockTaskScheduler will not satisfy TaskScheduler and the build will fail. Unify the task type across the interface, default implementation, and mock (same import path and identifier), and regenerate mocks if needed.

Copilot uses AI. Check for mistakes.
}

func NewTaskScheduler(name string, tasks []*apis.Task, pods []*corev1.Pod, resPolicyWhenTaskCompleted sandboxv1alpha1.TaskResourcePolicy, logger logr.Logger) (TaskScheduler, error) {
Expand Down
15 changes: 15 additions & 0 deletions kubernetes/internal/scheduler/mock/interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading