From a23bd4b974242739607b79d47cad177241754a46 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Tue, 31 Mar 2026 16:30:53 -0500 Subject: [PATCH 1/4] Fix a few typos and ``.rst`` link syntax (#1973) I ran across a few typos and link formatting issues (`.rst` vs. `.md` syntax) while reading through the docs recently. This PR contains fixes for those and a few other typos that claude found. Authors: - James Bourbeau (https://github.com/jrbourbeau) Approvers: - Corey J. Nolet (https://github.com/cjnolet) URL: https://github.com/rapidsai/cuvs/pull/1973 --- docs/source/api_basics.rst | 2 +- docs/source/api_interoperability.rst | 2 +- docs/source/cuvs_bench/index.rst | 4 ++-- docs/source/cuvs_bench/param_tuning.rst | 2 +- docs/source/developer_guide.md | 4 ++-- docs/source/filtering.rst | 2 +- docs/source/getting_started.rst | 6 +++--- docs/source/neighbors/cagra.rst | 4 ++-- docs/source/tuning_guide.rst | 6 +++--- 9 files changed, 16 insertions(+), 16 deletions(-) diff --git a/docs/source/api_basics.rst b/docs/source/api_basics.rst index b085d22de3..f2c6a7d08a 100644 --- a/docs/source/api_basics.rst +++ b/docs/source/api_basics.rst @@ -7,7 +7,7 @@ cuVS API Basics Memory management ----------------- -Centralized memory management allows flexible configuration of allocation strategies, such as sharing the same CUDA memory pool across library boundaries. cuVS uses the [RMM](https://github.com/rapidsai/rmm) library, which eases the burden of configuring different allocation strategies globally across GPU-accelerated libraries. +Centralized memory management allows flexible configuration of allocation strategies, such as sharing the same CUDA memory pool across library boundaries. cuVS uses the `RMM `_ library, which eases the burden of configuring different allocation strategies globally across GPU-accelerated libraries. RMM currently has APIs for C++ and Python. diff --git a/docs/source/api_interoperability.rst b/docs/source/api_interoperability.rst index 92e19de1fa..097025aee7 100644 --- a/docs/source/api_interoperability.rst +++ b/docs/source/api_interoperability.rst @@ -4,7 +4,7 @@ Interoperability DLPack (C) ^^^^^^^^^^ -Approximate nearest neighbor (ANN) indexes provide an interface to build and search an index via a C API. [DLPack v0.8](https://github.com/dmlc/dlpack/blob/main/README.md), a tensor interface framework, is used as the standard to interact with our C API. +Approximate nearest neighbor (ANN) indexes provide an interface to build and search an index via a C API. `DLPack v0.8 `_, a tensor interface framework, is used as the standard to interact with our C API. Representing a tensor with DLPack is simple, as it is a POD struct that stores information about the tensor at runtime. At the moment, `DLManagedTensor` from DLPack v0.8 is compatible with out C API however we will soon upgrade to `DLManagedTensorVersioned` from DLPack v1.0 as it will help us maintain ABI and API compatibility. diff --git a/docs/source/cuvs_bench/index.rst b/docs/source/cuvs_bench/index.rst index 7caec33892..7176b9401a 100644 --- a/docs/source/cuvs_bench/index.rst +++ b/docs/source/cuvs_bench/index.rst @@ -54,7 +54,7 @@ Installing the benchmarks There are two main ways pre-compiled benchmarks are distributed: - `Conda`_ For users not using containers but want an easy to install and use Python package. Pip wheels are planned to be added as an alternative for users that cannot use conda and prefer to not use containers. -- `Docker`_ Only needs docker and [NVIDIA docker](https://github.com/NVIDIA/nvidia-docker) to use. Provides a single docker run command for basic dataset benchmarking, as well as all the functionality of the conda solution inside the containers. +- `Docker`_ Only needs docker and `NVIDIA docker `_ to use. Provides a single docker run command for basic dataset benchmarking, as well as all the functionality of the conda solution inside the containers. Conda ----- @@ -297,7 +297,7 @@ All of the `cuvs-bench` images contain the Conda packages, so they can be used d -v $DATA_FOLDER:/data/benchmarks \ rapidsai/cuvs-bench:26.04-cuda12.9-py3.13 -This will drop you into a command line in the container, with the `cuvs-bench` python package ready to use, as described in the [Running the benchmarks](#running-the-benchmarks) section above: +This will drop you into a command line in the container, with the `cuvs-bench` python package ready to use, as described in the `Running the benchmarks`_ section above: .. code-block:: bash diff --git a/docs/source/cuvs_bench/param_tuning.rst b/docs/source/cuvs_bench/param_tuning.rst index f559e3fcc7..04d12282d8 100644 --- a/docs/source/cuvs_bench/param_tuning.rst +++ b/docs/source/cuvs_bench/param_tuning.rst @@ -533,7 +533,7 @@ IVF-pq is an inverted-file index, which partitions the vectors into a series of - Y - Positive integer. Power of 2 [8-64] - - - Ratio of numbeer of chunks or subquantizers for each vector. Computed by `dims` / `M_ratio` + - Ratio of number of chunks or subquantizers for each vector. Computed by `dims` / `M_ratio` * - `usePrecomputed` - `build` diff --git a/docs/source/developer_guide.md b/docs/source/developer_guide.md index ef1988cbd0..5c33b85c07 100644 --- a/docs/source/developer_guide.md +++ b/docs/source/developer_guide.md @@ -12,7 +12,7 @@ Please start by reading the [Contributor Guide](contributing.md). Developing features and fixing bugs for the RAFT library itself is straightforward and only requires building and installing the relevant RAFT artifacts. -The process for working on a CUDA/C++ feature which might span RAFT and one or more consuming libraries can vary slightly depending on whether the consuming project relies on a source build (as outlined in the [BUILD](BUILD.md#install_header_only_cpp) docs). In such a case, the option `CPM_raft_SOURCE=/path/to/raft/source` can be passed to the cmake of the consuming project in order to build the local RAFT from source. The PR with relevant changes to the consuming project can also pin the RAFT version temporarily by explicitly changing the `FORK` and `PINNED_TAG` arguments to the RAFT branch containing their changes when invoking `find_and_configure_raft`. The pin should be reverted after the changed is merged to the RAFT project and before it is merged to the dependent project(s) downstream. +The process for working on a CUDA/C++ feature which might span RAFT and one or more consuming libraries can vary slightly depending on whether the consuming project relies on a source build (as outlined in the [BUILD](BUILD.md#install_header_only_cpp) docs). In such a case, the option `CPM_raft_SOURCE=/path/to/raft/source` can be passed to the cmake of the consuming project in order to build the local RAFT from source. The PR with relevant changes to the consuming project can also pin the RAFT version temporarily by explicitly changing the `FORK` and `PINNED_TAG` arguments to the RAFT branch containing their changes when invoking `find_and_configure_raft`. The pin should be reverted after the change is merged to the RAFT project and before it is merged to the dependent project(s) downstream. If building a feature which spans projects and not using the source build in cmake, the RAFT changes (both C++ and Python) will need to be installed into the environment of the consuming project before they can be used. The ideal integration of RAFT into consuming projects will enable both the source build in the consuming project only for this case but also rely on a more stable packaging (such as conda packaging) otherwise. @@ -390,7 +390,7 @@ int main(int argc, char * argv[]) ``` A RAFT developer can assume the following: -* A instance of `raft::comms::comms_t` was correctly initialized. +* An instance of `raft::comms::comms_t` was correctly initialized. * All processes that are part of `raft::comms::comms_t` call into the RAFT algorithm cooperatively. The initialized instance of `raft::comms::comms_t` can be accessed from the `raft::resources` instance: diff --git a/docs/source/filtering.rst b/docs/source/filtering.rst index 7a62270264..cb168f94c8 100644 --- a/docs/source/filtering.rst +++ b/docs/source/filtering.rst @@ -5,7 +5,7 @@ Filtering vector indexes ~~~~~~~~~~~~~~~~~~~~~~~~ cuVS supports different type of filtering depending on the vector index being used. The main method used in all of the vector indexes -is pre-filtering, which is a technique that will into account the filtering of the vectors before computing it's closest neighbors, saving +is pre-filtering, which is a technique that will take into account the filtering of the vectors before computing its closest neighbors, saving some computation from calculating distances. Bitset diff --git a/docs/source/getting_started.rst b/docs/source/getting_started.rst index 89be0f1e5b..8475f89cb4 100644 --- a/docs/source/getting_started.rst +++ b/docs/source/getting_started.rst @@ -43,9 +43,9 @@ Getting Started New to vector search? ===================== -If you are unfamiliar with the basics of vector search or how vector search differs from vector databases, then :doc:`this primer on vector search guide ` should provide some good insight. Another good resource for the uninitiated is our :doc:`vector databases vs vector search ` guide. As outlined in the primer, vector search as used in vector databases is often closer to machine learning than to traditional databases. This means that while traditional databases can often be slow without any performance tuning, they will usually still yield the correct results. Unfortunately, vector search indexes, like other machine learning models, can yield garbage results of not tuned correctly. +If you are unfamiliar with the basics of vector search or how vector search differs from vector databases, then :doc:`this primer on vector search guide ` should provide some good insight. Another good resource for the uninitiated is our :doc:`vector databases vs vector search ` guide. As outlined in the primer, vector search as used in vector databases is often closer to machine learning than to traditional databases. This means that while traditional databases can often be slow without any performance tuning, they will usually still yield the correct results. Unfortunately, vector search indexes, like other machine learning models, can yield garbage results if not tuned correctly. -Fortunately, this opens up the whole world of hyperparamer optimization to improve vector search performance and quality. Please see our :doc:`index tuning guide ` for more information. +Fortunately, this opens up the whole world of hyperparameter optimization to improve vector search performance and quality. Please see our :doc:`index tuning guide ` for more information. When comparing the performance of vector search indexes, it is important that considerations are made with respect to three main dimensions: @@ -58,7 +58,7 @@ Please see the :doc:`primer on comparing vector search index performance ` for to learn more about each individual index type, when they can be useful on the GPU, the tuning knobs they offer to trade off performance and quality. +cuVS supports many of the standard index types with the list continuing to grow and stay current with the state-of-the-art. Please refer to our :doc:`vector search index guide ` to learn more about each individual index type, when they can be useful on the GPU, the tuning knobs they offer to trade off performance and quality. The primary goal of cuVS is to enable speed, scale, and flexibility (in that order)- and one of the important value propositions is to enhance existing software deployments with extensible GPU capabilities to improve pain points while not interrupting parts of the system that work well today with CPU. diff --git a/docs/source/neighbors/cagra.rst b/docs/source/neighbors/cagra.rst index e62274787a..cc7bc675f1 100644 --- a/docs/source/neighbors/cagra.rst +++ b/docs/source/neighbors/cagra.rst @@ -26,7 +26,7 @@ Filtering considerations CAGRA supports filtered search and has improved multi-CTA algorithm in branch-25.02 to provide reasonable recall and performance for filtering rate as high as 90% or more. -To obtain an appropriate recall in filtered search, it is necessary to set search parameters according to the filtering rate, but since it is difficult for users to to this, CAGRA automatically adjusts `itopk_size` internally according to the filtering rate on a heuristic basis. If you want to disable this automatic adjustment, set `filtering_rate`, one of the search parameters, to `0.0`, and `itopk_size` will not be adjusted automatically. +To obtain an appropriate recall in filtered search, it is necessary to set search parameters according to the filtering rate, but since it is difficult for users to do this, CAGRA automatically adjusts `itopk_size` internally according to the filtering rate on a heuristic basis. If you want to disable this automatic adjustment, set `filtering_rate`, one of the search parameters, to `0.0`, and `itopk_size` will not be adjusted automatically. Configuration parameters ------------------------ @@ -127,7 +127,7 @@ Optimize: formula for peak memory usage (device): :math:`n\_vectors * (4 + (size Build with out-of-core IVF-PQ peak memory usage: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Out-of-core CAGA build consists of IVF-PQ build, IVF-PQ search, CAGRA optimization. Note that these steps are performed sequentially, so they are not additive. +Out-of-core CAGRA build consists of IVF-PQ build, IVF-PQ search, CAGRA optimization. Note that these steps are performed sequentially, so they are not additive. IVF-PQ Build: diff --git a/docs/source/tuning_guide.rst b/docs/source/tuning_guide.rst index 0f49802c86..fd54fc42ae 100644 --- a/docs/source/tuning_guide.rst +++ b/docs/source/tuning_guide.rst @@ -23,14 +23,14 @@ But how would this work when we have an index that's massively large- like 1TB? One benefit to locally indexed vector databases is that they often scale by breaking the larger set of vectors down into a smaller set by uniformly random subsampling and training smaller vector search index models on the sub-samples. Most often, the same set of tuning parameters are applied to all of the smaller sub-index models, rather than trying to set them individually for each one. During search, the query vectors are often sent to all of the sub-indexes and the resulting neighbors list reduced down to `k` based on the closest distances (or similarities). -Because many databases use this sub-sampling trick, it's possible to perform an automated parameter tuning on the larger index just by randomly samplnig some number of vectors from it, splitting them into disjoint train/test/eval datasets, computing ground truth with brute-force, and then performing a hyper-parameter optimization on it. This procedure can also be repeated multiple times to simulate a monte-carlo cross validation. +Because many databases use this sub-sampling trick, it's possible to perform an automated parameter tuning on the larger index just by randomly sampling some number of vectors from it, splitting them into disjoint train/test/eval datasets, computing ground truth with brute-force, and then performing a hyper-parameter optimization on it. This procedure can also be repeated multiple times to simulate a monte-carlo cross validation. GPUs are naturally great at performing massively parallel tasks, especially when they are largely independent tasks, such as training and evaluating models with different hyper-parameter settings in parallel. Hyper-parameter optimization also lends itself well to distributed processing, such as multi-node multi-GPU operation. Steps to achieve automated tuning ================================= -More formally, an automated parameter tuning workflow with monte-carlo cross-validaton looks likes something like this: +More formally, an automated parameter tuning workflow with monte-carlo cross-validation looks something like this: #. Ingest a large dataset into the vector database of your choice @@ -42,7 +42,7 @@ More formally, an automated parameter tuning workflow with monte-carlo cross-val #. Use the test set to compute ground truth on the vectors from prior step against all vectors in the training set. -#. Start the HPO tuning process for the training set, using the test vectors for the query set. It's important to make sure your HPO is multi-objective and optimizes for: a) low build time, b) high throughput or low latency sarch (depending on needs), and c) acceptable recall. +#. Start the HPO tuning process for the training set, using the test vectors for the query set. It's important to make sure your HPO is multi-objective and optimizes for: a) low build time, b) high throughput or low latency search (depending on needs), and c) acceptable recall. #. Use the evaluation dataset to test that the optimal hyper-parameters generalize to unseen points that were not used in the optimization process. From c02afc9ca784a601582c08b1af3a81df8c06390f Mon Sep 17 00:00:00 2001 From: Tarang Jain <40517122+tarang-jain@users.noreply.github.com> Date: Tue, 31 Mar 2026 16:46:01 -0700 Subject: [PATCH 2/4] [FEA] Add Batching to KMeans (#1886) Merge after #1880 This PR adds support for streaming out of core (dataset on host) kmeans clustering. The idea is simple: Batched accumulation of centroid updates: Data is processed in batches and batch-wise means and cluster counts are accumulated until all the batches i.e., the full dataset pass has completed. This PR just brings a batch-size parameter to load and compute cluster assignments and (weighted) centroid adjustments on batches of the dataset. The final centroid 'updates' i.e. a single kmeans iteration only completes when all these accumulated sums are averaged once the whole dataset pass has completed. Authors: - Tarang Jain (https://github.com/tarang-jain) Approvers: - Victor Lafargue (https://github.com/viclafargue) - Anupam (https://github.com/aamijar) - Micka (https://github.com/lowener) - Jinsol Park (https://github.com/jinsolp) - Ben Frederickson (https://github.com/benfred) URL: https://github.com/rapidsai/cuvs/pull/1886 --- c/include/cuvs/cluster/kmeans.h | 19 +- c/src/cluster/kmeans.cpp | 103 +++-- cpp/include/cuvs/cluster/kmeans.hpp | 94 +++- cpp/src/cluster/detail/kmeans.cuh | 130 +----- cpp/src/cluster/detail/kmeans_batched.cuh | 510 +++++++++++++++++++++ cpp/src/cluster/detail/kmeans_common.cuh | 129 ++++++ cpp/src/cluster/kmeans_fit_double.cu | 16 +- cpp/src/cluster/kmeans_fit_float.cu | 16 +- cpp/tests/cluster/kmeans.cu | 252 +++++++++- python/cuvs/cuvs/cluster/kmeans/kmeans.pxd | 3 +- python/cuvs/cuvs/cluster/kmeans/kmeans.pyx | 117 ++++- python/cuvs/cuvs/tests/test_kmeans.py | 74 ++- 12 files changed, 1306 insertions(+), 157 deletions(-) create mode 100644 cpp/src/cluster/detail/kmeans_batched.cuh diff --git a/c/include/cuvs/cluster/kmeans.h b/c/include/cuvs/cluster/kmeans.h index 0bb9591f63..8f55edb925 100644 --- a/c/include/cuvs/cluster/kmeans.h +++ b/c/include/cuvs/cluster/kmeans.h @@ -36,6 +36,7 @@ typedef enum { Array = 2 } cuvsKMeansInitMethod; + /** * @brief Hyper-parameters for the kmeans algorithm */ @@ -90,6 +91,7 @@ struct cuvsKMeansParams { */ int batch_centroids; + /** Check inertia during iterations for early convergence. */ bool inertia_check; /** @@ -101,6 +103,12 @@ struct cuvsKMeansParams { * For hierarchical k-means , defines the number of training iterations */ int hierarchical_n_iters; + + /** + * Number of samples to process per GPU batch for the batched (host-data) API. + * When set to 0, defaults to n_samples (process all at once). + */ + int64_t streaming_batch_size; }; typedef struct cuvsKMeansParams* cuvsKMeansParams_t; @@ -142,18 +150,24 @@ typedef enum { CUVS_KMEANS_TYPE_KMEANS = 0, CUVS_KMEANS_TYPE_KMEANS_BALANCED = 1 * clusters are reinitialized by choosing new centroids with * k-means++ algorithm. * + * X may reside on either host (CPU) or device (GPU) memory. + * When X is on the host the data is streamed to the GPU in + * batches controlled by params->streaming_batch_size. + * * @param[in] res opaque C handle * @param[in] params Parameters for KMeans model. * @param[in] X Training instances to cluster. The data must - * be in row-major format. + * be in row-major format. May be on host or + * device memory. * [dim = n_samples x n_features] * @param[in] sample_weight Optional weights for each observation in X. + * Must be on the same memory space as X. * [len = n_samples] * @param[inout] centroids [in] When init is InitMethod::Array, use * centroids as the initial cluster centers. * [out] The generated centroids from the * kmeans algorithm are stored at the address - * pointed by 'centroids'. + * pointed by 'centroids'. Must be on device. * [dim = n_clusters x n_features] * @param[out] inertia Sum of squared distances of samples to their * closest cluster center. @@ -212,6 +226,7 @@ cuvsError_t cuvsKMeansClusterCost(cuvsResources_t res, DLManagedTensor* X, DLManagedTensor* centroids, double* cost); + /** * @} */ diff --git a/c/src/cluster/kmeans.cpp b/c/src/cluster/kmeans.cpp index 57b6282c20..a84cd50259 100644 --- a/c/src/cluster/kmeans.cpp +++ b/c/src/cluster/kmeans.cpp @@ -4,6 +4,7 @@ */ #include + #include #include @@ -17,16 +18,18 @@ namespace { cuvs::cluster::kmeans::params convert_params(const cuvsKMeansParams& params) { - auto kmeans_params = cuvs::cluster::kmeans::params(); - kmeans_params.metric = static_cast(params.metric); - kmeans_params.init = static_cast(params.init); - kmeans_params.n_clusters = params.n_clusters; - kmeans_params.max_iter = params.max_iter; - kmeans_params.tol = params.tol; + auto kmeans_params = cuvs::cluster::kmeans::params(); + kmeans_params.metric = static_cast(params.metric); + kmeans_params.init = static_cast(params.init); + kmeans_params.n_clusters = params.n_clusters; + kmeans_params.max_iter = params.max_iter; + kmeans_params.tol = params.tol; + kmeans_params.n_init = params.n_init; kmeans_params.oversampling_factor = params.oversampling_factor; kmeans_params.batch_samples = params.batch_samples; kmeans_params.batch_centroids = params.batch_centroids; kmeans_params.inertia_check = params.inertia_check; + kmeans_params.streaming_batch_size = params.streaming_batch_size; return kmeans_params; } @@ -38,7 +41,7 @@ cuvs::cluster::kmeans::balanced_params convert_balanced_params(const cuvsKMeansP return kmeans_params; } -template +template void _fit(cuvsResources_t res, const cuvsKMeansParams& params, DLManagedTensor* X_tensor, @@ -50,7 +53,51 @@ void _fit(cuvsResources_t res, auto X = X_tensor->dl_tensor; auto res_ptr = reinterpret_cast(res); - if (cuvs::core::is_dlpack_device_compatible(X)) { + if (!cuvs::core::is_dlpack_device_compatible(X)) { + auto n_samples = static_cast(X.shape[0]); + auto n_features = static_cast(X.shape[1]); + + if (params.hierarchical) { + RAFT_FAIL("hierarchical kmeans is not supported with host data"); + } + + auto centroids_dl = centroids_tensor->dl_tensor; + if (!cuvs::core::is_dlpack_device_compatible(centroids_dl)) { + RAFT_FAIL("centroids must be on device memory"); + } + + auto X_view = raft::make_host_matrix_view( + reinterpret_cast(X.data), n_samples, n_features); + auto centroids_view = + cuvs::core::from_dlpack>( + centroids_tensor); + + std::optional> sample_weight; + if (sample_weight_tensor != NULL) { + auto sw = sample_weight_tensor->dl_tensor; + if (!cuvs::core::is_dlpack_host_compatible(sw)) { + RAFT_FAIL("sample_weight must be host accessible when X is on host"); + } + sample_weight = raft::make_host_vector_view( + reinterpret_cast(sw.data), n_samples); + } + + T inertia_temp; + IdxT n_iter_temp; + + auto kmeans_params = convert_params(params); + cuvs::cluster::kmeans::fit(*res_ptr, + kmeans_params, + X_view, + sample_weight, + centroids_view, + raft::make_host_scalar_view(&inertia_temp), + raft::make_host_scalar_view(&n_iter_temp)); + + *inertia = inertia_temp; + *n_iter = n_iter_temp; + + } else { using const_mdspan_type = raft::device_matrix_view; using mdspan_type = raft::device_matrix_view; @@ -85,13 +132,11 @@ void _fit(cuvsResources_t res, cuvs::core::from_dlpack(X_tensor), sample_weight, cuvs::core::from_dlpack(centroids_tensor), - raft::make_host_scalar_view(&inertia_temp), - raft::make_host_scalar_view(&n_iter_temp)); + raft::make_host_scalar_view(&inertia_temp), + raft::make_host_scalar_view(&n_iter_temp)); *inertia = inertia_temp; *n_iter = n_iter_temp; } - } else { - RAFT_FAIL("X dataset must be accessible on device memory"); } } @@ -143,7 +188,7 @@ void _predict(cuvsResources_t res, cuvs::core::from_dlpack(centroids_tensor), cuvs::core::from_dlpack(labels_tensor), normalize_weight, - raft::make_host_scalar_view(&inertia_temp)); + raft::make_host_scalar_view(&inertia_temp)); *inertia = inertia_temp; } } else { @@ -168,7 +213,7 @@ void _cluster_cost(cuvsResources_t res, cuvs::cluster::kmeans::cluster_cost(*res_ptr, cuvs::core::from_dlpack(X_tensor), cuvs::core::from_dlpack(centroids_tensor), - raft::make_host_scalar_view(&cost_temp)); + raft::make_host_scalar_view(&cost_temp)); } else { RAFT_FAIL("X dataset must be accessible on device memory"); } @@ -182,17 +227,20 @@ extern "C" cuvsError_t cuvsKMeansParamsCreate(cuvsKMeansParams_t* params) return cuvs::core::translate_exceptions([=] { cuvs::cluster::kmeans::params cpp_params; cuvs::cluster::kmeans::balanced_params cpp_balanced_params; - *params = - new cuvsKMeansParams{.metric = static_cast(cpp_params.metric), - .n_clusters = cpp_params.n_clusters, - .init = static_cast(cpp_params.init), - .max_iter = cpp_params.max_iter, - .tol = cpp_params.tol, - .oversampling_factor = cpp_params.oversampling_factor, - .batch_samples = cpp_params.batch_samples, - .inertia_check = cpp_params.inertia_check, - .hierarchical = false, - .hierarchical_n_iters = static_cast(cpp_balanced_params.n_iters)}; + *params = new cuvsKMeansParams{ + .metric = static_cast(cpp_params.metric), + .n_clusters = cpp_params.n_clusters, + .init = static_cast(cpp_params.init), + .max_iter = cpp_params.max_iter, + .tol = cpp_params.tol, + .n_init = cpp_params.n_init, + .oversampling_factor = cpp_params.oversampling_factor, + .batch_samples = cpp_params.batch_samples, + .batch_centroids = cpp_params.batch_centroids, + .inertia_check = cpp_params.inertia_check, + .hierarchical = false, + .hierarchical_n_iters = static_cast(cpp_balanced_params.n_iters), + .streaming_batch_size = cpp_params.streaming_batch_size}; }); } @@ -235,10 +283,9 @@ extern "C" cuvsError_t cuvsKMeansPredict(cuvsResources_t res, return cuvs::core::translate_exceptions([=] { auto dataset = X->dl_tensor; if (dataset.dtype.code == kDLFloat && dataset.dtype.bits == 32) { - _predict(res, *params, X, sample_weight, centroids, labels, normalize_weight, inertia); + _predict(res, *params, X, sample_weight, centroids, labels, normalize_weight, inertia); } else if (dataset.dtype.code == kDLFloat && dataset.dtype.bits == 64) { - _predict( - res, *params, X, sample_weight, centroids, labels, normalize_weight, inertia); + _predict(res, *params, X, sample_weight, centroids, labels, normalize_weight, inertia); } else { RAFT_FAIL("Unsupported dataset DLtensor dtype: %d and bits: %d", dataset.dtype.code, diff --git a/cpp/include/cuvs/cluster/kmeans.hpp b/cpp/include/cuvs/cluster/kmeans.hpp index a839cecf56..6c351010ae 100644 --- a/cpp/include/cuvs/cluster/kmeans.hpp +++ b/cpp/include/cuvs/cluster/kmeans.hpp @@ -100,15 +100,31 @@ struct params : base_params { * useful to optimize/control the memory footprint * Default tile is [batch_samples x n_clusters] i.e. when batch_centroids is 0 * then don't tile the centroids + * + * NB: These parameters are unrelated to streaming_batch_size, which controls how many + * samples to transfer from host to device per batch when processing out-of-core + * data. */ int batch_samples = 1 << 15; /** * if 0 then batch_centroids = n_clusters */ - int batch_centroids = 0; // + int batch_centroids = 0; + /** + * If true, check inertia during iterations for early convergence. + */ bool inertia_check = false; + + /** + * Number of samples to process per GPU batch when fitting with host data. + * When set to 0, defaults to n_samples (process all at once). + * Only used by the batched (host-data) code path and ignored by device-data + * overloads. + * Default: 0 (process all data at once). + */ + int64_t streaming_batch_size = 0; }; /** @@ -141,6 +157,82 @@ enum class kmeans_type { KMeans = 0, KMeansBalanced = 1 }; * @{ */ +/** + * @brief Find clusters with k-means algorithm using batched processing of host data. + * + * TODO: Evaluate replacing the extent type with int64_t. Reference issue: + * https://github.com/rapidsai/cuvs/issues/1961 + * + * This overload supports out-of-core computation where the dataset resides + * on the host. Data is processed in GPU-sized batches, streaming from host to device. + * The batch size is controlled by params.streaming_batch_size. + * + * @code{.cpp} + * #include + * #include + * using namespace cuvs::cluster; + * ... + * raft::resources handle; + * cuvs::cluster::kmeans::params params; + * params.n_clusters = 100; + * params.streaming_batch_size = 100000; + * float inertia; + * int64_t n_iter; + * + * // Data on host + * std::vector h_X(n_samples * n_features); + * auto X = raft::make_host_matrix_view(h_X.data(), n_samples, n_features); + * + * // Centroids on device + * auto centroids = raft::make_device_matrix(handle, params.n_clusters, + * n_features); + * + * kmeans::fit(handle, + * params, + * X, + * std::nullopt, + * centroids.view(), + * raft::make_host_scalar_view(&inertia), + * raft::make_host_scalar_view(&n_iter)); + * @endcode + * + * @param[in] handle The raft handle. + * @param[in] params Parameters for KMeans model. Batch size is read from + * params.streaming_batch_size. + * @param[in] X Training instances on HOST memory. The data must + * be in row-major format. + * [dim = n_samples x n_features] + * @param[in] sample_weight Optional weights for each observation in X (on host). + * [len = n_samples] + * @param[inout] centroids [in] When init is InitMethod::Array, use + * centroids as the initial cluster centers. + * [out] The generated centroids from the + * kmeans algorithm are stored at the address + * pointed by 'centroids'. + * [dim = n_clusters x n_features] + * @param[out] inertia Sum of squared distances of samples to their + * closest cluster center. + * @param[out] n_iter Number of iterations run. + */ +void fit(raft::resources const& handle, + const cuvs::cluster::kmeans::params& params, + raft::host_matrix_view X, + std::optional> sample_weight, + raft::device_matrix_view centroids, + raft::host_scalar_view inertia, + raft::host_scalar_view n_iter); + +/** + * @brief Find clusters with k-means algorithm using batched processing of host data. + */ +void fit(raft::resources const& handle, + const cuvs::cluster::kmeans::params& params, + raft::host_matrix_view X, + std::optional> sample_weight, + raft::device_matrix_view centroids, + raft::host_scalar_view inertia, + raft::host_scalar_view n_iter); + /** * @brief Find clusters with k-means algorithm. * Initial centroids are chosen with k-means++ algorithm. Empty diff --git a/cpp/src/cluster/detail/kmeans.cuh b/cpp/src/cluster/detail/kmeans.cuh index 6e7bff8450..5a35f203b3 100644 --- a/cpp/src/cluster/detail/kmeans.cuh +++ b/cpp/src/cluster/detail/kmeans.cuh @@ -29,7 +29,6 @@ #include #include #include -#include #include #include #include @@ -287,61 +286,21 @@ void update_centroids(raft::resources const& handle, rmm::device_uvector& workspace) { auto n_clusters = centroids.extent(0); - auto n_samples = X.extent(0); - - workspace.resize(n_samples, raft::resource::get_cuda_stream(handle)); - - // Calculates weighted sum of all the samples assigned to cluster-i and stores the - // result in new_centroids[i] - raft::linalg::reduce_rows_by_key((DataT*)X.data_handle(), - X.extent(1), - cluster_labels, - sample_weights.data_handle(), - workspace.data(), - X.extent(0), - X.extent(1), - n_clusters, - new_centroids.data_handle(), - raft::resource::get_cuda_stream(handle)); - - // Reduce weights by key to compute weight in each cluster - raft::linalg::reduce_cols_by_key(sample_weights.data_handle(), - cluster_labels, - weight_per_cluster.data_handle(), - (IndexT)1, - (IndexT)sample_weights.extent(0), - (IndexT)n_clusters, - raft::resource::get_cuda_stream(handle)); - - // Computes new_centroids[i] = new_centroids[i]/weight_per_cluster[i] where - // new_centroids[n_clusters x n_features] - 2D array, new_centroids[i] has sum of all the - // samples assigned to cluster-i - // weight_per_cluster[n_clusters] - 1D array, weight_per_cluster[i] contains sum of weights in - // cluster-i. - // Note - when weight_per_cluster[i] is 0, new_centroids[i] is reset to 0 - raft::linalg::matrix_vector_op( - handle, - raft::make_const_mdspan(new_centroids), - raft::make_const_mdspan(weight_per_cluster), - new_centroids, - raft::div_checkzero_op{}); - - // copy centroids[i] to new_centroids[i] when weight_per_cluster[i] is 0 - cub::ArgIndexInputIterator itr_wt(weight_per_cluster.data_handle()); - raft::matrix::gather_if( - const_cast(centroids.data_handle()), - static_cast(centroids.extent(1)), - static_cast(centroids.extent(0)), - itr_wt, - itr_wt, - static_cast(weight_per_cluster.size()), - new_centroids.data_handle(), - [=] __device__(raft::KeyValuePair map) { // predicate - // copy when the sum of weights in the cluster is 0 - return map.value == 0; - }, - raft::key_op{}, - raft::resource::get_cuda_stream(handle)); + + cuvs::cluster::kmeans::detail::compute_centroid_adjustments(handle, + X, + sample_weights, + cluster_labels, + static_cast(n_clusters), + new_centroids, + weight_per_cluster, + workspace); + + cuvs::cluster::kmeans::detail::finalize_centroids(handle, + raft::make_const_mdspan(new_centroids), + raft::make_const_mdspan(weight_per_cluster), + centroids, + new_centroids); } // TODO: Resizing is needed to use mdarray instead of rmm::device_uvector @@ -437,18 +396,9 @@ void kmeans_fit_main(raft::resources const& handle, newCentroids.view(), workspace); - // compute the squared norm between the newCentroids and the original - // centroids, destructor releases the resource - auto sqrdNorm = raft::make_device_scalar(handle, DataT(0)); - raft::linalg::mapThenSumReduce(sqrdNorm.data_handle(), - newCentroids.size(), - raft::sqdiff_op{}, - stream, - centroids.data_handle(), - newCentroids.data_handle()); - - DataT sqrdNormError = 0; - raft::copy(handle, raft::make_host_scalar_view(&sqrdNormError), sqrdNorm.view()); + // Compute how much centroids shifted + DataT sqrdNormError = compute_centroid_shift( + handle, raft::make_const_mdspan(centroids), raft::make_const_mdspan(newCentroids.view())); raft::copy(handle, raft::make_device_vector_view(centroidsRawData.data_handle(), newCentroids.size()), @@ -478,7 +428,6 @@ void kmeans_fit_main(raft::resources const& handle, priorClusteringCost = curClusteringCost; } - raft::resource::sync_stream(handle, stream); if (sqrdNormError < params.tol) done = true; if (done) { @@ -487,43 +436,12 @@ void kmeans_fit_main(raft::resources const& handle, } } - auto centroids = raft::make_device_matrix_view( - centroidsRawData.data_handle(), n_clusters, n_features); - - cuvs::cluster::kmeans::detail::minClusterAndDistanceCompute( - handle, - X, - centroids, - minClusterAndDistance.view(), - l2normx_view, - L2NormBuf_OR_DistBuf, - params.metric, - params.batch_samples, - params.batch_centroids, - workspace); - - raft::linalg::map( - handle, - minClusterAndDistance.view(), - [=] __device__(const raft::KeyValuePair kvp, DataT wt) { - raft::KeyValuePair res; - res.value = kvp.value * wt; - res.key = kvp.key; - return res; - }, - raft::make_const_mdspan(minClusterAndDistance.view()), - raft::make_const_mdspan(weight)); - - // calculate cluster cost phi_x(C) - cuvs::cluster::kmeans::detail::computeClusterCost( - handle, - minClusterAndDistance.view(), - workspace, - raft::make_device_scalar_view(clusterCostD.data()), - raft::value_op{}, - raft::add_op{}); - - inertia[0] = clusterCostD.value(stream); + cuvs::cluster::kmeans::cluster_cost(handle, + X, + raft::make_device_matrix_view( + centroidsRawData.data_handle(), n_clusters, n_features), + inertia, + std::make_optional(weight)); RAFT_LOG_DEBUG("KMeans.fit: completed after %d iterations with %f inertia[0] ", n_iter[0] > params.max_iter ? n_iter[0] - 1 : n_iter[0], diff --git a/cpp/src/cluster/detail/kmeans_batched.cuh b/cpp/src/cluster/detail/kmeans_batched.cuh new file mode 100644 index 0000000000..e2fc8d334f --- /dev/null +++ b/cpp/src/cluster/detail/kmeans_batched.cuh @@ -0,0 +1,510 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ +#pragma once + +#include "kmeans.cuh" +#include "kmeans_common.cuh" + +#include "../../neighbors/detail/ann_utils.cuh" +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + +namespace cuvs::cluster::kmeans::detail { + +/** + * @brief Initialize centroids from host data + * + * @tparam T Input data type + * @tparam IdxT Index type + */ +template +void init_centroids_from_host_sample(raft::resources const& handle, + const cuvs::cluster::kmeans::params& params, + IdxT streaming_batch_size, + raft::host_matrix_view X, + raft::device_matrix_view centroids, + rmm::device_uvector& workspace) +{ + cudaStream_t stream = raft::resource::get_cuda_stream(handle); + auto n_samples = X.extent(0); + auto n_features = X.extent(1); + auto n_clusters = params.n_clusters; + + if (params.init == cuvs::cluster::kmeans::params::InitMethod::KMeansPlusPlus) { + // this is a heuristic to speed up the initialization + IdxT init_sample_size = 3 * streaming_batch_size; + if (init_sample_size < n_clusters) { init_sample_size = 3 * n_clusters; } + init_sample_size = std::min(init_sample_size, n_samples); + + auto init_sample = raft::make_device_matrix(handle, init_sample_size, n_features); + raft::random::RngState random_state(params.rng_state.seed); + raft::matrix::sample_rows(handle, random_state, X, init_sample.view()); + + if (params.oversampling_factor == 0) { + cuvs::cluster::kmeans::detail::kmeansPlusPlus( + handle, params, raft::make_const_mdspan(init_sample.view()), centroids, workspace); + } else { + cuvs::cluster::kmeans::detail::initScalableKMeansPlusPlus( + handle, params, raft::make_const_mdspan(init_sample.view()), centroids, workspace); + } + } else if (params.init == cuvs::cluster::kmeans::params::InitMethod::Random) { + raft::random::RngState random_state(params.rng_state.seed); + raft::matrix::sample_rows(handle, random_state, X, centroids); + } else if (params.init == cuvs::cluster::kmeans::params::InitMethod::Array) { + // already provided + } else { + RAFT_FAIL("Unknown initialization method"); + } +} + +/** + * @brief Compute the weight normalization scale factor for host sample weights. Weights are + * normalized to sum to n_samples. Returns the scale factor to apply to each weight. + * + * @param[in] sample_weight Optional host vector of sample weights + * @param[in] n_samples Number of samples + * @return Scale factor (1.0 if no weights or already normalized) + */ +template +T compute_host_weight_scale( + const std::optional>& sample_weight, IdxT n_samples) +{ + if (!sample_weight.has_value()) { return T{1}; } + + T wt_sum = T{0}; + const T* sw_ptr = sample_weight->data_handle(); + for (IdxT i = 0; i < n_samples; ++i) { + wt_sum += sw_ptr[i]; + } + if (wt_sum == static_cast(n_samples)) { return T{1}; } + + RAFT_LOG_DEBUG( + "[Warning!] KMeans: normalizing the user provided sample weight to " + "sum up to %zu samples (scale=%f)", + static_cast(n_samples), + static_cast(static_cast(n_samples) / wt_sum)); + return static_cast(n_samples) / wt_sum; +} + +/** + * @brief Copy host sample weights to device and apply normalization scale. + * + * When sample_weight is provided, copies the batch slice to the device buffer + * and applies the normalization scale factor. When not provided, the device + * buffer is assumed to already be filled with 1.0. + * + * @param[in] handle RAFT resources handle + * @param[in] sample_weight Optional host weights + * @param[in] batch_offset Offset into the host weights for this batch + * @param[in] batch_size Number of elements in this batch + * @param[in] weight_scale Scale factor from compute_host_weight_scale + * @param[inout] batch_weights Device buffer to write normalized weights into + */ +template +void copy_and_scale_batch_weights( + raft::resources const& handle, + const std::optional>& sample_weight, + size_t batch_offset, + IdxT batch_size, + T weight_scale, + raft::device_vector& batch_weights) +{ + if (!sample_weight.has_value()) { return; } + + cudaStream_t stream = raft::resource::get_cuda_stream(handle); + raft::copy( + batch_weights.data_handle(), sample_weight->data_handle() + batch_offset, batch_size, stream); + + if (weight_scale != T{1}) { + auto batch_weights_view = + raft::make_device_vector_view(batch_weights.data_handle(), batch_size); + raft::linalg::map(handle, + batch_weights_view, + raft::mul_const_op{weight_scale}, + raft::make_const_mdspan(batch_weights_view)); + } +} + +/** + * @brief Accumulate partial centroid sums and counts from a batch + * + * This function adds the partial sums from a batch to the running accumulators. + * It does NOT divide - that happens once at the end of all batches. + */ +template +void accumulate_batch_centroids( + raft::resources const& handle, + raft::device_matrix_view batch_data, + raft::device_vector_view, IdxT> minClusterAndDistance, + raft::device_vector_view sample_weights, + raft::device_matrix_view centroid_sums, + raft::device_vector_view cluster_counts, + raft::device_matrix_view batch_sums, + raft::device_vector_view batch_counts) +{ + cudaStream_t stream = raft::resource::get_cuda_stream(handle); + + auto workspace = rmm::device_uvector( + batch_data.extent(0), stream, raft::resource::get_workspace_resource(handle)); + + cuvs::cluster::kmeans::detail::KeyValueIndexOp conversion_op; + thrust::transform_iterator, + const raft::KeyValuePair*> + labels_itr(minClusterAndDistance.data_handle(), conversion_op); + + cuvs::cluster::kmeans::detail::compute_centroid_adjustments( + handle, + batch_data, + sample_weights, + labels_itr, + static_cast(centroid_sums.extent(0)), + batch_sums, + batch_counts, + workspace); + + raft::linalg::add(centroid_sums.data_handle(), + centroid_sums.data_handle(), + batch_sums.data_handle(), + centroid_sums.size(), + stream); + + raft::linalg::add(cluster_counts.data_handle(), + cluster_counts.data_handle(), + batch_counts.data_handle(), + cluster_counts.size(), + stream); +} + +/** + * @brief Main fit function for batched k-means with host data (full-batch / Lloyd's algorithm). + * + * Processes host data in GPU-sized batches per iteration, accumulating partial centroid + * sums and counts, then finalizes centroids at the end of each iteration. + * + * @tparam T Input data type (float, double) + * @tparam IdxT Index type (int, int64_t) + * + * @param[in] handle RAFT resources handle + * @param[in] params K-means parameters + * @param[in] X Input data on HOST [n_samples x n_features] + * @param[in] sample_weight Optional weights per sample (on host) + * @param[inout] centroids Initial/output cluster centers [n_clusters x n_features] + * @param[out] inertia Sum of squared distances to nearest centroid + * @param[out] n_iter Number of iterations run + */ +template +void fit(raft::resources const& handle, + const cuvs::cluster::kmeans::params& params, + raft::host_matrix_view X, + std::optional> sample_weight, + raft::device_matrix_view centroids, + raft::host_scalar_view inertia, + raft::host_scalar_view n_iter) +{ + cudaStream_t stream = raft::resource::get_cuda_stream(handle); + auto n_samples = X.extent(0); + auto n_features = X.extent(1); + auto n_clusters = params.n_clusters; + auto metric = params.metric; + + IdxT streaming_batch_size = static_cast(params.streaming_batch_size); + + if (params.streaming_batch_size == 0) { + streaming_batch_size = static_cast(n_samples); + } else if (params.streaming_batch_size < 0 || params.streaming_batch_size > n_samples) { + RAFT_LOG_WARN("streaming_batch_size must be >= 0 and <= n_samples, using n_samples=%zu", + static_cast(n_samples)); + streaming_batch_size = static_cast(n_samples); + } + + RAFT_EXPECTS(n_clusters > 0, "n_clusters must be positive"); + RAFT_EXPECTS(static_cast(centroids.extent(0)) == n_clusters, + "centroids.extent(0) must equal n_clusters"); + RAFT_EXPECTS(centroids.extent(1) == n_features, "centroids.extent(1) must equal n_features"); + + RAFT_LOG_DEBUG( + "KMeans batched fit: n_samples=%zu, n_features=%zu, n_clusters=%d, streaming_batch_size=%zu", + static_cast(n_samples), + static_cast(n_features), + n_clusters, + static_cast(streaming_batch_size)); + + rmm::device_uvector workspace(0, stream); + + auto n_init = params.n_init; + if (params.init == cuvs::cluster::kmeans::params::InitMethod::Array && n_init != 1) { + RAFT_LOG_DEBUG( + "Explicit initial center position passed: performing only one init in " + "k-means instead of n_init=%d", + n_init); + n_init = 1; + } + + auto best_centroids = n_init > 1 + ? raft::make_device_matrix(handle, n_clusters, n_features) + : raft::make_device_matrix(handle, 0, 0); + T best_inertia = std::numeric_limits::max(); + IdxT best_n_iter = 0; + + std::mt19937 gen(params.rng_state.seed); + + // ----- Allocate reusable work buffers (shared across n_init iterations) ----- + auto batch_weights = raft::make_device_vector(handle, streaming_batch_size); + auto minClusterAndDistance = + raft::make_device_vector, IdxT>(handle, streaming_batch_size); + auto L2NormBatch = raft::make_device_vector(handle, streaming_batch_size); + rmm::device_uvector L2NormBuf_OR_DistBuf(0, stream); + + auto centroid_sums = raft::make_device_matrix(handle, n_clusters, n_features); + auto weight_per_cluster = raft::make_device_vector(handle, n_clusters); + auto new_centroids = raft::make_device_matrix(handle, n_clusters, n_features); + auto clustering_cost = raft::make_device_vector(handle, 1); + auto batch_clustering_cost = raft::make_device_vector(handle, 1); + auto batch_sums = raft::make_device_matrix(handle, n_clusters, n_features); + auto batch_counts = raft::make_device_vector(handle, n_clusters); + + // Compute weight normalization (matches checkWeight in regular kmeans) + T weight_scale = compute_host_weight_scale(sample_weight, n_samples); + + // ---- Main n_init loop ---- + for (int seed_iter = 0; seed_iter < n_init; ++seed_iter) { + cuvs::cluster::kmeans::params iter_params = params; + iter_params.rng_state.seed = gen(); + + RAFT_LOG_DEBUG("KMeans batched fit: n_init iteration %d/%d (seed=%llu)", + seed_iter + 1, + n_init, + (unsigned long long)iter_params.rng_state.seed); + + if (iter_params.init != cuvs::cluster::kmeans::params::InitMethod::Array) { + init_centroids_from_host_sample( + handle, iter_params, streaming_batch_size, X, centroids, workspace); + } + + if (!sample_weight.has_value()) { raft::matrix::fill(handle, batch_weights.view(), T{1}); } + + // Reset per-iteration state + T prior_cluster_cost = 0; + + cuvs::spatial::knn::detail::utils::batch_load_iterator data_batches( + X.data_handle(), n_samples, n_features, streaming_batch_size, stream); + + for (n_iter[0] = 1; n_iter[0] <= iter_params.max_iter; ++n_iter[0]) { + RAFT_LOG_DEBUG("KMeans batched: Iteration %d", n_iter[0]); + + raft::matrix::fill(handle, centroid_sums.view(), T{0}); + raft::matrix::fill(handle, weight_per_cluster.view(), T{0}); + + raft::matrix::fill(handle, clustering_cost.view(), T{0}); + + auto centroids_const = raft::make_const_mdspan(centroids); + + for (const auto& data_batch : data_batches) { + IdxT current_batch_size = static_cast(data_batch.size()); + raft::matrix::fill(handle, batch_clustering_cost.view(), T{0}); + + auto batch_data_view = raft::make_device_matrix_view( + data_batch.data(), current_batch_size, n_features); + + copy_and_scale_batch_weights(handle, + sample_weight, + data_batch.offset(), + current_batch_size, + weight_scale, + batch_weights); + + auto batch_weights_view = raft::make_device_vector_view( + batch_weights.data_handle(), current_batch_size); + + auto L2NormBatch_view = + raft::make_device_vector_view(L2NormBatch.data_handle(), current_batch_size); + + if (metric == cuvs::distance::DistanceType::L2Expanded || + metric == cuvs::distance::DistanceType::L2SqrtExpanded) { + raft::linalg::norm( + handle, + raft::make_device_matrix_view( + data_batch.data(), current_batch_size, n_features), + L2NormBatch_view); + } + + auto L2NormBatch_const = raft::make_const_mdspan(L2NormBatch_view); + + auto minClusterAndDistance_view = + raft::make_device_vector_view, IdxT>( + minClusterAndDistance.data_handle(), current_batch_size); + + cuvs::cluster::kmeans::detail::minClusterAndDistanceCompute( + handle, + batch_data_view, + centroids_const, + minClusterAndDistance_view, + L2NormBatch_const, + L2NormBuf_OR_DistBuf, + metric, + params.batch_samples, + params.batch_centroids, + workspace); + + auto minClusterAndDistance_const = raft::make_const_mdspan(minClusterAndDistance_view); + + accumulate_batch_centroids(handle, + batch_data_view, + minClusterAndDistance_const, + batch_weights_view, + centroid_sums.view(), + weight_per_cluster.view(), + batch_sums.view(), + batch_counts.view()); + + if (params.inertia_check) { + raft::linalg::map( + handle, + minClusterAndDistance_view, + [=] __device__(const raft::KeyValuePair kvp, T wt) { + raft::KeyValuePair res; + res.value = kvp.value * wt; + res.key = kvp.key; + return res; + }, + raft::make_const_mdspan(minClusterAndDistance_view), + batch_weights_view); + + cuvs::cluster::kmeans::detail::computeClusterCost( + handle, + minClusterAndDistance_view, + workspace, + raft::make_device_scalar_view(batch_clustering_cost.data_handle()), + raft::value_op{}, + raft::add_op{}); + raft::linalg::add(handle, + raft::make_const_mdspan(clustering_cost.view()), + raft::make_const_mdspan(batch_clustering_cost.view()), + clustering_cost.view()); + } + } + + auto centroid_sums_const = raft::make_device_matrix_view( + centroid_sums.data_handle(), n_clusters, n_features); + auto weight_per_cluster_const = + raft::make_device_vector_view(weight_per_cluster.data_handle(), n_clusters); + + finalize_centroids(handle, + centroid_sums_const, + weight_per_cluster_const, + centroids_const, + new_centroids.view()); + + T sqrdNormError = compute_centroid_shift( + handle, raft::make_const_mdspan(centroids), raft::make_const_mdspan(new_centroids.view())); + + raft::copy(handle, centroids, new_centroids.view()); + + bool done = false; + if (params.inertia_check) { + raft::copy(inertia.data_handle(), clustering_cost.data_handle(), 1, stream); + raft::resource::sync_stream(handle); + ASSERT(inertia[0] != (T)0.0, + "Too few points and centroids being found is getting 0 cost from " + "centers"); + if (n_iter[0] > 1) { + T delta = inertia[0] / prior_cluster_cost; + if (delta > 1 - params.tol) done = true; + } + prior_cluster_cost = inertia[0]; + } + + if (sqrdNormError < params.tol) done = true; + + if (done) { + RAFT_LOG_DEBUG("Threshold triggered after %d iterations. Terminating early.", n_iter[0]); + break; + } + } + + // Recompute final weighted inertia with the converged centroids. + { + auto centroids_const_view = raft::make_device_matrix_view( + centroids.data_handle(), n_clusters, n_features); + + inertia[0] = T{0}; + for (const auto& data_batch : data_batches) { + IdxT current_batch_size = static_cast(data_batch.size()); + + auto batch_data_view = raft::make_device_matrix_view( + data_batch.data(), current_batch_size, n_features); + + std::optional> batch_sw = std::nullopt; + if (sample_weight.has_value()) { + copy_and_scale_batch_weights(handle, + sample_weight, + data_batch.offset(), + current_batch_size, + weight_scale, + batch_weights); + batch_sw = raft::make_device_vector_view(batch_weights.data_handle(), + current_batch_size); + } + + T batch_cost = T{0}; + cuvs::cluster::kmeans::cluster_cost(handle, + batch_data_view, + centroids_const_view, + raft::make_host_scalar_view(&batch_cost), + batch_sw); + + inertia[0] += batch_cost; + } + } + + RAFT_LOG_DEBUG("KMeans batched: n_init %d/%d completed with inertia=%f", + seed_iter + 1, + n_init, + static_cast(inertia[0])); + + if (n_init > 1 && inertia[0] < best_inertia) { + best_inertia = inertia[0]; + best_n_iter = n_iter[0]; + raft::copy(best_centroids.data_handle(), centroids.data_handle(), centroids.size(), stream); + } + } + if (n_init > 1) { + inertia[0] = best_inertia; + n_iter[0] = best_n_iter; + raft::copy(handle, centroids, best_centroids.view()); + RAFT_LOG_DEBUG("KMeans batched: Best of %d runs: inertia=%f, n_iter=%d", + n_init, + static_cast(best_inertia), + best_n_iter); + } +} + +} // namespace cuvs::cluster::kmeans::detail diff --git a/cpp/src/cluster/detail/kmeans_common.cuh b/cpp/src/cluster/detail/kmeans_common.cuh index 4e2a41b26a..250563dd12 100644 --- a/cpp/src/cluster/detail/kmeans_common.cuh +++ b/cpp/src/cluster/detail/kmeans_common.cuh @@ -22,7 +22,10 @@ #include #include #include +#include +#include #include +#include #include #include #include @@ -469,4 +472,130 @@ void countSamplesInCluster(raft::resources const& handle, (IndexT)n_clusters, workspace); } + +/** + * @brief Compute centroid adjustments (weighted sums and counts per cluster) + * + * This helper function computes: + * 1. Weighted sum of samples per cluster using reduce_rows_by_key + * 2. Sum of weights per cluster using reduce_cols_by_key + * + * @tparam DataT Data type for samples and weights + * @tparam IndexT Index type + * @tparam LabelsIterator Iterator type for cluster labels + * + * @param[in] handle RAFT resources handle + * @param[in] X Input samples [n_samples x n_features] + * @param[in] sample_weights Weights for each sample [n_samples] + * @param[in] cluster_labels Cluster assignment for each sample (iterator) + * @param[in] n_clusters Number of clusters + * @param[out] centroid_sums Output weighted sum per cluster [n_clusters x n_features] + * @param[out] weight_per_cluster Output sum of weights per cluster [n_clusters] + * @param[inout] workspace Workspace buffer for intermediate operations + */ +template +void compute_centroid_adjustments( + raft::resources const& handle, + raft::device_matrix_view X, + raft::device_vector_view sample_weights, + LabelsIterator cluster_labels, + IndexT n_clusters, + raft::device_matrix_view centroid_sums, + raft::device_vector_view weight_per_cluster, + rmm::device_uvector& workspace) +{ + cudaStream_t stream = raft::resource::get_cuda_stream(handle); + auto n_samples = X.extent(0); + + workspace.resize(n_samples, stream); + + raft::linalg::reduce_rows_by_key(X.data_handle(), + X.extent(1), + cluster_labels, + sample_weights.data_handle(), + workspace.data(), + X.extent(0), + X.extent(1), + n_clusters, + centroid_sums.data_handle(), + stream); + + raft::linalg::reduce_cols_by_key(sample_weights.data_handle(), + cluster_labels, + weight_per_cluster.data_handle(), + static_cast(1), + static_cast(n_samples), + n_clusters, + stream); +} +/** + * @brief Finalize centroids by dividing accumulated sums by counts. + * + * For clusters with zero count, the old centroid is preserved. + * + * @tparam DataT Data type + * @tparam IndexT Index type + * + * @param[in] handle RAFT resources handle + * @param[in] centroid_sums Accumulated weighted sums per cluster [n_clusters x n_features] + * @param[in] weight_per_cluster Sum of weights per cluster [n_clusters] + * @param[in] old_centroids Previous centroids (used for empty clusters) [n_clusters x + * n_features] + * @param[out] new_centroids Output centroids [n_clusters x n_features] + */ +template +void finalize_centroids(raft::resources const& handle, + raft::device_matrix_view centroid_sums, + raft::device_vector_view weight_per_cluster, + raft::device_matrix_view old_centroids, + raft::device_matrix_view new_centroids) +{ + cudaStream_t stream = raft::resource::get_cuda_stream(handle); + + raft::linalg::matrix_vector_op(handle, + raft::make_const_mdspan(centroid_sums), + weight_per_cluster, + new_centroids, + raft::div_checkzero_op{}); + + // For empty clusters (count == 0), copy old centroid back + cub::ArgIndexInputIterator itr_wt(weight_per_cluster.data_handle()); + raft::matrix::gather_if( + old_centroids.data_handle(), + static_cast(old_centroids.extent(1)), + static_cast(old_centroids.extent(0)), + itr_wt, + itr_wt, + static_cast(weight_per_cluster.size()), + new_centroids.data_handle(), + [=] __device__(raft::KeyValuePair map) { return map.value == DataT{0}; }, + raft::key_op{}, + stream); +} + +/** + * @brief Compute the squared norm difference between two centroid sets. + * + * Returns sum((old_centroids - new_centroids)^2). + * Used for convergence checking. + */ +template +DataT compute_centroid_shift(raft::resources const& handle, + raft::device_matrix_view old_centroids, + raft::device_matrix_view new_centroids) +{ + cudaStream_t stream = raft::resource::get_cuda_stream(handle); + auto sqrdNorm = raft::make_device_scalar(handle, DataT{0}); + raft::linalg::mapThenSumReduce(sqrdNorm.data_handle(), + old_centroids.size(), + raft::sqdiff_op{}, + stream, + old_centroids.data_handle(), + new_centroids.data_handle()); + DataT result = 0; + raft::copy(&result, sqrdNorm.data_handle(), 1, stream); + raft::resource::sync_stream(handle, stream); + return result; +} + } // namespace cuvs::cluster::kmeans::detail diff --git a/cpp/src/cluster/kmeans_fit_double.cu b/cpp/src/cluster/kmeans_fit_double.cu index 43f457a29a..d7e4748e33 100644 --- a/cpp/src/cluster/kmeans_fit_double.cu +++ b/cpp/src/cluster/kmeans_fit_double.cu @@ -1,8 +1,9 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ +#include "detail/kmeans_batched.cuh" #include "kmeans.cuh" #include "kmeans_impl.cuh" #include @@ -62,4 +63,17 @@ void fit(raft::resources const& handle, cuvs::cluster::kmeans::fit( handle, params, X, sample_weight, centroids, inertia, n_iter); } + +void fit(raft::resources const& handle, + const cuvs::cluster::kmeans::params& params, + raft::host_matrix_view X, + std::optional> sample_weight, + raft::device_matrix_view centroids, + raft::host_scalar_view inertia, + raft::host_scalar_view n_iter) +{ + cuvs::cluster::kmeans::detail::fit( + handle, params, X, sample_weight, centroids, inertia, n_iter); +} + } // namespace cuvs::cluster::kmeans diff --git a/cpp/src/cluster/kmeans_fit_float.cu b/cpp/src/cluster/kmeans_fit_float.cu index 5624151943..f86fabcfbd 100644 --- a/cpp/src/cluster/kmeans_fit_float.cu +++ b/cpp/src/cluster/kmeans_fit_float.cu @@ -1,8 +1,9 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ +#include "detail/kmeans_batched.cuh" #include "kmeans.cuh" #include "kmeans_impl.cuh" #include @@ -62,4 +63,17 @@ void fit(raft::resources const& handle, cuvs::cluster::kmeans::fit( handle, params, X, sample_weight, centroids, inertia, n_iter); } + +void fit(raft::resources const& handle, + const cuvs::cluster::kmeans::params& params, + raft::host_matrix_view X, + std::optional> sample_weight, + raft::device_matrix_view centroids, + raft::host_scalar_view inertia, + raft::host_scalar_view n_iter) +{ + cuvs::cluster::kmeans::detail::fit( + handle, params, X, sample_weight, centroids, inertia, n_iter); +} + } // namespace cuvs::cluster::kmeans diff --git a/cpp/tests/cluster/kmeans.cu b/cpp/tests/cluster/kmeans.cu index 576e6c1a48..1ef8d07623 100644 --- a/cpp/tests/cluster/kmeans.cu +++ b/cpp/tests/cluster/kmeans.cu @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -16,7 +17,6 @@ #include -#include #include #include @@ -263,10 +263,8 @@ class KmeansTest : public ::testing::TestWithParam> { d_sample_weight.resize(n_samples, stream); d_sw = std::make_optional( raft::make_device_vector_view(d_sample_weight.data(), n_samples)); - thrust::fill(thrust::cuda::par.on(stream), - d_sample_weight.data(), - d_sample_weight.data() + n_samples, - 1); + raft::matrix::fill( + handle, raft::make_device_vector_view(d_sample_weight.data(), n_samples), T(1)); } raft::copy(d_labels_ref.data(), labels.data_handle(), n_samples, stream); @@ -346,4 +344,248 @@ TEST_P(KmeansTestF, Result) { ASSERT_TRUE(score == 1.0); } INSTANTIATE_TEST_CASE_P(KmeansTests, KmeansTestF, ::testing::ValuesIn(inputsf2)); +// ============================================================================ +// Batched KMeans Tests (fit + predict with host data) +// ============================================================================ + +template +struct KmeansBatchedInputs { + int n_row; + int n_col; + int n_clusters; + T tol; + bool weighted; + int streaming_batch_size; +}; + +template +class KmeansFitBatchedTest : public ::testing::TestWithParam> { + protected: + KmeansFitBatchedTest() + : d_labels(0, raft::resource::get_cuda_stream(handle)), + d_labels_ref(0, raft::resource::get_cuda_stream(handle)), + d_centroids(0, raft::resource::get_cuda_stream(handle)), + d_centroids_ref(0, raft::resource::get_cuda_stream(handle)) + { + } + + void fitBatchedTest() + { + testparams = ::testing::TestWithParam>::GetParam(); + + int n_samples = testparams.n_row; + int n_features = testparams.n_col; + params.n_clusters = testparams.n_clusters; + params.tol = testparams.tol; + params.rng_state.seed = 1; + params.oversampling_factor = 0; + + auto stream = raft::resource::get_cuda_stream(handle); + auto X = raft::make_device_matrix(handle, n_samples, n_features); + auto labels = raft::make_device_vector(handle, n_samples); + + raft::random::make_blobs(X.data_handle(), + labels.data_handle(), + n_samples, + n_features, + params.n_clusters, + stream, + true, + nullptr, + nullptr, + T(1.0), + false, + (T)-10.0f, + (T)10.0f, + (uint64_t)1234); + + // Copy X to host for batched API + std::vector h_X(n_samples * n_features); + raft::update_host(h_X.data(), X.data_handle(), n_samples * n_features, stream); + raft::resource::sync_stream(handle, stream); + + d_labels.resize(n_samples, stream); + d_labels_ref.resize(n_samples, stream); + d_centroids.resize(params.n_clusters * n_features, stream); + d_centroids_ref.resize(params.n_clusters * n_features, stream); + raft::copy(d_labels_ref.data(), labels.data_handle(), n_samples, stream); + + raft::random::RngState rng(params.rng_state.seed); + raft::random::uniform( + handle, rng, d_centroids.data(), params.n_clusters * n_features, T(-1), T(1)); + raft::copy(d_centroids_ref.data(), d_centroids.data(), params.n_clusters * n_features, stream); + + auto h_X_view = + raft::make_host_matrix_view(h_X.data(), n_samples, n_features); + auto d_centroids_view = + raft::make_device_matrix_view(d_centroids.data(), params.n_clusters, n_features); + + std::optional> d_sw = std::nullopt; + rmm::device_uvector d_sample_weight(0, stream); + if (testparams.weighted) { + d_sample_weight.resize(n_samples, stream); + d_sw = std::make_optional( + raft::make_device_vector_view(d_sample_weight.data(), n_samples)); + raft::matrix::fill( + handle, raft::make_device_vector_view(d_sample_weight.data(), n_samples), T(1)); + } + + auto d_centroids_ref_view = + raft::make_device_matrix_view(d_centroids_ref.data(), params.n_clusters, n_features); + + params.init = cuvs::cluster::kmeans::params::Array; + params.inertia_check = true; + params.max_iter = 20; + + T ref_inertia = 0; + int ref_n_iter = 0; + cuvs::cluster::kmeans::fit(handle, + params, + raft::make_const_mdspan(X.view()), + d_sw, + d_centroids_ref_view, + raft::make_host_scalar_view(&ref_inertia), + raft::make_host_scalar_view(&ref_n_iter)); + + cuvs::cluster::kmeans::params batched_params = params; + batched_params.inertia_check = true; + batched_params.streaming_batch_size = testparams.streaming_batch_size; + + std::optional> h_sw = std::nullopt; + std::vector h_sample_weight; + if (testparams.weighted) { + h_sample_weight.resize(n_samples, T(1)); + h_sw = std::make_optional( + raft::make_host_vector_view(h_sample_weight.data(), n_samples)); + } + + T inertia = 0; + int64_t n_iter = 0; + + cuvs::cluster::kmeans::fit(handle, + batched_params, + h_X_view, + h_sw, + d_centroids_view, + raft::make_host_scalar_view(&inertia), + raft::make_host_scalar_view(&n_iter)); + + raft::resource::sync_stream(handle, stream); + + centroids_match = devArrMatch(d_centroids_ref.data(), + d_centroids.data(), + params.n_clusters, + n_features, + CompareApprox(T(1e-2)), + stream); + + T ref_pred_inertia = 0; + cuvs::cluster::kmeans::predict( + handle, + params, + raft::make_const_mdspan(X.view()), + d_sw, + raft::make_device_matrix_view( + d_centroids_ref.data(), params.n_clusters, n_features), + raft::make_device_vector_view(d_labels_ref.data(), n_samples), + true, + raft::make_host_scalar_view(&ref_pred_inertia)); + + T pred_inertia = 0; + cuvs::cluster::kmeans::predict( + handle, + params, + raft::make_const_mdspan(X.view()), + d_sw, + raft::make_device_matrix_view( + d_centroids.data(), params.n_clusters, n_features), + raft::make_device_vector_view(d_labels.data(), n_samples), + true, + raft::make_host_scalar_view(&pred_inertia)); + + raft::resource::sync_stream(handle, stream); + + score = raft::stats::adjusted_rand_index( + d_labels_ref.data(), d_labels.data(), n_samples, raft::resource::get_cuda_stream(handle)); + + if (score < 0.99) { + std::stringstream ss; + ss << "Expected: " << raft::arr2Str(d_labels_ref.data(), 25, "d_labels_ref", stream); + std::cout << (ss.str().c_str()) << '\n'; + ss.str(std::string()); + ss << "Actual: " << raft::arr2Str(d_labels.data(), 25, "d_labels", stream); + std::cout << (ss.str().c_str()) << '\n'; + std::cout << "Score = " << score << '\n'; + } + + inertia_match = (std::abs(ref_inertia - inertia) < T(1e-2) * std::abs(ref_inertia)); + + if (!inertia_match) { + std::cout << "Inertia mismatch: ref=" << ref_inertia << " batched=" << inertia << '\n'; + } + } + + void SetUp() override { fitBatchedTest(); } + + protected: + raft::resources handle; + KmeansBatchedInputs testparams; + rmm::device_uvector d_labels; + rmm::device_uvector d_labels_ref; + rmm::device_uvector d_centroids; + rmm::device_uvector d_centroids_ref; + double score; + testing::AssertionResult centroids_match = testing::AssertionSuccess(); + bool inertia_match = false; + cuvs::cluster::kmeans::params params; +}; + +// ============================================================================ +// Test inputs for batched tests +// ============================================================================ + +const std::vector> batched_inputsf2 = { + {1000, 32, 5, 0.0001f, true, 256}, + {1000, 64, 5, 0.0001f, false, 500}, + {1000, 100, 20, 0.0001f, true, 30}, + {1000, 10, 20, 0.0001f, false, 30}, + {10000, 16, 10, 0.0001f, true, 1000}, + {10000, 96, 10, 0.0001f, false, 10000}, +}; + +const std::vector> batched_inputsd2 = { + {1000, 32, 5, 0.0001, true, 256}, + {1000, 64, 5, 0.0001, false, 500}, + {1000, 100, 20, 0.0001, true, 30}, + {1000, 10, 20, 0.0001, false, 30}, + {10000, 16, 10, 0.0001, true, 1000}, + {10000, 96, 10, 0.0001, false, 10000}, +}; + +// ============================================================================ +// fit (host/batched) tests +// ============================================================================ +typedef KmeansFitBatchedTest KmeansFitBatchedTestF; +typedef KmeansFitBatchedTest KmeansFitBatchedTestD; + +TEST_P(KmeansFitBatchedTestF, Result) +{ + ASSERT_TRUE(centroids_match); + ASSERT_TRUE(score >= 0.99); + ASSERT_TRUE(inertia_match); +} + +TEST_P(KmeansFitBatchedTestD, Result) +{ + ASSERT_TRUE(centroids_match); + ASSERT_TRUE(score >= 0.99); + ASSERT_TRUE(inertia_match); +} + +INSTANTIATE_TEST_CASE_P(KmeansFitBatchedTests, + KmeansFitBatchedTestF, + ::testing::ValuesIn(batched_inputsf2)); +INSTANTIATE_TEST_CASE_P(KmeansFitBatchedTests, + KmeansFitBatchedTestD, + ::testing::ValuesIn(batched_inputsd2)); } // namespace cuvs diff --git a/python/cuvs/cuvs/cluster/kmeans/kmeans.pxd b/python/cuvs/cuvs/cluster/kmeans/kmeans.pxd index 9f16d46c4d..6d0c878660 100644 --- a/python/cuvs/cuvs/cluster/kmeans/kmeans.pxd +++ b/python/cuvs/cuvs/cluster/kmeans/kmeans.pxd @@ -4,7 +4,7 @@ # # cython: language_level=3 -from libc.stdint cimport uintptr_t +from libc.stdint cimport int64_t, uintptr_t from libcpp cimport bool from cuvs.common.c_api cimport cuvsError_t, cuvsResources_t @@ -33,6 +33,7 @@ cdef extern from "cuvs/cluster/kmeans.h" nogil: int batch_samples, int batch_centroids, bool inertia_check, + int64_t streaming_batch_size, bool hierarchical, int hierarchical_n_iters diff --git a/python/cuvs/cuvs/cluster/kmeans/kmeans.pyx b/python/cuvs/cuvs/cluster/kmeans/kmeans.pyx index 489d983ac7..b267c908c9 100644 --- a/python/cuvs/cuvs/cluster/kmeans/kmeans.pyx +++ b/python/cuvs/cuvs/cluster/kmeans/kmeans.pyx @@ -1,5 +1,5 @@ # -# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION. +# SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION. # SPDX-License-Identifier: Apache-2.0 # # cython: language_level=3 @@ -70,6 +70,23 @@ cdef class KMeansParams: Number of instance k-means algorithm will be run with different seeds oversampling_factor : double Oversampling factor for use in the k-means|| algorithm + batch_samples : int + Number of samples to process in each batch for tiled 1NN computation. + Useful to optimize/control memory footprint. Default tile is + [batch_samples x n_clusters]. + batch_centroids : int + Number of centroids to process in each batch. If 0, uses n_clusters. + inertia_check : bool + If True, check inertia during iterations for early convergence. + streaming_batch_size : int + Number of samples to process per GPU batch when fitting with host + (numpy) data. When set to 0, defaults to n_samples (process all + at once). Only used by the batched (host-data) code path. Reducing + streaming_batch_size can help reduce GPU memory pressure but increases + overhead as the number of times centroid adjustments are computed + increases. + + Default: 0 (process all data at once). hierarchical : bool Whether to use hierarchical (balanced) kmeans or not hierarchical_n_iters : int @@ -92,6 +109,10 @@ cdef class KMeansParams: tol=None, n_init=None, oversampling_factor=None, + batch_samples=None, + batch_centroids=None, + inertia_check=None, + streaming_batch_size=None, hierarchical=None, hierarchical_n_iters=None): if metric is not None: @@ -109,6 +130,14 @@ cdef class KMeansParams: self.params.n_init = n_init if oversampling_factor is not None: self.params.oversampling_factor = oversampling_factor + if batch_samples is not None: + self.params.batch_samples = batch_samples + if batch_centroids is not None: + self.params.batch_centroids = batch_centroids + if inertia_check is not None: + self.params.inertia_check = inertia_check + if streaming_batch_size is not None: + self.params.streaming_batch_size = streaming_batch_size if hierarchical is not None: self.params.hierarchical = hierarchical if hierarchical_n_iters is not None: @@ -145,6 +174,22 @@ cdef class KMeansParams: def oversampling_factor(self): return self.params.oversampling_factor + @property + def batch_samples(self): + return self.params.batch_samples + + @property + def batch_centroids(self): + return self.params.batch_centroids + + @property + def inertia_check(self): + return self.params.inertia_check + + @property + def streaming_batch_size(self): + return self.params.streaming_batch_size + @property def hierarchical(self): return self.params.hierarchical @@ -165,16 +210,27 @@ def fit( """ Find clusters with the k-means algorithm + When X is a device array (CUDA array interface), standard on-device + k-means is used. When X is a host array (numpy ndarray or + ``__array_interface__``), data is streamed to the GPU in batches + controlled by ``params.streaming_batch_size``. For large host datasets, consider + reducing ``streaming_batch_size`` to reduce GPU memory usage. + Parameters ---------- params : KMeansParams - Parameters to use to fit KMeans model - X : Input CUDA array interface compliant matrix shape (m, k) + Parameters to use to fit KMeans model. For host data, + ``params.streaming_batch_size`` controls how many samples are sent to the + GPU per batch. + X : array-like + Training instances, shape (m, k). Accepts both device arrays + (cupy / CUDA array interface) and host arrays (numpy). centroids : Optional writable CUDA array interface compliant matrix shape (n_clusters, k) - sample_weights : Optional input CUDA array interface compliant matrix shape - (n_clusters, 1) default: None + sample_weights : Optional weights per observation. Must reside on + the same memory space as X (device or host). + default: None {resources_docstring} Returns @@ -202,10 +258,49 @@ def fit( >>> params = KMeansParams(n_clusters=n_clusters) >>> centroids, inertia, n_iter = fit(params, X) + + Host-data (batched) example: + + >>> import numpy as np + >>> X_host = np.random.random((10_000_000, 128)).astype(np.float32) + >>> params = KMeansParams(n_clusters=1000, streaming_batch_size=1_000_000) + >>> centroids, inertia, n_iter = fit(params, X_host) """ + is_host = isinstance(X, np.ndarray) or ( + hasattr(X, '__array_interface__') and + not hasattr(X, '__cuda_array_interface__') + ) + + if sample_weights is not None: + is_sample_weight_host = isinstance(sample_weights, np.ndarray) or ( + hasattr(sample_weights, '__array_interface__') and + not hasattr(sample_weights, '__cuda_array_interface__') + ) + if is_host != is_sample_weight_host: + raise ValueError( + "X and sample_weights must have the same memory residency " + "(both host or both device). X is {}, sample_weights is {}.".format( + "host" if is_host else "device", + "host" if is_sample_weight_host else "device" + ) + ) + + if is_host: + if not isinstance(X, np.ndarray): + X = np.asarray(X) + if not X.flags['C_CONTIGUOUS']: + raise ValueError("X must have C contiguous layout") + if sample_weights is not None: + if not isinstance(sample_weights, np.ndarray): + sample_weights = np.asarray(sample_weights) + if not sample_weights.flags['C_CONTIGUOUS']: + raise ValueError("sample_weights must have C contiguous layout") + x_ai = wrap_array(X) - _check_input_array(x_ai, [np.dtype('float32'), np.dtype('float64')]) + _check_input_array( + x_ai, [np.dtype('float32'), np.dtype('float64')] + ) cdef cydlpack.DLManagedTensor* x_dlpack = cydlpack.dlpack_c(x_ai) cdef cydlpack.DLManagedTensor* sample_weight_dlpack = NULL @@ -216,15 +311,17 @@ def fit( cdef int n_iter = 0 if centroids is None: - centroids = device_ndarray.empty((params.n_clusters, x_ai.shape[1]), - dtype=x_ai.dtype) + centroids = device_ndarray.empty( + (params.n_clusters, x_ai.shape[1]), dtype=x_ai.dtype + ) centroids_ai = wrap_array(centroids) - cdef cydlpack.DLManagedTensor * centroids_dlpack = \ + cdef cydlpack.DLManagedTensor* centroids_dlpack = \ cydlpack.dlpack_c(centroids_ai) if sample_weights is not None: - sample_weight_dlpack = cydlpack.dlpack_c(wrap_array(sample_weights)) + sample_weight_dlpack = \ + cydlpack.dlpack_c(wrap_array(sample_weights)) with cuda_interruptible(): check_cuvs(cuvsKMeansFit( diff --git a/python/cuvs/cuvs/tests/test_kmeans.py b/python/cuvs/cuvs/tests/test_kmeans.py index 6f18137b13..210ae06b80 100644 --- a/python/cuvs/cuvs/tests/test_kmeans.py +++ b/python/cuvs/cuvs/tests/test_kmeans.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. +# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. # SPDX-License-Identifier: Apache-2.0 # @@ -6,7 +6,12 @@ import pytest from pylibraft.common import device_ndarray -from cuvs.cluster.kmeans import KMeansParams, cluster_cost, fit, predict +from cuvs.cluster.kmeans import ( + KMeansParams, + cluster_cost, + fit, + predict, +) from cuvs.distance import pairwise_distance @@ -69,3 +74,68 @@ def test_cluster_cost(n_rows, n_cols, n_clusters, dtype): # need reduced tolerance for float32 tol = 1e-3 if dtype == np.float32 else 1e-6 assert np.allclose(inertia, sum(cluster_distances), rtol=tol, atol=tol) + + +@pytest.mark.parametrize("n_rows", [1000, 5000]) +@pytest.mark.parametrize("n_cols", [10, 100]) +@pytest.mark.parametrize("n_clusters", [8, 16]) +@pytest.mark.parametrize("streaming_batch_size", [0, 100, 239, 500]) +@pytest.mark.parametrize("dtype", [np.float64]) +@pytest.mark.parametrize("weighted", [False, True]) +def test_fit_host_matches_fit_device( + n_rows, n_cols, n_clusters, streaming_batch_size, dtype, weighted +): + """ + Test that fit() with host (numpy) data produces the same centroids as + fit() with device data, when given the same initial centroids. + Optionally tests with non-uniform sample weights. + """ + rng = np.random.default_rng(99) + X_host = rng.random((n_rows, n_cols)).astype(dtype) + + centroid_indices = rng.choice(n_rows, size=n_clusters, replace=False) + initial_centroids_host = X_host[centroid_indices].copy() + + if weighted: + sample_weights_host = rng.uniform(0.5, 2.0, size=n_rows).astype(dtype) + sample_weights_device = device_ndarray(sample_weights_host) + else: + sample_weights_host = None + sample_weights_device = None + + params_device = KMeansParams( + n_clusters=n_clusters, + init_method="Array", + max_iter=20, + tol=1e-10, + ) + centroids_regular, inertia_regular, _ = fit( + params_device, + device_ndarray(X_host), + device_ndarray(initial_centroids_host.copy()), + sample_weights=sample_weights_device, + ) + centroids_regular = centroids_regular.copy_to_host() + + params_host = KMeansParams( + n_clusters=n_clusters, + init_method="Array", + max_iter=20, + tol=1e-10, + streaming_batch_size=streaming_batch_size, + ) + centroids_batched, inertia_batched, _ = fit( + params_host, + X_host, + centroids=device_ndarray(initial_centroids_host.copy()), + sample_weights=sample_weights_host, + ) + centroids_batched = centroids_batched.copy_to_host() + + assert np.allclose( + centroids_regular, centroids_batched, rtol=1e-3, atol=1e-3 + ), f"max diff: {np.max(np.abs(centroids_regular - centroids_batched))}" + + assert np.allclose( + inertia_regular, inertia_batched, rtol=1e-3, atol=1e-3 + ), f"max diff: {np.max(np.abs(inertia_regular - inertia_batched))}" From 6e688b35150e485fae994bbac0c65ad7b81bf6e4 Mon Sep 17 00:00:00 2001 From: Divye Gala Date: Wed, 1 Apr 2026 02:08:04 +0200 Subject: [PATCH 3/4] Null JIT kernel launch config (#1974) This is to ensure the config does not accidentally get corrupted or start with garbage values. Furthermore, the CUDA [docs](https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__EXECUTION.html#group__CUDART__EXECUTION_1ge236ecdbbaf7cf47a806bba71c1d03c4) recommend setting `config.attrs = nullptr` even if `config.numAttrs = 0`. The need of this update comes from https://github.com/rapidsai/cuml/issues/7906, where in cuML nightly wheel tests we are intermittently observing CUDA context corruption from the JIT path. While I am not sure if this PR will resolve them, it is still a step in the right direction. Authors: - Divye Gala (https://github.com/divyegala) Approvers: - Dante Gama Dessavre (https://github.com/dantegd) URL: https://github.com/rapidsai/cuvs/pull/1974 --- cpp/src/detail/jit_lto/AlgorithmLauncher.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/detail/jit_lto/AlgorithmLauncher.cpp b/cpp/src/detail/jit_lto/AlgorithmLauncher.cpp index 55273979ad..ed1853db93 100644 --- a/cpp/src/detail/jit_lto/AlgorithmLauncher.cpp +++ b/cpp/src/detail/jit_lto/AlgorithmLauncher.cpp @@ -37,12 +37,13 @@ AlgorithmLauncher& AlgorithmLauncher::operator=(AlgorithmLauncher&& other) noexc void AlgorithmLauncher::call( cudaStream_t stream, dim3 grid, dim3 block, std::size_t shared_mem, void** kernel_args) { - cudaLaunchConfig_t config; + cudaLaunchConfig_t config{}; config.gridDim = grid; config.blockDim = block; config.stream = stream; config.dynamicSmemBytes = shared_mem; config.numAttrs = 0; + config.attrs = NULL; RAFT_CUDA_TRY(cudaLaunchKernelExC(&config, kernel, kernel_args)); } From 745191e8dfd28f2d5b5a991c67a2e68ef48a1123 Mon Sep 17 00:00:00 2001 From: Tarang Jain <40517122+tarang-jain@users.noreply.github.com> Date: Tue, 31 Mar 2026 23:11:13 -0700 Subject: [PATCH 4/4] [BUG] Fix Vamana Serialization (#1966) - Add sync_stream for D2H copies of the graph - strided copy to copy the strided dataset Authors: - Tarang Jain (https://github.com/tarang-jain) Approvers: - Ben Karsin (https://github.com/bkarsin) - Jinsol Park (https://github.com/jinsolp) - Corey J. Nolet (https://github.com/cjnolet) URL: https://github.com/rapidsai/cuvs/pull/1966 --- .../detail/vamana/vamana_serialize.cuh | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/cpp/src/neighbors/detail/vamana/vamana_serialize.cuh b/cpp/src/neighbors/detail/vamana/vamana_serialize.cuh index 887c9eb448..68ce334cd2 100644 --- a/cpp/src/neighbors/detail/vamana/vamana_serialize.cuh +++ b/cpp/src/neighbors/detail/vamana/vamana_serialize.cuh @@ -16,6 +16,7 @@ #include #include #include +#include #include "../dataset_serialize.hpp" @@ -65,13 +66,19 @@ void serialize_dataset(raft::resources const& res, const auto* strided_dataset = dynamic_cast*>(dataset); if (strided_dataset) { - auto h_dataset = - raft::make_host_matrix(strided_dataset->n_rows(), strided_dataset->dim()); - raft::copy(res, - raft::make_host_vector_view(h_dataset.data_handle(), - strided_dataset->n_rows() * strided_dataset->dim()), - raft::make_device_vector_view(strided_dataset->view().data_handle(), - strided_dataset->n_rows() * strided_dataset->dim())); + auto nrows = strided_dataset->n_rows(); + auto dim = strided_dataset->dim(); + auto stride = strided_dataset->stride(); + auto d_data = strided_dataset->view(); + auto h_dataset = raft::make_host_matrix(nrows, dim); + raft::copy_matrix(h_dataset.data_handle(), + dim, + d_data.data_handle(), + stride, + dim, + nrows, + raft::resource::get_cuda_stream(res)); + raft::resource::sync_stream(res); to_file(dataset_base_file, h_dataset); } else { RAFT_LOG_DEBUG("dynamic_cast to strided_dataset failed"); @@ -91,6 +98,7 @@ void serialize_dataset(raft::resources const& res, try { auto h_dataset = raft::make_host_matrix(dataset.extent(0), dataset.extent(1)); raft::copy(res, h_dataset.view(), dataset); + raft::resource::sync_stream(res); to_file(dataset_base_file, h_dataset); } catch (std::bad_alloc& e) { RAFT_LOG_INFO("Failed to serialize dataset");