diff --git a/README.md b/README.md index f54e6fa..d9d0ee0 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,22 @@ # RAPIDS, Dask, and XGBoost -## Dask -[Dask](https://dask.org "Dask: Scalable Analytics in Python") is a framework for flexibly scaling computation in Python. +Dask and XGboost are two of the most popular open source packages for data +processing and machine learning, respectively. RAPIDS augments them to support +accelerated computation across many GPUs on one or several nodes. -It implements a distributed Pandas DataFrame to elastically scale over many workers. +RAPIDS integrates cuDF, a GPU-based dataframe implementation, with a CUDA-aware +version of Dask to accelerate data loading and preparation. These can pass data +seamlessly into XGBoost for model training, while reducing the amount of +unncessary data copying. + +# + +## Dask [Dask](https://dask.org "Dask: Scalable Analytics in Python") is a +framework for flexibly scaling computational graphs in Python on a single node +or across a cluster. + +It implements several distributed datastructures - most importantly a +distributed, Pandas-style DataFrame that elastically scales over many workers. ```python import dask.dataframe as dd @@ -15,16 +28,14 @@ df.head() 1 2 b 2 3 c 3 4 a - 4 5 b - 5 6 c df2 = df[df.y == 'a'].x + 1 ``` ## cuDF -[RAPIDS](https://rapids.ai "RAPIDS: Open GPU Data Science") is a collection of open source software libraries aimed at accelerating data science applications end-to-end. +[RAPIDS](https://rapids.ai "RAPIDS: Open GPU Data Science") is a collection of open source software libraries aimed at accelerating data science applications end-to-end with GPUs. -Much like Pandas, users can read in data, and perform various analytical tasks in parity with Pandas. +Using a Pandas-compatible API, users can read in data, and perform various analytical tasks on data on GPU. ```python import cudf @@ -40,10 +51,24 @@ for column in gdf.columns: The [RAPIDS Fork of XGBoost](https://github.com/rapidsai/xgboost "RAPIDS XGBoost") enables XGBoost with cuDF: a user may directly pass a cuDF object into XGBoost for training, prediction, etc. ## Dask-XGBoost -The [RAPIDS Fork of Dask-XGBoost](https://github.com/rapidsai/dask-xgboost/ "RAPIDS Dask-XGBoost") enables XGBoost with the distributed CUDA DataFrame via Dask-cuDF. A user may pass Dask-XGBoost a reference to a distributed cuDF object, and start a training session over an entire cluster from Python. -# Using Dask-cuDF -A user may instantiate a Dask-cuDF cluster like this: +The [RAPIDS Fork of Dask-XGBoost](https://github.com/rapidsai/dask-xgboost/ +"RAPIDS Dask-XGBoost") enables XGBoost with the distributed CUDA DataFrame via +Dask-cuDF. A user may pass Dask-XGBoost a reference to a distributed cuDF +object, and start a training session over an entire cluster from Python. + +# Building a Dask-cuDF-XGBoost pipeline + +Training an XGboost model across multiple GPUs with Dask-XGBoost requires 3 main steps: + (1) Create a CUDA-aware Dask instance, with one worker per GPU + (2) Load data into a Dask-CuDF dataframe, distributed across GPUs + (3) Call XGboost's distributed training functions + + +## (1) Create a CUDA-aware Dask instance + +Setting up the Dask instance to use all GPUs on a single machine is +straightforward: ```python import cudf @@ -56,36 +81,45 @@ from dask_cuda import LocalCUDACluster import subprocess +## XXX: Can we do socket.gethostbyname(socket.gethostname()) or something like that? Seems a little distracting +# First, find our host's IP address cmd = "hostname --all-ip-addresses" process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE) output, error = process.communicate() IPADDR = str(output.decode()).split()[0] +# Now launch a Dask-CUDA cluster on the local host cluster = LocalCUDACluster(ip=IPADDR) client = Client(cluster) client ``` -Note the use of `from dask_cuda import LocalCUDACluster`. [Dask-CUDA](https://github.com/rapidsai/dask-cuda) is a lightweight set of utilities useful for setting up a Dask cluster. These calls instantiate a Dask-cuDF cluster in a single node environment. To instantiate a multi-node Dask-cuDF cluster, a user must use `dask-scheduler` and `dask-cuda-worker`. +Note the use of `from dask_cuda import LocalCUDACluster`. [Dask-CUDA](https://github.com/rapidsai/dask-cuda) is a lightweight set of utilities useful for setting up a Dask cluster. These calls instantiate a Dask-cuDF cluster in a single node environment. To instantiate a multi-node Dask-cuDF cluster, a user must use `dask-scheduler` and `dask-cuda-worker`. See the [Dask distributed documentation][http://distributed.dask.org/en/latest/setup.html] for more details. -Once a `client` is available, a user may commission the client to perform tasks: +Once a `client` is available, a user may use the client to run tasks across the workers: ```python -def foo(): - return +def do_computation(): + return 1 -client.run(foo) +client.run(do_computation) ``` -Alternatively, a user may employ `dask_cudf`: +Alternatively, a user may employ `dask_cudf` to distribute both data and computation over the workers: + +## _TODO: Not necessarily in this chunk, but I think one of the big missing pieces to me is understanding how +## dask_cudf distributes data across the workers / nodes / GPUs. I think this will be a common question for +## customers too_ +## +## _Would it be better to show an example that uses dask to do the data loading?_ ```python pdf = pd.DataFrame( - {"x": np.random.randint(0, 5, size=10000), "y": np.random.normal(size=10000)}) + {"x": np.random.randint(0, 5, size=10000), + "y": np.random.normal(size=10000)}) # Construct Pandas structure in CPU memory -gdf = cudf.DataFrame.from_pandas(pdf) - -ddf = dask_cudf.from_cudf(gdf, npartitions=5) +gdf = cudf.DataFrame.from_pandas(pdf) # CuDF transfers the data to local GPU memory (??) +ddf = dask_cudf.from_cudf(gdf, npartitions=5) # Dask-CuDF distributes the data across all GPU workers (??) ddf = ddf.groupby("x").mean() ``` @@ -96,7 +130,7 @@ There are two functions of interest with Dask-XGBoost: 1. `dask_xgboost.train` 2. `dask_xgboost.predict` -The documentation for `dask_xgboost.train` is this: +The documentation for `dask_xgboost.train` provides the clearest explanation: ```python help(dask_xgboost.train) @@ -140,7 +174,7 @@ params = { 'num_rounds': 100, 'max_depth': 8, 'max_leaves': 2**8, - 'n_gpus': 1, + 'n_gpus': 1, # This will set the number of GPUs, per-worker 'tree_method': 'gpu_hist', 'objective': 'reg:squarederror', 'grow_policy': 'lossguide' @@ -203,4 +237,8 @@ rmse = np.sqrt(test.squared_error.mean().compute()) 2. `bst`: the Booster produced by the XGBoost training session 3. `x_test`: an instance of `dask_cudf.DataFrame` containing the data to be inferenced (acquire predictions) -`pred` will be an instance of `dask_cudf.Series`. We can use `dask.dataframe.multi.concat` to construct a `dask_cudf.DataFrame` by concatenating the list of `dask_cudf.Series` instances (`[pred]`). `test` is a `dask_cudf.DataFrame` object with a single column named `0` (e.g.) `test[0]` returns `pred`. Additionally, the root-mean-squared-error (RMSE) can be computed by constructing a new column and assigning to it the value of the difference between predicted and labeled values squared. This is encoded in the assignment `test['squared_error'] = (test[0] - y_test['x'])**2`. Finally, the mean can be computed by using an aggregator from the `dask_cudf` API. The entire computation is initiated via `.compute()`; finally, we take the square-root of the result, leaving us with `rmse = np.sqrt(test.squared_error.mean().compute())`. Note: `.squared_error` is an accessor for `test[squared_error]`. \ No newline at end of file +`pred` will be an instance of `dask_cudf.Series`. We can use `dask.dataframe.multi.concat` to construct a `dask_cudf.DataFrame` by concatenating the list of `dask_cudf.Series` instances (`[pred]`). `test` is a `dask_cudf.DataFrame` object with a single column named `0` (e.g.) `test[0]` returns `pred`. + +# This is a bit confusing... maybe just needs a link to dask docs explaining the use of compute? Wouldn't be bad to mention +# lazy-computation here +Additionally, the root-mean-squared-error (RMSE) can be computed by constructing a new column and assigning to it the value of the difference between predicted and labeled values squared. This is encoded in the assignment `test['squared_error'] = (test[0] - y_test['x'])**2`. Finally, the mean can be computed by using an aggregator from the `dask_cudf` API. The entire computation is initiated via `.compute()`; finally, we take the square-root of the result, leaving us with `rmse = np.sqrt(test.squared_error.mean().compute())`. Note: `.squared_error` is an accessor for `test[squared_error]`.