From 036f8d9a9cf5a6bc030c1c5434ef3d231b6aa297 Mon Sep 17 00:00:00 2001 From: zerone0x Date: Tue, 10 Mar 2026 03:52:21 +0100 Subject: [PATCH] fix(scheduler): schedule tasks on new pods when BatchSandbox scales out MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a BatchSandbox is scaled out (replicas increased), the existing in-memory TaskScheduler was not informed of the new task specs for the additional replicas. As a result, newly created pods received no task assignment, leaving them idle. Root causes fixed: 1. Add TaskScheduler.AddTasks() – a new interface method that appends task nodes for specs not yet tracked by the scheduler. Already- tracked task names are silently skipped, making it safe to call with the full task list on every reconciliation cycle. 2. Call AddTasks() in getTaskScheduler() after UpdatePods() so that any replicas added since the scheduler was first created get their task nodes registered immediately. 3. Remove the spurious in scaleBatchSandbox() that doubled every entry in the local pods slice on each iteration without any effect (range evaluated the initial slice length), making the code easier to read and reason about. Fixes #102 Co-Authored-By: Claude --- .../controller/batchsandbox_controller.go | 11 ++- .../internal/scheduler/default_scheduler.go | 17 ++++ .../scheduler/default_scheduler_test.go | 78 +++++++++++++++++++ kubernetes/internal/scheduler/interface.go | 4 + .../internal/scheduler/mock/interface.go | 15 ++++ 5 files changed, 124 insertions(+), 1 deletion(-) diff --git a/kubernetes/internal/controller/batchsandbox_controller.go b/kubernetes/internal/controller/batchsandbox_controller.go index 6008d3cd..d8d22c2f 100644 --- a/kubernetes/internal/controller/batchsandbox_controller.go +++ b/kubernetes/internal/controller/batchsandbox_controller.go @@ -347,6 +347,16 @@ func (r *BatchSandboxReconciler) getTaskScheduler(ctx context.Context, batchSbx } // Update the pods list for this scheduler tSch.UpdatePods(pods) + // Handle scale-out: register task specs for any replicas added since the + // scheduler was first created. Already-tracked task names are skipped. + taskStrategy := strategy.NewTaskSchedulingStrategy(batchSbx) + taskSpecs, err := taskStrategy.GenerateTaskSpecs() + if err != nil { + return nil, fmt.Errorf("failed to generate task specs for scale-out: %w", err) + } + if err := tSch.AddTasks(taskSpecs); err != nil { + return nil, fmt.Errorf("failed to add tasks on scale-out: %w", err) + } } return tSch, nil } @@ -464,7 +474,6 @@ func (r *BatchSandboxReconciler) scaleBatchSandbox(ctx context.Context, batchSan for i := range pods { pod := pods[i] BatchSandboxScaleExpectations.ObserveScale(controllerutils.GetControllerKey(batchSandbox), expectations.Create, pod.Name) - pods = append(pods, pod) idx, err := parseIndex(pod) if err != nil { return fmt.Errorf("failed to parse idx Pod %s, err %w", pod.Name, err) diff --git a/kubernetes/internal/scheduler/default_scheduler.go b/kubernetes/internal/scheduler/default_scheduler.go index a4bf818f..4aa0b554 100644 --- a/kubernetes/internal/scheduler/default_scheduler.go +++ b/kubernetes/internal/scheduler/default_scheduler.go @@ -215,6 +215,23 @@ func (sch *defaultTaskScheduler) UpdatePods(pods []*corev1.Pod) { sch.allPods = pods } +// AddTasks registers task specs that are not yet tracked by the scheduler. +// Tasks whose names are already tracked are silently skipped, making this +// safe to call with the full task list during a scale-out reconciliation. +func (sch *defaultTaskScheduler) AddTasks(tasks []*api.Task) error { + newNodes, err := initTaskNodes(tasks) + if err != nil { + return err + } + for _, node := range newNodes { + if _, exists := sch.taskNodeByNameIndex[node.Name]; !exists { + sch.taskNodes = append(sch.taskNodes, node) + sch.taskNodeByNameIndex[node.Name] = node + } + } + return nil +} + func (sch *defaultTaskScheduler) ListTask() []Task { ret := make([]Task, len(sch.taskNodes), len(sch.taskNodes)) for i := range sch.taskNodes { diff --git a/kubernetes/internal/scheduler/default_scheduler_test.go b/kubernetes/internal/scheduler/default_scheduler_test.go index fe5673eb..7fec1454 100644 --- a/kubernetes/internal/scheduler/default_scheduler_test.go +++ b/kubernetes/internal/scheduler/default_scheduler_test.go @@ -1290,3 +1290,81 @@ func Test_initTaskNodes(t *testing.T) { }) } } + +func Test_addTasks(t *testing.T) { + tests := []struct { + name string + initial []*api.Task + addTasks []*api.Task + wantNodeNames []string + wantNodeCount int + }{ + { + name: "scale-out: new tasks are appended, existing tasks are skipped", + initial: []*api.Task{ + {Name: "sandbox-0", Process: &api.Process{Command: []string{"echo", "0"}}}, + }, + addTasks: []*api.Task{ + {Name: "sandbox-0", Process: &api.Process{Command: []string{"echo", "0"}}}, + {Name: "sandbox-1", Process: &api.Process{Command: []string{"echo", "1"}}}, + }, + wantNodeNames: []string{"sandbox-0", "sandbox-1"}, + wantNodeCount: 2, + }, + { + name: "no-op: add same tasks as already tracked", + initial: []*api.Task{ + {Name: "sandbox-0"}, + {Name: "sandbox-1"}, + }, + addTasks: []*api.Task{ + {Name: "sandbox-0"}, + {Name: "sandbox-1"}, + }, + wantNodeNames: []string{"sandbox-0", "sandbox-1"}, + wantNodeCount: 2, + }, + { + name: "empty scheduler: add initial tasks", + initial: []*api.Task{}, + addTasks: []*api.Task{ + {Name: "sandbox-0"}, + }, + wantNodeNames: []string{"sandbox-0"}, + wantNodeCount: 1, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + initialNodes, err := initTaskNodes(tt.initial) + if err != nil { + t.Fatalf("initTaskNodes() error = %v", err) + } + sch := &defaultTaskScheduler{ + taskNodes: initialNodes, + taskNodeByNameIndex: indexByName(initialNodes), + logger: testLogger, + } + + if err := sch.AddTasks(tt.addTasks); err != nil { + t.Fatalf("AddTasks() unexpected error = %v", err) + } + + if len(sch.taskNodes) != tt.wantNodeCount { + t.Errorf("AddTasks() taskNodes count = %d, want %d", len(sch.taskNodes), tt.wantNodeCount) + } + + nodeNames := make([]string, len(sch.taskNodes)) + for i, n := range sch.taskNodes { + nodeNames[i] = n.Name + } + if !reflect.DeepEqual(nodeNames, tt.wantNodeNames) { + t.Errorf("AddTasks() taskNode names = %v, want %v", nodeNames, tt.wantNodeNames) + } + + if len(sch.taskNodeByNameIndex) != tt.wantNodeCount { + t.Errorf("AddTasks() taskNodeByNameIndex size = %d, want %d", len(sch.taskNodeByNameIndex), tt.wantNodeCount) + } + }) + } +} diff --git a/kubernetes/internal/scheduler/interface.go b/kubernetes/internal/scheduler/interface.go index 2de1476a..02e40994 100644 --- a/kubernetes/internal/scheduler/interface.go +++ b/kubernetes/internal/scheduler/interface.go @@ -27,6 +27,10 @@ type TaskScheduler interface { UpdatePods(pod []*corev1.Pod) ListTask() []Task StopTask() []Task + // AddTasks registers task specs that are not yet tracked by the scheduler. + // Tasks whose names are already tracked are silently skipped, making this + // safe to call with the full task list during a scale-out reconciliation. + AddTasks(tasks []*apis.Task) error } func NewTaskScheduler(name string, tasks []*apis.Task, pods []*corev1.Pod, resPolicyWhenTaskCompleted sandboxv1alpha1.TaskResourcePolicy, logger logr.Logger) (TaskScheduler, error) { diff --git a/kubernetes/internal/scheduler/mock/interface.go b/kubernetes/internal/scheduler/mock/interface.go index 7d421734..96f363e7 100644 --- a/kubernetes/internal/scheduler/mock/interface.go +++ b/kubernetes/internal/scheduler/mock/interface.go @@ -11,6 +11,7 @@ import ( v1 "k8s.io/api/core/v1" scheduler "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/scheduler" + api "github.com/alibaba/OpenSandbox/sandbox-k8s/pkg/task-executor" ) // MockTaskScheduler is a mock of TaskScheduler interface. @@ -36,6 +37,20 @@ func (m *MockTaskScheduler) EXPECT() *MockTaskSchedulerMockRecorder { return m.recorder } +// AddTasks mocks base method. +func (m *MockTaskScheduler) AddTasks(tasks []*api.Task) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddTasks", tasks) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddTasks indicates an expected call of AddTasks. +func (mr *MockTaskSchedulerMockRecorder) AddTasks(tasks interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddTasks", reflect.TypeOf((*MockTaskScheduler)(nil).AddTasks), tasks) +} + // ListTask mocks base method. func (m *MockTaskScheduler) ListTask() []scheduler.Task { m.ctrl.T.Helper()