Skip to content

Commit 0f6e389

Browse files
marijaselakovicamotl
authored andcommitted
Dask: Index page and starter tutorial
1 parent ccf247f commit 0f6e389

File tree

3 files changed

+238
-37
lines changed

3 files changed

+238
-37
lines changed

docs/connect/df/index.md

Lines changed: 2 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -6,39 +6,10 @@
66

77
How to use CrateDB together with popular open-source DataFrame libraries.
88

9-
(dask)=
109
## Dask
11-
12-
:::{rubric} About
13-
:::
14-
[Dask] is a parallel computing library for analytics with task scheduling.
15-
It is built on top of the Python programming language, making it easy to scale
16-
the Python libraries that you know and love, like NumPy, pandas, and scikit-learn.
17-
18-
```{div}
19-
:style: "float: right"
20-
[![](https://github.com/crate/crate-clients-tools/assets/453543/99bd2234-c501-479b-ade7-bcc2bfc1f288){w=180px}](https://www.dask.org/)
21-
```
22-
23-
- [Dask DataFrames] help you process large tabular data by parallelizing pandas,
24-
either on your laptop for larger-than-memory computing, or on a distributed
25-
cluster of computers.
26-
27-
- [Dask Futures], implementing a real-time task framework, allow you to scale
28-
generic Python workflows across a Dask cluster with minimal code changes,
29-
by extending Python's `concurrent.futures` interface.
30-
31-
```{div}
32-
:style: "clear: both"
33-
```
34-
35-
:::{rubric} Learn
10+
:::{seealso}
11+
Please navigate to the dedicated page about {ref}`dask`.
3612
:::
37-
- [Guide to efficient data ingestion to CrateDB with pandas and Dask]
38-
- [Efficient batch/bulk INSERT operations with pandas, Dask, and SQLAlchemy]
39-
- [Import weather data using Dask]
40-
- [Dask code examples]
41-
4213

4314
(pandas)=
4415
## pandas
@@ -93,16 +64,10 @@ Please navigate to the dedicated page about {ref}`polars`.
9364

9465

9566
[Apache Arrow]: https://arrow.apache.org/
96-
[Dask]: https://www.dask.org/
97-
[Dask DataFrames]: https://docs.dask.org/en/latest/dataframe.html
98-
[Dask Futures]: https://docs.dask.org/en/latest/futures.html
9967
[pandas]: https://pandas.pydata.org/
10068

101-
[Dask code examples]: https://github.com/crate/cratedb-examples/tree/main/by-dataframe/dask
10269
[Efficient batch/bulk INSERT operations with pandas, Dask, and SQLAlchemy]: https://cratedb.com/docs/python/en/latest/by-example/sqlalchemy/dataframe.html
10370
[From data storage to data analysis: Tutorial on CrateDB and pandas]: https://community.cratedb.com/t/from-data-storage-to-data-analysis-tutorial-on-cratedb-and-pandas/1440
10471
[Guide to efficient data ingestion to CrateDB with pandas]: https://community.cratedb.com/t/guide-to-efficient-data-ingestion-to-cratedb-with-pandas/1541
105-
[Guide to efficient data ingestion to CrateDB with pandas and Dask]: https://community.cratedb.com/t/guide-to-efficient-data-ingestion-to-cratedb-with-pandas-and-dask/1482
106-
[Import weather data using Dask]: https://github.com/crate/cratedb-examples/blob/main/topic/timeseries/dask-weather-data-import.ipynb
10772
[Importing Parquet files into CrateDB using Apache Arrow and SQLAlchemy]: https://community.cratedb.com/t/importing-parquet-files-into-cratedb-using-apache-arrow-and-sqlalchemy/1161
10873
[pandas code examples]: https://github.com/crate/cratedb-examples/tree/main/by-dataframe/pandas

