diff --git a/api/operator.go b/api/operator.go index fc974d83fbf..ae517f6ba8a 100644 --- a/api/operator.go +++ b/api/operator.go @@ -174,6 +174,13 @@ type SchedulerConfiguration struct { // priority jobs to place higher priority jobs. PreemptionConfig PreemptionConfig + // BatchQueue specifies the configuration of the batch jobs queue + // use to control queueing and scheduling of batch jobs. + // + // "Scheduling" in this context refers to releasing evaluations + // to the eval broker for scheduling with a worker. + BatchQueue BatchQueue + // MemoryOversubscriptionEnabled specifies whether memory oversubscription is enabled MemoryOversubscriptionEnabled bool @@ -215,14 +222,23 @@ type SchedulerSetConfigurationResponse struct { WriteMeta } -// SchedulerAlgorithm is an enum string that encapsulates the valid options for a -// SchedulerConfiguration block's SchedulerAlgorithm. These modes will allow the -// scheduler to be user-selectable. -type SchedulerAlgorithm string +// Enum strings that encapsulate the valid options for a +// their respective scheduler configuration blocks. These modes +// allow the config to be user-selectable. +type ( + SchedulerAlgorithm string + BatchQueueTenant string + BatchQueueType string +) const ( SchedulerAlgorithmBinpack SchedulerAlgorithm = "binpack" SchedulerAlgorithmSpread SchedulerAlgorithm = "spread" + + BatchQueueTypeDynamic BatchQueueType = "dynamicPriority" + + TenantTypeMetadata BatchQueueTenant = "metadata" + TenantTypeNamespace BatchQueueTenant = "namespace" ) // PreemptionConfig specifies whether preemption is enabled based on scheduler type @@ -233,6 +249,15 @@ type PreemptionConfig struct { ServiceSchedulerEnabled bool } +// BatchQueue is the configuration for a batch job queue used to control scheduling +// of batch jobs. +type BatchQueue struct { + Type BatchQueueType + TenantType BatchQueueTenant + MetadataKey string + Config map[string]any +} + // SchedulerGetConfiguration is used to query the current Scheduler configuration. func (op *Operator) SchedulerGetConfiguration(q *QueryOptions) (*SchedulerConfigurationResponse, *QueryMeta, error) { var resp SchedulerConfigurationResponse diff --git a/command/operator_scheduler_get_config.go b/command/operator_scheduler_get_config.go index feb88297211..d1fa2447f5d 100644 --- a/command/operator_scheduler_get_config.go +++ b/command/operator_scheduler_get_config.go @@ -76,8 +76,7 @@ func (o *OperatorSchedulerGetConfig) Run(args []string) int { schedConfig := resp.SchedulerConfig - // Output the information. - o.Ui.Output(formatKV([]string{ + out := []string{ fmt.Sprintf("Scheduler Algorithm|%s", schedConfig.SchedulerAlgorithm), fmt.Sprintf("Memory Oversubscription|%v", schedConfig.MemoryOversubscriptionEnabled), fmt.Sprintf("Reject Job Registration|%v", schedConfig.RejectJobRegistration), @@ -87,8 +86,28 @@ func (o *OperatorSchedulerGetConfig) Run(args []string) int { fmt.Sprintf("Preemption Batch Scheduler|%v", schedConfig.PreemptionConfig.BatchSchedulerEnabled), fmt.Sprintf("Preemption SysBatch Scheduler|%v", schedConfig.PreemptionConfig.SysBatchSchedulerEnabled), fmt.Sprintf("Node Limit For Feasibility Checks|%v", schedConfig.NodeLimitForFeasibilityChecks), - fmt.Sprintf("Modify Index|%v", resp.SchedulerConfig.ModifyIndex), - })) + fmt.Sprintf("Batch Queue Type|%v", schedConfig.BatchQueue.Type), + } + + if schedConfig.BatchQueue.Type != "" { + out = append(out, fmt.Sprintf("Batch Queue Tenant Type|%v", schedConfig.BatchQueue.TenantType)) + + // only append metadata key if it's set + if schedConfig.BatchQueue.TenantType == "metadata" { + out = append(out, fmt.Sprintf("Batch Queue Metadata Key|%v", schedConfig.BatchQueue.MetadataKey)) + } + + conf := "" + for k, v := range schedConfig.BatchQueue.Config { + conf = fmt.Sprintf("%s%s", conf, fmt.Sprintf("%v:%v ", k, v)) + } + out = append(out, fmt.Sprintf("Batch Queue Config|%v", conf)) + } + + out = append(out, fmt.Sprintf("Modify Index|%v", resp.SchedulerConfig.ModifyIndex)) + + // Output the information. + o.Ui.Output(formatKV(out)) return 0 } diff --git a/command/operator_scheduler_get_config_test.go b/command/operator_scheduler_get_config_test.go index a0be94b3da5..2cd0bbc9143 100644 --- a/command/operator_scheduler_get_config_test.go +++ b/command/operator_scheduler_get_config_test.go @@ -27,6 +27,10 @@ func TestOperatorSchedulerGetConfig_Run(t *testing.T) { s := ui.OutputWriter.String() must.StrContains(t, s, "Scheduler Algorithm = binpack") must.StrContains(t, s, "Preemption SysBatch Scheduler = false") + must.StrContains(t, s, "Scheduler Algorithm = binpack") + must.StrContains(t, s, "Preemption SysBatch Scheduler = false") + must.StrContains(t, s, "Node Limit For Feasibility Checks = 0") + must.StrContains(t, s, "Batch Queue Type =") ui.ErrorWriter.Reset() ui.OutputWriter.Reset() diff --git a/nomad/structs/node_pool.go b/nomad/structs/node_pool.go index b4dfc4308d7..05aa643df0c 100644 --- a/nomad/structs/node_pool.go +++ b/nomad/structs/node_pool.go @@ -249,6 +249,10 @@ type NodePoolSchedulerConfiguration struct { // If not defined, the global cluster scheduling algorithm is used. SchedulerAlgorithm SchedulerAlgorithm `hcl:"scheduler_algorithm"` + // BatchQueue defines the batch job queue configuration used + // to control scheduling of batch jobs. + BatchQueue BatchQueue `hcl:"batch_queue"` + // MemoryOversubscriptionEnabled specifies whether memory oversubscription // is enabled. If not defined, the global cluster configuration is used. MemoryOversubscriptionEnabled *bool `hcl:"memory_oversubscription_enabled"` diff --git a/nomad/structs/operator.go b/nomad/structs/operator.go index 3006572ccde..c71825ddbc8 100644 --- a/nomad/structs/operator.go +++ b/nomad/structs/operator.go @@ -224,6 +224,10 @@ type SchedulerConfiguration struct { // priority jobs to place higher priority jobs. PreemptionConfig PreemptionConfig `hcl:"preemption_config"` + // BatchQueue specifies the batch queue for this scheduler configuration + // which defines the behavior for scheduling batch job evaluations. + BatchQueue BatchQueue `hcl:"batch_queue"` + // MemoryOversubscriptionEnabled specifies whether memory oversubscription is enabled MemoryOversubscriptionEnabled bool `hcl:"memory_oversubscription_enabled"` @@ -286,6 +290,11 @@ func (s *SchedulerConfiguration) WithNodePool(pool *NodePool) *SchedulerConfigur if poolConfig.SchedulerAlgorithm != "" { schedConfig.SchedulerAlgorithm = poolConfig.SchedulerAlgorithm } + + if poolConfig.BatchQueue.Type != "" { + schedConfig.BatchQueue = poolConfig.BatchQueue + } + if poolConfig.MemoryOversubscriptionEnabled != nil { schedConfig.MemoryOversubscriptionEnabled = *poolConfig.MemoryOversubscriptionEnabled } @@ -310,6 +319,10 @@ func (s *SchedulerConfiguration) Validate() error { return fmt.Errorf("invalid scheduler algorithm: %v", s.SchedulerAlgorithm) } + if err := s.BatchQueue.Validate(); err != nil { + return err + } + return nil } @@ -346,6 +359,86 @@ type PreemptionConfig struct { ServiceSchedulerEnabled bool `hcl:"service_scheduler_enabled"` } +type ( + BatchQueueType string + BatchQueueTenant string +) + +const ( + BatchQueueTypeDynamic BatchQueueType = "dynamicPriorty" + + TenantTypeMetadata BatchQueueTenant = "metadata" + TenantTypeNamespace BatchQueueTenant = "namespace" + + DynamicCalcInterval = "calc_interval" + DynamicMaxAge = "max_age" +) + +type BatchQueue struct { + Type BatchQueueType `hcl:"type"` + TenantType BatchQueueTenant `hcl:"tenant_type"` + MetadataKey string `hcl:"metadata_key"` + Config map[string]any `hcl:"config"` +} + +type DynamicQueueConfig struct { + CalcInterval time.Duration + MaxAge time.Duration + MaxSize int + AgeWeight int + UsageWeight int + SizeWeight int +} + +func validateDuration(val any) error { + switch t := val.(type) { + case string: + if _, err := time.ParseDuration(t); err != nil { + return err + } + case int, nil: + default: + return fmt.Errorf("value not a duration: %v", val) + } + + return nil +} + +func (b *BatchQueue) Validate() error { + if b.Type == "" { + switch { + case b.TenantType != "", b.MetadataKey != "", b.Config != nil: + return errors.New("batch queue configuration found but no type specified") + } + + return nil + } + + switch b.Type { + case BatchQueueTypeDynamic: + if err := validateDuration(b.Config[DynamicCalcInterval]); err != nil { + return fmt.Errorf("failed to parse calc_interval: %v", err) + } + if err := validateDuration(b.Config[DynamicMaxAge]); err != nil { + return fmt.Errorf("failed to parse max_age: %v", err) + } + default: + return fmt.Errorf("unsupported batch queue type: %q", b.Type) + } + + switch b.TenantType { + case TenantTypeNamespace: + case TenantTypeMetadata: + if b.MetadataKey == "" { + return fmt.Errorf("metadata key must be specified if using metadata tenency") + } + default: + return fmt.Errorf("unsupported tenant type: %q", b.TenantType) + } + + return nil +} + // SchedulerSetConfigRequest is used by the Operator endpoint to update the // current Scheduler configuration of the cluster. type SchedulerSetConfigRequest struct { diff --git a/nomad/structs/operator_test.go b/nomad/structs/operator_test.go index b9d013022fb..371a37bc8c2 100644 --- a/nomad/structs/operator_test.go +++ b/nomad/structs/operator_test.go @@ -86,10 +86,16 @@ func TestSchedulerConfiguration_WithNodePool(t *testing.T) { pool: &NodePool{ SchedulerConfiguration: &NodePoolSchedulerConfiguration{ MemoryOversubscriptionEnabled: pointer.Of(true), + BatchQueue: BatchQueue{ + Type: "test", + }, }, }, expected: &SchedulerConfiguration{ MemoryOversubscriptionEnabled: true, + BatchQueue: BatchQueue{ + Type: "test", + }, }, }, { @@ -140,3 +146,123 @@ func TestSchedulerConfiguration_WithNodePool(t *testing.T) { }) } } + +func TestSchedulerConfiguration_Validate(t *testing.T) { + + testCases := []struct { + name string + schedConfig *SchedulerConfiguration + err string + }{ + { + name: "invalid scheduler algorithm", + schedConfig: &SchedulerConfiguration{ + SchedulerAlgorithm: "not-good", + }, + err: "invalid scheduler algorithm: not-good", + }, + { + name: "valid scheduler algorithm", + schedConfig: &SchedulerConfiguration{ + SchedulerAlgorithm: SchedulerAlgorithmBinpack, + }, + err: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := tc.schedConfig.Validate() + if tc.err != "" { + must.ErrorContains(t, err, tc.err) + } else { + must.NoError(t, err) + } + }) + } +} + +func TestBatchQueue_Validate(t *testing.T) { + + testCases := []struct { + name string + batchConfig BatchQueue + err string + }{ + { + name: "invalid queue type", + batchConfig: BatchQueue{ + Type: "foo", + }, + err: "unsupported batch queue type", + }, + { + name: "invalid metadata type", + batchConfig: BatchQueue{ + Type: BatchQueueTypeDynamic, + TenantType: "foo", + }, + err: "unsupported tenant type", + }, + { + name: "batch config with no type", + batchConfig: BatchQueue{ + Type: "", + TenantType: TenantTypeNamespace, + }, + err: "batch queue configuration found but no type specified", + }, + { + name: "empty metadata key errors", + batchConfig: BatchQueue{ + Type: BatchQueueTypeDynamic, + TenantType: TenantTypeMetadata, + }, + err: "metadata key must be specified", + }, + { + name: "dynamicPriority - invalid interval", + batchConfig: BatchQueue{ + Type: BatchQueueTypeDynamic, + TenantType: TenantTypeNamespace, + Config: map[string]any{ + "calc_interval": "hello", + }, + }, + err: "failed to parse", + }, + { + name: "dynamicPriority - valid string interval", + batchConfig: BatchQueue{ + Type: BatchQueueTypeDynamic, + TenantType: TenantTypeNamespace, + Config: map[string]any{ + "calc_interval": "1h", + }, + }, + err: "", + }, + { + name: "dynamicPriority - valid int interval", + batchConfig: BatchQueue{ + Type: BatchQueueTypeDynamic, + TenantType: TenantTypeNamespace, + Config: map[string]any{ + "calc_interval": 1000, + }, + }, + err: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := tc.batchConfig.Validate() + if tc.err != "" { + must.ErrorContains(t, err, tc.err) + } else { + must.NoError(t, err) + } + }) + } +}