diff --git a/cmd/scheduler.go b/cmd/scheduler.go new file mode 100644 index 000000000..738f34ec4 --- /dev/null +++ b/cmd/scheduler.go @@ -0,0 +1,33 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cmd + +import "github.com/spf13/cobra" + +var ( + schedulerSchedulerNamespace string + schedulerNamespace string +) + +var SchedulerCmd = &cobra.Command{ + Use: "scheduler", + Short: "Scheduler management commands", +} + +func init() { + RootCmd.AddCommand(SchedulerCmd) + SchedulerCmd.PersistentFlags().BoolVarP(&kubernetesMode, "kubernetes", "k", false, "List all Dapr pods in a Kubernetes cluster") + SchedulerCmd.PersistentFlags().StringVarP(&schedulerNamespace, "namespace", "n", "", "Kubernetes namespace to list Dapr apps from. If not specified, uses all namespaces") + SchedulerCmd.PersistentFlags().StringVar(&schedulerSchedulerNamespace, "scheduler-namespace", "dapr-system", "Kubernetes namespace where the scheduler is deployed") +} diff --git a/cmd/schedulerdelete.go b/cmd/schedulerdelete.go new file mode 100644 index 000000000..be8e2fbb4 --- /dev/null +++ b/cmd/schedulerdelete.go @@ -0,0 +1,85 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cmd + +import ( + "errors" + "fmt" + "os" + + "github.com/dapr/cli/pkg/print" + "github.com/dapr/cli/pkg/scheduler" + "github.com/dapr/kit/signals" + "github.com/spf13/cobra" +) + +var ( + schedulerDeleteAll bool +) + +var SchedulerDeleteCmd = &cobra.Command{ + Use: "delete", + Short: "Delete jobs or actor reminders which are scheduled in Scheduler.", + Long: `Delete jobs or actor reminders which are scheduled in Scheduler. +Namespace (-n) is required. +Job names are formatted by their type, app ID, then identifier. +Actor reminders require the actor type, actor ID, then reminder name, separated by ||. +Accepts multiple job names or actor reminders to delete. + +dapr scheduler delete -n foo job/my-app-id/my-job-name +dapr scheduler delete -n foo "actorreminder/my-actor-type||my-actor-id||my-reminder-name" +`, + RunE: func(cmd *cobra.Command, args []string) error { + ctx := signals.Context() + + if !cmd.Flag("namespace").Changed { + return errors.New(`required flag(s) "--namespace" not set`) + } + + if schedulerDeleteAll { + if err := scheduler.DeleteAll(ctx, scheduler.DeleteOptions{ + SchedulerNamespace: schedulerSchedulerNamespace, + DaprNamespace: schedulerNamespace, + KubernetesMode: kubernetesMode, + }); err != nil { + return fmt.Errorf("Failed to delete jobs: %s", err) + } + return nil + } + + if len(args) < 1 { + return errors.New(`Qualifier and job name are required. +Example: dapr scheduler delete -n foo job/my-app-id/my-job-name +Example: dapr scheduler delete -n foo "actorreminder/my-actor-type||my-actor-id||my-reminder-name"`) + } + + for _, name := range args { + if err := scheduler.Delete(ctx, name, scheduler.DeleteOptions{ + SchedulerNamespace: schedulerSchedulerNamespace, + DaprNamespace: schedulerNamespace, + KubernetesMode: kubernetesMode, + }); err != nil { + return fmt.Errorf("Failed to delete job: %s", err) + } + print.InfoStatusEvent(os.Stdout, "Deleted job '%s' in namespace '%s'.", name, schedulerNamespace) + } + + return nil + }, +} + +func init() { + SchedulerDeleteCmd.Flags().BoolVar(&schedulerDeleteAll, "delete-all-yes-i-know-what-i-am-doing", false, "Deletes all jobs and actor reminders in the given namespace.") + SchedulerCmd.AddCommand(SchedulerDeleteCmd) +} diff --git a/cmd/schedulerexport.go b/cmd/schedulerexport.go new file mode 100644 index 000000000..d1a67f713 --- /dev/null +++ b/cmd/schedulerexport.go @@ -0,0 +1,60 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cmd + +import ( + "os" + + "github.com/spf13/cobra" + + "github.com/dapr/cli/pkg/print" + "github.com/dapr/cli/pkg/scheduler" + "github.com/dapr/kit/signals" +) + +var ( + schedulerExportFile string +) + +var SchedulerExportCmd = &cobra.Command{ + Use: "export", + Short: "Export all jobs and actor reminders to a binary file, including the tracked count.", + Long: `Export jobs and actor reminders which are scheduled in Scheduler. +Can later be imported using 'dapr scheduler import'. +dapr scheduler export -o output.bin +`, + RunE: func(cmd *cobra.Command, args []string) error { + ctx := signals.Context() + + err := scheduler.Export(ctx, scheduler.ExportImportOptions{ + SchedulerNamespace: schedulerSchedulerNamespace, + KubernetesMode: kubernetesMode, + TargetFile: schedulerExportFile, + }) + if err != nil { + return err + } + + print.InfoStatusEvent(os.Stdout, "Export to '%s' complete.", schedulerExportFile) + + return nil + }, +} + +func init() { + SchedulerExportCmd.Flags().MarkHidden("namespace") + SchedulerExportCmd.Flags().StringVarP(&schedulerExportFile, "output-file", "o", "", "Output binary file to export jobs and actor reminders to.") + SchedulerExportCmd.MarkFlagRequired("output-file") + SchedulerCmd.AddCommand(SchedulerExportCmd) +} diff --git a/cmd/schedulerimport.go b/cmd/schedulerimport.go new file mode 100644 index 000000000..086d05d34 --- /dev/null +++ b/cmd/schedulerimport.go @@ -0,0 +1,58 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cmd + +import ( + "os" + + "github.com/spf13/cobra" + + "github.com/dapr/cli/pkg/print" + "github.com/dapr/cli/pkg/scheduler" + "github.com/dapr/kit/signals" +) + +var ( + schedulerImportFile string +) + +var SchedulerImportCmd = &cobra.Command{ + Use: "import", + Short: "Import all jobs and actor reminders from a binary file generated by 'dapr scheduler export'.", + Long: `Import jobs and actor reminders to Scheduler from a binary file generated by 'dapr scheduler export'. +dapr scheduler import -f export.bin`, + RunE: func(cmd *cobra.Command, args []string) error { + ctx := signals.Context() + + err := scheduler.Import(ctx, scheduler.ExportImportOptions{ + SchedulerNamespace: schedulerSchedulerNamespace, + KubernetesMode: kubernetesMode, + TargetFile: schedulerImportFile, + }) + if err != nil { + return err + } + + print.InfoStatusEvent(os.Stdout, "Import from '%s' complete.", schedulerImportFile) + + return nil + }, +} + +func init() { + SchedulerImportCmd.Flags().MarkHidden("namespace") + SchedulerImportCmd.Flags().StringVarP(&schedulerImportFile, "input-file", "f", "", "Input file to import jobs and actor reminders from (required).") + SchedulerImportCmd.MarkFlagRequired("input-file") + SchedulerCmd.AddCommand(SchedulerImportCmd) +} diff --git a/cmd/schedulerlist.go b/cmd/schedulerlist.go new file mode 100644 index 000000000..d736299b7 --- /dev/null +++ b/cmd/schedulerlist.go @@ -0,0 +1,111 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cmd + +import ( + "errors" + "os" + "slices" + + "github.com/gocarina/gocsv" + "github.com/spf13/cobra" + + "github.com/dapr/cli/pkg/scheduler" + "github.com/dapr/cli/utils" + "github.com/dapr/kit/ptr" + "github.com/dapr/kit/signals" +) + +const ( + schedulerListOutputFormatShort = "short" + schedulerListOutputFormatWide = "wide" + schedulerListOutputFormatYAML = "yaml" + schedulerListOutputFormatJSON = "json" +) + +var ( + schedulerListFilterType string + schedulerListOutputFormat string +) + +var SchedulerListCmd = &cobra.Command{ + Use: "list", + Short: "List scheduled jobs in the Scheduler", + RunE: func(cmd *cobra.Command, args []string) error { + ctx := signals.Context() + + if !slices.Contains([]string{ + scheduler.FilterJobsAll, + scheduler.FilterJobsJob, + scheduler.FilterJobsActor, + }, schedulerListFilterType) { + return errors.New("invalid value for --filter-type. Supported values are 'all', 'jobs', 'actorreminder'.") + } + + if !slices.Contains([]string{ + schedulerListOutputFormatShort, + schedulerListOutputFormatWide, + schedulerListOutputFormatYAML, + schedulerListOutputFormatJSON, + }, schedulerListOutputFormat) { + return errors.New("invalid value for --output. Supported values are 'table', 'wide', 'yaml', 'json'.") + } + + opts := scheduler.ListJobsOptions{ + SchedulerNamespace: schedulerSchedulerNamespace, + KubernetesMode: kubernetesMode, + FilterJobType: schedulerListFilterType, + } + if schedulerNamespace != "" { + opts.DaprNamespace = ptr.Of(schedulerNamespace) + } + + var list any + var err error + if schedulerListOutputFormat == schedulerListOutputFormatShort { + list, err = scheduler.ListJobsAsOutput(ctx, opts) + } else { + list, err = scheduler.ListJobsAsOutputWide(ctx, opts) + } + if err != nil { + return err + } + + switch schedulerListOutputFormat { + case schedulerListOutputFormatYAML: + err = utils.PrintDetail(os.Stdout, "yaml", list) + case schedulerListOutputFormatJSON: + err = utils.PrintDetail(os.Stdout, "json", list) + default: + table, err := gocsv.MarshalString(list) + if err != nil { + break + } + + utils.PrintTable(table) + } + + if err != nil { + return err + } + + return nil + }, +} + +func init() { + SchedulerListCmd.Flags().StringVar(&schedulerListFilterType, "filter-type", scheduler.FilterJobsAll, "Filter jobs by type. Supported values are 'all', 'jobs', 'actorreminder'") + SchedulerListCmd.Flags().StringVarP(&schedulerListOutputFormat, "output", "o", schedulerListOutputFormatShort, "Output format. One of 'short', 'wide', 'yaml', 'json'") + SchedulerCmd.AddCommand(SchedulerListCmd) +} diff --git a/go.mod b/go.mod index 040b186fa..6cee22c95 100644 --- a/go.mod +++ b/go.mod @@ -7,9 +7,10 @@ require ( github.com/Masterminds/semver/v3 v3.3.0 github.com/Pallinder/sillyname-go v0.0.0-20130730142914-97aeae9e6ba1 github.com/briandowns/spinner v1.19.0 - github.com/dapr/dapr v1.16.0-rc.7 - github.com/dapr/go-sdk v1.11.0 + github.com/dapr/dapr v1.16.0 + github.com/dapr/go-sdk v1.13.0 github.com/dapr/kit v0.16.1 + github.com/diagridio/go-etcd-cron v0.9.1 github.com/docker/docker v25.0.6+incompatible github.com/evanphx/json-patch/v5 v5.9.0 github.com/fatih/color v1.17.0 @@ -27,8 +28,10 @@ require ( github.com/spf13/pflag v1.0.6 github.com/spf13/viper v1.13.0 github.com/stretchr/testify v1.10.0 + go.etcd.io/etcd/client/v3 v3.5.21 golang.org/x/mod v0.25.0 golang.org/x/sys v0.33.0 + google.golang.org/protobuf v1.36.6 gopkg.in/yaml.v2 v2.4.0 helm.sh/helm/v3 v3.17.1 k8s.io/api v0.32.1 @@ -72,9 +75,11 @@ require ( github.com/containerd/errdefs v0.3.0 // indirect github.com/containerd/log v0.1.0 // indirect github.com/containerd/platforms v0.2.1 // indirect + github.com/coreos/go-semver v0.3.1 // indirect + github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cyphar/filepath-securejoin v0.3.6 // indirect - github.com/dapr/components-contrib v1.16.0-rc.6 // indirect - github.com/dapr/durabletask-go v0.9.0 // indirect + github.com/dapr/components-contrib v1.16.0 // indirect + github.com/dapr/durabletask-go v0.10.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect github.com/distribution/reference v0.6.0 // indirect @@ -201,20 +206,23 @@ require ( github.com/xlab/treeprint v1.2.0 // indirect github.com/yusufpapurcu/wmi v1.2.3 // indirect github.com/zeebo/errs v1.4.0 // indirect + go.etcd.io/etcd/api/v3 v3.5.21 // indirect + go.etcd.io/etcd/client/pkg/v3 v3.5.21 // indirect go.mongodb.org/mongo-driver v1.14.0 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect - go.opentelemetry.io/otel v1.35.0 // indirect + go.opentelemetry.io/otel v1.36.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.35.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0 // indirect go.opentelemetry.io/otel/exporters/zipkin v1.34.0 // indirect - go.opentelemetry.io/otel/metric v1.35.0 // indirect + go.opentelemetry.io/otel/metric v1.36.0 // indirect go.opentelemetry.io/otel/sdk v1.35.0 // indirect - go.opentelemetry.io/otel/trace v1.35.0 // indirect + go.opentelemetry.io/otel/trace v1.36.0 // indirect go.opentelemetry.io/proto/otlp v1.6.0 // indirect go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect golang.org/x/crypto v0.39.0 // indirect golang.org/x/exp v0.0.0-20250408133849-7e4ce0ab07d0 // indirect golang.org/x/net v0.41.0 // indirect @@ -226,7 +234,6 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20250512202823-5a2f75b736a9 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect google.golang.org/grpc v1.73.0 // indirect - google.golang.org/protobuf v1.36.6 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/go.sum b/go.sum index f2a774da2..fc262641b 100644 --- a/go.sum +++ b/go.sum @@ -142,20 +142,24 @@ github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= github.com/containerd/platforms v0.2.1 h1:zvwtM3rz2YHPQsF2CHYM8+KtB5dvhISiXh5ZpSBQv6A= github.com/containerd/platforms v0.2.1/go.mod h1:XHCb+2/hzowdiut9rkudds9bE5yJ7npe7dG/wG+uFPw= +github.com/coreos/go-semver v0.3.1 h1:yi21YpKnrx1gt5R+la8n5WgS0kCrsPp33dmEyHReZr4= +github.com/coreos/go-semver v0.3.1/go.mod h1:irMmmIw/7yzSRPWryHsK7EYSg09caPQL03VsM8rvUec= +github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= github.com/cyphar/filepath-securejoin v0.3.6 h1:4d9N5ykBnSp5Xn2JkhocYDkOpURL/18CYMpo6xB9uWM= github.com/cyphar/filepath-securejoin v0.3.6/go.mod h1:Sdj7gXlvMcPZsbhwhQ33GguGLDGQL7h7bg04C/+u9jI= -github.com/dapr/components-contrib v1.16.0-rc.6 h1:AB04RdqmWdZ84PBaHcUQFMHHl7XaIgOl67BeHKZzqM4= -github.com/dapr/components-contrib v1.16.0-rc.6/go.mod h1:mPT6lNeoQxJoJ0y9HyOGWozscSmTdf4yVJTTJMjSOJE= -github.com/dapr/dapr v1.16.0-rc.7 h1:VC38W4sJq/sZeBltY0SxdIy1QhuDXrOibY++PMU8YDY= -github.com/dapr/dapr v1.16.0-rc.7/go.mod h1:jKw+BrQap6qjqHe++BMwzDN5ORABrhZiaqNvG06yizI= -github.com/dapr/durabletask-go v0.9.0 h1:b2/aNOJau7VS639JodSES/+momwnjjrroAtbn7rp1PI= -github.com/dapr/durabletask-go v0.9.0/go.mod h1:0Ts4rXp74JyG19gDWPcwNo5V6NBZzhARzHF5XynmA7Q= -github.com/dapr/go-sdk v1.11.0 h1:clANpOQd6MsfvSa6snaX8MVk6eRx26Vsj5GxGdQ6mpE= -github.com/dapr/go-sdk v1.11.0/go.mod h1:btZ/tX8eYnx0fg3HiJUku8J5QBRXHsp3kAB1BUiTxXY= +github.com/dapr/components-contrib v1.16.0 h1:kUif6UyxtRz6tXnkuIjbx6z+VLMfc6y+SIYa9T7J3eA= +github.com/dapr/components-contrib v1.16.0/go.mod h1:1AufCWqZwBj//UkyS7FesOEmp5/E6Xgy1tyCn8peiR4= +github.com/dapr/dapr v1.16.0 h1:la2WLZM8Myr2Pq3cyrFjHKWDSPYLzGZCs3p502TwBjI= +github.com/dapr/dapr v1.16.0/go.mod h1:ln/mxvNOeqklaDmic4ppsxmnjl2D/oZGKaJy24IwaEY= +github.com/dapr/durabletask-go v0.10.0 h1:vfIivPl4JYd55xZTslDwhA6p6F8ipcNxBtMaupxArr8= +github.com/dapr/durabletask-go v0.10.0/go.mod h1:0Ts4rXp74JyG19gDWPcwNo5V6NBZzhARzHF5XynmA7Q= +github.com/dapr/go-sdk v1.13.0 h1:Qw2BmUonClQ9yK/rrEEaFL1PyDgq616RrvYj0CT67Lk= +github.com/dapr/go-sdk v1.13.0/go.mod h1:RsffVNZitDApmQqoS68tNKGMXDZUjTviAbKZupJSzts= github.com/dapr/kit v0.16.1 h1:MqLAhHVg8trPy2WJChMZFU7ToeondvxcNHYVvMDiVf4= github.com/dapr/kit v0.16.1/go.mod h1:40ZWs5P6xfYf7O59XgwqZkIyDldTIXlhTQhGop8QoSM= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -164,6 +168,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etlyjdBU4sfcs2WYQMs= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= +github.com/diagridio/go-etcd-cron v0.9.1 h1:KUfcceDtypL8s3hL0jD2ZoiIzjjXY6xDQ4kT1DJF4Ws= +github.com/diagridio/go-etcd-cron v0.9.1/go.mod h1:CSzuxoCDFu+Gbds0RO73GE8CnmL5t85axiPLptsej3I= github.com/distribution/distribution/v3 v3.0.0-20221208165359-362910506bc2 h1:aBfCb7iqHmDEIp6fBvC/hQUddQfg+3qdYjwzaiP9Hnc= github.com/distribution/distribution/v3 v3.0.0-20221208165359-362910506bc2/go.mod h1:WHNsWjnIn2V1LYOrME7e8KxSeKunYHsxEm4am0BUtcI= github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= @@ -271,6 +277,7 @@ github.com/gocarina/gocsv v0.0.0-20220927221512-ad3251f9fa25 h1:wxgEEZvsnOTrDO2n github.com/gocarina/gocsv v0.0.0-20220927221512-ad3251f9fa25/go.mod h1:5YoVOkjYAQumqlV356Hj3xeYh4BdZuLE0/nRkf2NKkI= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= @@ -670,6 +677,12 @@ github.com/yvasiyarov/newrelic_platform_go v0.0.0-20140908184405-b21fdbd4370f h1 github.com/yvasiyarov/newrelic_platform_go v0.0.0-20140908184405-b21fdbd4370f/go.mod h1:GlGEuHIJweS1mbCqG+7vt2nvWLzLLnRHbXz5JKd/Qbg= github.com/zeebo/errs v1.4.0 h1:XNdoD/RRMKP7HD0UhJnIzUy74ISdGGxURlYG8HSWSfM= github.com/zeebo/errs v1.4.0/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4= +go.etcd.io/etcd/api/v3 v3.5.21 h1:A6O2/JDb3tvHhiIz3xf9nJ7REHvtEFJJ3veW3FbCnS8= +go.etcd.io/etcd/api/v3 v3.5.21/go.mod h1:c3aH5wcvXv/9dqIw2Y810LDXJfhSYdHQ0vxmP3CCHVY= +go.etcd.io/etcd/client/pkg/v3 v3.5.21 h1:lPBu71Y7osQmzlflM9OfeIV2JlmpBjqBNlLtcoBqUTc= +go.etcd.io/etcd/client/pkg/v3 v3.5.21/go.mod h1:BgqT/IXPjK9NkeSDjbzwsHySX3yIle2+ndz28nVsjUs= +go.etcd.io/etcd/client/v3 v3.5.21 h1:T6b1Ow6fNjOLOtM0xSoKNQt1ASPCLWrF9XMHcH9pEyY= +go.etcd.io/etcd/client/v3 v3.5.21/go.mod h1:mFYy67IOqmbRf/kRUvsHixzo3iG+1OF2W2+jVIQRAnU= go.mongodb.org/mongo-driver v1.14.0 h1:P98w8egYRjYe3XDjxhYJagTokP/H6HzlsnojRgZRd80= go.mongodb.org/mongo-driver v1.14.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= @@ -684,8 +697,8 @@ go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJyS go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 h1:sbiXRNDSWJOTobXh5HyQKjq6wUC5tNybqjIqDpAY4CU= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0/go.mod h1:69uWxva0WgAA/4bu2Yy70SLDBwZXuQ6PbBpbsa5iZrQ= -go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ= -go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y= +go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg= +go.opentelemetry.io/otel v1.36.0/go.mod h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 h1:1fTNlAIJZGWLP5FVu0fikVry1IsiUnXjf7QFvoNN3Xw= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0/go.mod h1:zjPK58DtkqQFn+YUMbx0M2XV3QgKU0gS9LeGohREyK4= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.35.0 h1:m639+BofXTvcY1q8CGs4ItwQarYtJPOWmVobfM1HpVI= @@ -694,14 +707,14 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0 h1:xJ2qH go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0/go.mod h1:u5BF1xyjstDowA1R5QAO9JHzqK+ublenEW/dyqTjBVk= go.opentelemetry.io/otel/exporters/zipkin v1.34.0 h1:GSjCkoYqsnvUMCjxF18j2tCWH8fhGZYjH3iYgechPTI= go.opentelemetry.io/otel/exporters/zipkin v1.34.0/go.mod h1:h830hluwAqgSNnZbxL2rJhmAlE7/0SF9esoHVLU04Gc= -go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M= -go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE= +go.opentelemetry.io/otel/metric v1.36.0 h1:MoWPKVhQvJ+eeXWHFBOPoBOi20jh6Iq2CcCREuTYufE= +go.opentelemetry.io/otel/metric v1.36.0/go.mod h1:zC7Ks+yeyJt4xig9DEw9kuUFe5C3zLbVjV2PzT6qzbs= go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY= go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg= go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o= go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w= -go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs= -go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc= +go.opentelemetry.io/otel/trace v1.36.0 h1:ahxWNuqZjpdiFAyrIoQ4GIiAIhxAunQR6MUoKrsNd4w= +go.opentelemetry.io/otel/trace v1.36.0/go.mod h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA= go.opentelemetry.io/proto/otlp v1.6.0 h1:jQjP+AQyTf+Fe7OKj/MfkDrmK4MNVtw2NpXsf9fefDI= go.opentelemetry.io/proto/otlp v1.6.0/go.mod h1:cicgGehlFuNdgZkcALOCh3VE6K/u2tAjzlRhDwmVpZc= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= diff --git a/pkg/kubernetes/components_test.go b/pkg/kubernetes/components_test.go index 1568a9a8d..dd7dc3880 100644 --- a/pkg/kubernetes/components_test.go +++ b/pkg/kubernetes/components_test.go @@ -40,7 +40,7 @@ func TestComponents(t *testing.T) { name: "List one config", configName: "", outputFormat: "", - expectedOutput: " NAMESPACE NAME TYPE VERSION SCOPES CREATED AGE \n default appConfig state.redis v1 " + formattedNow + " 0s \n", + expectedOutput: "NAMESPACE NAME TYPE VERSION SCOPES CREATED AGE \ndefault appConfig state.redis v1 " + formattedNow + " 0s \n", errString: "", errorExpected: false, k8sConfig: []v1alpha1.Component{ @@ -70,7 +70,7 @@ func TestComponents(t *testing.T) { name: "Filters out daprsystem", configName: "", outputFormat: "", - expectedOutput: " NAMESPACE NAME TYPE VERSION SCOPES CREATED AGE \n default appConfig state.redis v1 " + formattedNow + " 0s \n", + expectedOutput: "NAMESPACE NAME TYPE VERSION SCOPES CREATED AGE \ndefault appConfig state.redis v1 " + formattedNow + " 0s \n", errString: "", errorExpected: false, k8sConfig: []v1alpha1.Component{ @@ -98,7 +98,7 @@ func TestComponents(t *testing.T) { name: "Name does match", configName: "appConfig", outputFormat: "list", - expectedOutput: " NAMESPACE NAME TYPE VERSION SCOPES CREATED AGE \n default appConfig state.redis v1 " + formattedNow + " 0s \n", + expectedOutput: "NAMESPACE NAME TYPE VERSION SCOPES CREATED AGE \ndefault appConfig state.redis v1 " + formattedNow + " 0s \n", errString: "", errorExpected: false, k8sConfig: []v1alpha1.Component{ @@ -119,7 +119,7 @@ func TestComponents(t *testing.T) { name: "Name does not match", configName: "appConfig", outputFormat: "list", - expectedOutput: " NAMESPACE NAME TYPE VERSION SCOPES CREATED AGE \n", + expectedOutput: "NAMESPACE NAME TYPE VERSION SCOPES CREATED AGE \n", errString: "", errorExpected: false, k8sConfig: []v1alpha1.Component{ diff --git a/pkg/kubernetes/configurations_test.go b/pkg/kubernetes/configurations_test.go index a48d655bb..8d62a3f51 100644 --- a/pkg/kubernetes/configurations_test.go +++ b/pkg/kubernetes/configurations_test.go @@ -41,7 +41,7 @@ func TestConfigurations(t *testing.T) { name: "List one config", configName: "", outputFormat: "", - expectedOutput: " NAMESPACE NAME TRACING-ENABLED METRICS-ENABLED AGE CREATED \n default appConfig false false 0s " + formattedNow + " \n", + expectedOutput: "NAMESPACE NAME TRACING-ENABLED METRICS-ENABLED AGE CREATED \ndefault appConfig false false 0s " + formattedNow + " \n", errString: "", errorExpected: false, k8sConfig: []v1alpha1.Configuration{ @@ -68,7 +68,7 @@ func TestConfigurations(t *testing.T) { name: "Filters out daprsystem", configName: "", outputFormat: "", - expectedOutput: " NAMESPACE NAME TRACING-ENABLED METRICS-ENABLED AGE CREATED \n default appConfig false false 0s " + formattedNow + " \n", + expectedOutput: "NAMESPACE NAME TRACING-ENABLED METRICS-ENABLED AGE CREATED \ndefault appConfig false false 0s " + formattedNow + " \n", errString: "", errorExpected: false, k8sConfig: []v1alpha1.Configuration{ @@ -94,7 +94,7 @@ func TestConfigurations(t *testing.T) { name: "Name does match", configName: "appConfig", outputFormat: "list", - expectedOutput: " NAMESPACE NAME TRACING-ENABLED METRICS-ENABLED AGE CREATED \n default appConfig false false 0s " + formattedNow + " \n", + expectedOutput: "NAMESPACE NAME TRACING-ENABLED METRICS-ENABLED AGE CREATED \ndefault appConfig false false 0s " + formattedNow + " \n", errString: "", errorExpected: false, k8sConfig: []v1alpha1.Configuration{ @@ -112,7 +112,7 @@ func TestConfigurations(t *testing.T) { name: "Name does not match", configName: "appConfig", outputFormat: "list", - expectedOutput: " NAMESPACE NAME TRACING-ENABLED METRICS-ENABLED AGE CREATED \n", + expectedOutput: "NAMESPACE NAME TRACING-ENABLED METRICS-ENABLED AGE CREATED \n", errString: "", errorExpected: false, k8sConfig: []v1alpha1.Configuration{ diff --git a/pkg/scheduler/delete.go b/pkg/scheduler/delete.go new file mode 100644 index 000000000..8b8d7d7e1 --- /dev/null +++ b/pkg/scheduler/delete.go @@ -0,0 +1,143 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "context" + "fmt" + "os" + "strings" + + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/dapr/cli/pkg/print" +) + +type DeleteOptions struct { + SchedulerNamespace string + DaprNamespace string + KubernetesMode bool +} + +func DeleteAll(ctx context.Context, opts DeleteOptions) error { + etcdClient, cancel, err := etcdClient(opts.KubernetesMode, opts.SchedulerNamespace) + if err != nil { + return err + } + defer cancel() + + resp, err := etcdClient.Txn(ctx).Then( + clientv3.OpDelete(fmt.Sprintf("dapr/jobs/app||%s||", opts.DaprNamespace), clientv3.WithPrefix()), + clientv3.OpDelete(fmt.Sprintf("dapr/jobs/actorreminder||%s||", opts.DaprNamespace), clientv3.WithPrefix()), + clientv3.OpDelete(fmt.Sprintf("dapr/counters/app||%s||", opts.DaprNamespace), clientv3.WithPrefix()), + clientv3.OpDelete(fmt.Sprintf("dapr/counters/actorreminder||%s||", opts.DaprNamespace), clientv3.WithPrefix()), + ).Commit() + if err != nil { + return err + } + + // Only count actual jobs, not counters. + var deleted int64 + for _, resp := range resp.Responses[:2] { + deleted += resp.GetResponseDeleteRange().Deleted + } + + print.InfoStatusEvent(os.Stdout, "Deleted %d jobs in namespace '%s'.", deleted, opts.DaprNamespace) + + return nil +} + +func Delete(ctx context.Context, key string, opts DeleteOptions) error { + split := strings.Split(key, "/") + if len(split) < 2 { + return fmt.Errorf("failed to parse job key, expecting '{target type}/{identifier}', got '%s'", key) + } + + etcdClient, cancel, err := etcdClient(opts.KubernetesMode, opts.SchedulerNamespace) + if err != nil { + return err + } + defer cancel() + + switch split[0] { + case "job": + if len(split) != 3 { + return fmt.Errorf("expecting job key to be in format 'job/{app ID}/{job name}', got '%s'", key) + } + return deleteJob(ctx, etcdClient, split[1], split[2], opts) + case "actorreminder": + if len(split) != 2 { + return fmt.Errorf("expecting actor reminder key to be in format 'actorreminder/{actor type}||{actor id}||{reminder name}', got '%s'", key) + } + actorSplit := strings.Split(split[1], "||") + if len(actorSplit) != 3 { + return fmt.Errorf( + "failed to parse actor reminder key, expecting 'actorreminder/{actor type}||{actor id}||{reminder name}', got '%s'", + key, + ) + } + + return deleteActorReminder(ctx, etcdClient, actorSplit[0], actorSplit[1], actorSplit[2], opts) + default: + return fmt.Errorf("unsupported job target type '%s', accepts 'job' and 'actorreminder'", split[0]) + } + + return nil +} + +func deleteJob(ctx context.Context, + client *clientv3.Client, + appID, name string, + opts DeleteOptions, +) error { + return deleteKeys(ctx, + client, + fmt.Sprintf("dapr/jobs/app||%s||%s||%s", opts.DaprNamespace, appID, name), + fmt.Sprintf("dapr/counters/app||%s||%s||%s", opts.DaprNamespace, appID, name), + opts, + ) +} + +func deleteActorReminder(ctx context.Context, + client *clientv3.Client, + actorType, actorID, name string, + opts DeleteOptions, +) error { + return deleteKeys(ctx, + client, + fmt.Sprintf("dapr/jobs/actorreminder||%s||%s||%s||%s", + opts.DaprNamespace, actorType, actorID, name, + ), + fmt.Sprintf("dapr/counters/actorreminder||%s||%s||%s||%s", + opts.DaprNamespace, actorType, actorID, name, + ), + opts, + ) +} + +func deleteKeys(ctx context.Context, client *clientv3.Client, key1, key2 string, opts DeleteOptions) error { + resp, err := client.Txn(ctx).Then( + clientv3.OpDelete(key1), + clientv3.OpDelete(key2), + ).Commit() + if err != nil { + return err + } + + if len(resp.Responses) == 0 || resp.Responses[0].GetResponseDeleteRange().Deleted == 0 { + return fmt.Errorf("no job with key '%s' found in namespace '%s'", key1, opts.DaprNamespace) + } + + return nil +} diff --git a/pkg/scheduler/exportimport.go b/pkg/scheduler/exportimport.go new file mode 100644 index 000000000..721983151 --- /dev/null +++ b/pkg/scheduler/exportimport.go @@ -0,0 +1,152 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "context" + "encoding/gob" + "errors" + "fmt" + "os" + + clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/protobuf/proto" + + "github.com/dapr/cli/pkg/print" + "github.com/dapr/cli/pkg/scheduler/stored" +) + +type ExportImportOptions struct { + SchedulerNamespace string + KubernetesMode bool + TargetFile string +} + +type ExportFile struct { + Jobs map[string][]byte + Counters map[string][]byte +} + +func Export(ctx context.Context, opts ExportImportOptions) error { + if _, err := os.Stat(opts.TargetFile); !errors.Is(err, os.ErrNotExist) { + if err == nil { + return fmt.Errorf("file '%s' already exists", opts.TargetFile) + } + return err + } + + client, cancel, err := etcdClient(opts.KubernetesMode, opts.SchedulerNamespace) + if err != nil { + return err + } + defer cancel() + + jobs, err := listJobs(ctx, client) + if err != nil { + return err + } + counters, err := listCounters(ctx, client) + if err != nil { + return err + } + + out := ExportFile{ + Jobs: make(map[string][]byte, len(jobs)), + Counters: make(map[string][]byte, len(counters)), + } + + for k, j := range jobs { + b, err := proto.Marshal(j) + if err != nil { + return fmt.Errorf("marshal job %q: %w", k, err) + } + out.Jobs[k] = b + } + for k, c := range counters { + b, err := proto.Marshal(c) + if err != nil { + return fmt.Errorf("marshal counter %q: %w", k, err) + } + out.Counters[k] = b + } + + f, err := os.OpenFile(opts.TargetFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o600) + if err != nil { + return fmt.Errorf("open %s: %w", opts.TargetFile, err) + } + + if err := gob.NewEncoder(f).Encode(&out); err != nil { + _ = f.Close() + _ = os.Remove(opts.TargetFile) + return fmt.Errorf("encode export file: %w", err) + } + + f.Close() + print.InfoStatusEvent(os.Stdout, "Exported %d jobs and %d counters.", len(out.Jobs), len(out.Counters)) + return nil +} + +func Import(ctx context.Context, opts ExportImportOptions) error { + client, cancel, err := etcdClient(opts.KubernetesMode, opts.SchedulerNamespace) + if err != nil { + return err + } + defer cancel() + + f, err := os.OpenFile(opts.TargetFile, os.O_RDONLY, 0o600) + if err != nil { + return fmt.Errorf("open %s: %w", opts.TargetFile, err) + } + defer f.Close() + + var in ExportFile + if err := gob.NewDecoder(f).Decode(&in); err != nil { + return fmt.Errorf("decode import file: %w", err) + } + + ops := make([]clientv3.Op, 0, len(in.Jobs)+len(in.Counters)) + + for key, b := range in.Jobs { + // Optional: verify bytes are valid before writing + var j stored.Job + if err := proto.Unmarshal(b, &j); err != nil { + return fmt.Errorf("unmarshal job %q: %w", key, err) + } + ops = append(ops, clientv3.OpPut(key, string(b))) + } + + for key, b := range in.Counters { + var c stored.Counter + if err := proto.Unmarshal(b, &c); err != nil { + return fmt.Errorf("unmarshal counter %q: %w", key, err) + } + ops = append(ops, clientv3.OpPut(key, string(b))) + } + + for i := 0; i < len(ops); i += 128 { + txn := client.Txn(ctx) + end := i + 128 + if end > len(ops) { + end = len(ops) + } + txn.Then(ops[i:end]...) + if _, err := txn.Commit(); err != nil { + return fmt.Errorf("commit transaction: %w", err) + } + + print.InfoStatusEvent(os.Stdout, "Imported %d items.", end) + } + + return nil +} diff --git a/pkg/scheduler/list.go b/pkg/scheduler/list.go new file mode 100644 index 000000000..27b2c30d3 --- /dev/null +++ b/pkg/scheduler/list.go @@ -0,0 +1,259 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "context" + "fmt" + "sort" + "strings" + "time" + + clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/protobuf/proto" + + "github.com/dapr/cli/pkg/scheduler/stored" + schedulerv1 "github.com/dapr/dapr/pkg/proto/scheduler/v1" + "github.com/dapr/kit/ptr" +) + +const ( + FilterJobsAll = "all" + FilterJobsJob = "jobs" + FilterJobsActor = "actorreminder" +) + +type ListJobsOptions struct { + SchedulerNamespace string + DaprNamespace *string + KubernetesMode bool + FilterJobType string +} + +type ListOutputWide struct { + Namespace string `csv:"NAMESPACE" json:"namespace" yaml:"namespace"` + AppID string `csv:"APP ID" json:"appId" yaml:"appId"` + Name string `csv:"NAME" json:"name" yaml:"name"` + Target string `csv:"TARGET" json:"target" yaml:"target"` + Begin time.Time `csv:"BEGIN" json:"begin" yaml:"begin,omitempty"` + Expiration *time.Time `csv:"EXPIRATION" json:"expiration" yaml:"expiration,omitempty"` + Schedule *string `csv:"SCHEDULE" json:"schedule" yaml:"schedule,omitempty"` + DueTime *string `csv:"DUE TIME" json:"dueTime" yaml:"dueTime,omitempty"` + TTL *string `csv:"TTL" json:"ttl" yaml:"ttl,omitempty"` + Repeats *uint32 `csv:"REPEATS" json:"repeats" yaml:"repeats,omitempty"` + Count uint32 `csv:"Count" json:"count" yaml:"count,omitempty"` + LastTrigger *time.Time `csv:"LAST TRIGGER" json:"lastTrigger" yaml:"lastTrigger"` +} + +type ListOutput struct { + Namespace string `csv:"NAMESPACE" json:"namespace" yaml:"namespace"` + AppID string `csv:"APP ID" json:"appId" yaml:"appId"` + Name string `csv:"NAME" json:"name" yaml:"name"` + Target string `csv:"TARGET" json:"target" yaml:"target"` + Begin time.Time `csv:"BEGIN" json:"begin" yaml:"begin,omitempty"` + Count uint32 `csv:"Count" json:"count" yaml:"count,omitempty"` + LastTrigger *time.Time `csv:"LAST TRIGGER" json:"lastTrigger" yaml:"lastTrigger"` +} + +type JobCount struct { + Key string + Job *stored.Job + Counter *stored.Counter +} + +func ListJobsAsOutput(ctx context.Context, opts ListJobsOptions) ([]ListOutput, error) { + listWide, err := ListJobsAsOutputWide(ctx, opts) + if err != nil { + return nil, err + } + + list := make([]ListOutput, 0, len(listWide)) + for _, item := range listWide { + list = append(list, ListOutput{ + Namespace: item.Namespace, + AppID: item.AppID, + Name: item.Name, + Target: item.Target, + Begin: item.Begin, + Count: item.Count, + LastTrigger: item.LastTrigger, + }) + } + + return list, nil +} + +func ListJobsAsOutputWide(ctx context.Context, opts ListJobsOptions) ([]ListOutputWide, error) { + jobCounters, err := ListJobs(ctx, opts) + if err != nil { + return nil, err + } + + var list []ListOutputWide + for _, jobCounter := range jobCounters { + var meta schedulerv1.JobMetadata + if err = jobCounter.Job.GetJob().GetMetadata().UnmarshalTo(&meta); err != nil { + return nil, err + } + + if opts.FilterJobType != FilterJobsAll { + switch meta.GetTarget().GetType().(type) { + case *schedulerv1.JobTargetMetadata_Job: + if opts.FilterJobType != FilterJobsJob { + continue + } + case *schedulerv1.JobTargetMetadata_Actor: + if opts.FilterJobType != FilterJobsActor { + continue + } + } + } + + if opts.DaprNamespace != nil && meta.GetNamespace() != *opts.DaprNamespace { + continue + } + + listoutput := ListOutputWide{ + Name: jobCounter.Key[(strings.LastIndex(jobCounter.Key, "||") + 2):], + Namespace: meta.GetNamespace(), + AppID: meta.GetAppId(), + Schedule: jobCounter.Job.GetJob().Schedule, + DueTime: jobCounter.Job.GetJob().DueTime, + TTL: jobCounter.Job.GetJob().Ttl, + Repeats: jobCounter.Job.GetJob().Repeats, + } + + switch meta.GetTarget().GetType().(type) { + case *schedulerv1.JobTargetMetadata_Job: + listoutput.Target = "job" + case *schedulerv1.JobTargetMetadata_Actor: + listoutput.Target = fmt.Sprintf("%s||%s)", + meta.GetTarget().GetActor().GetType(), + meta.GetTarget().GetActor().GetId(), + ) + } + + switch t := jobCounter.Job.GetBegin().(type) { + case *stored.Job_DueTime: + listoutput.Begin = t.DueTime.AsTime().Truncate(time.Second) + case *stored.Job_Start: + listoutput.Begin = t.Start.AsTime().Truncate(time.Second) + } + + if jobCounter.Job.Expiration != nil { + listoutput.Expiration = ptr.Of(jobCounter.Job.Expiration.AsTime().Truncate(time.Second)) + } + + if jobCounter.Counter != nil { + listoutput.Count = jobCounter.Counter.Count + if jobCounter.Counter.LastTrigger != nil { + listoutput.LastTrigger = ptr.Of(jobCounter.Counter.LastTrigger.AsTime().Truncate(time.Second)) + } + } + + list = append(list, listoutput) + } + + sort.SliceStable(list, func(i, j int) bool { + return list[i].Namespace < list[j].Namespace && + list[i].AppID < list[j].AppID && + list[i].Name < list[j].Name + }) + + return list, nil +} + +func ListJobs(ctx context.Context, opts ListJobsOptions) ([]*JobCount, error) { + etcdClient, cancel, err := etcdClient(opts.KubernetesMode, opts.SchedulerNamespace) + if err != nil { + return nil, err + } + defer cancel() + + jobs, err := listJobs(ctx, etcdClient) + if err != nil { + return nil, err + } + + counters, err := listCounters(ctx, etcdClient) + if err != nil { + return nil, err + } + + jobCounts := make([]*JobCount, 0, len(jobs)) + for key, job := range jobs { + jobCount := &JobCount{ + Key: key, + Job: job, + } + + counter, ok := counters[strings.ReplaceAll(key, "dapr/jobs/", "dapr/counters/")] + if ok { + jobCount.Counter = counter + } + + jobCounts = append(jobCounts, jobCount) + } + + return jobCounts, nil +} + +func listJobs(ctx context.Context, client *clientv3.Client) (map[string]*stored.Job, error) { + // dapr/jobs/actorreminder||default||dapr.internal.default.workflow-stress2.activity||e42d7040-e8e6-46d0-93be-d173ef8fd7d1-66::0::1||run-activity + // dapr/jobs/app||default||foobar + + resp, err := client.Get(ctx, + "dapr/jobs/", + clientv3.WithPrefix(), + clientv3.WithLimit(0), + ) + if err != nil { + return nil, err + } + + jobs := make(map[string]*stored.Job) + for _, kv := range resp.Kvs { + var stored stored.Job + if err := proto.Unmarshal(kv.Value, &stored); err != nil { + return nil, fmt.Errorf("failed to unmarshal job %s: %w", kv.Key, err) + } + + jobs[string(kv.Key)] = &stored + } + + return jobs, nil +} + +func listCounters(ctx context.Context, client *clientv3.Client) (map[string]*stored.Counter, error) { + resp, err := client.Get(ctx, + "dapr/counters/", + clientv3.WithPrefix(), + clientv3.WithLimit(0), + ) + if err != nil { + return nil, err + } + + counters := make(map[string]*stored.Counter) + for _, kv := range resp.Kvs { + var stored stored.Counter + if err := proto.Unmarshal(kv.Value, &stored); err != nil { + return nil, fmt.Errorf("failed to unmarshal counter %s: %w", kv.Key, err) + } + + counters[string(kv.Key)] = &stored + } + + return counters, nil +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go new file mode 100644 index 000000000..5496350a6 --- /dev/null +++ b/pkg/scheduler/scheduler.go @@ -0,0 +1,84 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "context" + + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/dapr/cli/pkg/kubernetes" +) + +func etcdClient(kubernetesMode bool, schedulerNamespace string) (*clientv3.Client, context.CancelFunc, error) { + var etcdClient *clientv3.Client + var err error + if kubernetesMode { + var cancel context.CancelFunc + etcdClient, cancel, err = etcdClientKubernetes(schedulerNamespace) + if err != nil { + return nil, nil, err + } + return etcdClient, cancel, nil + } else { + etcdClient, err = getEtcdClient("localhost:2379") + if err != nil { + return nil, nil, err + } + } + + return etcdClient, func() {}, nil +} + +func getEtcdClient(host string) (*clientv3.Client, error) { + client, err := clientv3.New(clientv3.Config{ + Endpoints: []string{host}, + }) + if err != nil { + return nil, err + } + + return client, nil +} + +func etcdClientKubernetes(namespace string) (*clientv3.Client, context.CancelFunc, error) { + config, _, err := kubernetes.GetKubeConfigClient() + if err != nil { + return nil, nil, err + } + + portForward, err := kubernetes.NewPortForward( + config, + namespace, + "dapr-scheduler-server-0", + "localhost", + 2379, + 2379, + false, + ) + if err != nil { + return nil, nil, err + } + + if err = portForward.Init(); err != nil { + return nil, nil, err + } + + client, err := getEtcdClient("localhost:2379") + if err != nil { + return nil, nil, err + } + + return client, portForward.Stop, nil +} diff --git a/pkg/scheduler/stored/counter.pb.go b/pkg/scheduler/stored/counter.pb.go new file mode 100644 index 000000000..10ced51ea --- /dev/null +++ b/pkg/scheduler/stored/counter.pb.go @@ -0,0 +1,197 @@ +// +//Copyright (c) 2024 Diagrid Inc. +//Licensed under the MIT License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.33.0 +// protoc v5.29.3 +// source: proto/stored/counter.proto + +package stored + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// Counter holds counter information for a given job. +type Counter struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // job_partition_id is the parition_id of the job this counter belongs to. + // Prevents an updated job from inheriting the counter of a previous job with + // the same name. + // Doesn't need to be globally unique. + JobPartitionId uint64 `protobuf:"varint,1,opt,name=job_partition_id,json=jobPartitionId,proto3" json:"job_partition_id,omitempty"` + // count is the number of times the job has been triggered. + Count uint32 `protobuf:"varint,2,opt,name=count,proto3" json:"count,omitempty"` + // last_trigger is the timestamp the job was last triggered. Used to + // determine the next time the job should be triggered. + LastTrigger *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=last_trigger,json=lastTrigger,proto3" json:"last_trigger,omitempty"` + // attempts is the number of times the job has been attempted to be triggered + // at this count. Used by failure policy to track how many times the Job + // trigger should be retried. + Attempts uint32 `protobuf:"varint,4,opt,name=attempts,proto3" json:"attempts,omitempty"` +} + +func (x *Counter) Reset() { + *x = Counter{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_stored_counter_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Counter) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Counter) ProtoMessage() {} + +func (x *Counter) ProtoReflect() protoreflect.Message { + mi := &file_proto_stored_counter_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Counter.ProtoReflect.Descriptor instead. +func (*Counter) Descriptor() ([]byte, []int) { + return file_proto_stored_counter_proto_rawDescGZIP(), []int{0} +} + +func (x *Counter) GetJobPartitionId() uint64 { + if x != nil { + return x.JobPartitionId + } + return 0 +} + +func (x *Counter) GetCount() uint32 { + if x != nil { + return x.Count + } + return 0 +} + +func (x *Counter) GetLastTrigger() *timestamppb.Timestamp { + if x != nil { + return x.LastTrigger + } + return nil +} + +func (x *Counter) GetAttempts() uint32 { + if x != nil { + return x.Attempts + } + return 0 +} + +var File_proto_stored_counter_proto protoreflect.FileDescriptor + +var file_proto_stored_counter_proto_rawDesc = []byte{ + 0x0a, 0x1a, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x64, 0x2f, 0x63, + 0x6f, 0x75, 0x6e, 0x74, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x73, 0x74, + 0x6f, 0x72, 0x65, 0x64, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa4, 0x01, 0x0a, 0x07, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x65, + 0x72, 0x12, 0x28, 0x0a, 0x10, 0x6a, 0x6f, 0x62, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, + 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0e, 0x6a, 0x6f, 0x62, + 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x63, + 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x63, 0x6f, 0x75, 0x6e, + 0x74, 0x12, 0x3d, 0x0a, 0x0c, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x74, 0x72, 0x69, 0x67, 0x67, 0x65, + 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, + 0x61, 0x6d, 0x70, 0x52, 0x0b, 0x6c, 0x61, 0x73, 0x74, 0x54, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, + 0x12, 0x1a, 0x0a, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x0d, 0x52, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x42, 0x37, 0x5a, 0x35, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x64, 0x69, 0x61, 0x67, 0x72, + 0x69, 0x64, 0x69, 0x6f, 0x2f, 0x67, 0x6f, 0x2d, 0x65, 0x74, 0x63, 0x64, 0x2d, 0x63, 0x72, 0x6f, + 0x6e, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, + 0x74, 0x6f, 0x72, 0x65, 0x64, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_proto_stored_counter_proto_rawDescOnce sync.Once + file_proto_stored_counter_proto_rawDescData = file_proto_stored_counter_proto_rawDesc +) + +func file_proto_stored_counter_proto_rawDescGZIP() []byte { + file_proto_stored_counter_proto_rawDescOnce.Do(func() { + file_proto_stored_counter_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_stored_counter_proto_rawDescData) + }) + return file_proto_stored_counter_proto_rawDescData +} + +var file_proto_stored_counter_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_proto_stored_counter_proto_goTypes = []interface{}{ + (*Counter)(nil), // 0: stored.Counter + (*timestamppb.Timestamp)(nil), // 1: google.protobuf.Timestamp +} +var file_proto_stored_counter_proto_depIdxs = []int32{ + 1, // 0: stored.Counter.last_trigger:type_name -> google.protobuf.Timestamp + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_proto_stored_counter_proto_init() } +func file_proto_stored_counter_proto_init() { + if File_proto_stored_counter_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_proto_stored_counter_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Counter); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_stored_counter_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_proto_stored_counter_proto_goTypes, + DependencyIndexes: file_proto_stored_counter_proto_depIdxs, + MessageInfos: file_proto_stored_counter_proto_msgTypes, + }.Build() + File_proto_stored_counter_proto = out.File + file_proto_stored_counter_proto_rawDesc = nil + file_proto_stored_counter_proto_goTypes = nil + file_proto_stored_counter_proto_depIdxs = nil +} diff --git a/pkg/scheduler/stored/job.pb.go b/pkg/scheduler/stored/job.pb.go new file mode 100644 index 000000000..3d4637b8e --- /dev/null +++ b/pkg/scheduler/stored/job.pb.go @@ -0,0 +1,253 @@ +// +//Copyright (c) 2024 Diagrid Inc. +//Licensed under the MIT License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.33.0 +// protoc v5.29.3 +// source: proto/stored/job.proto + +package stored + +import ( + api "github.com/diagridio/go-etcd-cron/api" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// Job is the wrapped stored version of a Job which has a partition_id +// associated. +type Job struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // partion_id is an identifier for the job, used for distinguishing jobs with + // the same name and assigning the job to a partition. + // Doesn't need to be globally unique. + PartitionId uint64 `protobuf:"varint,1,opt,name=partition_id,json=partitionId,proto3" json:"partition_id,omitempty"` + // begin is the beginning time of the job. + // + // Types that are assignable to Begin: + // + // *Job_Start + // *Job_DueTime + Begin isJob_Begin `protobuf_oneof:"begin"` + // expiration is the optional time at which the job should no longer be + // scheduled and will be ignored and garbage collected thereafter. + // A job may be removed earlier if repeats are exhausted or schedule doesn't + // permit. + Expiration *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=expiration,proto3,oneof" json:"expiration,omitempty"` + // job is the job spec. + Job *api.Job `protobuf:"bytes,5,opt,name=job,proto3" json:"job,omitempty"` +} + +func (x *Job) Reset() { + *x = Job{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_stored_job_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Job) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Job) ProtoMessage() {} + +func (x *Job) ProtoReflect() protoreflect.Message { + mi := &file_proto_stored_job_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Job.ProtoReflect.Descriptor instead. +func (*Job) Descriptor() ([]byte, []int) { + return file_proto_stored_job_proto_rawDescGZIP(), []int{0} +} + +func (x *Job) GetPartitionId() uint64 { + if x != nil { + return x.PartitionId + } + return 0 +} + +func (m *Job) GetBegin() isJob_Begin { + if m != nil { + return m.Begin + } + return nil +} + +func (x *Job) GetStart() *timestamppb.Timestamp { + if x, ok := x.GetBegin().(*Job_Start); ok { + return x.Start + } + return nil +} + +func (x *Job) GetDueTime() *timestamppb.Timestamp { + if x, ok := x.GetBegin().(*Job_DueTime); ok { + return x.DueTime + } + return nil +} + +func (x *Job) GetExpiration() *timestamppb.Timestamp { + if x != nil { + return x.Expiration + } + return nil +} + +func (x *Job) GetJob() *api.Job { + if x != nil { + return x.Job + } + return nil +} + +type isJob_Begin interface { + isJob_Begin() +} + +type Job_Start struct { + // start is the epoch time of the job whereby the clock starts on the + // schedule. The job _will not_ trigger at this time. + Start *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=start,proto3,oneof"` +} + +type Job_DueTime struct { + // due_time is the epoch time of the job whereby the clock starts on the + // schedule. The job _will_ trigger at this time. + DueTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=due_time,json=dueTime,proto3,oneof"` +} + +func (*Job_Start) isJob_Begin() {} + +func (*Job_DueTime) isJob_Begin() {} + +var File_proto_stored_job_proto protoreflect.FileDescriptor + +var file_proto_stored_job_proto_rawDesc = []byte{ + 0x0a, 0x16, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x64, 0x2f, 0x6a, + 0x6f, 0x62, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x64, + 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x1a, 0x13, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x6a, 0x6f, 0x62, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x8a, 0x02, 0x0a, 0x03, 0x4a, 0x6f, 0x62, 0x12, 0x21, + 0x0a, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x49, + 0x64, 0x12, 0x32, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x48, 0x00, 0x52, 0x05, + 0x73, 0x74, 0x61, 0x72, 0x74, 0x12, 0x37, 0x0a, 0x08, 0x64, 0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, + 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, + 0x61, 0x6d, 0x70, 0x48, 0x00, 0x52, 0x07, 0x64, 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x3f, + 0x0a, 0x0a, 0x65, 0x78, 0x70, 0x69, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x48, 0x01, + 0x52, 0x0a, 0x65, 0x78, 0x70, 0x69, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x88, 0x01, 0x01, 0x12, + 0x1a, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x61, + 0x70, 0x69, 0x2e, 0x4a, 0x6f, 0x62, 0x52, 0x03, 0x6a, 0x6f, 0x62, 0x42, 0x07, 0x0a, 0x05, 0x62, + 0x65, 0x67, 0x69, 0x6e, 0x42, 0x0d, 0x0a, 0x0b, 0x5f, 0x65, 0x78, 0x70, 0x69, 0x72, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x42, 0x37, 0x5a, 0x35, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x64, 0x69, 0x61, 0x67, 0x72, 0x69, 0x64, 0x69, 0x6f, 0x2f, 0x67, 0x6f, 0x2d, 0x65, + 0x74, 0x63, 0x64, 0x2d, 0x63, 0x72, 0x6f, 0x6e, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, + 0x6c, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x64, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_proto_stored_job_proto_rawDescOnce sync.Once + file_proto_stored_job_proto_rawDescData = file_proto_stored_job_proto_rawDesc +) + +func file_proto_stored_job_proto_rawDescGZIP() []byte { + file_proto_stored_job_proto_rawDescOnce.Do(func() { + file_proto_stored_job_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_stored_job_proto_rawDescData) + }) + return file_proto_stored_job_proto_rawDescData +} + +var file_proto_stored_job_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_proto_stored_job_proto_goTypes = []interface{}{ + (*Job)(nil), // 0: stored.Job + (*timestamppb.Timestamp)(nil), // 1: google.protobuf.Timestamp + (*api.Job)(nil), // 2: api.Job +} +var file_proto_stored_job_proto_depIdxs = []int32{ + 1, // 0: stored.Job.start:type_name -> google.protobuf.Timestamp + 1, // 1: stored.Job.due_time:type_name -> google.protobuf.Timestamp + 1, // 2: stored.Job.expiration:type_name -> google.protobuf.Timestamp + 2, // 3: stored.Job.job:type_name -> api.Job + 4, // [4:4] is the sub-list for method output_type + 4, // [4:4] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name +} + +func init() { file_proto_stored_job_proto_init() } +func file_proto_stored_job_proto_init() { + if File_proto_stored_job_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_proto_stored_job_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Job); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_proto_stored_job_proto_msgTypes[0].OneofWrappers = []interface{}{ + (*Job_Start)(nil), + (*Job_DueTime)(nil), + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_stored_job_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_proto_stored_job_proto_goTypes, + DependencyIndexes: file_proto_stored_job_proto_depIdxs, + MessageInfos: file_proto_stored_job_proto_msgTypes, + }.Build() + File_proto_stored_job_proto = out.File + file_proto_stored_job_proto_rawDesc = nil + file_proto_stored_job_proto_goTypes = nil + file_proto_stored_job_proto_depIdxs = nil +} diff --git a/pkg/scheduler/stored/leadership.pb.go b/pkg/scheduler/stored/leadership.pb.go new file mode 100644 index 000000000..772970a63 --- /dev/null +++ b/pkg/scheduler/stored/leadership.pb.go @@ -0,0 +1,186 @@ +// +//Copyright (c) 2024 Diagrid Inc. +//Licensed under the MIT License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.33.0 +// protoc v5.29.3 +// source: proto/stored/leadership.proto + +package stored + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + anypb "google.golang.org/protobuf/types/known/anypb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// Leadership is the message written to the leadership table when the replica +// gains ownership of the leader key. +type Leadership struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // total is this replicas understanding of the total number of partition + // replicas. + Total uint64 `protobuf:"varint,1,opt,name=total,proto3" json:"total,omitempty"` + // uid is a unique identifier for this replica. Ensures a single replica + // is the leader for a given partition. + Uid uint64 `protobuf:"varint,2,opt,name=uid,proto3" json:"uid,omitempty"` + // replica_data is custom data that is associated with this leader (replica). + // All leader data will be sent to library consumer on leadership table + // updates. + ReplicaData *anypb.Any `protobuf:"bytes,3,opt,name=replica_data,json=replicaData,proto3,oneof" json:"replica_data,omitempty"` +} + +func (x *Leadership) Reset() { + *x = Leadership{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_stored_leadership_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Leadership) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Leadership) ProtoMessage() {} + +func (x *Leadership) ProtoReflect() protoreflect.Message { + mi := &file_proto_stored_leadership_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Leadership.ProtoReflect.Descriptor instead. +func (*Leadership) Descriptor() ([]byte, []int) { + return file_proto_stored_leadership_proto_rawDescGZIP(), []int{0} +} + +func (x *Leadership) GetTotal() uint64 { + if x != nil { + return x.Total + } + return 0 +} + +func (x *Leadership) GetUid() uint64 { + if x != nil { + return x.Uid + } + return 0 +} + +func (x *Leadership) GetReplicaData() *anypb.Any { + if x != nil { + return x.ReplicaData + } + return nil +} + +var File_proto_stored_leadership_proto protoreflect.FileDescriptor + +var file_proto_stored_leadership_proto_rawDesc = []byte{ + 0x0a, 0x1d, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x64, 0x2f, 0x6c, + 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x68, 0x69, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x0b, 0x63, 0x72, 0x6f, 0x6e, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x64, 0x1a, 0x19, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x61, 0x6e, + 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x83, 0x01, 0x0a, 0x0a, 0x4c, 0x65, 0x61, 0x64, + 0x65, 0x72, 0x73, 0x68, 0x69, 0x70, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x12, 0x10, 0x0a, 0x03, + 0x75, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, 0x3c, + 0x0a, 0x0c, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x48, 0x00, 0x52, 0x0b, 0x72, 0x65, + 0x70, 0x6c, 0x69, 0x63, 0x61, 0x44, 0x61, 0x74, 0x61, 0x88, 0x01, 0x01, 0x42, 0x0f, 0x0a, 0x0d, + 0x5f, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x42, 0x37, 0x5a, + 0x35, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x64, 0x69, 0x61, 0x67, + 0x72, 0x69, 0x64, 0x69, 0x6f, 0x2f, 0x67, 0x6f, 0x2d, 0x65, 0x74, 0x63, 0x64, 0x2d, 0x63, 0x72, + 0x6f, 0x6e, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x61, 0x70, 0x69, 0x2f, + 0x73, 0x74, 0x6f, 0x72, 0x65, 0x64, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_proto_stored_leadership_proto_rawDescOnce sync.Once + file_proto_stored_leadership_proto_rawDescData = file_proto_stored_leadership_proto_rawDesc +) + +func file_proto_stored_leadership_proto_rawDescGZIP() []byte { + file_proto_stored_leadership_proto_rawDescOnce.Do(func() { + file_proto_stored_leadership_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_stored_leadership_proto_rawDescData) + }) + return file_proto_stored_leadership_proto_rawDescData +} + +var file_proto_stored_leadership_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_proto_stored_leadership_proto_goTypes = []interface{}{ + (*Leadership)(nil), // 0: cron.stored.Leadership + (*anypb.Any)(nil), // 1: google.protobuf.Any +} +var file_proto_stored_leadership_proto_depIdxs = []int32{ + 1, // 0: cron.stored.Leadership.replica_data:type_name -> google.protobuf.Any + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_proto_stored_leadership_proto_init() } +func file_proto_stored_leadership_proto_init() { + if File_proto_stored_leadership_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_proto_stored_leadership_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Leadership); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_proto_stored_leadership_proto_msgTypes[0].OneofWrappers = []interface{}{} + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_stored_leadership_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_proto_stored_leadership_proto_goTypes, + DependencyIndexes: file_proto_stored_leadership_proto_depIdxs, + MessageInfos: file_proto_stored_leadership_proto_msgTypes, + }.Build() + File_proto_stored_leadership_proto = out.File + file_proto_stored_leadership_proto_rawDesc = nil + file_proto_stored_leadership_proto_goTypes = nil + file_proto_stored_leadership_proto_depIdxs = nil +} diff --git a/pkg/standalone/standalone.go b/pkg/standalone/standalone.go index 86d23dd7a..83682a62e 100644 --- a/pkg/standalone/standalone.go +++ b/pkg/standalone/standalone.go @@ -88,7 +88,7 @@ const ( schedulerHealthPort = 58081 schedulerMetricPort = 59091 - schedulerEtcdPort = 52379 + schedulerEtcdPort = 2379 daprVersionsWithScheduler = ">= 1.14.x" ) diff --git a/tests/apps/jobs/app.go b/tests/apps/jobs/app.go new file mode 100644 index 000000000..939d6dfb6 --- /dev/null +++ b/tests/apps/jobs/app.go @@ -0,0 +1,133 @@ +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "fmt" + "log" + "net" + "net/http" + "time" + + "github.com/dapr/go-sdk/client" + "github.com/dapr/kit/ptr" + "github.com/dapr/kit/signals" +) + +func main() { + const port = 9084 + + ctx := signals.Context() + + fmt.Printf("Starting server in port %v...\n", port) + + regCh := make(chan struct{}) + mux := http.NewServeMux() + mux.HandleFunc("/dapr/config", func(w http.ResponseWriter, r *http.Request) { + close(regCh) + w.Write([]byte(`{"entities": ["myactortype"]}`)) + }) + mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {}) + + go func() { + <-regCh + register(ctx) + }() + + StartServer(ctx, port, mux) +} + +func register(ctx context.Context) { + log.Printf("Registering jobs & reminders") + + cl, err := client.NewClient() + if err != nil { + log.Fatal(err) + } + + ds := time.Now().Format(time.RFC3339) + + if err = cl.ScheduleJobAlpha1(ctx, &client.Job{ + Name: "test1", + Schedule: ptr.Of("@every 100m"), + Repeats: ptr.Of(uint32(1234)), + DueTime: ptr.Of(ds), + }); err != nil { + log.Fatal(err) + } + + if err = cl.ScheduleJobAlpha1(ctx, &client.Job{ + Name: "test2", + Schedule: ptr.Of("@every 100m"), + Repeats: ptr.Of(uint32(56788)), + DueTime: ptr.Of(ds), + TTL: ptr.Of("10000s"), + }); err != nil { + log.Fatal(err) + } + + if err = cl.RegisterActorReminder(ctx, &client.RegisterActorReminderRequest{ + ActorType: "myactortype", + ActorID: "actorid1", + Name: "test1", + DueTime: ds, + Period: "R100/PT10000S", + }); err != nil { + log.Fatal(err) + } + + if err = cl.RegisterActorReminder(ctx, &client.RegisterActorReminderRequest{ + ActorType: "myactortype", + ActorID: "actorid2", + Name: "test2", + DueTime: ds, + Period: "R100/PT10000S", + }); err != nil { + log.Fatal(err) + } +} + +// StartServer starts a HTTP or HTTP2 server +func StartServer(ctx context.Context, port int, handler http.Handler) { + // Create a listener + addr := fmt.Sprintf(":%d", port) + ln, err := net.Listen("tcp", addr) + if err != nil { + log.Fatalf("Failed to create listener: %v", err) + } + + //nolint:gosec + server := &http.Server{ + Addr: addr, + Handler: handler, + } + + go func() { + // Wait for cancelation signal + <-ctx.Done() + log.Println("Shutdown signal received") + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + server.Shutdown(ctx) + }() + + err = server.Serve(ln) + + if err != http.ErrServerClosed { + log.Fatalf("Failed to run server: %v", err) + } + + log.Println("Server shut down") +} diff --git a/tests/apps/jobs/go.mod b/tests/apps/jobs/go.mod new file mode 100644 index 000000000..500af9d12 --- /dev/null +++ b/tests/apps/jobs/go.mod @@ -0,0 +1,29 @@ +module jobs + +go 1.24.7 + +require ( + github.com/dapr/go-sdk v1.13.0 + github.com/dapr/kit v0.16.1 +) + +require ( + github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/dapr/dapr v1.16.0 // indirect + github.com/dapr/durabletask-go v0.10.0 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel v1.36.0 // indirect + go.opentelemetry.io/otel/metric v1.36.0 // indirect + go.opentelemetry.io/otel/trace v1.36.0 // indirect + golang.org/x/net v0.41.0 // indirect + golang.org/x/sys v0.33.0 // indirect + golang.org/x/text v0.26.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect + google.golang.org/grpc v1.73.0 // indirect + google.golang.org/protobuf v1.36.6 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/tests/apps/jobs/go.sum b/tests/apps/jobs/go.sum new file mode 100644 index 000000000..deeb81574 --- /dev/null +++ b/tests/apps/jobs/go.sum @@ -0,0 +1,71 @@ +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/dapr/dapr v1.16.0 h1:la2WLZM8Myr2Pq3cyrFjHKWDSPYLzGZCs3p502TwBjI= +github.com/dapr/dapr v1.16.0/go.mod h1:ln/mxvNOeqklaDmic4ppsxmnjl2D/oZGKaJy24IwaEY= +github.com/dapr/durabletask-go v0.10.0 h1:vfIivPl4JYd55xZTslDwhA6p6F8ipcNxBtMaupxArr8= +github.com/dapr/durabletask-go v0.10.0/go.mod h1:0Ts4rXp74JyG19gDWPcwNo5V6NBZzhARzHF5XynmA7Q= +github.com/dapr/go-sdk v1.13.0 h1:Qw2BmUonClQ9yK/rrEEaFL1PyDgq616RrvYj0CT67Lk= +github.com/dapr/go-sdk v1.13.0/go.mod h1:RsffVNZitDApmQqoS68tNKGMXDZUjTviAbKZupJSzts= +github.com/dapr/kit v0.16.1 h1:MqLAhHVg8trPy2WJChMZFU7ToeondvxcNHYVvMDiVf4= +github.com/dapr/kit v0.16.1/go.mod h1:40ZWs5P6xfYf7O59XgwqZkIyDldTIXlhTQhGop8QoSM= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg= +go.opentelemetry.io/otel v1.36.0/go.mod h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E= +go.opentelemetry.io/otel/metric v1.36.0 h1:MoWPKVhQvJ+eeXWHFBOPoBOi20jh6Iq2CcCREuTYufE= +go.opentelemetry.io/otel/metric v1.36.0/go.mod h1:zC7Ks+yeyJt4xig9DEw9kuUFe5C3zLbVjV2PzT6qzbs= +go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY= +go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg= +go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o= +go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w= +go.opentelemetry.io/otel/trace v1.36.0 h1:ahxWNuqZjpdiFAyrIoQ4GIiAIhxAunQR6MUoKrsNd4w= +go.opentelemetry.io/otel/trace v1.36.0/go.mod h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA= +golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw= +golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= +golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 h1:fc6jSaCT0vBduLYZHYrBBNY4dsWuvgyff9noRNDdBeE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= +google.golang.org/grpc v1.73.0 h1:VIWSmpI2MegBtTuFt5/JWy2oXxtjJ/e89Z70ImfD2ok= +google.golang.org/grpc v1.73.0/go.mod h1:50sbHOUqWoCQGI8V2HQLJM0B+LMlIUjNSZmow7EVBQc= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/tests/e2e/standalone/commands.go b/tests/e2e/standalone/commands.go index c75bc7171..8a84df07a 100644 --- a/tests/e2e/standalone/commands.go +++ b/tests/e2e/standalone/commands.go @@ -219,3 +219,35 @@ func cmdVersion(output string, args ...string) (string, error) { return spawn.Command(common.GetDaprPath(), verArgs...) } + +func cmdSchedulerList(args ...string) (string, error) { + listArgs := []string{"scheduler", "list"} + + listArgs = append(listArgs, args...) + + return spawn.Command(common.GetDaprPath(), listArgs...) +} + +func cmdSchedulerDelete(args ...string) (string, error) { + deleteArgs := []string{"scheduler", "delete"} + + deleteArgs = append(deleteArgs, args...) + + return spawn.Command(common.GetDaprPath(), deleteArgs...) +} + +func cmdSchedulerExport(args ...string) (string, error) { + exportArgs := []string{"scheduler", "export"} + + exportArgs = append(exportArgs, args...) + + return spawn.Command(common.GetDaprPath(), exportArgs...) +} + +func cmdSchedulerImport(args ...string) (string, error) { + importArgs := []string{"scheduler", "import"} + + importArgs = append(importArgs, args...) + + return spawn.Command(common.GetDaprPath(), importArgs...) +} diff --git a/tests/e2e/standalone/scheduler_test.go b/tests/e2e/standalone/scheduler_test.go new file mode 100644 index 000000000..482aeb339 --- /dev/null +++ b/tests/e2e/standalone/scheduler_test.go @@ -0,0 +1,298 @@ +//go:build !windows && (e2e || template) + +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package standalone_test + +import ( + "encoding/json" + "path/filepath" + "strconv" + "strings" + "testing" + "time" + + "github.com/dapr/cli/pkg/scheduler" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +func TestSchedulerList(t *testing.T) { + cleanUpLogs() + ensureDaprInstallation(t) + t.Cleanup(func() { + // remove dapr installation after all tests in this function. + must(t, cmdUninstall, "failed to uninstall Dapr") + }) + + runFilePath := "../testdata/run-template-files/jobs.yaml" + t.Cleanup(func() { + // assumption in the test is that there is only one set of app and daprd logs in the logs directory. + cleanUpLogs() + waitAppsToBeStopped() + }) + args := []string{ + "-f", runFilePath, + } + + go cmdRunWithContext(t.Context(), "", args...) + + require.EventuallyWithT(t, func(c *assert.CollectT) { + output, err := cmdSchedulerList() + require.NoError(t, err) + assert.Len(c, strings.Lines(output), 5) + }, time.Second*10, time.Millisecond*10) + + t.Run("short", func(t *testing.T) { + output, err := cmdSchedulerList() + require.NoError(t, err) + lines := strings.Split(output, "\n") + require.Len(t, lines, 5) + + require.Equal(t, []string{ + "NAMESPACE", + "APP ID", + "NAME", + "TARGET", + "BEGIN", + "COUNT", + "LAST", + "TRIGGER", + }, strings.Fields(lines[0])) + + expNames := []string{"test1", "test2", "test1", "test2"} + expTargets := []string{"jobs", "jobs", "myactortype||actorid1", "myactortype||actorid2"} + for i, line := range lines[1:] { + assert.Equal(t, "default", strings.Fields(line)[0]) + + assert.Equal(t, "jobs", strings.Fields(line)[1]) + + assert.Equal(t, expNames[i], strings.Fields(line)[2]) + + assert.Equal(t, expTargets[i], strings.Fields(line)[3]) + + assert.NotEmpty(t, strings.Fields(line)[4]) + + count, err := strconv.Atoi(strings.Fields(line)[5]) + require.NoError(t, err) + assert.Equal(t, 1, count) + + assert.NotEmpty(t, strings.Fields(line)[6]) + } + }) + + t.Run("wide", func(t *testing.T) { + output, err := cmdSchedulerList("-o", "wide") + require.NoError(t, err) + lines := strings.Split(output, "\n") + require.Len(t, lines, 5) + + require.Equal(t, []string{ + "NAMESPACE", + "APP ID", + "NAME", + "TARGET", + "BEGIN", + "EXPIRATION", + "SCHEDULE", + "DUE TIME", + "TTL", + "REPEATS", + "COUNT", + "LAST", + "TRIGGER", + }, strings.Fields(lines[0])) + }) + + t.Run("yaml", func(t *testing.T) { + output, err := cmdSchedulerList("-o", "yaml") + require.NoError(t, err) + + var list []scheduler.ListOutputWide + require.NoError(t, yaml.Unmarshal([]byte(output), &list)) + assert.Len(t, list, 4) + }) + + t.Run("json", func(t *testing.T) { + output, err := cmdSchedulerList("-o", "json") + require.NoError(t, err) + + var list []scheduler.ListOutputWide + require.NoError(t, json.Unmarshal([]byte(output), &list)) + assert.Len(t, list, 4) + }) + + t.Run("filter", func(t *testing.T) { + output, err := cmdSchedulerList("-n", "foo") + require.NoError(t, err) + assert.Len(t, strings.Lines(output), 1) + + output, err = cmdSchedulerList("--filter-type", "all") + require.NoError(t, err) + assert.Len(t, strings.Lines(output), 5) + + output, err = cmdSchedulerList("--filter-type", "jobs") + require.NoError(t, err) + assert.Len(t, strings.Lines(output), 3) + + output, err = cmdSchedulerList("--filter-type", "actorreminder") + require.NoError(t, err) + assert.Len(t, strings.Lines(output), 3) + }) +} + +func TestSchedulerDelete(t *testing.T) { + cleanUpLogs() + ensureDaprInstallation(t) + t.Cleanup(func() { + // remove dapr installation after all tests in this function. + must(t, cmdUninstall, "failed to uninstall Dapr") + }) + + runFilePath := "../testdata/run-template-files/jobs.yaml" + t.Cleanup(func() { + // assumption in the test is that there is only one set of app and daprd logs in the logs directory. + cleanUpLogs() + waitAppsToBeStopped() + }) + args := []string{ + "-f", runFilePath, + } + + go cmdRunWithContext(t.Context(), "", args...) + + require.EventuallyWithT(t, func(c *assert.CollectT) { + output, err := cmdSchedulerList() + require.NoError(t, err) + assert.Len(c, strings.Lines(output), 5) + }, time.Second*10, time.Millisecond*10) + + // No namespace + _, err := cmdSchedulerDelete("job/jobs/test1") + require.NoError(t, err) + + var output string + output, err = cmdSchedulerList() + require.NoError(t, err) + assert.Len(t, strings.Lines(output), 5) + + _, err = cmdSchedulerDelete("job/jobs/test1", "-n", "default") + require.NoError(t, err) + output, err = cmdSchedulerList() + require.NoError(t, err) + assert.Len(t, strings.Lines(output), 4) + + _, err = cmdSchedulerDelete("-n", "default", "actorreminder/myactortype||actor1||test1") + require.NoError(t, err) + output, err = cmdSchedulerList() + require.NoError(t, err) + assert.Len(t, strings.Lines(output), 3) + + _, err = cmdSchedulerDelete("-n", "default", + "job/jobs/test2", + "actorreminder/myactortype||actor2||test2", + ) + require.NoError(t, err) + output, err = cmdSchedulerList() + require.NoError(t, err) + assert.Len(t, strings.Lines(output), 1) +} + +func TestSchedulerDeleteAll(t *testing.T) { + cleanUpLogs() + ensureDaprInstallation(t) + t.Cleanup(func() { + // remove dapr installation after all tests in this function. + must(t, cmdUninstall, "failed to uninstall Dapr") + }) + + runFilePath := "../testdata/run-template-files/jobs.yaml" + t.Cleanup(func() { + // assumption in the test is that there is only one set of app and daprd logs in the logs directory. + cleanUpLogs() + waitAppsToBeStopped() + }) + args := []string{ + "-f", runFilePath, + } + + go cmdRunWithContext(t.Context(), "", args...) + + require.EventuallyWithT(t, func(c *assert.CollectT) { + output, err := cmdSchedulerList() + require.NoError(t, err) + assert.Len(c, strings.Lines(output), 5) + }, time.Second*10, time.Millisecond*10) + + // No namespace + _, err := cmdSchedulerDelete("job/jobs/test1") + require.NoError(t, err) + + var output string + output, err = cmdSchedulerList() + require.NoError(t, err) + assert.Len(t, strings.Lines(output), 5) + + _, err = cmdSchedulerDelete("--delete-all-yes-i-know-what-i-am-doing") + require.NoError(t, err) + output, err = cmdSchedulerList() + require.NoError(t, err) + assert.Len(t, strings.Lines(output), 1) +} + +func TestSchedulerExportImport(t *testing.T) { + cleanUpLogs() + ensureDaprInstallation(t) + t.Cleanup(func() { + // remove dapr installation after all tests in this function. + must(t, cmdUninstall, "failed to uninstall Dapr") + }) + + runFilePath := "../testdata/run-template-files/jobs.yaml" + t.Cleanup(func() { + // assumption in the test is that there is only one set of app and daprd logs in the logs directory. + cleanUpLogs() + waitAppsToBeStopped() + }) + args := []string{ + "-f", runFilePath, + } + + go cmdRunWithContext(t.Context(), "", args...) + + require.EventuallyWithT(t, func(c *assert.CollectT) { + output, err := cmdSchedulerList() + require.NoError(t, err) + assert.Len(c, strings.Lines(output), 5) + }, time.Second*10, time.Millisecond*10) + + f := filepath.Join(t.TempDir(), "foo") + _, err = cmdSchedulerExport("-o", f) + require.NoError(t, err) + + _, err = cmdSchedulerDelete("--delete-all-yes-i-know-what-i-am-doing") + require.NoError(t, err) + output, err = cmdSchedulerList() + require.NoError(t, err) + assert.Len(t, strings.Lines(output), 1) + + _, err = cmdSchedulerImport("-f", f) + require.NoError(t, err) + output, err = cmdSchedulerList() + require.NoError(t, err) + assert.Len(t, strings.Lines(output), 5) +} diff --git a/tests/e2e/testdata/run-template-files/jobs.yaml b/tests/e2e/testdata/run-template-files/jobs.yaml new file mode 100644 index 000000000..9fdb223b4 --- /dev/null +++ b/tests/e2e/testdata/run-template-files/jobs.yaml @@ -0,0 +1,8 @@ +version: 1 +apps: +- appID: jobs + appDirPath: ../../../apps/jobs/ + appPort: 9084 + daprHTTPPort: 3510 + command: ["go","run", "app.go"] + appLogDestination: console diff --git a/utils/utils.go b/utils/utils.go index 483bf85f7..b410ef03c 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -15,6 +15,7 @@ package utils import ( "bufio" + "bytes" "context" "encoding/json" "errors" @@ -92,12 +93,18 @@ func PrintTable(csvContent string) { // WriteTable writes the csv table to writer. func WriteTable(writer io.Writer, csvContent string) { - table := tablewriter.NewWriter(writer) + var output bytes.Buffer + + table := tablewriter.NewWriter(&output) table.SetHeaderAlignment(tablewriter.ALIGN_LEFT) - table.SetBorder(false) + //table.SetBorder(false) table.SetHeaderLine(false) - table.SetRowLine(false) - table.SetCenterSeparator("") + //table.SetRowLine(false) + table.SetBorders(tablewriter.Border{ + Top: false, + Bottom: false, + }) + table.SetTablePadding("") table.SetRowSeparator("") table.SetColumnSeparator("") table.SetAlignment(tablewriter.ALIGN_LEFT) @@ -116,6 +123,12 @@ func WriteTable(writer io.Writer, csvContent string) { } table.Render() + + b := bufio.NewScanner(&output) + for b.Scan() { + writer.Write(bytes.TrimLeft(b.Bytes(), " ")) + writer.Write([]byte("\n")) + } } func TruncateString(str string, maxLength int) string {