docs/integrate/dask/index.md

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
(dask)=
2+
# Dask
3+
4+
:::{rubric} About
5+
:::
6+
[Dask] is a parallel computing library for analytics with task scheduling.
7+
It is built on top of the Python programming language, making it easy to scale
8+
the Python libraries that you know and love, like NumPy, pandas, and scikit-learn.
9+
10+
```{div}
11+
:style: "float: right"
12+
[![](https://github.com/crate/crate-clients-tools/assets/453543/99bd2234-c501-479b-ade7-bcc2bfc1f288){w=180px}](https://www.dask.org/)
13+
```
14+
15+
- [Dask DataFrames] help you process large tabular data by parallelizing pandas,
16+
either on your laptop for larger-than-memory computing, or on a distributed
17+
cluster of computers.
18+
19+
- [Dask Futures], implementing a real-time task framework, allow you to scale
20+
generic Python workflows across a Dask cluster with minimal code changes,
21+
by extending Python's `concurrent.futures` interface.
22+
23+
```{div}
24+
:style: "clear: both"
25+
```
26+
27+
:::{rubric} Learn
28+
:::
29+
- {ref}`dask-tutorial`
30+
- [Efficient batch/bulk INSERT operations with pandas, Dask, and SQLAlchemy]
31+
- [Import weather data using Dask]
32+
- [Dask code examples]
33+
34+
35+
:::{toctree}
36+
:maxdepth: 1
37+
:hidden:
38+
Tutorial <tutorial>
39+
:::
40+
41+
42+
[Dask]: https://www.dask.org/
43+
[Dask code examples]: https://github.com/crate/cratedb-examples/tree/main/by-dataframe/dask
44+
[Dask DataFrames]: https://docs.dask.org/en/latest/dataframe.html
45+
[Dask Futures]: https://docs.dask.org/en/latest/futures.html
46+
[Efficient batch/bulk INSERT operations with pandas, Dask, and SQLAlchemy]: https://cratedb.com/docs/python/en/latest/by-example/sqlalchemy/dataframe.html
47+
[Import weather data using Dask]: https://github.com/crate/cratedb-examples/blob/main/topic/timeseries/dask-weather-data-import.ipynb

