diff --git a/api-reference/go/workflows/Jobs.Submit.mdx b/api-reference/go/workflows/Jobs.Submit.mdx index 4af4e16..c587a72 100644 --- a/api-reference/go/workflows/Jobs.Submit.mdx +++ b/api-reference/go/workflows/Jobs.Submit.mdx @@ -8,7 +8,6 @@ icon: diagram-project func (*JobClient) Submit( ctx context.Context, jobName string, - cluster *workflows.Cluster, tasks []workflows.Task, options ...job.SubmitOption ) (*workflows.Job, error) @@ -21,9 +20,6 @@ Submit a job. The name of the job - - The [cluster](/workflows/concepts/clusters#managing-clusters) to run the root task on - The root task for the job. This task is executed first and can submit subtasks to manage the entire workflow. A job can have optionally consist of multiple root tasks. @@ -36,6 +32,9 @@ Submit a job. Set the maximum number of [retries](/workflows/concepts/tasks#retry-handling) for the subtask in case it fails + + The [cluster](/workflows/concepts/clusters#managing-clusters) to run the root task on. If not provided, the default cluster is used. + ## Returns @@ -45,7 +44,6 @@ A job object. ```go Go job, err := client.Jobs.Submit(ctx, "My job", - cluster, []workflows.Task{rootTask}, ) ``` diff --git a/api-reference/go/workflows/NewTaskRunner.mdx b/api-reference/go/workflows/NewTaskRunner.mdx index fbac3c6..2e68399 100644 --- a/api-reference/go/workflows/NewTaskRunner.mdx +++ b/api-reference/go/workflows/NewTaskRunner.mdx @@ -6,7 +6,7 @@ icon: gear-code ```go func (*Client) NewTaskRunner( - cluster *workflows.Cluster, + ctx context.Context, options ...runner.Option, ) (*workflows.TaskRunner, error) ``` @@ -15,15 +15,15 @@ Initialize a task runner. ## Parameters - - The [cluster](/workflows/concepts/clusters#managing-clusters) to connect to - Options for initializing the task runner ## Options + + The [cluster](/workflows/concepts/clusters#managing-clusters) to connect to. If not provided, the default cluster is used. + Set the logger to use for the task runner @@ -37,6 +37,6 @@ The created task runner object. ```go Go -runner, err := client.NewTaskRunner() +runner, err := client.NewTaskRunner(ctx) ``` diff --git a/api-reference/python/tilebox.workflows/Client.mdx b/api-reference/python/tilebox.workflows/Client.mdx index f8902f1..2d21185 100644 --- a/api-reference/python/tilebox.workflows/Client.mdx +++ b/api-reference/python/tilebox.workflows/Client.mdx @@ -68,6 +68,6 @@ cluster_client = client.clusters() automation_client = client.automations() # or instantiate a task runner -runner = client.runner("dev-cluster", tasks=[...]) +runner = client.runner(tasks=[...]) ``` diff --git a/api-reference/python/tilebox.workflows/Client.runner.mdx b/api-reference/python/tilebox.workflows/Client.runner.mdx index e38a654..b150a98 100644 --- a/api-reference/python/tilebox.workflows/Client.runner.mdx +++ b/api-reference/python/tilebox.workflows/Client.runner.mdx @@ -5,7 +5,7 @@ icon: laptop-code ```python def Client.runner( - cluster: ClusterSlugLike, + cluster: ClusterSlugLike | None = None, tasks: list[type[Task]], cache: JobCache | None = None ) -> TaskRunner @@ -15,8 +15,9 @@ Initialize a task runner. ## Parameters - + The [cluster slug](/workflows/concepts/clusters#managing-clusters) for the cluster associated with this task runner. + If not provided, the default cluster is used. @@ -34,8 +35,7 @@ from tilebox.workflows.cache import LocalFileSystemCache client = Client() runner = client.runner( - "my-cluster-EdsdUozYprBJDL", - [MyFirstTask, MySubtask], + tasks=[MyFirstTask, MySubtask], # optional: cache=LocalFileSystemCache("cache_directory"), ) diff --git a/api-reference/python/tilebox.workflows/JobClient.submit.mdx b/api-reference/python/tilebox.workflows/JobClient.submit.mdx index 1c5039c..aba2ab9 100644 --- a/api-reference/python/tilebox.workflows/JobClient.submit.mdx +++ b/api-reference/python/tilebox.workflows/JobClient.submit.mdx @@ -7,7 +7,7 @@ icon: diagram-project def JobClient.submit( job_name: str, root_task_or_tasks: Task | Iterable[Task], - cluster: str | Cluster | Iterable[str | Cluster], + cluster: str | Cluster | Iterable[str | Cluster] | None = None, max_retries: int = 0 ) -> Job ``` @@ -24,8 +24,9 @@ Submit a job. The root task for the job. This task is executed first and can submit subtasks to manage the entire workflow. A job can have optionally consist of multiple root tasks. - + The [cluster slug](/workflows/concepts/clusters#managing-clusters) for the cluster to run the root task on. In case of multiple root tasks, a list of cluster slugs can be provided. + If not provided, the default cluster is used. @@ -43,8 +44,6 @@ job = job_client.submit( value=42, data={"key": "value"} ), - "my-cluster-EdsdUozYprBJDL", - max_retries=0, ) ``` diff --git a/guides/workflows/multi-language.mdx b/guides/workflows/multi-language.mdx index 13f7088..1b774e4 100644 --- a/guides/workflows/multi-language.mdx +++ b/guides/workflows/multi-language.mdx @@ -96,22 +96,15 @@ Write a simple HTTP tasking server in Go with a `/submit` endpoint that accepts ```go Go func main() { - ctx := context.Background() - client := workflows.NewClient() - - cluster, err := client.Clusters.Get(ctx, "test-cluster-tZD9Ca2qsqt4V") - if err != nil { - log.Fatal(err) - } - - http.HandleFunc("/submit", submitHandler(client, cluster)) - log.Println("Server starting on http://localhost:8080") + + client := workflows.NewClient() + http.HandleFunc("/submit", submitHandler(client)) log.Fatal(http.ListenAndServe(":8080", nil)) } // Submit a job based on some query parameters -func submitHandler(client *workflows.Client, cluster *workflows.Cluster) http.HandlerFunc { +func submitHandler(client *workflows.Client) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { latArg := r.URL.Query().Get("lat") lonArg := r.URL.Query().Get("lon") @@ -145,7 +138,7 @@ func submitHandler(client *workflows.Client, cluster *workflows.Cluster) http.Ha spectralBands = append(spectralBands, band) } - job, err := client.Jobs.Submit(r.Context(), "Schedule Image capture", cluster, + job, err := client.Jobs.Submit(r.Context(), "Schedule Image capture", []workflows.Task{ &ScheduleImageCapture{ Location: [2]float64{latFloat, lonFloat}, @@ -177,12 +170,7 @@ from tilebox.workflows import Client def main(): client = Client() - runner = client.runner( - "test-cluster-tZD9Ca2qsqt4V", - tasks=[ - ScheduleImageCapture, - ], - ) + runner = client.runner(tasks=[ScheduleImageCapture]) runner.run_forever() if __name__ == "__main__": diff --git a/quickstart.mdx b/quickstart.mdx index 3b946bb..219146d 100644 --- a/quickstart.mdx +++ b/quickstart.mdx @@ -49,16 +49,6 @@ If you prefer to work locally, follow these steps to get started. Copy the API key and keep it somewhere safe. You will need it to authenticate your requests. - - Create a cluster by logging into the [Tilebox Console](https://console.tilebox.com), navigating to [Workflows -> Clusters](https://console.tilebox.com/workflows/clusters), and clicking the "Create cluster" button. - - - Tilebox Console - Tilebox Console - - - Copy the cluster slug, you will need it to run your workflows. - Use the datasets client to query data from a dataset. @@ -82,8 +72,7 @@ If you prefer to work locally, follow these steps to get started. ```python Python from tilebox.workflows import Client, Task - # Replace with your actual cluster and token - cluster = "YOUR_COMPUTE_CLUSTER" + # Replace with your actual token client = Client(token="YOUR_TILEBOX_API_KEY") class HelloWorldTask(Task): @@ -102,10 +91,10 @@ If you prefer to work locally, follow these steps to get started. # Initiate the job jobs = client.jobs() - jobs.submit("parameterized-hello-world", HelloWorldTask(greeting="Greetings", name="Universe"), cluster) + jobs.submit("parameterized-hello-world", HelloWorldTask(greeting="Greetings", name="Universe")) # Run the tasks - runner = client.runner(cluster, tasks=[HelloWorldTask, HelloSubtask]) + runner = client.runner(tasks=[HelloWorldTask, HelloSubtask]) runner.run_all() ``` @@ -158,16 +147,6 @@ If you prefer to work locally, follow these steps to get started. Copy the API key and keep it somewhere safe. You will need it to authenticate your requests. - - Create a cluster by logging into the [Tilebox Console](https://console.tilebox.com), navigating to [Workflows -> Clusters](https://console.tilebox.com/workflows/clusters), and clicking the "Create cluster" button. - - - Tilebox Console - Tilebox Console - - - Copy the cluster slug, you will need it to run your workflows. - Run [tilebox-generate](https://github.com/tilebox/tilebox-generate) in the root directory of your Go project. It generates the dataset type for Sentinel-2 MSI dataset. It will generate a `./protogen/tilebox/v1/sentinel2_msi.pb.go` file. @@ -285,11 +264,10 @@ If you prefer to work locally, follow these steps to get started. func main() { ctx := context.Background() - // Replace with your actual cluster and token - clusterSlug := "YOUR_COMPUTE_CLUSTER" + // Replace with your actual token client := workflows.NewClient() - job, err := client.Jobs.Submit(ctx, "hello-world", clusterSlug, + job, err := client.Jobs.Submit(ctx, "hello-world", []workflows.Task{ &HelloTask{ Greeting: "Greetings", @@ -304,9 +282,7 @@ If you prefer to work locally, follow these steps to get started. slog.InfoContext(ctx, "Job submitted", slog.String("job_id", job.ID.String())) - runner, err := client.NewTaskRunner( - workflows.WithCluster(clusterSlug), - ) + runner, err := client.NewTaskRunner(ctx) if err != nil { slog.Error("failed to create task runner", slog.Any("error", err)) return @@ -321,7 +297,7 @@ If you prefer to work locally, follow these steps to get started. return } - runner.Run(context.Background()) + runner.Run(ctx) } ``` diff --git a/workflows/caches.mdx b/workflows/caches.mdx index 005b7da..e4b36d5 100644 --- a/workflows/caches.mdx +++ b/workflows/caches.mdx @@ -25,7 +25,6 @@ You can configure a cache while creating a task runner by passing a cache instan client = Client() runner = client.runner( - "dev-cluster", tasks=[...], cache=InMemoryCache(), ) @@ -63,7 +62,6 @@ bucket = storage_client.bucket("cache-bucket") client = Client() runner = client.runner( - "dev-cluster", tasks=[...], cache=GoogleStorageCache(bucket, prefix="jobs"), ) @@ -87,7 +85,6 @@ from tilebox.workflows.cache import AmazonS3Cache client = Client() runner = client.runner( - "dev-cluster", tasks=[...], cache=AmazonS3Cache("my-bucket-name", prefix="jobs") ) @@ -109,7 +106,6 @@ from tilebox.workflows.cache import LocalFileSystemCache client = Client() runner = client.runner( - "dev-cluster", tasks=[...], cache=LocalFileSystemCache("/path/to/cache/directory"), ) @@ -131,7 +127,6 @@ from tilebox.workflows.cache import InMemoryCache client = Client() runner = client.runner( - "dev-cluster", tasks=[...], cache=InMemoryCache(), ) @@ -179,11 +174,10 @@ To test the workflow, you can start a local task runner using the `InMemoryCache ```python Python # submit a job to test our workflow job_client = client.jobs() - job_client.submit("testing-cache-access", ProducerTask(), cluster="dev-cluster") + job_client.submit("testing-cache-access", ProducerTask()) # start a runner to execute it runner = client.runner( - "dev-cluster", tasks=[ProducerTask, ConsumerTask], cache=LocalFileSystemCache("/path/to/cache/directory"), ) @@ -260,11 +254,10 @@ Submitting a job of the `CacheGroupDemo` and running it with a task runner can b ```python Python # submit a job to test our workflow job_client = client.jobs() - job_client.submit("cache-groups", CacheGroupDemo(5), cluster="dev-cluster") + job_client.submit("cache-groups", CacheGroupDemo(5)) # start a runner to execute it runner = client.runner( - "dev-cluster", tasks=[CacheGroupDemo, ProduceRandomNumbers, ProduceRandomNumber, PrintSum], cache=LocalFileSystemCache("/path/to/cache/directory"), ) diff --git a/workflows/concepts/clusters.mdx b/workflows/concepts/clusters.mdx index 6180b05..a6f445b 100644 --- a/workflows/concepts/clusters.mdx +++ b/workflows/concepts/clusters.mdx @@ -25,6 +25,12 @@ If multiple task runners have the same set of registered tasks, you can assign t You can add task runners to a cluster by specifying the [cluster's slug](#cluster-slug) when [registering a task runner](/workflows/concepts/task-runners). Each task runner must always be assigned to a cluster. +## Default Cluster + +Each team has a default cluster that is automatically created for them. +This cluster is used when no cluster is specified when [registering a task runner](/workflows/concepts/task-runners) or [submitting a job](/workflows/concepts/jobs). +This is useful when you are just getting started and don't need to create any custom clusters yet. + ## Managing Clusters Before registering a task runner or submitting a job, you must create a cluster. You can also list, fetch, and delete clusters as needed. The following sections explain how to do this. @@ -231,10 +237,10 @@ func main() { _, _ = client.Jobs.Submit( ctx, "my-job", - "testing-CvufcSxcC9SKfe", []workflows.Task{ &MultiCluster{}, }, + job.WithClusterSlug("testing-CvufcSxcC9SKfe"), ) } ``` diff --git a/workflows/concepts/jobs.mdx b/workflows/concepts/jobs.mdx index f3a5b35..b05e01c 100644 --- a/workflows/concepts/jobs.mdx +++ b/workflows/concepts/jobs.mdx @@ -30,24 +30,17 @@ jobClient := client.Jobs ``` -After obtaining a job client, submit a job using the [submit](/api-reference/python/tilebox.workflows/JobClient.submit) method. You need to provide a name for the job, an instance of the root [task](/workflows/concepts/tasks), and a [cluster](/workflows/concepts/clusters) to execute the root task on. +After obtaining a job client, submit a job using the [submit](/api-reference/python/tilebox.workflows/JobClient.submit) method. You need to provide a name for the job, an instance of the root [task](/workflows/concepts/tasks), and an optional [cluster](/workflows/concepts/clusters) to execute the root task on. ```python Python # import your own workflow from my_workflow import MyTask -cluster = "dev-cluster" -job = job_client.submit('my-job', MyTask("some", "parameters"), cluster) +job = job_client.submit('my-job', MyTask("some", "parameters")) ``` ```go Go -cluster, err := client.Clusters.Get(ctx, "dev-cluster") -if err != nil { - slog.Error("failed to get cluster", slog.Any("error", err)) - return -} - -job, err := client.Jobs.Submit(ctx, "my-job", cluster, +job, err := client.Jobs.Submit(ctx, "my-job", []workflows.Task{ &MyTask{ Some: "parameters", @@ -71,11 +64,10 @@ Once a job is submitted, it's immediately scheduled for execution. The root task ```python Python from my_workflow import MyFlakyTask -cluster = "dev-cluster" -job = job_client.submit('my-job', MyFlakyTask(), cluster, max_retries=5) +job = job_client.submit('my-job', MyFlakyTask(), max_retries=5) ``` ```go Go -myJob, err := client.Jobs.Submit(ctx, "my-job", cluster, +myJob, err := client.Jobs.Submit(ctx, "my-job", []workflows.Task{ &MyFlakyTask{}, }, @@ -86,6 +78,29 @@ myJob, err := client.Jobs.Submit(ctx, "my-job", cluster, In this example, if `MyFlakyTask` fails, it will be retried up to five times before being marked as failed. +## Submitting to a specific cluster + +Jobs default to running on the [default cluster](/workflows/concepts/clusters#default-cluster). +You can specify another cluster to run the root task on using the `cluster` argument of the `submit` method. + + +```python Python +from my_workflow import MyFlakyTask + +job = job_client.submit('my-job', MyFlakyTask(), cluster="dev-cluster") +``` +```go Go +myJob, err := client.Jobs.Submit(ctx, "my-job", + []workflows.Task{ + &MyFlakyTask{}, + }, + job.WithClusterSlug("dev-cluster"), +) +``` + + +Only runners listening on the specified cluster can pick up the task. + ## Querying jobs You can query jobs in a given time range using the `query` method on the job client. @@ -130,14 +145,14 @@ You can use the `find` method on the job client to get a job by its ID. ```python Python -job = job_client.submit('my-job', MyTask("some", "parameters"), cluster) +job = job_client.submit('my-job', MyTask("some", "parameters")) print(job.id) # 018dd029-58ca-74e5-8b58-b4f99d610f9a # Later, in another process or machine, retrieve job info job = job_client.find("018dd029-58ca-74e5-8b58-b4f99d610f9a") ``` ```go Go -myJob, err := client.Jobs.Submit(ctx, "my-job", cluster, +myJob, err := client.Jobs.Submit(ctx, "my-job", []workflows.Task{ &helloworld.HelloTask{ Some: "parameters", @@ -264,7 +279,7 @@ class SubTask(Task): def execute(self, context: ExecutionContext): context.current_task.display = f"Leaf Nr. {self.index}" -job = job_client.submit('custom-display-names', RootTask(3), "dev-cluster") +job = job_client.submit('custom-display-names', RootTask(3)) job_client.display(job) ``` ```go Go @@ -300,7 +315,7 @@ func (t *SubTask) Execute(ctx context.Context) error { } // in main -job, err := client.Jobs.Submit(ctx, "custom-display-names", cluster, +job, err := client.Jobs.Submit(ctx, "custom-display-names", []workflows.Task{&RootTask{ NumSubtasks: 3, }}, @@ -321,12 +336,12 @@ Use the `cancel` method on the job client to cancel a job. ```python Python -job = job_client.submit('my-job', MyTask(), "dev-cluster") +job = job_client.submit('my-job', MyTask()) # After a short while, the job gets canceled job_client.cancel(job) ``` ```go Go -job, err := client.Jobs.Submit(ctx, "my-job", cluster, +job, err := client.Jobs.Submit(ctx, "my-job", []workflows.Task{&MyTask{}}, ) if err != nil { @@ -456,12 +471,12 @@ job = job_client.submit('movies-stats', MoviesStats([ "Shrek 2", "Tilebox - The Movie", "The Avengers", -]), "dev-cluster") +])) job_client.display(job) ``` ```go Go -job, err := client.Jobs.Submit(ctx, "movies-stats", cluster, +job, err := client.Jobs.Submit(ctx, "movies-stats", []workflows.Task{&MoviesStats{ Titles: []string{ "The Matrix", diff --git a/workflows/concepts/task-runners.mdx b/workflows/concepts/task-runners.mdx index af42fe1..35b4853 100644 --- a/workflows/concepts/task-runners.mdx +++ b/workflows/concepts/task-runners.mdx @@ -19,8 +19,9 @@ To create and start a task runner, follow these steps: Instantiate a client connected to the Tilebox Workflows API. - + Select or create a [cluster](/workflows/concepts/clusters) and specify its slug when creating a task runner. + If no cluster is specified, the task runner will use the default cluster. Register tasks by specifying the task classes that the task runner can execute as a list to the `runner` method. @@ -40,9 +41,8 @@ from my_workflow import MyTask, OtherTask def main(): client = Client() # 1. connect to the Tilebox Workflows API - cluster = "dev-cluster" # 2. select a cluster to join runner = client.runner( - cluster, + cluster= "dev-cluster" # 2. select a cluster to join (optional, omit to use the default cluster) tasks=[MyTask, OtherTask] # 3. register tasks ) runner.run_forever() # 4. listen for new tasks to execute @@ -58,16 +58,19 @@ import ( "log/slog" "github.com/tilebox/tilebox-go/workflows/v1" + "github.com/tilebox/tilebox-go/workflows/v1/runner" // your own workflow: "github.com/my_org/myworkflow" ) func main() { + ctx := context.Background() + // 1. connect to the Tilebox Workflows API client := workflows.NewClient() - // 2. select a cluster to join - runner, err := client.NewTaskRunner("dev-cluster") + // 2. select a cluster to join (optional, omit to use the default cluster) + runner, err := client.NewTaskRunner(ctx, runner.WithClusterSlug("dev-cluster")) if err != nil { slog.Error("failed to create task runner", slog.Any("error", err)) return @@ -84,7 +87,7 @@ func main() { } // 4. listen for new tasks to execute - runner.Run(context.Background()) + runner.Run(ctx) } ``` @@ -163,7 +166,7 @@ Here's an example of a distributed workflow: ```python Python from tilebox.workflows import Task, ExecutionContext - class Distributed(Task): + class DistributedWorkflow(Task): def execute(self, context: ExecutionContext) -> None: download_task = context.submit_subtask(DownloadData()) process_task = context.submit_subtask( @@ -198,9 +201,9 @@ import ( "github.com/tilebox/tilebox-go/workflows/v1/subtask" ) -type Distributed struct{} +type DistributedWorkflow struct{} -func (t *Distributed) Execute(ctx context.Context) error { +func (t *DistributedWorkflow) Execute(ctx context.Context) error { downloadTask, err := workflows.SubmitSubtask(ctx, &DownloadData{}) if err != nil { return fmt.Errorf("failed to submit download subtask: %w", err) @@ -232,7 +235,10 @@ func (t *ProcessData) Execute(ctx context.Context) error { ``` -To achieve distributed execution for this workflow, no single task runner capable of executing all three of the tasks is set up. Instead, two task runners, each capable of executing one of the tasks are set up: one in a high-speed network environment and the other with GPU access. When the distributed workflow runs, the first task runner picks up the `DownloadData` task, while the second picks up the `ProcessData` task. The `Distributed` task does not require specific hardware, so it can be registered with both runners and executed by either one. +To achieve distributed execution for this workflow, no single task runner capable of executing all three of the tasks is set up. +Instead, two task runners, each capable of executing one of the tasks are set up: one in a high-speed network environment and the other with GPU access. +When the distributed workflow runs, the first task runner picks up the `DownloadData` task, while the second picks up the `ProcessData` task. +The `DistributedWorkflow` does not require specific hardware, so it can be registered with both runners and executed by either one. @@ -242,8 +248,7 @@ from tilebox.workflows import Client client = Client() high_network_speed_runner = client.runner( - "dev-cluster", - tasks=[DownloadData, Distributed] + tasks=[DownloadData, DistributedWorkflow] ) high_network_speed_runner.run_forever() ``` @@ -258,9 +263,10 @@ import ( ) func main() { + ctx := context.Background() client := workflows.NewClient() - highNetworkSpeedRunner, err := client.NewTaskRunner("dev-cluster") + highNetworkSpeedRunner, err := client.NewTaskRunner(ctx) if err != nil { slog.Error("failed to create task runner", slog.Any("error", err)) return @@ -268,14 +274,14 @@ func main() { err = highNetworkSpeedRunner.RegisterTasks( &DownloadData{}, - &Distributed{}, + &DistributedWorkflow{}, ) if err != nil { slog.Error("failed to register tasks", slog.Any("error", err)) return } - highNetworkSpeedRunner.RunForever(context.Background()) + highNetworkSpeedRunner.RunForever(ctx) } ``` @@ -288,8 +294,7 @@ from tilebox.workflows import Client client = Client() gpu_runner = client.runner( - "dev-cluster", - tasks=[ProcessData, Distributed] + tasks=[ProcessData, DistributedWorkflow] ) gpu_runner.run_forever() ``` @@ -304,9 +309,10 @@ import ( ) func main() { + ctx := context.Background() client := workflows.NewClient() - gpuRunner, err := client.NewTaskRunner("dev-cluster") + gpuRunner, err := client.NewTaskRunner(ctx) if err != nil { slog.Error("failed to create task runner", slog.Any("error", err)) return @@ -314,21 +320,21 @@ func main() { err = gpuRunner.RegisterTasks( &ProcessData{}, - &Distributed{}, + &DistributedWorkflow{}, ) if err != nil { slog.Error("failed to register tasks", slog.Any("error", err)) return } - gpuRunner.RunForever(context.Background()) + gpuRunner.RunForever(ctx) } ``` -Now, both `download_task_runner.py` and `gpu_task_runner.py` are started, in parallel, on different machines with the required hardware for each. When `Distributed` is submitted, it executes on one of the two runners, and it's submitted sub-tasks are handled by the appropriate runner. +Now, both `download_task_runner.py` and `gpu_task_runner.py` are started, in parallel, on different machines with the required hardware for each. When `DistributedWorkflow` is submitted, it executes on one of the two runners, and it's submitted sub-tasks are handled by the appropriate runner. In this case, since `ProcessData` depends on `DownloadData`, the GPU task runner remains idle until the download completion, then picks up the processing task. diff --git a/workflows/concepts/tasks.mdx b/workflows/concepts/tasks.mdx index b2332bc..23508ae 100644 --- a/workflows/concepts/tasks.mdx +++ b/workflows/concepts/tasks.mdx @@ -307,7 +307,6 @@ jobs = client.jobs() job = jobs.submit( "download-dog-images", DownloadRandomDogImages(5), - "dev-cluster", ) # now our deployed task runners will pick up the task and execute it @@ -318,7 +317,7 @@ jobs.display(job) ctx := context.Background() client := workflows.NewClient() -job, err := client.Jobs.Submit(ctx, "download-dog-images", "dev-cluster", +job, err := client.Jobs.Submit(ctx, "download-dog-images", []workflows.Task{ &helloworld.DownloadRandomDogImages{ NumImages: 5, @@ -672,7 +671,6 @@ A practical example is a workflow that fetches news articles from an API and pro # now submit a job, and then visualize it job = job_client.submit("process-news", NewsWorkflow(category="science", max_articles=5), - "dev-cluster" ) ``` ```go Go @@ -814,7 +812,7 @@ func (t *MostFrequentAuthors) Execute(context.Context) error { // in main now submit a job, and then visualize it /* -job, err := client.Jobs.Submit(ctx, "process-news", "dev-cluster", +job, err := client.Jobs.Submit(ctx, "process-news", []workflows.Task{ &NewsWorkflow{ Category: "science", diff --git a/workflows/observability/logging.mdx b/workflows/observability/logging.mdx index 981ac34..3dbf2be 100644 --- a/workflows/observability/logging.mdx +++ b/workflows/observability/logging.mdx @@ -40,7 +40,7 @@ The Tilebox workflow SDKs include support for exporting OpenTelemetry logs. To e # the task runner will export logs from # the executed tasks to the specified dataset client = Client() - runner = client.runner("dev-cluster", tasks=[MyTask]) + runner = client.runner(tasks=[MyTask]) runner.run_forever() if __name__ == "__main__": @@ -83,13 +83,7 @@ func main() { client := workflows.NewClient() - cluster, err := client.Clusters.Get(ctx, "dev-cluster") - if err != nil { - slog.Error("failed to get cluster", slog.Any("error", err)) - return - } - - taskRunner, err := client.NewTaskRunner(cluster) + taskRunner, err := client.NewTaskRunner(ctx) if err != nil { slog.Error("failed to create task runner", slog.Any("error", err)) return @@ -134,7 +128,7 @@ func main() { # the task runner will export logs from # the executed tasks to the specified endpoint client = Client() - runner = client.runner("dev-cluster", tasks=[MyTask]) + runner = client.runner(tasks=[MyTask]) runner.run_forever() if __name__ == "__main__": @@ -183,13 +177,7 @@ func main() { client := workflows.NewClient() - cluster, err := client.Clusters.Get(ctx, "dev-cluster") - if err != nil { - slog.Error("failed to get cluster", slog.Any("error", err)) - return - } - - taskRunner, err := client.NewTaskRunner(cluster) + taskRunner, err := client.NewTaskRunner(ctx) if err != nil { slog.Error("failed to create task runner", slog.Any("error", err)) return @@ -227,7 +215,7 @@ func main() { # the task runner will print log messages from # the executed tasks to the console client = Client() - runner = client.runner("dev-cluster", tasks=[MyTask]) + runner = client.runner(tasks=[MyTask]) runner.run_forever() if __name__ == "__main__": @@ -253,13 +241,7 @@ func main() { client := workflows.NewClient() - cluster, err := client.Clusters.Get(ctx, "dev-cluster") - if err != nil { - slog.Error("failed to get cluster", slog.Any("error", err)) - return - } - - taskRunner, err := client.NewTaskRunner(cluster) + taskRunner, err := client.NewTaskRunner(ctx) if err != nil { slog.Error("failed to create task runner", slog.Any("error", err)) return @@ -341,6 +323,6 @@ client.configure_logger(get_logger(level=logging.INFO)) # now the task runner inherits this logger and uses # it to emit its own internal log messages as well -runner = client.runner("dev-cluster", tasks=[MyTask]) +runner = client.runner(tasks=[MyTask]) runner.run_forever() ``` diff --git a/workflows/observability/tracing.mdx b/workflows/observability/tracing.mdx index e0199a4..bebb2d2 100644 --- a/workflows/observability/tracing.mdx +++ b/workflows/observability/tracing.mdx @@ -50,7 +50,7 @@ The Tilebox workflow SDKs have built-in support for exporting OpenTelemetry trac # the following task runner generates traces for executed tasks and # exports trace and span data to the specified Axiom dataset client = Client() - runner = client.runner("dev-cluster", tasks=[MyTask]) + runner = client.runner(tasks=[MyTask]) runner.run_forever() if __name__ == "__main__": @@ -88,13 +88,7 @@ func main() { client := workflows.NewClient() - cluster, err := client.Clusters.Get(ctx, "dev-cluster") - if err != nil { - slog.Error("failed to get cluster", slog.Any("error", err)) - return - } - - taskRunner, err := client.NewTaskRunner(cluster) + taskRunner, err := client.NewTaskRunner(ctx) if err != nil { slog.Error("failed to create task runner", slog.Any("error", err)) return @@ -140,7 +134,7 @@ func main() { # the following task runner generates traces for executed tasks and # exports trace and span data to the specified endpoint client = Client() - runner = client.runner("dev-cluster", tasks=[MyTask]) + runner = client.runner(tasks=[MyTask]) runner.run_forever() if __name__ == "__main__": @@ -185,13 +179,7 @@ func main() { client := workflows.NewClient() - cluster, err := client.Clusters.Get(ctx, "dev-cluster") - if err != nil { - slog.Error("failed to get cluster", slog.Any("error", err)) - return - } - - taskRunner, err := client.NewTaskRunner(cluster) + taskRunner, err := client.NewTaskRunner(ctx) if err != nil { slog.Error("failed to create task runner", slog.Any("error", err)) return