Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions api/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,13 @@ type SchedulerConfiguration struct {
// priority jobs to place higher priority jobs.
PreemptionConfig PreemptionConfig

// BatchQueue specifies the configuration of the batch jobs queue
// use to control queueing and scheduling of batch jobs.
//
// "Scheduling" in this context refers to releasing evaluations
// to the eval broker for scheduling with a worker.
BatchQueue BatchQueue

// MemoryOversubscriptionEnabled specifies whether memory oversubscription is enabled
MemoryOversubscriptionEnabled bool

Expand Down Expand Up @@ -215,14 +222,23 @@ type SchedulerSetConfigurationResponse struct {
WriteMeta
}

// SchedulerAlgorithm is an enum string that encapsulates the valid options for a
// SchedulerConfiguration block's SchedulerAlgorithm. These modes will allow the
// scheduler to be user-selectable.
type SchedulerAlgorithm string
// Enum strings that encapsulate the valid options for a
// their respective scheduler configuration blocks. These modes
// allow the config to be user-selectable.
type (
SchedulerAlgorithm string
BatchQueueTenant string
BatchQueueType string
)

const (
SchedulerAlgorithmBinpack SchedulerAlgorithm = "binpack"
SchedulerAlgorithmSpread SchedulerAlgorithm = "spread"

BatchQueueTypeDynamic BatchQueueType = "dynamicPriority"

TenantTypeMetadata BatchQueueTenant = "metadata"
TenantTypeNamespace BatchQueueTenant = "namespace"
)

// PreemptionConfig specifies whether preemption is enabled based on scheduler type
Expand All @@ -233,6 +249,15 @@ type PreemptionConfig struct {
ServiceSchedulerEnabled bool
}

// BatchQueue is the configuration for a batch job queue used to control scheduling
// of batch jobs.
type BatchQueue struct {
Type BatchQueueType
TenantType BatchQueueTenant
MetadataKey string
Config map[string]any
}

// SchedulerGetConfiguration is used to query the current Scheduler configuration.
func (op *Operator) SchedulerGetConfiguration(q *QueryOptions) (*SchedulerConfigurationResponse, *QueryMeta, error) {
var resp SchedulerConfigurationResponse
Expand Down
27 changes: 23 additions & 4 deletions command/operator_scheduler_get_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ func (o *OperatorSchedulerGetConfig) Run(args []string) int {

schedConfig := resp.SchedulerConfig

// Output the information.
o.Ui.Output(formatKV([]string{
out := []string{
fmt.Sprintf("Scheduler Algorithm|%s", schedConfig.SchedulerAlgorithm),
fmt.Sprintf("Memory Oversubscription|%v", schedConfig.MemoryOversubscriptionEnabled),
fmt.Sprintf("Reject Job Registration|%v", schedConfig.RejectJobRegistration),
Expand All @@ -87,8 +86,28 @@ func (o *OperatorSchedulerGetConfig) Run(args []string) int {
fmt.Sprintf("Preemption Batch Scheduler|%v", schedConfig.PreemptionConfig.BatchSchedulerEnabled),
fmt.Sprintf("Preemption SysBatch Scheduler|%v", schedConfig.PreemptionConfig.SysBatchSchedulerEnabled),
fmt.Sprintf("Node Limit For Feasibility Checks|%v", schedConfig.NodeLimitForFeasibilityChecks),
fmt.Sprintf("Modify Index|%v", resp.SchedulerConfig.ModifyIndex),
}))
fmt.Sprintf("Batch Queue Type|%v", schedConfig.BatchQueue.Type),
}

if schedConfig.BatchQueue.Type != "" {
out = append(out, fmt.Sprintf("Batch Queue Tenant Type|%v", schedConfig.BatchQueue.TenantType))

// only append metadata key if it's set
if schedConfig.BatchQueue.TenantType == "metadata" {
out = append(out, fmt.Sprintf("Batch Queue Metadata Key|%v", schedConfig.BatchQueue.MetadataKey))
}

conf := ""
for k, v := range schedConfig.BatchQueue.Config {
conf = fmt.Sprintf("%s%s", conf, fmt.Sprintf("%v:%v ", k, v))
}
out = append(out, fmt.Sprintf("Batch Queue Config|%v", conf))
}

out = append(out, fmt.Sprintf("Modify Index|%v", resp.SchedulerConfig.ModifyIndex))

// Output the information.
o.Ui.Output(formatKV(out))
return 0
}

Expand Down
4 changes: 4 additions & 0 deletions command/operator_scheduler_get_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func TestOperatorSchedulerGetConfig_Run(t *testing.T) {
s := ui.OutputWriter.String()
must.StrContains(t, s, "Scheduler Algorithm = binpack")
must.StrContains(t, s, "Preemption SysBatch Scheduler = false")
must.StrContains(t, s, "Scheduler Algorithm = binpack")
must.StrContains(t, s, "Preemption SysBatch Scheduler = false")
must.StrContains(t, s, "Node Limit For Feasibility Checks = 0")
must.StrContains(t, s, "Batch Queue Type =")
ui.ErrorWriter.Reset()
ui.OutputWriter.Reset()

Expand Down
4 changes: 4 additions & 0 deletions nomad/structs/node_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ type NodePoolSchedulerConfiguration struct {
// If not defined, the global cluster scheduling algorithm is used.
SchedulerAlgorithm SchedulerAlgorithm `hcl:"scheduler_algorithm"`

// BatchQueue defines the batch job queue configuration used
// to control scheduling of batch jobs.
BatchQueue BatchQueue `hcl:"batch_queue"`

// MemoryOversubscriptionEnabled specifies whether memory oversubscription
// is enabled. If not defined, the global cluster configuration is used.
MemoryOversubscriptionEnabled *bool `hcl:"memory_oversubscription_enabled"`
Expand Down
93 changes: 93 additions & 0 deletions nomad/structs/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ type SchedulerConfiguration struct {
// priority jobs to place higher priority jobs.
PreemptionConfig PreemptionConfig `hcl:"preemption_config"`

// BatchQueue specifies the batch queue for this scheduler configuration
// which defines the behavior for scheduling batch job evaluations.
BatchQueue BatchQueue `hcl:"batch_queue"`

// MemoryOversubscriptionEnabled specifies whether memory oversubscription is enabled
MemoryOversubscriptionEnabled bool `hcl:"memory_oversubscription_enabled"`

Expand Down Expand Up @@ -286,6 +290,11 @@ func (s *SchedulerConfiguration) WithNodePool(pool *NodePool) *SchedulerConfigur
if poolConfig.SchedulerAlgorithm != "" {
schedConfig.SchedulerAlgorithm = poolConfig.SchedulerAlgorithm
}

if poolConfig.BatchQueue.Type != "" {
schedConfig.BatchQueue = poolConfig.BatchQueue
}

if poolConfig.MemoryOversubscriptionEnabled != nil {
schedConfig.MemoryOversubscriptionEnabled = *poolConfig.MemoryOversubscriptionEnabled
}
Expand All @@ -310,6 +319,10 @@ func (s *SchedulerConfiguration) Validate() error {
return fmt.Errorf("invalid scheduler algorithm: %v", s.SchedulerAlgorithm)
}

if err := s.BatchQueue.Validate(); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -346,6 +359,86 @@ type PreemptionConfig struct {
ServiceSchedulerEnabled bool `hcl:"service_scheduler_enabled"`
}

type (
BatchQueueType string
BatchQueueTenant string
)

const (
BatchQueueTypeDynamic BatchQueueType = "dynamicPriorty"

TenantTypeMetadata BatchQueueTenant = "metadata"
TenantTypeNamespace BatchQueueTenant = "namespace"

DynamicCalcInterval = "calc_interval"
DynamicMaxAge = "max_age"
)

type BatchQueue struct {
Type BatchQueueType `hcl:"type"`
TenantType BatchQueueTenant `hcl:"tenant_type"`
MetadataKey string `hcl:"metadata_key"`
Config map[string]any `hcl:"config"`
}

type DynamicQueueConfig struct {
CalcInterval time.Duration
MaxAge time.Duration
MaxSize int
AgeWeight int
UsageWeight int
SizeWeight int
}

func validateDuration(val any) error {
switch t := val.(type) {
case string:
if _, err := time.ParseDuration(t); err != nil {
return err
}
case int, nil:
default:
return fmt.Errorf("value not a duration: %v", val)
}

return nil
}

func (b *BatchQueue) Validate() error {
if b.Type == "" {
switch {
case b.TenantType != "", b.MetadataKey != "", b.Config != nil:
return errors.New("batch queue configuration found but no type specified")
}

return nil
}
Comment thread
mismithhisler marked this conversation as resolved.

switch b.Type {
case BatchQueueTypeDynamic:
if err := validateDuration(b.Config[DynamicCalcInterval]); err != nil {
return fmt.Errorf("failed to parse calc_interval: %v", err)
}
if err := validateDuration(b.Config[DynamicMaxAge]); err != nil {
return fmt.Errorf("failed to parse max_age: %v", err)
}
default:
return fmt.Errorf("unsupported batch queue type: %q", b.Type)
}

switch b.TenantType {
case TenantTypeNamespace:
case TenantTypeMetadata:
if b.MetadataKey == "" {
return fmt.Errorf("metadata key must be specified if using metadata tenency")
}
default:
return fmt.Errorf("unsupported tenant type: %q", b.TenantType)
}

return nil
}

// SchedulerSetConfigRequest is used by the Operator endpoint to update the
// current Scheduler configuration of the cluster.
type SchedulerSetConfigRequest struct {
Expand Down
126 changes: 126 additions & 0 deletions nomad/structs/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,16 @@ func TestSchedulerConfiguration_WithNodePool(t *testing.T) {
pool: &NodePool{
SchedulerConfiguration: &NodePoolSchedulerConfiguration{
MemoryOversubscriptionEnabled: pointer.Of(true),
BatchQueue: BatchQueue{
Type: "test",
},
},
},
expected: &SchedulerConfiguration{
MemoryOversubscriptionEnabled: true,
BatchQueue: BatchQueue{
Type: "test",
},
},
},
{
Expand Down Expand Up @@ -140,3 +146,123 @@ func TestSchedulerConfiguration_WithNodePool(t *testing.T) {
})
}
}

func TestSchedulerConfiguration_Validate(t *testing.T) {

testCases := []struct {
name string
schedConfig *SchedulerConfiguration
err string
}{
{
name: "invalid scheduler algorithm",
schedConfig: &SchedulerConfiguration{
SchedulerAlgorithm: "not-good",
},
err: "invalid scheduler algorithm: not-good",
},
{
name: "valid scheduler algorithm",
schedConfig: &SchedulerConfiguration{
SchedulerAlgorithm: SchedulerAlgorithmBinpack,
},
err: "",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := tc.schedConfig.Validate()
if tc.err != "" {
must.ErrorContains(t, err, tc.err)
} else {
must.NoError(t, err)
}
})
}
}

func TestBatchQueue_Validate(t *testing.T) {

testCases := []struct {
name string
batchConfig BatchQueue
err string
}{
{
name: "invalid queue type",
batchConfig: BatchQueue{
Type: "foo",
},
err: "unsupported batch queue type",
},
{
name: "invalid metadata type",
batchConfig: BatchQueue{
Type: BatchQueueTypeDynamic,
TenantType: "foo",
},
err: "unsupported tenant type",
},
{
name: "batch config with no type",
batchConfig: BatchQueue{
Type: "",
TenantType: TenantTypeNamespace,
},
err: "batch queue configuration found but no type specified",
},
{
name: "empty metadata key errors",
batchConfig: BatchQueue{
Type: BatchQueueTypeDynamic,
TenantType: TenantTypeMetadata,
},
err: "metadata key must be specified",
},
{
name: "dynamicPriority - invalid interval",
batchConfig: BatchQueue{
Type: BatchQueueTypeDynamic,
TenantType: TenantTypeNamespace,
Config: map[string]any{
"calc_interval": "hello",
},
},
err: "failed to parse",
},
{
name: "dynamicPriority - valid string interval",
batchConfig: BatchQueue{
Type: BatchQueueTypeDynamic,
TenantType: TenantTypeNamespace,
Config: map[string]any{
"calc_interval": "1h",
},
},
err: "",
},
{
name: "dynamicPriority - valid int interval",
batchConfig: BatchQueue{
Type: BatchQueueTypeDynamic,
TenantType: TenantTypeNamespace,
Config: map[string]any{
"calc_interval": 1000,
},
},
err: "",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := tc.batchConfig.Validate()
if tc.err != "" {
must.ErrorContains(t, err, tc.err)
} else {
must.NoError(t, err)
}
})
}
}
Loading