docs/integrate/dask/tutorial.md

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
(dask-tutorial)=
2+
# Guide to efficient data ingestion to CrateDB with Dask
3+
4+
## Introduction
5+
Dask is a parallel computing library that enables distributed computing for tasks such as data processing and machine learning. In this tutorial, we'll explore how to leverage the power of CrateDB, a distributed SQL database, in conjunction with Dask, to perform efficient data processing and analysis tasks.
6+
7+
Prerequisites:
8+
9+
Before getting started, you should have the following installed:
10+
11+
* Python 3
12+
* CrateDB
13+
* Dask
14+
15+
You can use pip to install Dask. To make sure that you have everything required for the most common uses of Dask (e.g., Dask Dataframe, Dask Array, etc) use the following command:
16+
17+
```
18+
python -m pip install "dask[complete]"
19+
```
20+
21+
## Inserting data
22+
23+
For this tutorial, we chose to use the California housing prices dataset, also available on [Kaggle](https://www.kaggle.com/datasets/camnugent/california-housing-prices?resource=download). This dataset is a popular dataset for regression tasks, consisting of median house values in census tracts in California, making it an excellent starting point for implementing basic machine learning algorithms.
24+
25+
Before importing data, create a california_housing table in CrateDB:
26+
27+
```sql
28+
CREATE TABLE IF NOT EXISTS "doc"."california_housing" (
29+
"longitude" REAL,
30+
"latitude" REAL,
31+
"housing_median_age" REAL,
32+
"total_rooms" REAL,
33+
"total_bedrooms" REAL,
34+
"population" REAL,
35+
"households" REAL,
36+
"median_income" REAL,
37+
"median_house_value" REAL,
38+
"ocean_proximity" TEXT
39+
)
40+
```
41+
42+
Use COPY FROM command to import housing data:
43+
44+
```sql
45+
COPY "doc"."california_housing" FROM 'file:///path/to/file'
46+
```
47+
48+
## Using Dask to query the data
49+
50+
Dask provides three methods to read a SQL query or q database table into a Dataframe: read_sql, read_sql_table, and read_sql_query. The read_sql method is a convenience wrapper around the other two and it will delegate to a specific function based on the provided input. To use this method you will need the following parameters:
51+
52+
* `sql`: name of a SQL table in a database or an SQL query to be executed,
53+
* `uri`: the full sqlalchemy URI for the database connection
54+
* `index_col`: the index column. The index column is used by Dask to split up the query on multiple machines.
55+
56+
Now, let’s load the data from a California housing dataset to a Dask Dataframe:
57+
58+
```python
59+
import dask
60+
import dask.dataframe as dd
61+
URI = 'crate://localhost:4200'
62+
63+
df = dd.read_sql(table_name='california_housing', con = URI, index_col = 'total_rooms')
64+
```
65+
66+
In the above example, we read the data from california_housing dataset and use total_rooms as an index column.
67+
68+
If you want to run read_sql with a query to be executed, you will need to provide an ***SQLAlchemy Selectable*** query. The following example shows how to query several columns from california_housing table and load the result to the Dask Dataframe.
69+
70+
```python
71+
data = table("california_housing",
72+
column("longitude"),
73+
column("latitude"),
74+
column("total_rooms"),
75+
column("total_bedrooms"),
76+
column("population")
77+
)
78+
79+
df = dd.read_sql(sql=data.select(), con = URI, index_col = 'total_rooms')
80+
```
81+
82+
Now that we loaded the data we can use the df.head() to show the first n rows in the dataset:
83+
84+
```python
85+
print(df.head(n=5))
86+
```
87+
88+
```python
89+
housing_median_age longitude latitude total_rooms total_bedrooms population households median_income median_house_value ocean_proximity
90+
91+
21.0 -122.22 37.86 7099.0 1106.0 2401.0 1138.0 8.3014 358500.0 NEAR BAY
92+
52.0 -122.25 37.85 1627.0 280.0 565.0 259.0 3.8462 342200.0 NEAR BAY
93+
52.0 -122.25 37.85 919.0 213.0 413.0 193.0 4.0368 269700.0 NEAR BAY
94+
52.0 -122.26 37.85 2491.0 474.0 1098.0 468.0 3.0750 213500.0 NEAR BAY
95+
52.0 -122.26 37.85 2643.0 626.0 1212.0 620.0 1.9167 159200.0 NEAR BAY
96+
```
97+
98+
### Linear regression with Dask and CrateDB
99+
100+
In the following example, we will illustrate how to perform a linear regression task on the California housing data. We will train a machine learning model that predicts the median house value based on the other variables in the dataset. Before we start, we need to categorize the ocean_proximity column as the only non-number column:
101+
102+
```python
103+
df=df.categorize(columns='ocean_proximity')
104+
df['ocean_proximity'] = df['ocean_proximity'].cat.as_known().cat.codes
105+
```
106+
107+
The above code will transform the last column so that it contains a number representing a certain category, as illustrated below:
108+
```python
109+
print(df.compute().head(3))
110+
```
111+
```python
112+
housing_median_age longitude latitude total_rooms total_bedrooms population households median_income median_house_value ocean_proximity
113+
21.0 -122.22 37.86 7099.0 1106.0 2401.0 1138.0 8.3014 358500.0 3
114+
52.0 -122.25 37.85 1627.0 280.0 565.0 259.0 3.8462 342200.0 3
115+
52.0 -122.25 37.85 919.0 213.0 413.0 193.0 4.0368 269700.0 3
116+
```
117+
118+
The next step is to split the data into training and testing sets and for that, we can use the dask_ml library. For the linear regression estimator to work with the data we need to transform training and testing sets into Dask arrays:
119+
120+
```python
121+
from dask_ml.model_selection import train_test_split
122+
X_train, X_test, y_train, y_test = train_test_split(df.drop('median_house_value', axis=1), df['median_house_value'], test_size=0.2, shuffle=True)
123+
124+
X_train= X_train.to_dask_array(lengths=True)
125+
X_test = X_test.to_dask_array(lengths=True)
126+
y_train = y_train.to_dask_array(lengths=True)
127+
y_test = y_test.to_dask_array(lengths=True)
128+
```
129+
130+
Now we can perform a linear regression task on the data. First, we need to create a linear regression estimator and fit the estimator to the training data:
131+
132+
```python
133+
from dask_ml.linear_model import LinearRegression
134+
135+
lr = LinearRegression()
136+
lr.fit(X_train, y_train)
137+
y_pred = lr.predict(X_test)
138+
```
139+
140+
In the last line, we use the estimator to make predictions on the testing data. To evaluate the performance of our linear regression model, we can calculate the mean squared error (MSE) and the coefficient of determination (R²) on the testing data:
141+
142+
```python
143+
from dask_ml.metrics import mean_squared_error, r2_score
144+
145+
mse = mean_squared_error(y_test, y_pred)
146+
r2 = r2_score(y_test, y_pred)
147+
print(f'Mean squared error: {mse:.2f}')
148+
print(f'R² score: {r2:.2f}')
149+
```
150+
151+
The last two lines will output the mean squared error and R² score for our linear regression model.
152+
153+
## Using Dask to write to CrateDB
154+
155+
Dask also provides support for storing Dask Dataframe to a SQL table with the to_sql method. To illustrate the concurrent write of a Dask Dataframe to CrateDB we first create a Pandas DataFrame using the makeTimeDataFrame function with a frequency of one second and a total of 1,5 million periods as illustrated below.
156+
157+
```python
158+
from pandas._testing import makeTimeDataFrame
159+
df = makeTimeDataFrame(nper=1_500_000, freq="S")
160+
```
161+
162+
Then, we create the Dask Dataframe from the Pandas Dataframe and divide the data into 4 partitions, allowing for parallel processing:
163+
164+
```python
165+
ddf = dd.from_pandas(df, npartitions=4)
166+
```
167+
168+
Finally, with the to_sql() method we load the data to a CrateDB database:
169+
170+
```python
171+
ddf.to_sql("demo", uri=URI, index=False, if_exists="replace", chunksize=10000, parallel=True)
172+
```
173+
174+
The to_sql() method takes several arguments:
175+
176+
* `"demo"`: the name of the table where the data will be loaded.
177+
* `uri`: the connection string to the CrateDB database.
178+
* `index=False`: specifies that the index column in the DataFrame should not be included in the database table.
179+
* `if_exists="replace"`: specifies that if the table already exists, it should be replaced with the new data. Other possible values are 'fail', 'replace', and 'append'.
180+
* `chunksize=10000`: the number of rows to be inserted at a time. It may be helpful to experiment with different chunk sizes to find the optimal value for the specific use case.
181+
* `parallel=True`: specifies that the insertion process should be done in parallel.
182+
183+
On an M1 machine with 16 GB of RAM, the entire process of loading the 1.5 million records worth of data into the database, takes approximately 15 seconds. Without using `parallel=True`, the total runtime increases to 22 seconds, thus demonstrating that it is more efficient than running insert operations subsequently.
184+
185+
## Conclusions
186+
187+
In this tutorial, we've covered the essentials of using CrateDB with Dask for efficient data processing and analysis. By combining the distributed capabilities of CrateDB with the parallel computing power of Dask, you can unlock the potential to handle large-scale datasets, perform complex queries, and leverage advanced analytics techniques.
188+
189+
To learn more about updates, features, and other questions you might have, join our [CrateDB community](https://community.cratedb.com/).

0 commit comments

Comments
 (0)