diff --git a/vermeer/.gitignore b/vermeer/.gitignore index 540a67ae4..e9502ea66 100644 --- a/vermeer/.gitignore +++ b/vermeer/.gitignore @@ -83,3 +83,7 @@ node_modules/ /output/ /bin/* !/bin/*.sh + +# 其他 # +###################### +test/case/ diff --git a/vermeer/README.md b/vermeer/README.md index 776956625..55ca14b02 100644 --- a/vermeer/README.md +++ b/vermeer/README.md @@ -3,6 +3,28 @@ ## Introduction Vermeer is a high-performance distributed graph computing platform based on memory, supporting more than 15 graph algorithms, custom algorithm extensions, and custom data source access. +## Run with Docker + +Pull the image: +``` +docker pull hugegraph/vermeer:latest +``` + +Create local configuration files, for example, `~/master.ini` and `~/worker.ini`. + +Run with Docker. The `--env` flag specifies the file name. + +``` +master: docker run -v ~/:/go/bin/config hugegraph/vermeer --env=master +worker: docker run -v ~/:/go/bin/config hugegraph/vermeer --env=worker +``` + +We've also provided a `docker-compose` file. Once you've created `~/master.ini` and `~/worker.ini`, and updated the `master_peer` in `worker.ini` to `172.20.0.10:6689`, you can run it using the following command: + +``` +docker-compose up -d +``` + ## Start ``` diff --git a/vermeer/README.zh-CN.md b/vermeer/README.zh-CN.md index 34dcec04c..1b125fa38 100644 --- a/vermeer/README.zh-CN.md +++ b/vermeer/README.zh-CN.md @@ -3,6 +3,26 @@ ## 简介 Vermeer是一个基于内存的高性能分布式图计算平台,支持15+图算法。支持自定义算法扩展,支持自定义数据源接入。 +## 基于 Docker 运行 + +拉取镜像 +``` +docker pull hugegraph/vermeer:latest +``` + +创建好本地配置文件,例如`~/master.ini`与`~/worker.ini` + +基于docker运行,其中`--env`指定的是文件名称。 +``` +master: docker run -v ~/:/go/bin/config hugegraph/vermeer --env=master +worker: docker run -v ~/:/go/bin/config hugegraph/vermeer --env=worker +``` + +我们也提供了`docker-compose`文件,当创建好`~/master.ini`与`~/worker.ini`,将`worker.ini`中的`master_peer`修改为`172.20.0.10:6689`后,即可通过以下命令运行: +``` +docker-compose up -d +``` + ## 运行 ``` diff --git a/vermeer/apps/graphio/local_file.go b/vermeer/apps/graphio/local_file.go index 8cdb72623..7d6119304 100644 --- a/vermeer/apps/graphio/local_file.go +++ b/vermeer/apps/graphio/local_file.go @@ -82,6 +82,7 @@ func (a *LocalMaker) MakeTasks(params map[string]string, taskID int32) ([]LoadPa logrus.Errorf(s) return nil, errors.New(s) } + logrus.Debugf("MakeTask LoadTypeLocal parse file: %s, s:%d, e:%d", files, s, e) for i := s; i <= e; i++ { part := LoadPartition{} part.Init(partID, taskID, LoadPartTypeVertex) diff --git a/vermeer/apps/master/bl/compute_task.go b/vermeer/apps/master/bl/compute_task.go index 80ca80528..e8abef046 100644 --- a/vermeer/apps/master/bl/compute_task.go +++ b/vermeer/apps/master/bl/compute_task.go @@ -142,6 +142,8 @@ func (ctb *ComputeTaskBl) ComputeTaskStatus( } } taskMgr.ForceState(computeTask.Task, structure.TaskStateComplete) + // for scheduler, mark task complete + Scheduler.taskManager.MarkTaskComplete(taskId) graph.SubUsingNum() computeTask.FreeMemory() needQuery := options.GetInt(computeTask.Task.Params, "output.need_query") == 1 diff --git a/vermeer/apps/master/bl/grpc_handlers.go b/vermeer/apps/master/bl/grpc_handlers.go index e1c235584..2734c063f 100644 --- a/vermeer/apps/master/bl/grpc_handlers.go +++ b/vermeer/apps/master/bl/grpc_handlers.go @@ -26,6 +26,7 @@ import ( "time" "vermeer/apps/compute" "vermeer/apps/graphio" + "vermeer/apps/master/schedules" "vermeer/apps/master/threshold" "vermeer/apps/master/workers" pb "vermeer/apps/protos" @@ -103,6 +104,11 @@ func (h *ServerHandler) SayHelloMaster(ctx context.Context, req *pb.HelloMasterR logrus.Errorf("failed to add a WorkerClient to the WorkerManager, error: %s", err) return &pb.HelloMasterResp{}, err } + _, err = Scheduler.ChangeWorkerStatus(reqWorker.Name, schedules.WorkerOngoingStatusIdle) + if err != nil { + logrus.Errorf("failed to change worker status to idle, error: %s", err) + return &pb.HelloMasterResp{}, err + } logrus.Infof("worker say hello name: %s and set to workgroup: %s, client: %s", reqWorker.Name, reqWorker.Group, p.Addr.String()) diff --git a/vermeer/apps/master/bl/load_task.go b/vermeer/apps/master/bl/load_task.go index 4e1f79f4f..80c0023bc 100644 --- a/vermeer/apps/master/bl/load_task.go +++ b/vermeer/apps/master/bl/load_task.go @@ -204,6 +204,9 @@ func (lb *LoadTaskBl) LoadTaskStatus(taskId int32, state string, workerName stri loadTask.Task.SetState(structure.TaskStateLoaded) //TaskMgr.ForceState(loadTask.Task, structure.TaskStateLoaded) + // for scheduler, mark task complete + Scheduler.taskManager.MarkTaskComplete(taskId) + logrus.Infof("graph: %s, vertex: %d, edge: %d", graph.Name, graph.VertexCount, graph.EdgeCount) for _, w := range graph.Workers { logrus.Infof( diff --git a/vermeer/apps/master/bl/scheduler_bl.go b/vermeer/apps/master/bl/scheduler_bl.go index bc4ccac49..4fd5df97c 100644 --- a/vermeer/apps/master/bl/scheduler_bl.go +++ b/vermeer/apps/master/bl/scheduler_bl.go @@ -28,16 +28,58 @@ import ( "github.com/sirupsen/logrus" ) +/* +* @Description: ScheduleBl is the scheduler business logic. +* @Note: This is the main scheduler business logic. + */ type ScheduleBl struct { structure.MutexLocker - dispatchLocker structure.MutexLocker - spaceQueue *schedules.SpaceQueue - broker *schedules.Broker - startChan chan *structure.TaskInfo - isDispatchPaused bool + // resource management + resourceManager *schedules.SchedulerResourceManager + // algorithm management + algorithmManager *schedules.SchedulerAlgorithmManager + // task management + taskManager *schedules.SchedulerTaskManager + // cron management + cronManager *schedules.SchedulerCronManager + // start channel for tasks to be started + startChan chan *structure.TaskInfo + // configurations + startChanSize int + tickerInterval int + softSchedule bool } +/* +* @Description: Init initializes the ScheduleBl. +* @Note: This function will initialize the ScheduleBl. + */ func (s *ScheduleBl) Init() { + logrus.Info("Initializing ScheduleBl...") + s.LoadConfig() + startChan := make(chan *structure.TaskInfo, s.startChanSize) + s.startChan = startChan + + s.resourceManager = &schedules.SchedulerResourceManager{} + s.resourceManager.Init() + s.taskManager = &schedules.SchedulerTaskManager{} + s.taskManager.Init() + s.algorithmManager = &schedules.SchedulerAlgorithmManager{} + s.algorithmManager.Init() + s.cronManager = &schedules.SchedulerCronManager{} + s.cronManager.Init(s.QueueTaskFromTemplate) + go s.startTicker() + go s.waitingStartedTask() +} + +/* +* @Description: LoadConfig loads the configuration from the common package. +* @Note: This function will load the configuration from the common package. + */ +func (s *ScheduleBl) LoadConfig() { + // Load configuration from common package + + // startChanSize const defaultChanSizeConfig = "10" chanSize := common.GetConfigDefault("start_chan_size", defaultChanSizeConfig).(string) // Convert string to int @@ -47,22 +89,133 @@ func (s *ScheduleBl) Init() { logrus.Infof("using default start_chan_size: %s", defaultChanSizeConfig) chanSizeInt, _ = strconv.Atoi(defaultChanSizeConfig) } - startChan := make(chan *structure.TaskInfo, chanSizeInt) - s.startChan = startChan - s.spaceQueue = (&schedules.SpaceQueue{}).Init() - s.broker = (&schedules.Broker{}).Init() + s.startChanSize = chanSizeInt - go s.waitingTask() - go s.startTicker() + // tickerInterval + const defaultTickerInterval = "3" + tickerInterval := common.GetConfigDefault("ticker_interval", defaultTickerInterval).(string) + tickerIntervalInt, err := strconv.Atoi(tickerInterval) + if err != nil { + logrus.Errorf("failed to convert ticker_interval to int: %v", err) + logrus.Infof("using default ticker_interval: %s", defaultTickerInterval) + tickerIntervalInt, _ = strconv.Atoi(defaultTickerInterval) + } + s.tickerInterval = tickerIntervalInt + + // softSchedule + softSchedule := common.GetConfigDefault("soft_schedule", "true").(string) + if softSchedule == "true" { + s.softSchedule = true + } else { + s.softSchedule = false + } + + logrus.Infof("ScheduleBl configuration: startChanSize=%d, tickerInterval=%d, softSchedule=%v", + s.startChanSize, s.tickerInterval, s.softSchedule) } -func (s *ScheduleBl) PeekSpaceTail(space string) *structure.TaskInfo { - return s.spaceQueue.PeekTailTask(space) +/* +* @Description: startTicker starts the ticker. +* @Note: This function will start the ticker. + */ +func (s *ScheduleBl) startTicker() { + // Create a ticker with the specified interval + ticker := time.Tick(time.Duration(s.tickerInterval) * time.Second) + + for range ticker { + logrus.Debug("Ticker ticked") + s.TryScheduleNextTasks() + } +} + +// this make scheduler manager try to schedule next tasks +/* +* @Description: TryScheduleNextTasks tries to schedule the next tasks. +* @Note: This function will try to schedule the next tasks. +* @Param noLock + */ +func (s *ScheduleBl) TryScheduleNextTasks(noLock ...bool) { + defer func() { + if err := recover(); err != nil { + logrus.Errorln("TryScheduleNextTasks() has been recovered:", err) + } + }() + + if err := s.tryScheduleInner(s.softSchedule, noLock...); err != nil { + logrus.Errorf("do scheduling error:%v", err) + } +} + +// Main routine to schedule tasks +/* +* @Description: tryScheduleInner tries to schedule the next tasks. +* @Note: This function will try to schedule the next tasks. +* @Param softSchedule +* @Param noLock + */ +func (s *ScheduleBl) tryScheduleInner(softSchedule bool, noLock ...bool) error { + // Implement logic to get the next task in the queue for the given space + if !(len(noLock) > 0 && noLock[0]) { + defer s.Unlock(s.Lock()) + } + + // step 1: make sure all tasks have alloc to a worker group + // This is done by the TaskManager, which assigns a worker group to each task + s.taskManager.RefreshTaskToWorkerGroupMap() + + // step 2: get available resources and tasks + logrus.Debugf("scheduling next tasks, softSchedule: %v", softSchedule) + idleWorkerGroups := s.resourceManager.GetIdleWorkerGroups() + concurrentWorkerGroups := s.resourceManager.GetConcurrentWorkerGroups() + allTasks := s.taskManager.GetAllTasksNotComplete() + if len(allTasks) == 0 || (len(idleWorkerGroups) == 0 && len(concurrentWorkerGroups) == 0) { + logrus.Debugf("no available tasks or workerGroups, allTasks: %d, workerGroups: %d/%d", + len(allTasks), len(idleWorkerGroups), len(concurrentWorkerGroups)) + return nil + } + logrus.Debugf("all tasks: %d, workerGroups: %d/%d", len(allTasks), len(idleWorkerGroups), len(concurrentWorkerGroups)) + + // TODO: NEED TO JUDGE IF THE TASK CAN CONCURRENTLY RUNNING + // NOT only by user setting, but also by scheduler setting + + // step 3: return the task with the highest priority or small tasks which can be executed immediately + taskToWorkerGroupMap := s.taskManager.GetTaskToWorkerGroupMap() + nextTasks, err := s.algorithmManager.ScheduleNextTasks(allTasks, taskToWorkerGroupMap, idleWorkerGroups, concurrentWorkerGroups, softSchedule) + if err != nil { + logrus.Errorf("failed to schedule next tasks: %v", err) + return err + } + logrus.Debugf("scheduled %d tasks", len(nextTasks)) + // step 4: send to start channel + for _, task := range nextTasks { + if task == nil { + logrus.Warnf("received a nil task from algorithm manager") + continue + } + if task.State != structure.TaskStateWaiting { + logrus.Warnf("task '%d' is not in waiting state, current state: %s", task.ID, task.State) + continue + } + logrus.Infof("scheduling task '%d' with type '%s' to start channel", task.ID, task.Type) + select { + case s.startChan <- task: + logrus.Infof("task '%d' sent to start channel", task.ID) + default: + logrus.Warnf("start channel is full, task '%d' could not be sent", task.ID) + } + } + + return nil } // QueueTask Add the task to the inner queue. -// The tasks will be executed in order from the queue. // If the task exists, return false. +/* +* @Description: QueueTask queues the task. +* @Note: This function will queue the task. +* @Param taskInfo +* @Return bool, error + */ func (s *ScheduleBl) QueueTask(taskInfo *structure.TaskInfo) (bool, error) { if taskInfo == nil { return false, errors.New("the argument `taskInfo` is nil") @@ -72,81 +225,161 @@ func (s *ScheduleBl) QueueTask(taskInfo *structure.TaskInfo) (bool, error) { return false, errors.New("the property `SpaceName` of taskInfo is empty") } - //defer s.Unlock(s.Lock()) + defer s.Unlock(s.Lock()) if err := taskMgr.SetState(taskInfo, structure.TaskStateWaiting); err != nil { return false, err } + logrus.Debugf("queuing task %d with parameters: %+v", taskInfo.ID, taskInfo) + + // check dependency if exists + if len(taskInfo.Preorders) > 0 { + for _, depTaskID := range taskInfo.Preorders { + depTask := taskMgr.GetTaskByID(depTaskID) + if depTask == nil { + err := errors.New("the dependency task with ID " + strconv.Itoa(int(depTaskID)) + " does not exist") + logrus.Error(err) + taskMgr.SetError(taskInfo, err.Error()) + return false, err + } + } + } + // Notice: Ensure successful invocation. - ok, err := s.spaceQueue.PushTask(taskInfo) + // make sure all tasks have alloc to a worker group + ok, err := s.taskManager.QueueTask(taskInfo) if err != nil { taskMgr.SetError(taskInfo, err.Error()) return ok, err } - go s.dispatch() + if s.cronManager.CheckCronExpression(taskInfo.CronExpr) == nil { + if err := s.cronManager.AddCronTask(taskInfo); err != nil { + logrus.Errorf("failed to add cron task: %v", err) + return false, err + } + logrus.Infof("added cron task for task '%d' with expression '%s'", taskInfo.ID, taskInfo.CronExpr) + } return ok, nil } -func (s *ScheduleBl) CancelTask(taskInfo *structure.TaskInfo) error { - if taskInfo == nil { - return errors.New("the argument `taskInfo` is nil") +/* +* @Description: QueueTaskFromTemplate queues the task from the template. +* @Note: This function will queue the task from the template. This function is used by cron tasks. +* @Param template +* @Return int32, error + */ +func (s *ScheduleBl) QueueTaskFromTemplate(template *structure.TaskInfo) (int32, error) { + if template == nil { + return -1, errors.New("the argument `template` is nil") } - s.Lock() - isHeadTask := s.spaceQueue.IsHeadTask(taskInfo.ID) - task := s.spaceQueue.RemoveTask(taskInfo.ID) - s.Unlock(nil) + bc := &baseCreator{} + taskInfo, err := bc.CopyTaskInfo(template) + if err != nil { + logrus.Errorf("failed to copy task info from template, template ID: %d, caused by: %v", template.ID, err) + return -1, err + } + bc.saveTaskInfo(taskInfo) - isInQueue := false - if task != nil { - logrus.Infof("removed task '%d' from space queue", task.ID) - isInQueue = true + ok, err := s.QueueTask(taskInfo) + if err != nil || !ok { + logrus.Errorf("failed to queue task from template, template ID: %d, caused by: %v", template.ID, err) + return -1, err } - if isInQueue && !isHeadTask { - if err := taskMgr.SetState(taskInfo, structure.TaskStateCanceled); err != nil { - return err - } + logrus.Infof("queued task '%d' from template '%d'", taskInfo.ID, template.ID) - logrus.Infof("set task '%d' to TaskStateCanceled", taskInfo.ID) - } else { - logrus.Infof("sending task '%d' to task canceler", taskInfo.ID) - return s.handleCancelTask(taskInfo) + return taskInfo.ID, nil +} + +/* +* @Description: BatchQueueTask batches the task. +* @Note: This function will batch the task. +* @Param taskInfos +* @Return []bool, []error + */ +func (s *ScheduleBl) BatchQueueTask(taskInfos []*structure.TaskInfo) ([]bool, []error) { + if len(taskInfos) == 0 { + return []bool{}, []error{} } - return nil -} + s.PauseDispatch() -func (s *ScheduleBl) IsDispatchPaused() bool { - return s.isDispatchPaused -} -func (s *ScheduleBl) PauseDispatch() { - s.isDispatchPaused = true -} + defer s.ResumeDispatch() + // defer s.Unlock(s.Lock()) -func (s *ScheduleBl) ResumeDispatch() { - s.isDispatchPaused = false + errors := make([]error, len(taskInfos)) + oks := make([]bool, len(taskInfos)) + + for _, taskInfo := range taskInfos { + ok, err := s.QueueTask(taskInfo) + if err != nil { + logrus.Errorf("failed to queue task '%d': %v", taskInfo.ID, err) + } + errors = append(errors, err) + oks = append(oks, ok) + } + + return oks, errors } -func (s *ScheduleBl) AllTasksInQueue() []*structure.TaskInfo { - return s.spaceQueue.AllTasks() +// ******** CloseCurrent ******** +func (s *ScheduleBl) CloseCurrent(taskId int32, removeWorkerName ...string) error { + defer s.Unlock(s.Lock()) + + // trace tasks need these workers, check if these tasks are available + s.taskManager.RemoveTask(taskId) + // release the worker group + s.resourceManager.ReleaseByTaskID(taskId) + + if len(removeWorkerName) > 0 { + // stop the cron job if exists when need remove worker, otherwise the task is just closed normally + s.cronManager.DeleteTask(taskId) + // remove the worker from resource manager + workerName := removeWorkerName[0] + if workerName == "" { + return errors.New("the argument `removeWorkerName` is empty") + } + logrus.Infof("removing worker '%s' from resource manager", workerName) + s.ChangeWorkerStatus(workerName, schedules.WorkerOngoingStatusDeleted) + } + + logrus.Infof("invoke dispatch when task '%d' is closed", taskId) + s.TryScheduleNextTasks(true) + return nil } -func (s *ScheduleBl) TasksInQueue(space string) []*structure.TaskInfo { - return s.spaceQueue.SpaceTasks(space) +// This will be called when a worker is offline. +// This will be called when a worker is online. +func (s *ScheduleBl) ChangeWorkerStatus(workerName string, status schedules.WorkerOngoingStatus) (bool, error) { + defer s.Unlock(s.Lock()) + s.resourceManager.ChangeWorkerStatus(workerName, status) + + logrus.Infof("worker '%s' status changed to '%s'", workerName, status) + // After changing the worker status, we may need to reschedule tasks + s.TryScheduleNextTasks(true) + + return true, nil } -func (s *ScheduleBl) CloseCurrent(taskId int32) error { - logrus.Infof("invoke dispatch when task '%d' is closed", taskId) - s.dispatch() +// ******** START TASK ******** +func (s *ScheduleBl) waitingStartedTask() { + for taskInfo := range s.startChan { + if taskInfo == nil { + logrus.Warnf("recieved a nil task from startChan") + continue + } - return nil + logrus.Infof("chan received task '%d' to start", taskInfo.ID) + s.handleStartTask(taskInfo) + } } +// now, start task! func (s *ScheduleBl) handleStartTask(taskInfo *structure.TaskInfo) { - agent, status, err := s.broker.ApplyAgent(taskInfo) + agent, status, err := s.resourceManager.GetAgentAndAssignTask(taskInfo) if err != nil { logrus.Errorf("apply agent error: %v", err) @@ -175,24 +408,6 @@ func (s *ScheduleBl) handleStartTask(taskInfo *structure.TaskInfo) { go s.startWaitingTask(agent, taskInfo) } -func (s *ScheduleBl) handleCancelTask(taskInfo *structure.TaskInfo) error { - logrus.Infof("received task '%d' to cancel", taskInfo.ID) - canceler, err := NewTaskCanceler(taskInfo) - if err != nil { - logrus.Errorf("failed to create new TaskCanceler err: %v", err) - taskMgr.SetError(taskInfo, err.Error()) - return err - } - - if err := canceler.CancelTask(); err != nil { - logrus.Errorf("failed to cancel task '%d', caused by: %v", taskInfo.ID, err) - taskMgr.SetError(taskInfo, err.Error()) - return err - } - - return nil -} - func (s *ScheduleBl) startWaitingTask(agent *schedules.Agent, taskInfo *structure.TaskInfo) { logrus.Infof("starting a task, id: %v, type: %v, graph: %v", taskInfo.ID, taskInfo.Type, taskInfo.GraphName) @@ -202,6 +417,7 @@ func (s *ScheduleBl) startWaitingTask(agent *schedules.Agent, taskInfo *structur } }() + // TODO: Is here need a lock? TOCTTOU if taskInfo.State != structure.TaskStateWaiting { logrus.Errorf("task state is not in 'Waiting' state, taskID: %v", taskInfo) return @@ -222,68 +438,115 @@ func (s *ScheduleBl) startWaitingTask(agent *schedules.Agent, taskInfo *structur taskInfo.StartTime = time.Now() err = taskStarter.StartTask() + + // only for test or debug, record the task start sequence + if err := s.taskManager.AddTaskStartSequence(taskInfo.ID); err != nil { + logrus.Errorf("failed to add task '%d' to start sequence: %v", taskInfo.ID, err) + } + if err != nil { logrus.Errorf("failed to start a task, type: %s, taskID: %d, caused by: %v", taskInfo.Type, taskInfo.ID, err) taskMgr.SetError(taskInfo, err.Error()) } - } -func (s *ScheduleBl) dispatch() { - defer func() { - if err := recover(); err != nil { - logrus.Errorln("dispatch() has been recovered:", err) +// ********* CANCEL TASK ******** +// handle cancel task +// need to cancel cron task +func (s *ScheduleBl) CancelTask(taskInfo *structure.TaskInfo) error { + if taskInfo == nil { + return errors.New("the argument `taskInfo` is nil") + } + + defer s.Unlock(s.Lock()) + + isHeadTask := s.taskManager.IsTaskOngoing(taskInfo.ID) + task := s.taskManager.RemoveTask(taskInfo.ID) + s.cronManager.DeleteTask(taskInfo.ID) + // err := s.taskManager.CancelTask(taskInfo) + isInQueue := false + if task != nil { + logrus.Infof("removed task '%d' from space queue", taskInfo.ID) + isInQueue = true + } + + if isInQueue && !isHeadTask { + if err := taskMgr.SetState(taskInfo, structure.TaskStateCanceled); err != nil { + return err } - }() - if err := s.doDispatch(); err != nil { - logrus.Errorf("do dispatching error:%v", err) + logrus.Infof("set task '%d' to TaskStateCanceled", taskInfo.ID) + } else { + logrus.Infof("sending task '%d' to task canceler", taskInfo.ID) + return s.handleCancelTask(taskInfo) } + + return nil } -func (s *ScheduleBl) doDispatch() error { - if s.isDispatchPaused { - logrus.Warn("the dispatching was paused") - return nil +func (s *ScheduleBl) handleCancelTask(taskInfo *structure.TaskInfo) error { + logrus.Infof("received task '%d' to cancel", taskInfo.ID) + canceler, err := NewTaskCanceler(taskInfo) + if err != nil { + logrus.Errorf("failed to create new TaskCanceler err: %v", err) + taskMgr.SetError(taskInfo, err.Error()) + return err } - defer s.dispatchLocker.Unlock(s.dispatchLocker.Lock()) - - buffer := s.spaceQueue.HeadTasks() - if len(buffer) == 0 { - return nil + if err := canceler.CancelTask(); err != nil { + logrus.Errorf("failed to cancel task '%d', caused by: %v", taskInfo.ID, err) + taskMgr.SetError(taskInfo, err.Error()) + return err } - for _, task := range buffer { - select { - case s.startChan <- task: - default: - logrus.Warnf("the start channel is full, dropped task: %d", task.ID) - } + // set worker state to idle or concurrent running + s.resourceManager.ReleaseByTaskID(taskInfo.ID) + return nil +} + +func (s *ScheduleBl) CancelCronTask(taskInfo *structure.TaskInfo) error { + if taskInfo == nil { + return errors.New("the argument `taskInfo` is nil") } + s.cronManager.DeleteTask(taskInfo.ID) + return nil } -func (s *ScheduleBl) waitingTask() { - for taskInfo := range s.startChan { - if taskInfo == nil { - logrus.Warnf("recieved a nil task from startChan") - return - } +// ** Other Methods ** - logrus.Infof("chan received task '%d' to start", taskInfo.ID) - s.handleStartTask(taskInfo) - } +func (s *ScheduleBl) PeekSpaceTail(space string) *structure.TaskInfo { + return s.taskManager.GetLastTask(space) } -func (s *ScheduleBl) startTicker() { - // Create a ticker that triggers every 3 seconds - ticker := time.Tick(3 * time.Second) +func (s *ScheduleBl) IsDispatchPaused() bool { + // Implement logic to check if dispatching is paused + return s.algorithmManager.IsDispatchPaused() +} - for range ticker { - //logrus.Debug("Ticker ticked") - s.dispatch() - } +func (s *ScheduleBl) PauseDispatch() { + // Implement logic to pause dispatching + s.algorithmManager.PauseDispatch() +} + +func (s *ScheduleBl) ResumeDispatch() { + // Implement logic to resume dispatching + s.algorithmManager.ResumeDispatch() +} + +func (s *ScheduleBl) AllTasksInQueue() []*structure.TaskInfo { + // Implement logic to get all tasks in the queue + return s.taskManager.GetAllTasks() +} + +func (s *ScheduleBl) TasksInQueue(space string) []*structure.TaskInfo { + // Implement logic to get tasks in the queue for a specific space + return s.taskManager.GetTasksInQueue(space) +} + +func (s *ScheduleBl) TaskStartSequence(queryTasks []int32) []*structure.TaskInfo { + // Only for debug or test, get task start sequence + return s.taskManager.GetTaskStartSequence(queryTasks) } diff --git a/vermeer/apps/master/bl/task_bl.go b/vermeer/apps/master/bl/task_bl.go index fa3d5b589..49724fba6 100644 --- a/vermeer/apps/master/bl/task_bl.go +++ b/vermeer/apps/master/bl/task_bl.go @@ -21,6 +21,8 @@ import ( "errors" "fmt" "sort" + "strconv" + "strings" "time" "vermeer/apps/compute" @@ -62,6 +64,53 @@ func (tb *TaskBl) CreateTaskInfo( return nil, err } + // for scheduler + taskInfo.Priority = 0 + taskInfo.Preorders = make([]int32, 0) + taskInfo.Exclusive = true // default to true for now, can be set false by params + if params != nil { + if priority, ok := params["priority"]; ok { + if p, err := strconv.ParseInt(priority, 10, 32); err == nil { + if p < 0 { + return nil, fmt.Errorf("priority should be non-negative") + } + taskInfo.Priority = int32(p) + } else { + logrus.Warnf("priority convert to int32 error:%v", err) + return nil, err + } + } + if preorders, ok := params["preorders"]; ok { + preorderList := strings.Split(preorders, ",") + for _, preorder := range preorderList { + if pid, err := strconv.ParseInt(preorder, 10, 32); err == nil { + if taskMgr.GetTaskByID(int32(pid)) == nil { + return nil, fmt.Errorf("preorder task id %d not exists", pid) + } + taskInfo.Preorders = append(taskInfo.Preorders, int32(pid)) + } else { + logrus.Warnf("preorder convert to int32 error:%v", err) + return nil, err + } + } + } + if exclusive, ok := params["exclusive"]; ok { + if ex, err := strconv.ParseBool(exclusive); err == nil { + taskInfo.Exclusive = ex + } else { + logrus.Warnf("exclusive convert to bool error:%v", err) + return nil, err + } + } + if cronExpr, ok := params["cron_expr"]; ok { + if err := Scheduler.cronManager.CheckCronExpression(cronExpr); err != nil { + logrus.Warnf("cron_expr parse error:%v", err) + return nil, err + } + taskInfo.CronExpr = cronExpr + } + } + return taskInfo, nil } @@ -146,6 +195,9 @@ func (tb *TaskBl) CancelTask(taskID int32) error { return fmt.Errorf("cannot cancel the task with id '%v' as it was not created by you", taskID) } + // stop the cron job if exists + Scheduler.CancelCronTask(task) + if task.State == structure.TaskStateCanceled { return fmt.Errorf("task had been in state canceled") } @@ -206,13 +258,10 @@ func QueueExecuteTask(taskInfo *structure.TaskInfo) error { } func QueueExecuteTasks(tasksInfo []*structure.TaskInfo) []error { - defer Scheduler.Unlock(Scheduler.Lock()) - errs := make([]error, 0, len(tasksInfo)) for _, task := range tasksInfo { task.CreateType = structure.TaskCreateAsync - _, err := Scheduler.QueueTask(task) - errs = append(errs, err) } + _, errs := Scheduler.BatchQueueTask(tasksInfo) return errs } diff --git a/vermeer/apps/master/bl/task_creator.go b/vermeer/apps/master/bl/task_creator.go index ca3f946cd..a7353c85b 100644 --- a/vermeer/apps/master/bl/task_creator.go +++ b/vermeer/apps/master/bl/task_creator.go @@ -99,6 +99,28 @@ func (bc *baseCreator) NewTaskInfo(graphName string, params map[string]string, t return task, nil } +func (bc *baseCreator) CopyTaskInfo(src *structure.TaskInfo) (*structure.TaskInfo, error) { + if src == nil { + return nil, fmt.Errorf("the argument `src` should not be nil") + } + + task, err := taskMgr.CreateTask(src.SpaceName, src.Type, 0) + if err != nil { + return nil, err + } + + task.CreateType = structure.TaskCreateAsync + task.GraphName = src.GraphName + task.CreateUser = src.CreateUser + task.Params = src.Params + task.CronExpr = "" // clear cron expression for the new task + task.Priority = src.Priority + task.Preorders = src.Preorders + task.Exclusive = src.Exclusive + + return task, nil +} + func (bc *baseCreator) saveTaskInfo(task *structure.TaskInfo) (*structure.TaskInfo, error) { if _, err := taskMgr.AddTask(task); err != nil { logrus.Errorf("failed to add a task to `TaskManager`, task: %v, cased by: %v", task, err) diff --git a/vermeer/apps/master/bl/worker_bl.go b/vermeer/apps/master/bl/worker_bl.go index e14d20c45..651e1e043 100644 --- a/vermeer/apps/master/bl/worker_bl.go +++ b/vermeer/apps/master/bl/worker_bl.go @@ -70,7 +70,7 @@ func (wb *WorkerBl) ReleaseWorker(workerName string) error { //taskInfo.SetErrMsg(fmt.Sprintf("worker %v is offline", workerName)) taskMgr.SetError(taskInfo, fmt.Sprintf("worker %v is offline", workerName)) logrus.Warnf("set task %v status:error", taskInfo.ID) - if err := Scheduler.CloseCurrent(taskInfo.ID); err != nil { + if err := Scheduler.CloseCurrent(taskInfo.ID, workerName); err != nil { logrus.Errorf("failed to close task with ID: %d,err:%v", taskInfo.ID, err) } break diff --git a/vermeer/apps/master/master_main.go b/vermeer/apps/master/master_main.go index 2da20a2fd..91cd33180 100644 --- a/vermeer/apps/master/master_main.go +++ b/vermeer/apps/master/master_main.go @@ -56,6 +56,7 @@ func Main() { services.SetUI(sen) logrus.Info("token-auth was activated") default: + services.SetAdminRouters(sen, auth.NoneAuthFilter) services.SetRouters(sen, auth.NoneAuthFilter) logrus.Warn("No authentication was activated.") } diff --git a/vermeer/apps/master/schedules/broker.go b/vermeer/apps/master/schedules/broker.go index fabdf415a..435219dc9 100644 --- a/vermeer/apps/master/schedules/broker.go +++ b/vermeer/apps/master/schedules/broker.go @@ -23,7 +23,7 @@ import ( "github.com/sirupsen/logrus" - . "vermeer/apps/master/workers" + "vermeer/apps/master/workers" ) type AgentStatus string @@ -72,42 +72,44 @@ func (b *Broker) AllAgents() []*Agent { return res } -func (b *Broker) ApplyAgent(taskInfo *structure.TaskInfo) (*Agent, AgentStatus, error) { +func (b *Broker) ApplyAgent(taskInfo *structure.TaskInfo, forceApply ...bool) (*Agent, AgentStatus, map[string]*workers.WorkerClient, error) { if taskInfo == nil { - return nil, AgentStatusError, fmt.Errorf("taskInfo is nil") + return nil, AgentStatusError, nil, fmt.Errorf("taskInfo is nil") } defer b.Unlock(b.Lock()) agent, workers, err := b.getAgent(taskInfo) if err != nil { - return nil, AgentStatusError, err + return nil, AgentStatusError, nil, err } if agent == nil { - return nil, AgentStatusPending, nil + return nil, AgentStatusPending, nil, nil } if workers == nil || len(workers) == 0 { - return nil, AgentStatusNoWorker, nil + return nil, AgentStatusNoWorker, nil, nil } if !b.isWorkersReady(workers) { logrus.Warnf("the workers of agent '%s' are not ready", agent.GroupName()) - return nil, AgentStatusWorkerNotReady, nil + return nil, AgentStatusWorkerNotReady, nil, nil } - if b.isAgentBusy(agent) { - return nil, AgentStatusAgentBusy, nil - } + if !(forceApply != nil && len(forceApply) > 0 && forceApply[0]) { + if b.isAgentBusy(agent) { + return nil, AgentStatusAgentBusy, nil, nil + } - if b.isWorkerBusy(workers, agent) { - return nil, AgentStatusWorkerBusy, nil + if b.isWorkerBusy(workers, agent) { + return nil, AgentStatusWorkerBusy, nil, nil + } } agent.AssignTask(taskInfo) - return agent, AgentStatusOk, nil + return agent, AgentStatusOk, workers, nil } // func (b *Broker) isAgentReady(taskInfo *structure.TaskInfo, agent *Agent) bool { @@ -124,7 +126,7 @@ func (b *Broker) ApplyAgent(taskInfo *structure.TaskInfo) (*Agent, AgentStatus, // } // } -func (b *Broker) isWorkersReady(workers map[string]*WorkerClient) bool { +func (b *Broker) isWorkersReady(workers map[string]*workers.WorkerClient) bool { ok := false for _, w := range workers { if w.Connection == nil { @@ -166,7 +168,7 @@ func (b *Broker) isAgentBusy(agent *Agent) bool { return busy } -func (b *Broker) isWorkerBusy(workers map[string]*WorkerClient, agent *Agent) bool { +func (b *Broker) isWorkerBusy(workers map[string]*workers.WorkerClient, agent *Agent) bool { for _, a := range b.agents { if a == agent { continue @@ -188,7 +190,7 @@ func (b *Broker) isWorkerBusy(workers map[string]*WorkerClient, agent *Agent) bo return false } -func (b *Broker) getAgent(taskInfo *structure.TaskInfo) (*Agent, map[string]*WorkerClient, error) { +func (b *Broker) getAgent(taskInfo *structure.TaskInfo) (*Agent, map[string]*workers.WorkerClient, error) { switch taskInfo.Type { case structure.TaskTypeLoad: fallthrough @@ -202,7 +204,7 @@ func (b *Broker) getAgent(taskInfo *structure.TaskInfo) (*Agent, map[string]*Wor } -func (b *Broker) getAgentFromGraph(taskInfo *structure.TaskInfo) (*Agent, map[string]*WorkerClient, error) { +func (b *Broker) getAgentFromGraph(taskInfo *structure.TaskInfo) (*Agent, map[string]*workers.WorkerClient, error) { graph := graphMgr.GetGraphByName(taskInfo.SpaceName, taskInfo.GraphName) if graph == nil { return nil, nil, fmt.Errorf("failed to retrieve graph with name: %s/%s", taskInfo.SpaceName, taskInfo.GraphName) @@ -223,7 +225,7 @@ func (b *Broker) getAgentFromGraph(taskInfo *structure.TaskInfo) (*Agent, map[st return nil, nil, nil // waiting for the next check } - workers := make(map[string]*WorkerClient) + workers := make(map[string]*workers.WorkerClient) for _, w := range graph.Workers { wc := workerMgr.GetWorker(w.Name) @@ -238,7 +240,7 @@ func (b *Broker) getAgentFromGraph(taskInfo *structure.TaskInfo) (*Agent, map[st } -func (b *Broker) getAgentFromWorker(taskInfo *structure.TaskInfo) (*Agent, map[string]*WorkerClient, error) { +func (b *Broker) getAgentFromWorker(taskInfo *structure.TaskInfo) (*Agent, map[string]*workers.WorkerClient, error) { group := workerMgr.ApplyGroup(taskInfo.SpaceName, taskInfo.GraphName) return b.retrieveAgent(group), workerMgr.GroupWorkerMap(group), nil } diff --git a/vermeer/apps/master/schedules/scheduler_algorithm_manager.go b/vermeer/apps/master/schedules/scheduler_algorithm_manager.go new file mode 100644 index 000000000..1d50a4509 --- /dev/null +++ b/vermeer/apps/master/schedules/scheduler_algorithm_manager.go @@ -0,0 +1,507 @@ +package schedules + +import ( + "slices" + "sort" + "strconv" + "time" + "vermeer/apps/common" + "vermeer/apps/structure" + + "github.com/sirupsen/logrus" +) + +/* +* @Description: SchedulerAlgorithm is the interface for the scheduler algorithm. +* @Note: This is the interface for the scheduler algorithm. + */ +type SchedulerAlgorithm interface { + // Name returns the name of the SchedulerAlgorithm + Name() string + // Init initializes the SchedulerAlgorithm + Init() + // FilterNextTasks filters the next tasks to be scheduled based on the provided parameters + FilterNextTasks(allTasks []*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups []string, concurrentWorkerGroups []string, softSchedule bool) ([]*structure.TaskInfo, error) + // ScheduleNextTasks schedules the next tasks based on the filtered tasks + ScheduleNextTasks(filteredTasks []*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups []string, concurrentWorkerGroups []string, softSchedule bool) ([]*structure.TaskInfo, error) +} + +/* +* @Description: SchedulerAlgorithmManager is the manager for the scheduler algorithm. +* @Note: This is the manager for the scheduler algorithm. + */ +type SchedulerAlgorithmManager struct { + filteredSchedulerAlgorithms map[string]SchedulerAlgorithm + scheduledSchedulerAlgorithms map[string]SchedulerAlgorithm + dispatchPaused bool +} + +/* +* @Description: Init initializes the SchedulerAlgorithmManager. +* @Note: This function will initialize the SchedulerAlgorithmManager. + */ +// Need to put DependsSchedulerAlgorithm before WaitingSchedulerAlgorithm +func (am *SchedulerAlgorithmManager) Init() { + am.filteredSchedulerAlgorithms = make(map[string]SchedulerAlgorithm) + am.scheduledSchedulerAlgorithms = make(map[string]SchedulerAlgorithm) + am.dispatchPaused = false + // Register filter and schedule algorithms + am.RegisterFilterAlgorithm(&DependsSchedulerAlgorithm{}) + am.RegisterFilterAlgorithm(&WaitingSchedulerAlgorithm{}) + // Register default SchedulerAlgorithms + am.RegisterSchedulerAlgorithm(&PriorityElderSchedulerAlgorithm{}) +} + +/* +* @Description: RegisterSchedulerAlgorithm registers the scheduler algorithm. +* @Note: This function will register the scheduler algorithm. +* @Param schedulerAlgorithm + */ +func (am *SchedulerAlgorithmManager) RegisterSchedulerAlgorithm(schedulerAlgorithm SchedulerAlgorithm) { + if schedulerAlgorithm == nil { + return + } + name := schedulerAlgorithm.Name() + if _, exists := am.scheduledSchedulerAlgorithms[name]; exists { + return // SchedulerAlgorithm already registered + } + + // only support one scheduling algorithm for now + if len(am.scheduledSchedulerAlgorithms) > 0 { + return // Only one scheduling algorithm can be registered + } + schedulerAlgorithm.Init() + am.scheduledSchedulerAlgorithms[name] = schedulerAlgorithm +} + +/* +* @Description: RegisterFilterAlgorithm registers the filter algorithm. +* @Note: This function will register the filter algorithm. +* @Param filterAlgorithm + */ +func (am *SchedulerAlgorithmManager) RegisterFilterAlgorithm(filterAlgorithm SchedulerAlgorithm) { + if filterAlgorithm == nil { + return + } + name := filterAlgorithm.Name() + if _, exists := am.filteredSchedulerAlgorithms[name]; exists { + return // SchedulerAlgorithm already registered + } + filterAlgorithm.Init() + am.filteredSchedulerAlgorithms[name] = filterAlgorithm +} + +/* +* @Description: IsDispatchPaused checks if the dispatch is paused. +* @Note: This function will check if the dispatch is paused. +* @Return bool + */ +func (am *SchedulerAlgorithmManager) IsDispatchPaused() bool { + return am.dispatchPaused +} + +/* +* @Description: PauseDispatch pauses the dispatch. +* @Note: This function will pause the dispatch. + */ +func (am *SchedulerAlgorithmManager) PauseDispatch() { + am.dispatchPaused = true +} + +/* +* @Description: ResumeDispatch resumes the dispatch. +* @Note: This function will resume the dispatch. + */ +func (am *SchedulerAlgorithmManager) ResumeDispatch() { + am.dispatchPaused = false +} + +/* +* @Description: ScheduleNextTasks schedules the next tasks. +* @Note: This function will schedule the next tasks. +* @Param allTasks +* @Param taskToWorkerGroupMap +* @Param idleWorkerGroups +* @Param concurrentWorkerGroups +* @Param softSchedule +* @Return []*structure.TaskInfo, error + */ +// For all tasks, filter and schedule them +// Only one scheduling algorithm is supported for now +func (am *SchedulerAlgorithmManager) ScheduleNextTasks(allTasks []*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups []string, concurrentWorkerGroups []string, softSchedule bool) ([]*structure.TaskInfo, error) { + if am.dispatchPaused { + return nil, nil // No tasks to schedule if dispatch is paused + } + + filteredTasks := allTasks + for _, algorithm := range am.filteredSchedulerAlgorithms { + var err error + filteredTasks, err = algorithm.FilterNextTasks(filteredTasks, taskToWorkerGroupMap, idleWorkerGroups, concurrentWorkerGroups, softSchedule) + if err != nil { + return nil, err + } + } + if len(filteredTasks) == 0 { + return nil, nil // No tasks to schedule after filtering + } + + // only support one scheduling algorithm for now + // get first algorithm + for _, algorithm := range am.scheduledSchedulerAlgorithms { + tasks, err := algorithm.ScheduleNextTasks(filteredTasks, taskToWorkerGroupMap, idleWorkerGroups, concurrentWorkerGroups, softSchedule) + if err != nil { + return nil, err + } + return tasks, nil // Return the scheduled tasks + } + + return nil, nil // No tasks scheduled +} + +type FIFOSchedulerAlgorithm struct{} + +func (f *FIFOSchedulerAlgorithm) Name() string { + return "FIFO" +} + +func (f *FIFOSchedulerAlgorithm) Init() { + // No specific initialization needed for FIFO + logrus.Info("Initializing FIFOSchedulerAlgorithm") +} + +func (f *FIFOSchedulerAlgorithm) FilterNextTasks(allTasks []*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups []string, concurrentWorkerGroups []string, softSchedule bool) ([]*structure.TaskInfo, error) { + // just return the waiting tasks as is for FIFO + return allTasks, nil +} + +func (f *FIFOSchedulerAlgorithm) ScheduleNextTasks(allTasks []*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups []string, concurrentWorkerGroups []string, softSchedule bool) ([]*structure.TaskInfo, error) { + if len(allTasks) == 0 { + return nil, nil // No tasks to schedule + } + + // For FIFO, we simply return the available tasks in the order they are provided + for _, task := range allTasks { + if task.State != structure.TaskStateWaiting { + continue // Only consider tasks that are in the waiting state + } + if group, exists := taskToWorkerGroupMap[task.ID]; exists && group != "" { + // only support idle worker groups for now + for _, idleGroup := range idleWorkerGroups { + if group == idleGroup { + logrus.Debugf("Task %d is assigned to worker group %s", task.ID, group) + return []*structure.TaskInfo{task}, nil // Return the first task that can be scheduled + } + } + } + } + + return nil, nil +} + +type PrioritySchedulerAlgorithm struct{} + +func (p *PrioritySchedulerAlgorithm) Name() string { + return "Priority" +} + +func (p *PrioritySchedulerAlgorithm) Init() { + // No specific initialization needed for Priority + logrus.Info("Initializing PrioritySchedulerAlgorithm") +} + +func (p *PrioritySchedulerAlgorithm) FilterNextTasks(allTasks []*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups []string, concurrentWorkerGroups []string, softSchedule bool) ([]*structure.TaskInfo, error) { + // just return the waiting tasks as is for Priority + return allTasks, nil +} + +func (p *PrioritySchedulerAlgorithm) ScheduleNextTasks(allTasks []*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups []string, concurrentWorkerGroups []string, softSchedule bool) ([]*structure.TaskInfo, error) { + if len(allTasks) == 0 { + return nil, nil // No tasks to schedule + } + + // Sort tasks by priority (higher priority first) + sort.Slice(allTasks, func(i, j int) bool { + return allTasks[i].Priority > allTasks[j].Priority + }) + + for _, task := range allTasks { + if task.State != structure.TaskStateWaiting { + continue // Only consider tasks that are in the waiting state + } + if group, exists := taskToWorkerGroupMap[task.ID]; exists && group != "" { + // only support idle worker groups for now + for _, idleGroup := range idleWorkerGroups { + if group == idleGroup { + logrus.Debugf("Task %d is assigned to worker group %s", task.ID, group) + return []*structure.TaskInfo{task}, nil // Return the first task that can be scheduled + } + } + } + } + + return nil, nil +} + +type PriorityElderSchedulerAlgorithm struct { + ageParam int64 + priorityParam int64 + resourceParam int64 + randomValueParam int64 +} + +func (p *PriorityElderSchedulerAlgorithm) Name() string { + return "PriorityElder" +} + +func (p *PriorityElderSchedulerAlgorithm) Init() { + logrus.Info("Initializing PriorityElderSchedulerAlgorithm") + + // Initialize parameters with default values + defaultAgeParam := "1" + defaultPriorityParam := "1" + defaultResourceParam := "10000000000" + defaultRandomValueParam := "1" // Placeholder for any random value logic + + // Load parameters from configuration + ageParam := common.GetConfigDefault("priority_elder_age_param", defaultAgeParam).(string) + priorityParam := common.GetConfigDefault("priority_elder_priority_param", defaultPriorityParam).(string) + resourceParam := common.GetConfigDefault("priority_elder_resource_param", defaultResourceParam).(string) + randomValueParam := common.GetConfigDefault("priority_elder_random_value_param", defaultRandomValueParam).(string) + + ageParamInt, err := strconv.Atoi(ageParam) + if err != nil { + logrus.Errorf("failed to convert priority_elder_age_param to int: %v", err) + logrus.Infof("using default priority_elder_age_param: %s", defaultAgeParam) + ageParamInt, _ = strconv.Atoi(defaultAgeParam) + } + p.ageParam = int64(ageParamInt) + priorityParamInt, err := strconv.Atoi(priorityParam) + if err != nil { + logrus.Errorf("failed to convert priority_elder_priority_param to int: %v", err) + logrus.Infof("using default priority_elder_priority_param: %s", defaultPriorityParam) + priorityParamInt, _ = strconv.Atoi(defaultPriorityParam) + } + p.priorityParam = int64(priorityParamInt) + resourceParamInt, err := strconv.Atoi(resourceParam) + if err != nil { + logrus.Errorf("failed to convert priority_elder_resource_param to int: %v", err) + logrus.Infof("using default priority_elder_resource_param: %s", defaultResourceParam) + resourceParamInt, _ = strconv.Atoi(defaultResourceParam) + } + p.resourceParam = int64(resourceParamInt) + randomValueParamInt, err := strconv.Atoi(randomValueParam) + if err != nil { + logrus.Errorf("failed to convert priority_elder_random_value_param to int: %v", err) + logrus.Infof("using default priority_elder_random_value_param: %s", defaultRandomValueParam) + randomValueParamInt, _ = strconv.Atoi(defaultRandomValueParam) + } + p.randomValueParam = int64(randomValueParamInt) + + logrus.Infof("PriorityElderSchedulerAlgorithm initialized with parameters: ageParam=%d, priorityParam=%d, resourceParam=%d, randomValueParam=%d", p.ageParam, p.priorityParam, p.resourceParam, p.randomValueParam) +} + +func (p *PriorityElderSchedulerAlgorithm) FilterNextTasks(allTasks []*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups []string, concurrentWorkerGroups []string, softSchedule bool) ([]*structure.TaskInfo, error) { + // just return the waiting tasks as is for PriorityElder + return allTasks, nil +} + +func (p *PriorityElderSchedulerAlgorithm) CalculateTaskEmergency(task *structure.TaskInfo, taskToWorkerGroupMap map[int32]string, printValue bool) int64 { + // step 0: get params + ageParam := p.ageParam + priorityParam := p.priorityParam + resourceParam := p.resourceParam + randomValueParam := p.randomValueParam + // step 1: age + ageCost := ageParam * time.Since(task.CreateTime).Milliseconds() / 1000 // in seconds + // step 2: priority + priorityCost := priorityParam * int64(task.Priority) + // step 3: resource cost + graph := structure.GraphManager.GetGraphByName(task.SpaceName, task.GraphName) + resourceCost := int64(0) + if graph == nil { + resourceCost = resourceParam // if graph not found, use max resource cost + } else { + resourceCost = resourceParam / max(1, graph.VertexCount+graph.EdgeCount) // Avoid division by zero, ensure at least 1 + } + // step 4: some random value + randomValue := int64(randomValueParam) // Placeholder for any random value logic + if printValue { + logrus.Debugf("Task %d: Age Cost: %d, Priority Cost: %d, Resource Cost: %d, Random Value: %d", task.ID, ageCost, priorityCost, resourceCost, randomValue) + } + return ageCost + priorityCost + resourceCost + randomValue +} + +func (p *PriorityElderSchedulerAlgorithm) ScheduleNextTasks(allTasks []*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups []string, concurrentWorkerGroups []string, softSchedule bool) ([]*structure.TaskInfo, error) { + if len(allTasks) == 0 { + return nil, nil // No tasks to schedule + } + + // calculate emergency value for each task + taskEmergencies := make(map[int32]int64) + for _, task := range allTasks { + taskEmergencies[task.ID] = p.CalculateTaskEmergency(task, taskToWorkerGroupMap, false) + } + + // Sort tasks by priority (higher priority first) + sort.Slice(allTasks, func(i, j int) bool { + return taskEmergencies[allTasks[i].ID] > taskEmergencies[allTasks[j].ID] + }) + + for _, task := range allTasks { + logrus.Debugf("Task %d: Emergency Value: %d", task.ID, taskEmergencies[task.ID]) + } + + for _, task := range allTasks { + if task.State != structure.TaskStateWaiting { + continue // Only consider tasks that are in the waiting state + } + if group, exists := taskToWorkerGroupMap[task.ID]; exists && group != "" { + for _, idleGroup := range idleWorkerGroups { + if group == idleGroup { + logrus.Debugf("Task %d is assigned to worker group %s", task.ID, group) + return []*structure.TaskInfo{task}, nil // Return the first task that can be scheduled + } + } + // if allow concurrent running, check if the group is in concurrent worker groups + if !task.Exclusive { + for _, concurrentGroup := range concurrentWorkerGroups { + if group == concurrentGroup { + logrus.Debugf("Task %d is assigned to concurrent worker group %s", task.ID, group) + return []*structure.TaskInfo{task}, nil // Return the first task that can be scheduled + } + } + } + } + } + + return nil, nil +} + +type WaitingSchedulerAlgorithm struct{} + +func (w *WaitingSchedulerAlgorithm) Name() string { + return "Waiting" +} + +func (w *WaitingSchedulerAlgorithm) Init() { + // No specific initialization needed for Waiting + logrus.Info("Initializing WaitingSchedulerAlgorithm") +} + +func (w *WaitingSchedulerAlgorithm) FilterNextTasks(allTasks []*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups []string, concurrentWorkerGroups []string, softSchedule bool) ([]*structure.TaskInfo, error) { + waitingTasks := make([]*structure.TaskInfo, 0) + for _, task := range allTasks { + if task.State == structure.TaskStateWaiting { + waitingTasks = append(waitingTasks, task) + } + } + return waitingTasks, nil +} + +func (w *WaitingSchedulerAlgorithm) ScheduleNextTasks(allTasks []*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups []string, concurrentWorkerGroups []string, softSchedule bool) ([]*structure.TaskInfo, error) { + waitingTasks, err := w.FilterNextTasks(allTasks, taskToWorkerGroupMap, idleWorkerGroups, concurrentWorkerGroups, softSchedule) + if err != nil { + return nil, err + } + if len(waitingTasks) == 0 { + return nil, nil + } + for _, task := range waitingTasks { + if task.State != structure.TaskStateWaiting { + continue // Only consider tasks that are in the waiting state + } + if group, exists := taskToWorkerGroupMap[task.ID]; exists && group != "" { + // only support idle worker groups for now + for _, idleGroup := range idleWorkerGroups { + if group == idleGroup { + logrus.Debugf("Task %d is assigned to worker group %s", task.ID, group) + return []*structure.TaskInfo{task}, nil // Return the first task that can be scheduled + } + } + } + } + return nil, nil // No tasks scheduled +} + +type DependsSchedulerAlgorithm struct{} + +func (d *DependsSchedulerAlgorithm) Name() string { + return "Depends" +} + +func (d *DependsSchedulerAlgorithm) Init() { + // No specific initialization needed for Depends + logrus.Info("Initializing DependsSchedulerAlgorithm") +} + +func (d *DependsSchedulerAlgorithm) FilterNextTasks(allTasks []*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups []string, concurrentWorkerGroups []string, softSchedule bool) ([]*structure.TaskInfo, error) { + if len(allTasks) == 0 { + return nil, nil // No tasks to schedule + } + + sort.Slice(allTasks, func(i, j int) bool { + return allTasks[i].ID < allTasks[j].ID + }) + + taskIDs := make(map[int32]*structure.TaskInfo) + for _, task := range allTasks { + taskIDs[task.ID] = task + } + + filteredTasks := make([]*structure.TaskInfo, 0) + for _, task := range allTasks { + depends := task.Preorders + // Check if all dependencies are satisfied + allDepsSatisfied := true + for _, dep := range depends { + if depTask, exists := taskIDs[dep]; exists && depTask.State != structure.TaskStateComplete { + allDepsSatisfied = false + break + } + } + if allDepsSatisfied { + if group, exists := taskToWorkerGroupMap[task.ID]; exists && group != "" { + filteredTasks = append(filteredTasks, task) // Add to filtered tasks if dependencies are satisfied + } + } + } + return filteredTasks, nil +} + +func (d *DependsSchedulerAlgorithm) ScheduleNextTasks(allTasks []*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups []string, concurrentWorkerGroups []string, softSchedule bool) ([]*structure.TaskInfo, error) { + if len(allTasks) == 0 { + return nil, nil // No tasks to schedule + } + + sort.Slice(allTasks, func(i, j int) bool { + return allTasks[i].ID < allTasks[j].ID + }) + + allTaskIDs := make(map[int32]*structure.TaskInfo) + for _, task := range allTasks { + allTaskIDs[task.ID] = task + } + + for _, task := range allTasks { + depends := task.Preorders + // Check if all dependencies are satisfied + allDepsSatisfied := true + for _, dep := range depends { + if depTask, exists := allTaskIDs[dep]; exists && depTask.State != structure.TaskStateComplete { + allDepsSatisfied = false + break + } + } + if allDepsSatisfied { + if group, exists := taskToWorkerGroupMap[task.ID]; exists && group != "" { + // only support idle worker groups for now + if slices.Contains(idleWorkerGroups, group) { + logrus.Debugf("Task %d is assigned to worker group %s", task.ID, group) + return []*structure.TaskInfo{task}, nil // Return the first task that can be scheduled + } + } + } + } + + return nil, nil +} diff --git a/vermeer/apps/master/schedules/scheduler_cron_manager.go b/vermeer/apps/master/schedules/scheduler_cron_manager.go new file mode 100644 index 000000000..12ad5dbd7 --- /dev/null +++ b/vermeer/apps/master/schedules/scheduler_cron_manager.go @@ -0,0 +1,144 @@ +package schedules + +import ( + "errors" + "vermeer/apps/structure" + + "github.com/robfig/cron/v3" + "github.com/sirupsen/logrus" +) + +/* +* @Description: SchedulerCronManager is the manager for the scheduler cron. +* @Note: This is the manager for the scheduler cron. + */ +type SchedulerCronManager struct { + cronTasks map[int32][]*structure.TaskInfo // cron expression to TaskInfo. Origin task ID to copied tasks + crons map[int32][]*cron.Cron // cron expression to cron jobs + // queueTemplateHandler is a function that handles the task queue + queueTemplateHandler func(*structure.TaskInfo) (int32, error) +} + +/* +* @Description: Init initializes the SchedulerCronManager. +* @Note: This function will initialize the SchedulerCronManager. +* @Param queueTemplateHandler +* @Return *SchedulerCronManager + */ +func (t *SchedulerCronManager) Init(queueTemplateHandler func(*structure.TaskInfo) (int32, error)) *SchedulerCronManager { + t.cronTasks = make(map[int32][]*structure.TaskInfo) + t.crons = make(map[int32][]*cron.Cron) + t.queueTemplateHandler = queueTemplateHandler + return t +} + +/* +* @Description: CheckCronExpression checks the cron expression. +* @Note: This function will check the cron expression. +* @Param cronExpr +* @Return error + */ +func (t *SchedulerCronManager) CheckCronExpression(cronExpr string) error { + if cronExpr == "" { + return errors.New("cron expression is empty") + } + if _, err := cron.ParseStandard(cronExpr); err != nil { + logrus.Errorf("Failed to parse cron expression: %v", err) + return errors.New("invalid cron expression: " + err.Error()) + } + return nil +} + +/* +* @Description: AddCronTask adds the cron task. +* @Note: This function will add the cron task. +* @Param taskInfo +* @Return error + */ +func (t *SchedulerCronManager) AddCronTask(taskInfo *structure.TaskInfo) error { + if taskInfo == nil { + return errors.New("the argument `taskInfo` is nil") + } + + if taskInfo.CronExpr == "" { + return errors.New("the property `CronExpr` of taskInfo is empty") + } + + // add to cron tasks + cronJob := cron.New() + _, err := cronJob.AddFunc(taskInfo.CronExpr, func() { + if taskInfo == nil { + return + } + + // CREATE a new task from the original task, using taskbl, it is handled in queueTemplateHandler + // copy a new taskInfo + newID, err := t.queueTemplateHandler(taskInfo) + if err != nil { + logrus.Errorf("Failed to queue task %d in cron job: %v", taskInfo.ID, err) + return + } + logrus.Infof("Successfully queued task %d from cron job", newID) + }) + if err != nil { + logrus.Errorf("Failed to add cron job for task %d: %v", taskInfo.ID, err) + return err + } + t.cronTasks[taskInfo.ID] = append(t.cronTasks[taskInfo.ID], taskInfo) + t.crons[taskInfo.ID] = append(t.crons[taskInfo.ID], cronJob) + cronJob.Start() + logrus.Infof("Added cron task for task ID %d with expression %s", taskInfo.ID, taskInfo.CronExpr) + return nil +} + +/* +* @Description: DeleteTask deletes the cron task. +* @Note: This function will delete the cron task. +* @Param taskID +* @Return error + */ +func (t *SchedulerCronManager) DeleteTask(taskID int32) error { + if _, exists := t.cronTasks[taskID]; !exists { + return errors.New("task not found in cron tasks") + } + + for _, cronJob := range t.crons[taskID] { + cronJob.Stop() + } + delete(t.cronTasks, taskID) + delete(t.crons, taskID) + logrus.Infof("Deleted cron task for task ID %d", taskID) + return nil +} + +/* +* @Description: DeleteTaskByGraph deletes the cron task by graph. +* @Note: This function will delete the cron task by graph. +* @Param spaceName +* @Param graphName +* @Return error + */ +func (t *SchedulerCronManager) DeleteTaskByGraph(spaceName, graphName string) error { + if spaceName == "" || graphName == "" { + return errors.New("the argument `spaceName` or `graphName` is empty") + } + + var toDelete []int32 + for taskID, tasks := range t.cronTasks { + for _, task := range tasks { + if task.SpaceName == spaceName && task.GraphName == graphName { + toDelete = append(toDelete, taskID) + break + } + } + } + + for _, taskID := range toDelete { + if err := t.DeleteTask(taskID); err != nil { + logrus.Errorf("Failed to delete cron task for task ID %d: %v", taskID, err) + return err + } + } + logrus.Infof("Deleted cron tasks for space %s and graph %s", spaceName, graphName) + return nil +} diff --git a/vermeer/apps/master/schedules/scheduler_resource_manager.go b/vermeer/apps/master/schedules/scheduler_resource_manager.go new file mode 100644 index 000000000..485659f03 --- /dev/null +++ b/vermeer/apps/master/schedules/scheduler_resource_manager.go @@ -0,0 +1,241 @@ +package schedules + +import ( + "errors" + "vermeer/apps/structure" + + "github.com/sirupsen/logrus" +) + +/* +* @Description: WorkerOngoingStatus is the status of the worker ongoing. +* @Note: This is the status of the worker ongoing. + */ +type WorkerOngoingStatus string + +const ( + WorkerOngoingStatusIdle WorkerOngoingStatus = "idle" + WorkerOngoingStatusRunning WorkerOngoingStatus = "running" + WorkerOngoingStatusConcurrentRunning WorkerOngoingStatus = "concurrent_running" + WorkerOngoingStatusPaused WorkerOngoingStatus = "paused" + WorkerOngoingStatusDeleted WorkerOngoingStatus = "deleted" +) + +/* +* @Description: SchedulerResourceManager is the manager for the scheduler resource. +* @Note: This is the manager for the scheduler resource. + */ +type SchedulerResourceManager struct { + structure.MutexLocker + workerStatus map[string]WorkerOngoingStatus + workerGroupStatus map[string]WorkerOngoingStatus + runningWorkerGroupTasks map[string][]int32 // worker group name to list of running task IDs + // broker just responsible for communication with workers + // it can not apply tasks to workers directly + broker *Broker +} + +/* +* @Description: Init initializes the SchedulerResourceManager. +* @Note: This function will initialize the SchedulerResourceManager. + */ +func (rm *SchedulerResourceManager) Init() { + rm.workerStatus = make(map[string]WorkerOngoingStatus) + rm.workerGroupStatus = make(map[string]WorkerOngoingStatus) + rm.runningWorkerGroupTasks = make(map[string][]int32) + rm.broker = new(Broker).Init() +} + +/* +* @Description: ReleaseByTaskID releases the resource by task ID. +* @Note: This function will release the resource by task ID. +* @Param taskID + */ +func (rm *SchedulerResourceManager) ReleaseByTaskID(taskID int32) { + defer rm.Unlock(rm.Lock()) + + for workerGroup, status := range rm.workerGroupStatus { + if (status == WorkerOngoingStatusRunning || status == WorkerOngoingStatusConcurrentRunning) && rm.isTaskRunningOnWorkerGroup(workerGroup, taskID) { + delete(rm.workerGroupStatus, workerGroup) + if tasks, exists := rm.runningWorkerGroupTasks[workerGroup]; exists { + for i, id := range tasks { + if id == taskID { + rm.runningWorkerGroupTasks[workerGroup] = append(tasks[:i], tasks[i+1:]...) + if len(rm.runningWorkerGroupTasks[workerGroup]) == 0 { + delete(rm.runningWorkerGroupTasks, workerGroup) + } + break + } + } + } + if tasks, exists := rm.runningWorkerGroupTasks[workerGroup]; !exists || len(tasks) == 0 { + for _, worker := range workerMgr.GetGroupWorkers(workerGroup) { + rm.changeWorkerStatus(worker.Name, WorkerOngoingStatusIdle) + } + } else { + for _, worker := range workerMgr.GetGroupWorkers(workerGroup) { + rm.changeWorkerStatus(worker.Name, WorkerOngoingStatusConcurrentRunning) + } + } + } + } +} + +/* +* @Description: isTaskRunningOnWorkerGroup checks if the task is running on the worker group. +* @Note: This function will check if the task is running on the worker group. +* @Param workerGroup +* @Param taskID +* @Return bool + */ +func (rm *SchedulerResourceManager) isTaskRunningOnWorkerGroup(workerGroup string, taskID int32) bool { + if tasks, exists := rm.runningWorkerGroupTasks[workerGroup]; exists { + for _, id := range tasks { + if id == taskID { + return true + } + } + } + return false +} + +/* +* @Description: GetAgentAndAssignTask gets the agent and assigns the task. +* @Note: This function will get the agent and assigns the task. +* @Param taskInfo +* @Return *Agent, AgentStatus, error + */ +func (rm *SchedulerResourceManager) GetAgentAndAssignTask(taskInfo *structure.TaskInfo) (*Agent, AgentStatus, error) { + if taskInfo == nil { + return nil, AgentStatusError, errors.New("taskInfo is nil") + } + + defer rm.Unlock(rm.Lock()) + + agent, status, workers, err := rm.broker.ApplyAgent(taskInfo, !taskInfo.Exclusive) + if err != nil { + return nil, AgentStatusError, err + } + if agent == nil { + return nil, status, nil + } + + // Assign the task to the agent + agent.AssignTask(taskInfo) + + exclusive := taskInfo.Exclusive + runningStatus := WorkerOngoingStatusRunning + if _, exists := rm.runningWorkerGroupTasks[agent.GroupName()]; !exists && exclusive { + rm.runningWorkerGroupTasks[agent.GroupName()] = []int32{} + runningStatus = WorkerOngoingStatusRunning + rm.workerGroupStatus[agent.GroupName()] = runningStatus + } else { + runningStatus = WorkerOngoingStatusConcurrentRunning + rm.workerGroupStatus[agent.GroupName()] = runningStatus + } + rm.runningWorkerGroupTasks[agent.GroupName()] = append(rm.runningWorkerGroupTasks[agent.GroupName()], taskInfo.ID) + + for _, worker := range workers { + if worker == nil { + continue + } + rm.workerStatus[worker.Name] = runningStatus + } + + return agent, status, nil +} + +/* +* @Description: GetIdleWorkerGroups gets the idle worker groups. +* @Note: This function will get the idle worker groups. +* @Return []string + */ +func (rm *SchedulerResourceManager) GetIdleWorkerGroups() []string { + defer rm.Unlock(rm.Lock()) + + idleWorkerGroups := make([]string, 0) + for workerGroup, status := range rm.workerGroupStatus { + if status == WorkerOngoingStatusIdle { + idleWorkerGroups = append(idleWorkerGroups, workerGroup) + } + } + return idleWorkerGroups +} + +/* +* @Description: GetConcurrentWorkerGroups gets the concurrent worker groups. +* @Note: This function will get the concurrent worker groups. +* @Return []string + */ +func (rm *SchedulerResourceManager) GetConcurrentWorkerGroups() []string { + defer rm.Unlock(rm.Lock()) + + concurrentWorkerGroups := make([]string, 0) + for workerGroup, status := range rm.workerGroupStatus { + if status == WorkerOngoingStatusConcurrentRunning { + concurrentWorkerGroups = append(concurrentWorkerGroups, workerGroup) + } + } + return concurrentWorkerGroups +} + +/* +* @Description: changeWorkerStatus changes the worker status. +* @Note: This function will change the worker status. +* @Param workerName +* @Param status + */ +func (rm *SchedulerResourceManager) changeWorkerStatus(workerName string, status WorkerOngoingStatus) { + rm.workerStatus[workerName] = status + + if status == WorkerOngoingStatusIdle || status == WorkerOngoingStatusConcurrentRunning { + workerInfo := workerMgr.GetWorkerInfo(workerName) + + if workerInfo == nil { + logrus.Warnf("worker '%s' not found", workerName) + return + } + + // get worker group name + groupName := workerInfo.Group + if groupName != "" { + gws := workerMgr.GetGroupWorkers(groupName) + allIdle := true + allConcurrent := true + for _, w := range gws { + st := rm.workerStatus[w.Name] + if st != WorkerOngoingStatusIdle { + allIdle = false + } + if st != WorkerOngoingStatusConcurrentRunning { + allConcurrent = false + } + } + if allConcurrent || allIdle { + newStatus := WorkerOngoingStatusIdle + if allConcurrent { + newStatus = WorkerOngoingStatusConcurrentRunning + } + logrus.Debugf("Change worker group '%s' status to '%s' (derived from %d workers)", groupName, newStatus, len(gws)) + rm.changeWorkerGroupStatus(groupName, newStatus) + } + } + + } else if status == WorkerOngoingStatusDeleted { + delete(rm.workerStatus, workerName) + } + + // TODO: Other status changes can be handled here if needed +} + +func (rm *SchedulerResourceManager) changeWorkerGroupStatus(workerGroup string, status WorkerOngoingStatus) { + logrus.Infof("Change worker group '%s' status to '%s'", workerGroup, status) + rm.workerGroupStatus[workerGroup] = status +} + +// TODO: when sync task created, need to alloc worker? +func (rm *SchedulerResourceManager) ChangeWorkerStatus(workerName string, status WorkerOngoingStatus) { + defer rm.Unlock(rm.Lock()) + + rm.changeWorkerStatus(workerName, status) +} diff --git a/vermeer/apps/master/schedules/scheduler_task_manager.go b/vermeer/apps/master/schedules/scheduler_task_manager.go new file mode 100644 index 000000000..267c473d5 --- /dev/null +++ b/vermeer/apps/master/schedules/scheduler_task_manager.go @@ -0,0 +1,269 @@ +package schedules + +import ( + "errors" + "vermeer/apps/structure" + + "github.com/sirupsen/logrus" +) + +/* +* @Description: SchedulerTaskManager is the manager for the scheduler task. +* @Note: This is the manager for the scheduler task. + */ +type SchedulerTaskManager struct { + structure.MutexLocker + // This struct is responsible for managing tasks in the scheduling system. + // A map from task ID to TaskInfo can be used to track tasks. + allTaskMap map[int32]*structure.TaskInfo + allTaskQueue []*structure.TaskInfo + // For debug or test, get task start sequence + startTaskQueue []*structure.TaskInfo + // onGoingTasks + notCompleteTasks map[int32]*structure.TaskInfo + // A map from task ID to worker group can be used to track which worker group is handling which task. + taskToworkerGroupMap map[int32]string +} + +/* +* @Description: Init initializes the SchedulerTaskManager. +* @Note: This function will initialize the SchedulerTaskManager. +* @Return *SchedulerTaskManager + */ +func (t *SchedulerTaskManager) Init() *SchedulerTaskManager { + t.allTaskMap = make(map[int32]*structure.TaskInfo) + t.notCompleteTasks = make(map[int32]*structure.TaskInfo) + t.taskToworkerGroupMap = make(map[int32]string) + return t +} + +/* +* @Description: QueueTask queues the task. +* @Note: This function will queue the task. +* @Param taskInfo +* @Return bool, error + */ +func (t *SchedulerTaskManager) QueueTask(taskInfo *structure.TaskInfo) (bool, error) { + if taskInfo == nil { + return false, errors.New("the argument `taskInfo` is nil") + } + + if taskInfo.SpaceName == "" { + return false, errors.New("the property `SpaceName` of taskInfo is empty") + } + + defer t.Unlock(t.Lock()) + + // Add the task to the task map + t.allTaskMap[taskInfo.ID] = taskInfo + t.allTaskQueue = append(t.allTaskQueue, taskInfo) + t.notCompleteTasks[taskInfo.ID] = taskInfo + t.AssignGroup(taskInfo) + return true, nil +} + +/* +* @Description: RefreshTaskToWorkerGroupMap refreshes the task to worker group map. +* @Note: This function will refresh the task to worker group map. + */ +func (t *SchedulerTaskManager) RefreshTaskToWorkerGroupMap() { + defer t.Unlock(t.Lock()) + + for _, taskInfo := range t.GetAllTasksNotComplete() { + if taskInfo == nil { + continue + } + t.AssignGroup(taskInfo) + t.taskToworkerGroupMap[taskInfo.ID] = workerMgr.ApplyGroup(taskInfo.SpaceName, taskInfo.GraphName) + } +} + +// Only for debug or test, get task start sequence +/* +* @Description: AddTaskStartSequence adds the task start sequence. +* @Note: This function will add the task start sequence. +* @Param taskID +* @Return error + */ +func (t *SchedulerTaskManager) AddTaskStartSequence(taskID int32) error { + if _, exists := t.allTaskMap[taskID]; !exists { + return errors.New("task not found") + } + t.startTaskQueue = append(t.startTaskQueue, t.allTaskMap[taskID]) + return nil +} + +/* +* @Description: RemoveTask removes the task. +* @Note: This function will remove the task. +* @Param taskID +* @Return error + */ +func (t *SchedulerTaskManager) RemoveTask(taskID int32) error { + if _, exists := t.allTaskMap[taskID]; !exists { + return errors.New("task not found") + } + defer t.Unlock(t.Lock()) + delete(t.allTaskMap, taskID) + // remove from queue + for i, task := range t.allTaskQueue { + if task.ID == taskID { + t.allTaskQueue = append(t.allTaskQueue[:i], t.allTaskQueue[i+1:]...) + break + } + } + delete(t.taskToworkerGroupMap, taskID) + delete(t.notCompleteTasks, taskID) + return nil +} + +/* +* @Description: MarkTaskComplete marks the task complete. +* @Note: This function will mark the task complete. +* @Param taskID +* @Return error + */ +func (t *SchedulerTaskManager) MarkTaskComplete(taskID int32) error { + if _, exists := t.allTaskMap[taskID]; !exists { + return errors.New("task not found") + } + defer t.Unlock(t.Lock()) + delete(t.notCompleteTasks, taskID) + return nil +} + +// update or create a task in the task map +/* +* @Description: AssignGroup assigns the group. +* @Note: This function will assign the group. +* @Param taskInfo +* @Return error + */ +func (t *SchedulerTaskManager) AssignGroup(taskInfo *structure.TaskInfo) error { + group := workerMgr.ApplyGroup(taskInfo.SpaceName, taskInfo.GraphName) + if group == "" { + return errors.New("failed to assign group for task") + } + t.taskToworkerGroupMap[taskInfo.ID] = group + return nil +} + +/* +* @Description: GetTaskByID gets the task by ID. +* @Note: This function will get the task by ID. +* @Param taskID +* @Return *structure.TaskInfo, error + */ +func (t *SchedulerTaskManager) GetTaskByID(taskID int32) (*structure.TaskInfo, error) { + task, exists := t.allTaskMap[taskID] + if !exists { + return nil, errors.New("task not found") + } + return task, nil +} + +/* +* @Description: GetLastTask gets the last task. +* @Note: This function will get the last task. +* @Param spaceName +* @Return *structure.TaskInfo + */ +func (t *SchedulerTaskManager) GetLastTask(spaceName string) *structure.TaskInfo { + // Implement logic to get the last task in the queue for the given space + if len(t.allTaskQueue) == 0 { + return nil + } + for i := len(t.allTaskQueue) - 1; i >= 0; i-- { + if t.allTaskQueue[i].SpaceName == spaceName { + return t.allTaskQueue[i] + } + } + return nil +} + +/* +* @Description: GetAllTasks gets all tasks. +* @Note: This function will get all tasks. +* @Return []*structure.TaskInfo + */ +func (t *SchedulerTaskManager) GetAllTasks() []*structure.TaskInfo { + tasks := make([]*structure.TaskInfo, 0, len(t.allTaskMap)) + for _, task := range t.allTaskMap { + tasks = append(tasks, task) + } + return tasks +} + +func (t *SchedulerTaskManager) GetAllTasksNotComplete() []*structure.TaskInfo { + tasks := make([]*structure.TaskInfo, 0, len(t.allTaskMap)) + for _, task := range t.notCompleteTasks { + tasks = append(tasks, task) + } + return tasks +} + +func (t *SchedulerTaskManager) GetAllTasksWaitng() []*structure.TaskInfo { + tasks := make([]*structure.TaskInfo, 0, len(t.allTaskMap)) + for _, task := range t.GetAllTasksNotComplete() { + if task.State == structure.TaskStateWaiting { + tasks = append(tasks, task) + } + } + return tasks +} + +func (t *SchedulerTaskManager) GetTasksInQueue(space string) []*structure.TaskInfo { + tasks := make([]*structure.TaskInfo, 0) + for _, task := range t.GetAllTasksNotComplete() { + if task.SpaceName == space { + tasks = append(tasks, task) + } + } + return tasks +} + +// Only for debug or test, get task start sequence +func (t *SchedulerTaskManager) GetTaskStartSequence(queryTasks []int32) []*structure.TaskInfo { + if len(t.startTaskQueue) == 0 { + return nil + } + if len(queryTasks) == 0 { + return t.startTaskQueue + } + tasks := make([]*structure.TaskInfo, 0, len(queryTasks)) + taskSet := make(map[int32]struct{}) + for _, id := range queryTasks { + taskSet[id] = struct{}{} + } + for _, task := range t.startTaskQueue { + if _, exists := taskSet[task.ID]; exists { + tasks = append(tasks, task) + } + } + logrus.Infof("GetTaskStartSequence: return %d tasks", len(tasks)) + for _, task := range tasks { + logrus.Debugf("TaskID: %d", task.ID) + } + return tasks +} + +func (t *SchedulerTaskManager) GetTaskToWorkerGroupMap() map[int32]string { + // Return a copy of the worker group map to avoid external modifications + taskNotComplete := t.GetAllTasksNotComplete() + groupMap := make(map[int32]string, len(taskNotComplete)) + for _, task := range taskNotComplete { + if group, exists := t.taskToworkerGroupMap[task.ID]; exists { + groupMap[task.ID] = group + } + } + return groupMap +} + +func (t *SchedulerTaskManager) IsTaskOngoing(taskID int32) bool { + // Check if the task is currently ongoing + task, exists := t.allTaskMap[taskID] + if !exists { + return false + } + return task.State == structure.TaskStateCreated +} diff --git a/vermeer/apps/master/services/http_tasks.go b/vermeer/apps/master/services/http_tasks.go index 03130ca80..de4c5eaed 100644 --- a/vermeer/apps/master/services/http_tasks.go +++ b/vermeer/apps/master/services/http_tasks.go @@ -112,7 +112,7 @@ func handleTaskCreation(ctx *gin.Context, exeFunc func(*structure.TaskInfo) erro } filteredTask := taskBiz(ctx).FilteringTask(task) - ctx.JSON(http.StatusOK, TaskResp{Task: taskInfo2TaskJson(filteredTask)}) + ctx.JSON(http.StatusOK, TaskCreateResponse{Task: taskInfo2TaskJson(filteredTask)}) } type TaskCreateBatchHandler struct { @@ -231,3 +231,33 @@ func (ch *ComputeValueHandler) GET(ctx *gin.Context) { ctx.JSON(http.StatusOK, resp) } + +type TaskStartSequenceHandler struct { + common.SenHandler +} + +type TaskStartSequenceRequest struct { + QueryTasks []int32 `json:"query_tasks,omitempty"` +} + +type TaskStartSequenceResp struct { + common.BaseResp + Sequence []int32 `json:"sequence,omitempty"` +} + +func (tsh *TaskStartSequenceHandler) POST(ctx *gin.Context) { + req := TaskStartSequenceRequest{} + err := ctx.BindJSON(&req) + if isBad(err != nil, ctx, func() string { return fmt.Sprintf("request body not correct: %s", err) }) { + return + } + + r := Scheduler.TaskStartSequence(req.QueryTasks) + + sequence := make([]int32, 0, 1) + for _, task := range r { + sequence = append(sequence, int32(task.ID)) + } + + ctx.JSON(http.StatusOK, TaskStartSequenceResp{Sequence: sequence, BaseResp: common.BaseResp{ErrCode: 0, Message: "ok"}}) +} diff --git a/vermeer/apps/master/services/router.go b/vermeer/apps/master/services/router.go index e1000d25c..ac01e8340 100644 --- a/vermeer/apps/master/services/router.go +++ b/vermeer/apps/master/services/router.go @@ -35,12 +35,13 @@ func SetRouters(sen *common.Sentinel, authFilters ...gin.HandlerFunc) { // /tasks regVerAPI(sen, 1, "/tasks", map[string]common.BaseHandler{ - "": &TasksHandler{}, - "/create": &TaskCreateHandler{}, - // "/create/batch": &TaskCreateBatchHandler{}, + "": &TasksHandler{}, + "/create": &TaskCreateHandler{}, + "/create/batch": &TaskCreateBatchHandler{}, "/create/sync": &TaskCreateSyncHandler{}, "/oltp": &OltpHandler{}, "/value/:task_id": &ComputeValueHandler{}, + "/start_sequence": &TaskStartSequenceHandler{}, }, authFilters...) // /task diff --git a/vermeer/apps/master/workers/worker_manager.go b/vermeer/apps/master/workers/worker_manager.go index bfaec0b8d..f2ea2fb62 100644 --- a/vermeer/apps/master/workers/worker_manager.go +++ b/vermeer/apps/master/workers/worker_manager.go @@ -577,6 +577,10 @@ func (wm *workerManager) getGroupWorkers(workerGroup string) []*WorkerClient { return workers } +func (wm *workerManager) GetGroupWorkers(workerGroup string) []*WorkerClient { + return wm.getGroupWorkers(workerGroup) +} + func (wm *workerManager) getGroupWorkerMap(workerGroup string) map[string]*WorkerClient { workerMap := make(map[string]*WorkerClient) diff --git a/vermeer/apps/structure/task.go b/vermeer/apps/structure/task.go index 87356f2bf..eb0000699 100644 --- a/vermeer/apps/structure/task.go +++ b/vermeer/apps/structure/task.go @@ -55,6 +55,12 @@ type TaskInfo struct { wg *sync.WaitGroup Action int32 StatisticsResult map[string]any + + // for scheduler + Priority int32 + Preorders []int32 + Exclusive bool // whether the task can be executed concurrently with other tasks + CronExpr string // cron expression for scheduling } func (ti *TaskInfo) SetState(state TaskState) { diff --git a/vermeer/client/client.go b/vermeer/client/client.go index d9d1a17f9..34553aa3e 100644 --- a/vermeer/client/client.go +++ b/vermeer/client/client.go @@ -150,6 +150,26 @@ func (vc *VermeerClient) GetWorkers() (*WorkersResponse, error) { return workersResp, err } +func (vc *VermeerClient) AllocGroupGraph(graphName string, groupName string) (bool, error) { + reader, err := Request2Reader(struct{}{}) + if err != nil { + return false, err + } + resp, err := vc.post(vc.httpAddr+"/admin/workers/alloc/"+groupName+"/$DEFAULT/"+graphName, reader) + if err != nil { + return false, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + respByte, err := ParseResponse2Byte(resp) + if err != nil { + return false, err + } + return false, fmt.Errorf("response:%s", string(respByte)) + } + return true, nil +} + func (vc *VermeerClient) GetMaster() (*MasterResponse, error) { resp, err := vc.get(vc.httpAddr + "/master") if err != nil { @@ -228,6 +248,31 @@ func (vc *VermeerClient) CreateTaskSync(request TaskCreateRequest) (*TaskRespons return taskResponse, err } +func (vc *VermeerClient) CreateTaskBatch(request TaskCreateBatchRequest) (*TaskBatchCreateResponse, error) { + reader, err := Request2Reader(request) + if err != nil { + return nil, err + } + resp, err := vc.post(vc.httpAddr+"/tasks/create/batch", reader) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + respByte, err := ParseResponse2Byte(resp) + if err != nil { + return nil, err + } + return nil, fmt.Errorf("response:%s", string(respByte)) + } + taskResp := &TaskBatchCreateResponse{} + err = ParseResponse2Any(resp, taskResp) + if err != nil { + return nil, err + } + return taskResp, err +} + func (vc *VermeerClient) GetTasks() (*TasksResponse, error) { resp, err := vc.get(vc.httpAddr + "/tasks") if err != nil { @@ -270,6 +315,31 @@ func (vc *VermeerClient) GetTask(taskID int) (*TaskResponse, error) { return taskResp, err } +func (vc *VermeerClient) GetTaskStartSequence(queryTasks []int32) (*TaskStartSequenceResp, error) { + reader, err := Request2Reader(TaskStartSequenceRequest{QueryTasks: queryTasks}) + if err != nil { + return nil, err + } + resp, err := vc.post(vc.httpAddr+"/tasks/start_sequence", reader) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + respByte, err := ParseResponse2Byte(resp) + if err != nil { + return nil, err + } + return nil, fmt.Errorf("response:%s", string(respByte)) + } + taskResp := &TaskStartSequenceResp{} + err = ParseResponse2Any(resp, taskResp) + if err != nil { + return nil, err + } + return taskResp, err +} + func (vc *VermeerClient) GetEdges(graphName string, vertexID string, direction string) (*EdgesResponse, error) { resp, err := vc.get(vc.httpAddr + "/graphs/" + graphName + "/edges?vertex_id=" + vertexID + "&direction=" + direction) if err != nil { diff --git a/vermeer/client/structure.go b/vermeer/client/structure.go index ceeead394..ee233b42f 100644 --- a/vermeer/client/structure.go +++ b/vermeer/client/structure.go @@ -38,6 +38,17 @@ type TaskResponse struct { Task TaskInfo `json:"task,omitempty"` } +type TaskStartSequenceRequest struct { + QueryTasks []int32 `json:"query_tasks,omitempty"` +} + +type TaskStartSequenceResp struct { + BaseResponse + Sequence []int32 `json:"sequence,omitempty"` +} + +type TaskBatchCreateResponse []TaskResponse + type TaskInfo struct { ID int32 `json:"id,omitempty"` Status string `json:"status,omitempty"` @@ -161,6 +172,8 @@ type TaskCreateRequest struct { Params map[string]string `json:"params"` } +type TaskCreateBatchRequest []TaskCreateRequest + type GraphCreateRequest struct { Name string `json:"name,omitempty"` } diff --git a/vermeer/config/master.ini b/vermeer/config/master.ini index 8a7adb133..34f1859c9 100644 --- a/vermeer/config/master.ini +++ b/vermeer/config/master.ini @@ -25,3 +25,4 @@ task_parallel_num=1 auth=none auth_token_factor=1234 start_chan_size=10 +ticker_interval=1 diff --git a/vermeer/config/worker04.ini b/vermeer/config/worker04.ini new file mode 100644 index 000000000..8b341c8f4 --- /dev/null +++ b/vermeer/config/worker04.ini @@ -0,0 +1,23 @@ +; Licensed to the Apache Software Foundation (ASF) under one or more +; contributor license agreements. See the NOTICE file distributed with +; this work for additional information regarding copyright ownership. +; The ASF licenses this file to You under the Apache License, Version 2.0 +; (the "License"); you may not use this file except in compliance with +; the License. You may obtain a copy of the License at +; +; http://www.apache.org/licenses/LICENSE-2.0 +; +; Unless required by applicable law or agreed to in writing, software +; distributed under the License is distributed on an "AS IS" BASIS, +; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +; See the License for the specific language governing permissions and +; limitations under the License. + +[default] +log_level=info +debug_mode=release +http_peer=0.0.0.0:6988 +grpc_peer=0.0.0.0:6989 +master_peer=127.0.0.1:6689 +run_mode=worker +worker_group=test \ No newline at end of file diff --git a/vermeer/docker-compose.yaml b/vermeer/docker-compose.yaml new file mode 100644 index 000000000..35a506170 --- /dev/null +++ b/vermeer/docker-compose.yaml @@ -0,0 +1,29 @@ +version: '3.8' + +services: + vermeer-master: + image: hugegraph/vermeer + container_name: vermeer-master + volumes: + - ~/:/go/bin/config # Change here to your actual config path + command: --env=master + networks: + vermeer_network: + ipv4_address: 172.20.0.10 # Assign a static IP for the master + + vermeer-worker: + image: hugegraph/vermeer + container_name: vermeer-worker + volumes: + - ~/:/go/bin/config # Change here to your actual config path + command: --env=worker + networks: + vermeer_network: + ipv4_address: 172.20.0.11 # Assign a static IP for the worker + +networks: + vermeer_network: + driver: bridge + ipam: + config: + - subnet: 172.20.0.0/24 # Define the subnet for your network \ No newline at end of file diff --git a/vermeer/go.mod b/vermeer/go.mod index 0ba852b4b..7d8542843 100644 --- a/vermeer/go.mod +++ b/vermeer/go.mod @@ -72,6 +72,7 @@ require ( github.com/prometheus/client_model v0.2.1-0.20210607210712-147c58e9608a // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect + github.com/robfig/cron/v3 v3.0.0 // indirect github.com/shurcooL/httpfs v0.0.0-20230704072500-f1e31cf0ba5c // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.12 // indirect diff --git a/vermeer/go.sum b/vermeer/go.sum index c30b27ba5..743516024 100644 --- a/vermeer/go.sum +++ b/vermeer/go.sum @@ -395,6 +395,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= +github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E= +github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= diff --git a/vermeer/test/functional/compute_base.go b/vermeer/test/functional/compute_base.go index 256c72941..745fd7a57 100644 --- a/vermeer/test/functional/compute_base.go +++ b/vermeer/test/functional/compute_base.go @@ -97,6 +97,103 @@ func (ctb *ComputeTaskBase) SendComputeReqAsync(params map[string]string) { require.Equal(ctb.t, "complete", taskResp.Task.Status) } +/* +* @Description: SendComputeReqAsyncNotWait sends a compute request asynchronously and returns the task ID. +* @Param params +* @Return int32 + */ +func (ctb *ComputeTaskBase) SendComputeReqAsyncNotWait(params map[string]string) int32 { + //create Compute Task + resp, err := ctb.masterHttp.CreateTaskAsync(client.TaskCreateRequest{ + TaskType: "compute", + GraphName: ctb.graphName, + Params: params, + }) + require.NoError(ctb.t, err) + return int32(resp.Task.ID) +} + +/* +* @Description: SendComputeReqAsyncNotWaitWithError sends a compute request asynchronously and returns the task ID and error. +* @Param params +* @Return int32, error + */ +func (ctb *ComputeTaskBase) SendComputeReqAsyncNotWaitWithError(params map[string]string) (int32, error) { + //create Compute Task + resp, err := ctb.masterHttp.CreateTaskAsync(client.TaskCreateRequest{ + TaskType: "compute", + GraphName: ctb.graphName, + Params: params, + }) + if err != nil { + return -1, err + } + return int32(resp.Task.ID), nil +} + +/* +* @Description: SendComputeReqAsyncBatchPriority sends a compute request asynchronously and returns the task ID and sequence. +* @Note: This function will block the main thread until all tasks are completed. +* @Param params +* @Return []int32, []int32 + */ +func (ctb *ComputeTaskBase) SendComputeReqAsyncBatchPriority(params []map[string]string) ([]int32, []int32) { + //create Compute Task + tasks := make([]client.TaskInfo, 0, len(params)) + taskIds := make([]int32, 0, len(params)) + createTasksParams := client.TaskCreateBatchRequest{} + for i := 0; i < len(params); i++ { + graph := ctb.graphName + if params[i]["graph_name"] != "" { + graph = params[i]["graph_name"] + } + createTasksParams = append(createTasksParams, client.TaskCreateRequest{ + TaskType: "compute", + GraphName: graph, + Params: params[i], + }) + } + resp, err := ctb.masterHttp.CreateTaskBatch(createTasksParams) + require.NoError(ctb.t, err) + + for i, r := range *resp { + if r.BaseResponse.ErrCode != 0 { + ctb.t.Fatalf("create compute task %d failed: %s", i, r.BaseResponse.Message) + } + tasks = append(tasks, r.Task) + taskIds = append(taskIds, r.Task.ID) + } + + for i := 0; i < len(params); i++ { + ctb.taskID = int(tasks[i].ID) + //若成功启动Compute Task,开始轮询tasksGet,解析response,得到状态为完成时break。 + var taskResp *client.TaskResponse + var err error + for attempt := 0; attempt < ctb.waitSecond; attempt++ { + ctb.healthCheck.DoHealthCheck() + taskResp, err = ctb.masterHttp.GetTask(ctb.taskID) + require.NoError(ctb.t, err) + if taskResp.Task.Status == "complete" { + break + } + require.NotEqual(ctb.t, "error", taskResp.Task.Status) + time.Sleep(1 * time.Second) + } + require.Equal(ctb.t, "complete", taskResp.Task.Status) + fmt.Printf("Compute Task %d completed successfully\n", ctb.taskID) + } + + response, err := ctb.masterHttp.GetTaskStartSequence(taskIds) + require.NoError(ctb.t, err) + sequence := response.Sequence + for i, id := range sequence { + fmt.Printf("Task %d started at position %d in the sequence\n", id, i+1) + } + require.Equal(ctb.t, len(taskIds), len(sequence)) + + return taskIds, sequence +} + // SendComputeReqSync // // @Description: 发送Http请求,无需重写,同步请求 diff --git a/vermeer/test/functional/compute_task.go b/vermeer/test/functional/compute_task.go index 14ee5a52b..08d65fdcb 100644 --- a/vermeer/test/functional/compute_task.go +++ b/vermeer/test/functional/compute_task.go @@ -36,6 +36,9 @@ type ComputeTask interface { masterHttp *client.VermeerClient, t *testing.T, healthCheck *HealthCheck) TaskComputeBody() map[string]string SendComputeReqAsync(params map[string]string) + SendComputeReqAsyncNotWait(params map[string]string) int32 + SendComputeReqAsyncNotWaitWithError(params map[string]string) (int32, error) + SendComputeReqAsyncBatchPriority(params []map[string]string) ([]int32, []int32) SendComputeReqSync(params map[string]string) LoadComputeRes() ([]interface{}, error) CheckRes() diff --git a/vermeer/test/functional/http_interface.go b/vermeer/test/functional/http_interface.go index 2869db146..199261e86 100644 --- a/vermeer/test/functional/http_interface.go +++ b/vermeer/test/functional/http_interface.go @@ -76,6 +76,22 @@ func (ct CancelTask) CancelTask(t *testing.T, master *client.VermeerClient, grap require.Equal(t, "canceled", task.Task.Status) } +/* +* @Description: DirectCancelTask cancels a task directly. +* @Param t +* @Param master +* @Param taskID + */ +func (ct CancelTask) DirectCancelTask(t *testing.T, master *client.VermeerClient, taskID int32) { + ok, err := master.GetTaskCancel(int(taskID)) + require.NoError(t, err) + require.Equal(t, true, ok) + + task, err := master.GetTask(int(taskID)) + require.NoError(t, err) + require.Equal(t, "canceled", task.Task.Status) +} + type GetGraphs struct { } diff --git a/vermeer/test/functional/load_local.go b/vermeer/test/functional/load_local.go index 8da57f048..52575bb1f 100644 --- a/vermeer/test/functional/load_local.go +++ b/vermeer/test/functional/load_local.go @@ -21,6 +21,9 @@ package functional import ( "math/rand" + "strconv" + + "github.com/sirupsen/logrus" ) type LoadTaskLocal struct { @@ -43,3 +46,28 @@ func (lt *LoadTaskLocal) TaskLoadBody() map[string]string { "load.vertex_backend": vertexBackends[rand.Intn(len(vertexBackends))], } } + +// TaskLoadBodyWithNum creates load configuration with specified number of files. +// If num <= 10, it will be automatically adjusted to 30 to ensure minimum test coverage. +func (lt *LoadTaskLocal) TaskLoadBodyWithNum(num int) map[string]string { + vertexBackends := []string{"db", "mem"} + + if num <= 10 { + num = 30 + } + + logrus.Infof("load with num: " + strconv.Itoa(num-1)) + + return map[string]string{ + "load.parallel": "100", + "load.type": "local", + "load.use_property": "0", + //"load.use_outedge": "1", + //"load.use_out_degree": "1", + //"load.use_undirected": "0", + "load.delimiter": " ", + "load.vertex_files": "{\"127.0.0.1\":\"" + "test/case/vertex/vertex_[0," + strconv.Itoa(num-1) + "]" + "\"}", + "load.edge_files": "{\"127.0.0.1\":\"" + "test/case/edge/edge_[0," + strconv.Itoa(num-1) + "]" + "\"}", + "load.vertex_backend": vertexBackends[rand.Intn(len(vertexBackends))], + } +} diff --git a/vermeer/test/scheduler/batch.go b/vermeer/test/scheduler/batch.go new file mode 100644 index 000000000..565bfea29 --- /dev/null +++ b/vermeer/test/scheduler/batch.go @@ -0,0 +1,24 @@ +package scheduler + +import ( + "testing" + "vermeer/client" + "vermeer/test/functional" +) + +/* +* @Description: This is the main test function for batch. +* @Param t +* @Param expectRes +* @Param healthCheck +* @Param masterHttp +* @Param graphName +* @Param factor +* @Param waitSecond + */ +func TestBatch(t *testing.T, expectRes *functional.ExpectRes, healthCheck *functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string, factor string, waitSecond int) { + // TEST GROUP: BATCH + // 1. send batch tasks to single graph + // expect: the tasks should be executed in order of time + // have been tested in priority.go +} diff --git a/vermeer/test/scheduler/priority.go b/vermeer/test/scheduler/priority.go new file mode 100644 index 000000000..ef68e3dda --- /dev/null +++ b/vermeer/test/scheduler/priority.go @@ -0,0 +1,349 @@ +package scheduler + +import ( + "fmt" + "sync" + "testing" + "time" + + "vermeer/apps/structure" + "vermeer/client" + "vermeer/test/functional" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" +) + +/* +* @Description: SubTestPriority tests the scheduler's behavior when submitting tasks with different priorities. +* @Param t +* @Param expectRes +* @Param healthCheck +* @Param masterHttp +* @Param graphName +* @Param computeTask +* @Param waitSecond + */ +func SubTestPriority(t *testing.T, expectRes *functional.ExpectRes, healthCheck *functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string, computeTask string, waitSecond int) { + fmt.Printf("Test Priority start with task: %s\n", computeTask) + bTime := time.Now() + computeTest, err := functional.MakeComputeTask(computeTask) + require.NoError(t, err) + computeTest.Init(graphName[0], computeTask, expectRes, waitSecond, masterHttp, t, healthCheck) + taskComputeBody := computeTest.TaskComputeBody() + + // send two tasks with different priority + params := make([]map[string]string, 0) + + for i := 0; i < 2; i++ { + param := make(map[string]string) + param["priority"] = fmt.Sprintf("%d", i) + for k, v := range taskComputeBody { + param[k] = v + } + params = append(params, param) + } + + logrus.Infof("params for priority test: %+v", params) + + taskids, sequence := computeTest.SendComputeReqAsyncBatchPriority(params) // send multiple requests asynchronously with priority + + require.Equal(t, 2, len(sequence)) + for i := 0; i < 2; i++ { + require.Equal(t, taskids[1-i], sequence[i]) // expect task with priority 1 executed before priority 0 + } + + computeTest.CheckRes() + fmt.Printf("Test Priority: %-30s [OK], cost: %v\n", computeTask, time.Since(bTime)) +} + +/* +* @Description: SubTestSmall tests the scheduler's behavior when submitting tasks with different sizes. +* @Param t +* @Param expectRes +* @Param healthCheck +* @Param masterHttp +* @Param graphName +* @Param computeTask +* @Param waitSecond + */ +func SubTestSmall(t *testing.T, expectRes *functional.ExpectRes, healthCheck *functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string, computeTask string, waitSecond int) { + fmt.Printf("Test Small start with task: %s\n", computeTask) + bTime := time.Now() + computeTest, err := functional.MakeComputeTask(computeTask) + computeTaskSmall, err := functional.MakeComputeTask(computeTask) + require.NoError(t, err) + computeTest.Init(graphName[0], computeTask, expectRes, waitSecond, masterHttp, t, healthCheck) + taskComputeBody := computeTest.TaskComputeBody() + computeTaskSmall.Init(graphName[1], computeTask, expectRes, waitSecond, masterHttp, t, healthCheck) + taskComputeBodySmall := computeTaskSmall.TaskComputeBody() + + // send two tasks with different size + params := make([]map[string]string, 0) + taskComputeBody["graph_name"] = graphName[0] + taskComputeBodySmall["graph_name"] = graphName[1] + params = append(params, taskComputeBody) + params = append(params, taskComputeBodySmall) + + logrus.Infof("params for small test: %+v", params) + + taskids, sequence := computeTest.SendComputeReqAsyncBatchPriority(params) // send multiple requests asynchronously with priority + + require.Equal(t, 2, len(sequence)) + for i := 0; i < 2; i++ { + require.Equal(t, taskids[1-i], sequence[i]) // expect task smaller executed before larger + } + + computeTest.CheckRes() + fmt.Printf("Test Small: %-30s [OK], cost: %v\n", computeTask, time.Since(bTime)) +} + +/* +* @Description: SubTestConcurrent tests the scheduler's behavior when submitting tasks with different sizes. +* @Param t +* @Param expectRes +* @Param healthCheck +* @Param masterHttp +* @Param graphName +* @Param computeTask +* @Param waitSecond + */ +func SubTestConcurrent(t *testing.T, expectRes *functional.ExpectRes, healthCheck *functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string, computeTask string, waitSecond int) { + fmt.Printf("Test Concurrent start with task: %s\n", computeTask) + bTime := time.Now() + computeTest, err := functional.MakeComputeTask(computeTask) + require.NoError(t, err) + computeTest.Init(graphName[1], computeTask, expectRes, waitSecond, masterHttp, t, healthCheck) + taskComputeBody := computeTest.TaskComputeBody() + + // send two tasks with different size + params := make([]map[string]string, 0) + // default is false, actually do not need to set + taskComputeBody["exclusive"] = "false" + params = append(params, taskComputeBody) + params = append(params, taskComputeBody) + + logrus.Infof("params for concurrent test: %+v", params) + + _, sequence := computeTest.SendComputeReqAsyncBatchPriority(params) // send multiple requests asynchronously with priority + + require.Equal(t, 2, len(sequence)) + + fmt.Printf("Test Concurrent: %-30s [OK], cost: %v\n", computeTask, time.Since(bTime)) + // cost should be less than 2 * single task time +} + +/* +* @Description: SubTestDepends tests the scheduler's behavior when submitting tasks with different dependencies. +* @Param t +* @Param expectRes +* @Param healthCheck +* @Param masterHttp +* @Param graphName +* @Param computeTask +* @Param waitSecond + */ +func SubTestDepends(t *testing.T, expectRes *functional.ExpectRes, healthCheck *functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string, computeTask string, waitSecond int) { + fmt.Printf("Test Depends start with task: %s\n", computeTask) + bTime := time.Now() + computeTest, err := functional.MakeComputeTask(computeTask) + require.NoError(t, err) + computeTest.Init(graphName[0], computeTask, expectRes, waitSecond, masterHttp, t, healthCheck) + taskComputeBody := computeTest.TaskComputeBody() + + // first alloc worker 4 for graph 3 + masterHttp.AllocGroupGraph(graphName[0]+"_3", "test") + + loadTest3 := functional.LoadTaskLocal{} + loadTest3.Init(graphName[0]+"_3", expectRes, masterHttp, waitSecond, t, healthCheck) + loadTest3.SendLoadRequest(loadTest3.TaskLoadBodyWithNum(10)) + + // send a large task to $ worker group + taskid := computeTest.SendComputeReqAsyncNotWait(taskComputeBody) + + // send two tasks with different dependency to the same graph + taskComputeBody["graph_name"] = graphName[0] + "_3" + params := make([]map[string]string, 0) + new_body := make(map[string]string) + for k, v := range taskComputeBody { + new_body[k] = v + } + new_body["preorders"] = fmt.Sprintf("%d", taskid) + params = append(params, new_body) + params = append(params, taskComputeBody) + + logrus.Infof("params for depends test: %+v", params) + + taskids, sequence := computeTest.SendComputeReqAsyncBatchPriority(params) // send multiple requests asynchronously with priority + + require.Equal(t, 2, len(sequence)) + for i := 0; i < 2; i++ { + require.Equal(t, taskids[1-i], sequence[i]) // expect task not depend executed first + } + + // computeTest.CheckRes() + fmt.Printf("Test Depends: %-30s [OK], cost: %v\n", computeTask, time.Since(bTime)) +} + +/* +* @Description: SubTestInvalidDependency tests the scheduler's behavior when a compute task is submitted with a dependency on a non-existent (invalid) task ID. +* @Param t +* @Param expectRes +* @Param healthCheck +* @Param masterHttp +* @Param graphName +* @Param computeTask +* @Param waitSecond + */ +func SubTestInvalidDependency(t *testing.T, expectRes *functional.ExpectRes, healthCheck *functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string, computeTask string, waitSecond int) { + fmt.Printf("Test Invalid Dependency start with task: %s\n", computeTask) + bTime := time.Now() + + computeTest, err := functional.MakeComputeTask(computeTask) + require.NoError(t, err) + computeTest.Init(graphName[0], computeTask, expectRes, waitSecond, masterHttp, t, healthCheck) + + taskBody := computeTest.TaskComputeBody() + // set preorders to a very large, theoretically nonexistent task ID + invalidTaskID := 999999999 + taskBody["preorders"] = fmt.Sprintf("%d", invalidTaskID) + + logrus.Infof("Attempting to submit a task with invalid dependency on ID: %d", invalidTaskID) + + // try to submit task asynchronously and check if it returns an error + taskID, err := computeTest.SendComputeReqAsyncNotWaitWithError(taskBody) + + // assert that the submission operation failed + require.Error(t, err, "Submitting a task with a non-existent dependency should return an error.") + // assert that the returned task ID is 0 or other failed values + require.Equal(t, int32(-1), taskID, "The task ID should be zero or invalid on failure.") + + fmt.Printf("Test Invalid Dependency: %-30s [OK], cost: %v\n", computeTask, time.Since(bTime)) +} + +/* +* @Description: SubTestConcurrentCancellation tests the scheduler's behavior when submitting tasks concurrently and canceling them. +* @Param t +* @Param expectRes +* @Param healthCheck +* @Param masterHttp +* @Param graphName +* @Param computeTask +* @Param waitSecond + */ +func SubTestConcurrentCancellation(t *testing.T, expectRes *functional.ExpectRes, healthCheck *functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string, computeTask string, waitSecond int) { + fmt.Printf("Test Concurrent Cancellation start with task: %s\n", computeTask) + bTime := time.Now() + + computeTest, err := functional.MakeComputeTask(computeTask) + require.NoError(t, err) + computeTest.Init(graphName[0], computeTask, expectRes, waitSecond, masterHttp, t, healthCheck) + + // set task number + const numTasks = 20 + taskBodies := make([]map[string]string, numTasks) + for i := 0; i < numTasks; i++ { + taskBodies[i] = computeTest.TaskComputeBody() + } + + taskIDs := make(chan int32, numTasks) + var wg sync.WaitGroup + + // 1. submit tasks concurrently + for i := 0; i < numTasks; i++ { + wg.Add(1) + go func(body map[string]string) { + defer wg.Done() + taskID := computeTest.SendComputeReqAsyncNotWait(body) + if taskID != 0 { + taskIDs <- taskID + } else { + logrus.Errorf("Failed to submit task: %v", err) + } + }(taskBodies[i]) + } + + wg.Wait() + close(taskIDs) + + submittedTaskIDs := make([]int32, 0, numTasks) + for id := range taskIDs { + submittedTaskIDs = append(submittedTaskIDs, id) + } + + logrus.Infof("Submitted %d tasks concurrently: %+v", len(submittedTaskIDs), submittedTaskIDs) + require.Equal(t, numTasks, len(submittedTaskIDs), "Not all tasks were successfully submitted.") + + cancelTask := functional.CancelTask{} + cancelTask.DirectCancelTask(t, masterHttp, submittedTaskIDs[len(submittedTaskIDs)-1]) + + // 3. verify task status + // wait for tasks to settle + logrus.Info("Waiting for tasks to settle...") + time.Sleep(time.Duration(waitSecond) * time.Second) + + checkTask, err := masterHttp.GetTask(int(submittedTaskIDs[numTasks-1])) + + require.NoError(t, err, "Error fetching task status after cancellation.") + require.NotNil(t, checkTask, "Task should exist after cancellation.") + + if structure.TaskState(checkTask.Task.Status) != structure.TaskStateCanceled { + logrus.Warn("No tasks were cancelled; check scheduler behavior.") + require.Fail(t, "Expected at least some tasks to be cancelled.") + } + + fmt.Printf("Test Concurrent Cancellation: %-30s [OK], cost: %v\n", computeTask, time.Since(bTime)) +} + +/* +* @Description: This is the main test function for priority. +* @Param t +* @Param expectRes +* @Param healthCheck +* @Param masterHttp +* @Param graphName +* @Param computeTask +* @Param waitSecond + */ +func TestPriority(t *testing.T, expectRes *functional.ExpectRes, healthCheck *functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string, factor string, waitSecond int) { + fmt.Print("start test priority\n") + + // for scheduler, just test a simple task + var computeTask = "pagerank" + + // TEST GROUP: PRIORITY + // 1. send priority tasks to single graph + // expect: the tasks should be executed in order of priority + + SubTestPriority(t, expectRes, healthCheck, masterHttp, graphName, computeTask, waitSecond) + + // 2. send small tasks and large tasks to single graph + // expect: the small tasks should be executed first + + SubTestSmall(t, expectRes, healthCheck, masterHttp, graphName, computeTask, waitSecond) + + // 3. send support concurrent tasks to single graph + // expect: the tasks should be executed concurrently + SubTestConcurrent(t, expectRes, healthCheck, masterHttp, graphName, computeTask, waitSecond) + + // 4. send dependency-tasks to single graph + // expect: the tasks should be executed in order of dependency + + SubTestDepends(t, expectRes, healthCheck, masterHttp, graphName, computeTask, waitSecond) + + // 5. send same priority tasks to single graph + // expect: the tasks should be executed in order of time + // skipped, too fragile + + // 6. send tasks to different graphs + // expect: the tasks should be executed concurrently + // have been tested in SubTestSmall and SubTestDepends + + // 7. send tasks with invalid dependency to single graph + // expect: the tasks should not be executed + SubTestInvalidDependency(t, expectRes, healthCheck, masterHttp, graphName, computeTask, waitSecond) + + // 8. send tasks concurrently and cancel them + // expect: the tasks should be cancelled + SubTestConcurrentCancellation(t, expectRes, healthCheck, masterHttp, graphName, computeTask, 3) +} diff --git a/vermeer/test/scheduler/routine.go b/vermeer/test/scheduler/routine.go new file mode 100644 index 000000000..55722bcee --- /dev/null +++ b/vermeer/test/scheduler/routine.go @@ -0,0 +1,64 @@ +package scheduler + +import ( + "fmt" + "testing" + "time" + "vermeer/client" + "vermeer/test/functional" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" +) + +/* +* @Description: SubTestRoutine tests the scheduler's behavior when submitting tasks with cron expression. +* @Param t +* @Param expectRes +* @Param healthCheck +* @Param masterHttp +* @Param graphName +* @Param computeTask +* @Param waitSecond + */ +func SubTestRoutine(t *testing.T, expectRes *functional.ExpectRes, healthCheck *functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string, computeTask string, waitSecond int) { + fmt.Printf("Test Routine start with task: %s\n", computeTask) + bTime := time.Now() + computeTest, err := functional.MakeComputeTask(computeTask) + require.NoError(t, err) + computeTest.Init(graphName[0], computeTask, expectRes, waitSecond, masterHttp, t, healthCheck) + taskComputeBody := computeTest.TaskComputeBody() + + // every 1 minute + taskComputeBody["cron_expr"] = "* * * * *" + + logrus.Infof("params for routine test: %+v", taskComputeBody) + + taskid := computeTest.SendComputeReqAsyncNotWait(taskComputeBody) + // computeTest.CheckRes() + + // wait for a while and check again + time.Sleep(2 * time.Minute) + + // check if deployed + queue := []int32{} + queue = append(queue, int32(taskid+1)) + result, err := masterHttp.GetTaskStartSequence(queue) + require.NoError(t, err) + require.Equal(t, 1, len(result.Sequence)) + require.Greater(t, result.Sequence[0], int32(0)) + + masterHttp.GetTaskCancel(int(taskid)) + + fmt.Printf("Test Routine: %-30s [OK], cost: %v\n", computeTask, time.Since(bTime)) +} + +func TestRoutine(t *testing.T, expectRes *functional.ExpectRes, healthCheck *functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string, factor string, waitSecond int) { + var computeTask = "pagerank" + + // TEST GROUP: ROUTINE + // 1. send tasks to single graph + // expect: the tasks should be executed timely + + SubTestRoutine(t, expectRes, healthCheck, masterHttp, graphName, computeTask, waitSecond) +} diff --git a/vermeer/test/scheduler/test_scheduler.go b/vermeer/test/scheduler/test_scheduler.go new file mode 100644 index 000000000..ea05b258f --- /dev/null +++ b/vermeer/test/scheduler/test_scheduler.go @@ -0,0 +1,70 @@ +package scheduler + +import ( + "fmt" + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vermeer/client" + "vermeer/test/functional" +) + +/* +* @Description: This is the main test function for scheduler. +* @Param t +* @Param expectResPath +* @Param masterHttpAddr +* @Param graphName +* @Param factor +* @Param waitSecond +* @Note: You must start at least two worker, named worker01 and worker04 in your config.yaml + */ +func TestScheduler(t *testing.T, expectResPath string, masterHttpAddr string, graphName string, factor string, waitSecond int) { + fmt.Print("start test scheduler\n") + + startTime := time.Now() + expectRes, err := functional.GetExpectRes(expectResPath) + require.NoError(t, err) + + masterHttp := client.VermeerClient{} + masterHttp.Init("http://"+masterHttpAddr, http.DefaultClient) + + // health check + healthCheck := functional.HealthCheck{} + healthCheck.Init(t, &masterHttp) + healthCheck.DoHealthCheck() + + // load graph first + loadTest1 := functional.LoadTaskLocal{} + loadTest1.Init(graphName+"_1", expectRes, &masterHttp, waitSecond, t, &healthCheck) + loadTest1.SendLoadRequest(loadTest1.TaskLoadBodyWithNum(0)) + loadTest1.CheckGraph() + + loadTest2 := functional.LoadTaskLocal{} + loadTest2.Init(graphName+"_2", expectRes, &masterHttp, waitSecond, t, &healthCheck) + loadTest2.SendLoadRequest(loadTest2.TaskLoadBodyWithNum(20)) + // loadTest2.CheckGraph() + + TestPriority(t, expectRes, &healthCheck, &masterHttp, []string{graphName + "_1", graphName + "_2"}, factor, waitSecond) + + TestBatch(t, expectRes, &healthCheck, &masterHttp, []string{graphName + "_1"}, factor, waitSecond) + + TestRoutine(t, expectRes, &healthCheck, &masterHttp, []string{graphName + "_2"}, factor, waitSecond) + + // Error handling: cancel task + cancelTask := functional.CancelTask{} + cancelTask.CancelTask(t, &masterHttp, graphName+"_1") + cancelTask.CancelTask(t, &masterHttp, graphName+"_2") + fmt.Print("test cancel task [OK]\n") + + // Finally, delete graph + deleteGraph := functional.DeleteGraph{} + deleteGraph.DeleteGraph(t, &masterHttp, graphName+"_1") + deleteGraph.DeleteGraph(t, &masterHttp, graphName+"_2") + fmt.Print("test delete graph [OK]\n") + + fmt.Printf("client test finished, cost time:%v\n", time.Since(startTime)) +} diff --git a/vermeer/vermeer_test.go b/vermeer/vermeer_test.go index 4dde004d1..fcc932721 100644 --- a/vermeer/vermeer_test.go +++ b/vermeer/vermeer_test.go @@ -31,6 +31,7 @@ import ( "vermeer/client" "vermeer/test/functional" + "vermeer/test/scheduler" ) var ( @@ -95,6 +96,8 @@ func TestVermeer(t *testing.T) { t.Run("algorithms", testAlgorithms) case "function": t.Run("function", testFunction) + case "scheduler": + t.Run("scheduler", testScheduler) } } @@ -102,10 +105,15 @@ func testFunction(t *testing.T) { functional.TestFunction(t, expectResPath, masterHttpAddr, graphName, factor, waitSecond) } +func testScheduler(t *testing.T) { + scheduler.TestScheduler(t, expectResPath, masterHttpAddr, graphName, factor, waitSecond) +} + func testAlgorithms(t *testing.T) { // todo: 增加算法名称 var computeTasks = []string{"pagerank", "lpa", "wcc", "degree_out", "degree_in", "degree_both", "triangle_count", "sssp", "closeness_centrality", "betweenness_centrality", "kcore", "jaccard", "ppr", "clustering_coefficient", "scc", "louvain"} + // var computeTasks = []string{"pagerank"} startTime := time.Now() expectRes, err := functional.GetExpectRes(expectResPath) @@ -158,6 +166,7 @@ func testAlgorithms(t *testing.T) { taskComputeBody["output.need_query"] = needQuery if sendType == "async" { computeTest.SendComputeReqAsync(taskComputeBody) + // computeTest.SendComputeReqAsyncBatchPriority(10, taskComputeBody) // 异步发送多个请求 } else { computeTest.SendComputeReqSync(taskComputeBody) }