From 1a4c3a0fa8ebd46aa5692818f0a5ea5b9d9474fe Mon Sep 17 00:00:00 2001 From: Benjamin Zaitlen Date: Tue, 13 Jan 2026 01:20:10 +0000 Subject: [PATCH 1/7] allow of scheduler file and existing dask cluster when using pdsh --- .../experimental/benchmarks/utils.py | 90 ++++++++++++++++--- 1 file changed, 76 insertions(+), 14 deletions(-) diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py index 5bf94a9e41e..bc0a42074ef 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py @@ -514,26 +514,67 @@ def print_query_plan( def initialize_dask_cluster(run_config: RunConfig, args: argparse.Namespace): # type: ignore[no-untyped-def] - """Initialize a Dask distributed cluster.""" + """Initialize a Dask distributed cluster. + + This function either creates a new LocalCUDACluster or connects to an + existing Dask cluster depending on the provided arguments. + + Parameters + ---------- + run_config : RunConfig + The run configuration. + args : argparse.Namespace + Parsed command line arguments. If ``args.scheduler_address`` or + ``args.scheduler_file`` is provided, we connect to an existing + cluster instead of creating a LocalCUDACluster. + + Returns + ------- + Client or None + A Dask distributed Client, or None if not using distributed mode. + """ if run_config.cluster != "distributed": return None - from dask_cuda import LocalCUDACluster from distributed import Client - kwargs = { - "n_workers": run_config.n_workers, - "dashboard_address": ":8585", - "protocol": args.protocol, - "rmm_pool_size": args.rmm_pool_size, - "rmm_async": args.rmm_async, - "rmm_release_threshold": args.rmm_release_threshold, - "threads_per_worker": run_config.threads, - } + # Check if we should connect to an existing cluster + scheduler_address = getattr(args, "scheduler_address", None) + scheduler_file = getattr(args, "scheduler_file", None) + + if scheduler_address is not None: + # Connect to existing cluster via scheduler address + breakpoint() + client = Client(address=scheduler_address) + n_workers = len(client.scheduler_info().get("workers", {})) + print( + f"Connected to existing Dask cluster at {scheduler_address} " + f"with {n_workers} workers" + ) + elif scheduler_file is not None: + # Connect to existing cluster via scheduler file + client = Client(scheduler_file=scheduler_file) + n_workers = len(client.scheduler_info().get("workers", {})) + print( + f"Connected to existing Dask cluster via scheduler file: {scheduler_file} " + f"with {n_workers} workers" + ) + else: + # Create a new LocalCUDACluster + from dask_cuda import LocalCUDACluster + + kwargs = { + "n_workers": run_config.n_workers, + "dashboard_address": ":8585", + "protocol": args.protocol, + "rmm_pool_size": args.rmm_pool_size, + "rmm_async": args.rmm_async, + "rmm_release_threshold": args.rmm_release_threshold, + "threads_per_worker": run_config.threads, + } - # Avoid UVM in distributed cluster - client = Client(LocalCUDACluster(**kwargs)) - client.wait_for_workers(run_config.n_workers) + client = Client(LocalCUDACluster(**kwargs)) + client.wait_for_workers(run_config.n_workers) if run_config.shuffle != "tasks": try: @@ -730,6 +771,27 @@ def parse_args( type=int, help="Number of Dask-CUDA workers (requires 'distributed' cluster).", ) + external_cluster_group = parser.add_mutually_exclusive_group() + external_cluster_group.add_argument( + "--scheduler-address", + default=None, + type=str, + help=textwrap.dedent("""\ + Scheduler address for connecting to an existing Dask cluster. + If provided, LocalCUDACluster is not created and worker + configuration options (--n-workers, --rmm-pool-size, etc.) + are ignored since the workers are assumed to be started separately."""), + ) + external_cluster_group.add_argument( + "--scheduler-file", + default=None, + type=str, + help=textwrap.dedent("""\ + Path to a scheduler file for connecting to an existing Dask cluster. + If provided, LocalCUDACluster is not created and worker + configuration options (--n-workers, --rmm-pool-size, etc.) + are ignored since the workers are assumed to be started separately."""), + ) parser.add_argument( "--blocksize", default=None, From 154401dafc5246e6dfa45a19c8621ede268e77f1 Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Tue, 13 Jan 2026 06:43:53 -0800 Subject: [PATCH 2/7] Apply suggestions from code review Co-authored-by: Lawrence Mitchell --- .../cudf_polars/experimental/benchmarks/utils.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py index bc0a42074ef..556df663173 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py @@ -544,7 +544,10 @@ def initialize_dask_cluster(run_config: RunConfig, args: argparse.Namespace): # if scheduler_address is not None: # Connect to existing cluster via scheduler address - breakpoint() + if scheduler_file is not None: + raise ValueError( + "Cannot specify both --scheduler-address and --scheduler-file." + ) client = Client(address=scheduler_address) n_workers = len(client.scheduler_info().get("workers", {})) print( @@ -778,7 +781,7 @@ def parse_args( type=str, help=textwrap.dedent("""\ Scheduler address for connecting to an existing Dask cluster. - If provided, LocalCUDACluster is not created and worker + If provided, a cluster is not created and worker configuration options (--n-workers, --rmm-pool-size, etc.) are ignored since the workers are assumed to be started separately."""), ) @@ -788,7 +791,7 @@ def parse_args( type=str, help=textwrap.dedent("""\ Path to a scheduler file for connecting to an existing Dask cluster. - If provided, LocalCUDACluster is not created and worker + If provided, a cluster is not created and worker configuration options (--n-workers, --rmm-pool-size, etc.) are ignored since the workers are assumed to be started separately."""), ) From 350455b20fb7f379bdded8b97d5cb014620b86f4 Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Tue, 13 Jan 2026 06:53:27 -0800 Subject: [PATCH 3/7] Update cw date in utils.py --- python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py index 556df663173..1ab86a42f3d 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: Apache-2.0 """Utility functions/classes for running the PDS-H and PDS-DS benchmarks.""" From 3b1bae7bc3550273f6f3b13731235b3d8ff33106 Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Tue, 13 Jan 2026 07:00:35 -0800 Subject: [PATCH 4/7] Update python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py --- .../cudf_polars/cudf_polars/experimental/benchmarks/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py index 1ab86a42f3d..f6a00ac2226 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py @@ -514,7 +514,8 @@ def print_query_plan( def initialize_dask_cluster(run_config: RunConfig, args: argparse.Namespace): # type: ignore[no-untyped-def] - """Initialize a Dask distributed cluster. + """ + Initialize a Dask distributed cluster. This function either creates a new LocalCUDACluster or connects to an existing Dask cluster depending on the provided arguments. From 0744cb4be727d8c8ddfe4c8ed0ba02a31d3840dc Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Tue, 13 Jan 2026 07:51:05 -0800 Subject: [PATCH 5/7] Update python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py Co-authored-by: Lawrence Mitchell --- .../cudf_polars/cudf_polars/experimental/benchmarks/utils.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py index f6a00ac2226..0689e52ce9c 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py @@ -545,10 +545,6 @@ def initialize_dask_cluster(run_config: RunConfig, args: argparse.Namespace): # if scheduler_address is not None: # Connect to existing cluster via scheduler address - if scheduler_file is not None: - raise ValueError( - "Cannot specify both --scheduler-address and --scheduler-file." - ) client = Client(address=scheduler_address) n_workers = len(client.scheduler_info().get("workers", {})) print( From fc2cab9e0b9ba5f959ed1375276dfb42c9454c35 Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Tue, 13 Jan 2026 07:51:23 -0800 Subject: [PATCH 6/7] Update python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py Co-authored-by: Lawrence Mitchell --- .../cudf_polars/cudf_polars/experimental/benchmarks/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py index 0689e52ce9c..b1d4c94b970 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py @@ -540,8 +540,8 @@ def initialize_dask_cluster(run_config: RunConfig, args: argparse.Namespace): # from distributed import Client # Check if we should connect to an existing cluster - scheduler_address = getattr(args, "scheduler_address", None) - scheduler_file = getattr(args, "scheduler_file", None) + scheduler_address = args.scheduler_address + scheduler_file = args.scheduler_file if scheduler_address is not None: # Connect to existing cluster via scheduler address From c8933c3562eabc42b43faceea379c9e3a14adbd8 Mon Sep 17 00:00:00 2001 From: Benjamin Zaitlen Date: Fri, 16 Jan 2026 15:09:46 +0000 Subject: [PATCH 7/7] update n_workers when using an existing cluster with pdsh --- .../cudf_polars/cudf_polars/experimental/benchmarks/utils.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py index b1d4c94b970..4af2a73340a 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py @@ -989,6 +989,11 @@ def run_polars( client = initialize_dask_cluster(run_config, args) + # Update n_workers from the actual cluster when using scheduler file/address + if client is not None: + actual_n_workers = len(client.scheduler_info().get("workers", {})) + run_config = dataclasses.replace(run_config, n_workers=actual_n_workers) + records: defaultdict[int, list[Record]] = defaultdict(list) engine: pl.GPUEngine | None = None