diff --git a/mongodb-with-coiled/dask-mongo.ipynb b/mongodb-with-coiled/dask-mongo.ipynb index 0d36dec..f6c58ad 100644 --- a/mongodb-with-coiled/dask-mongo.ipynb +++ b/mongodb-with-coiled/dask-mongo.ipynb @@ -55,30 +55,16 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "id": "fa1c3b19-feff-4f92-8752-079e277949a7", "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Creating new software environment\n", - "Creating new ecr build\n", - "STEP 1: FROM coiled/default:sha-6b4e896\n", - "STEP 2: COPY environment.yml environment.yml\n", - "--> 7cf6c61c926\n", - "STEP 3: RUN conda env update -n coiled -f environment.yml && rm environment.yml && conda clean --all -y && echo \"conda activate coiled\" >> ~/.bashrc\n", - "Collecting package metadata (repodata.json): ...working... done\n" - ] - } - ], + "outputs": [], "source": [ - "coiled.create_software_environment(\n", - " account=\"coiled-examples\",\n", - " name=\"dask-mongo\",\n", - " conda=\"/Users/rpelgrim/Documents/git/coiled-resources/mongodb-with-coiled/environment.yml\",\n", - ")" + "# coiled.create_software_environment(\n", + "# account=\"coiled-examples\",\n", + "# name=\"dask-mongo\",\n", + "# conda=\"/Users/rpelgrim/Documents/git/coiled-resources/mongodb-with-coiled/environment.yml\",\n", + "# )" ] }, { @@ -90,7 +76,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "1085c0ea11494440badf92e75b1859a0", + "model_id": "3f4a93d1d7864512998f389eb2818cf2", "version_major": 2, "version_minor": 0 }, @@ -152,7 +138,7 @@ }, { "cell_type": "code", - "execution_count": 7, + "execution_count": 5, "id": "4f2669ed-1fc2-42fa-87dd-2f191f804d32", "metadata": {}, "outputs": [ @@ -171,7 +157,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 6, "id": "7fff2141-43d8-4aa5-8aad-169cbc55d301", "metadata": {}, "outputs": [], @@ -192,7 +178,7 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": 7, "id": "eaa90416-45bd-4965-8770-f3219e7882c2", "metadata": {}, "outputs": [], @@ -209,7 +195,7 @@ }, { "cell_type": "code", - "execution_count": 10, + "execution_count": 8, "id": "d321e022-f68b-4842-b28b-ef930e7593c5", "metadata": {}, "outputs": [ @@ -219,7 +205,7 @@ "dask.bag" ] }, - "execution_count": 10, + "execution_count": 8, "metadata": {}, "output_type": "execute_result" } @@ -241,7 +227,7 @@ }, { "cell_type": "code", - "execution_count": 11, + "execution_count": 9, "id": "59fed42a-b8d0-4a2e-8c38-88231d8a56fa", "metadata": {}, "outputs": [ @@ -665,7 +651,7 @@ " 'comments': \"The house was extremely well located and Ana was able to give us some really great tips on locations to have lunch and eat out. The house was perfectly clean and the easily able to accommodate 6 people despite only having one bathroom. The beds and living room were comfortable. \\n\\nHowever, we always felt somewhat on edge in the house due to the number of signs posted around the kitchen, bedrooms and bathroom about being charged 15€ for all sorts of extras like not washing up or using extra towels and bed linen. Not that this would be particularly unreasonable but it made us feel like we were walking on egg shells in and around the house. \\n\\nThe hosts were aware that we were a group of six yet one of the beds was not prepared and we ran out of toilet paper well before we were due to check out despite only being there 2 nights. It really wasn't the end of the world but the shower head does not have a wall fitting meaning you had to hold it yourself if you wanted to stand underneath it.\"}]},)" ] }, - "execution_count": 11, + "execution_count": 9, "metadata": {}, "output_type": "execute_result" } @@ -681,15 +667,15 @@ "source": [ "This outputs a very large amount of data, including the listing itself as well as all the available reviews. The data is unwieldy and not in a format we can input to a regular machine learning model. \n", "\n", - "### Subset Data to Build a Machine Learning Pipeline\n", - "Let’s simplify this into a Machine Learning prediction problem. Let’s use the text in the Description field to predict the Review Rating and limit ourselves to listings of type “Apartment” only.\n", + "### Subset Data \n", + "Let’s use the text in the Description field to predict the Review Rating and limit ourselves to listings of type “Apartment” only.\n", "\n", "To do this, we’ll have to flatten the semi-structured JSON into a tabular form. We’ll use the processing function defined below to extract only the relevant information from all records. We'll then filter out only the Apartment listings, flatten the data structure and turn it into a Dask Dataframe." ] }, { "cell_type": "code", - "execution_count": 12, + "execution_count": 10, "id": "b5002201-31ba-4e64-9961-4f37359e560a", "metadata": {}, "outputs": [], @@ -710,7 +696,7 @@ }, { "cell_type": "code", - "execution_count": 13, + "execution_count": 11, "id": "31fff276-d1d8-468d-b893-4835ce26ae09", "metadata": {}, "outputs": [], @@ -725,7 +711,7 @@ }, { "cell_type": "code", - "execution_count": 14, + "execution_count": 12, "id": "bbd3b6d6-f22e-40de-af79-9dcc2b95d9dc", "metadata": {}, "outputs": [ @@ -738,7 +724,7 @@ " 'review_rating': 94})" ] }, - "execution_count": 14, + "execution_count": 12, "metadata": {}, "output_type": "execute_result" } @@ -761,7 +747,7 @@ }, { "cell_type": "code", - "execution_count": 14, + "execution_count": 13, "id": "ca7492b7-bf82-4111-b12a-5b8b101afd63", "metadata": {}, "outputs": [], @@ -773,7 +759,7 @@ }, { "cell_type": "code", - "execution_count": 15, + "execution_count": 14, "id": "54347e2e-6b91-440d-9e87-28c6a4c26a32", "metadata": {}, "outputs": [], @@ -784,7 +770,7 @@ }, { "cell_type": "code", - "execution_count": 16, + "execution_count": 15, "id": "090b7238-6ddc-45ba-bf1a-7eb9fb0ba96b", "metadata": {}, "outputs": [], @@ -803,7 +789,7 @@ }, { "cell_type": "code", - "execution_count": 17, + "execution_count": 16, "id": "6d717e86-aa92-4183-b464-a5666c9e4ee8", "metadata": {}, "outputs": [ @@ -853,7 +839,7 @@ "1 Murphy bed, optional second bedroom available.... 94" ] }, - "execution_count": 17, + "execution_count": 16, "metadata": {}, "output_type": "execute_result" } @@ -873,7 +859,7 @@ }, { "cell_type": "code", - "execution_count": 18, + "execution_count": 17, "id": "6572a460-b3e3-4c44-9e71-2e4573c6fe2c", "metadata": {}, "outputs": [ @@ -923,7 +909,7 @@ "1 murphy bed, optional second bedroom available.... 94" ] }, - "execution_count": 18, + "execution_count": 17, "metadata": {}, "output_type": "execute_result" } @@ -945,7 +931,7 @@ }, { "cell_type": "code", - "execution_count": 19, + "execution_count": 18, "id": "ceb3f5b5-c41a-4d67-8c9a-352b9321ef42", "metadata": {}, "outputs": [ @@ -1013,7 +999,7 @@ "4 [clean, fully, furnish, spacious, 1, bedroom, ... 100" ] }, - "execution_count": 19, + "execution_count": 18, "metadata": {}, "output_type": "execute_result" } @@ -1048,7 +1034,7 @@ }, { "cell_type": "code", - "execution_count": 20, + "execution_count": 19, "id": "7c50c7c7-b515-4e51-955d-8e6de32a4e12", "metadata": {}, "outputs": [], @@ -1056,6 +1042,18 @@ "import spacy" ] }, + { + "cell_type": "code", + "execution_count": 20, + "id": "cd15b2cb-db7c-49e1-b383-b453d17485e6", + "metadata": {}, + "outputs": [], + "source": [ + "# # run this cell if you've not used spacy in this env before\n", + "# # this will download the lexicon files\n", + "# ! python -m spacy download en" + ] + }, { "cell_type": "code", "execution_count": 21, @@ -1195,35 +1193,189 @@ "ddf.head()" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "b37cfc3f-e463-4873-9687-c29f7bf644ba", + "metadata": {}, + "outputs": [], + "source": [] + }, { "cell_type": "markdown", - "id": "99975628-6064-4115-a6c1-87716044fc03", - "metadata": { - "tags": [] - }, + "id": "53e13148-8f9b-4363-89e7-0ddb8f32b904", + "metadata": {}, "source": [ - "## Vectorization with Dask-ML\n", - "We can now transform our lemmatized tokens into numerical representations (vectors) that can be input to a machine learning algorithm.\n", + "## 5. Write Data Back to MongoDB\n", + "\n", + "Now that we have processed our raw JSON NLP data into a neat tabular format, we'll want to store this for future use. You can use the `to_mongo` method to write data to your MongoDB database.\n", "\n", - "To do this, we’ll first create our predictor and target features and our train and test splits:" + "You'll first need to convert your Dask DataFrame to a Dask Bag:" ] }, { "cell_type": "code", - "execution_count": 15, - "id": "d126152c-d608-4ccf-996a-e2073ee49bbb", + "execution_count": 26, + "id": "b5577f05-f8fa-4257-a443-f07ae285b792", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "({'description': ['here',\n", + " 'exist',\n", + " 'a',\n", + " 'very',\n", + " 'cozy',\n", + " 'room',\n", + " 'for',\n", + " 'rent',\n", + " 'in',\n", + " 'a',\n", + " 'share',\n", + " '4',\n", + " 'bedroom',\n", + " 'apartment',\n", + " 'it',\n", + " 'be',\n", + " 'locate',\n", + " 'one',\n", + " 'block',\n", + " 'off',\n", + " 'of',\n", + " 'the',\n", + " 'jmz',\n", + " 'at',\n", + " 'myrtle',\n", + " 'broadway',\n", + " 'the',\n", + " 'neighborhood',\n", + " 'be',\n", + " 'diverse',\n", + " 'and',\n", + " 'appeal',\n", + " 'to',\n", + " 'a',\n", + " 'variety',\n", + " 'of',\n", + " 'people'],\n", + " 'review_rating': 100},)" + ] + }, + "execution_count": 26, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import dask.bag as db\n", + "\n", + "# convert dask dataframe back to Dask bag\n", + "new_bag = db.from_delayed(\n", + " ddf.map_partitions(lambda x: x.to_dict(orient=\"records\")).to_delayed()\n", + ")\n", + "\n", + "new_bag.take(1)" + ] + }, + { + "cell_type": "markdown", + "id": "937cac09-2b12-4548-ab58-a46583179ccc", + "metadata": {}, + "source": [ + "And then use `to_mongo` to write the data to MongoDB:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "784ceae8-ce82-481b-a78f-c8b046863dfa", "metadata": {}, "outputs": [], "source": [ - "# insert b_flattened here to bypass custom tokenization\n", - "ddf = b_flattened.to_dataframe()\n", - "X = ddf['description'].to_dask_array(lengths=True)\n", - "y = ddf['review_rating'].to_dask_array(lengths=True)" + "from dask_mongo import to_mongo\n", + "\n", + "to_mongo(\n", + " new_bag,\n", + " database=\"\",\n", + " collection=\"\",\n", + " connection_kwargs={\"host\": host_uri},\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "82f59260-c06e-419f-881b-ae5bd6e1f24c", + "metadata": {}, + "source": [ + "*Note that because the AirBnB sample data is located on a low-resource Free Tier cluster, the read/write speeds may be suboptimal. Consider upgrading your cluster to improve your query performance.*" + ] + }, + { + "cell_type": "markdown", + "id": "f634c3c6-a035-4544-aa1e-cfbda6026af9", + "metadata": {}, + "source": [ + "## Summary\n", + "- You can use the dask-mongo connector to read and write files in parallel between your local Python session and a MongoDB Atlas cluster.\n", + "- You can use Dask and MongoDB to build an NLP pipeline at scale.\n", + "- You can run Dask computations in the cloud using a Coiled cluster. Running Coiled and MongoDB together can lead to performance gains on your queries and analyses.\n" + ] + }, + { + "cell_type": "markdown", + "id": "0d0b0e9d-9bbf-4724-a629-9db8d7aed9ce", + "metadata": {}, + "source": [ + "## Get Started with dask-mongo\n", + "To get started with the dask-mongo connector, install it on your local machine using pip or conda.\n", + "\n", + "`pip install dask-mongo`\n", + "\n", + "`conda install dask-mongo -c conda-forge`\n", + "\n", + "You can then run the code in the accompanying notebook yourself to test-drive dask-mongo. For more information, we recommend taking a look at [the Coiled documentation](https://docs.coiled.io/user_guide/examples/mongodb.html). \n", + "\n", + "Let us know how you get on by tweeting to the developing team at [@CoiledHQ](https://twitter.com/CoiledHQ)!" ] }, { "cell_type": "code", - "execution_count": 16, + "execution_count": null, + "id": "70ddaa3a-0012-445d-ba7f-512df3c58369", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bffebbdf-1c9e-4f64-a14f-cac827e1a7c3", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0f509099-716e-406a-ab0d-e158ad762977", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": 27, + "id": "4428342f-81db-4d4c-aea4-9d44f9efff22", + "metadata": {}, + "outputs": [], + "source": [ + "from dask_ml.model_selection import train_test_split" + ] + }, + { + "cell_type": "code", + "execution_count": 28, "id": "222c57c9-c493-41b8-ac18-18e25b50c63f", "metadata": {}, "outputs": [], @@ -1248,12 +1400,12 @@ }, { "cell_type": "code", - "execution_count": 17, + "execution_count": 29, "id": "92bc2fc9-cd96-415a-b088-9f9bcf479e00", "metadata": {}, "outputs": [], "source": [ - "# vectorize (NOTE: removed custom preprocessing bypasses)\n", + "# vectorize \n", "from dask_ml.feature_extraction.text import HashingVectorizer\n", "vect = HashingVectorizer()\n", "X_train_vect = vect.fit_transform(X_train)" @@ -1264,12 +1416,12 @@ "id": "fbfd8c78-e5be-4880-9d03-c134378c938e", "metadata": {}, "source": [ - "Note that this Vectorizer comes with its own built-in preprocessing functions. We'll override these since we have already done our own customized preprocessing above." + "Note that this Vectorizer comes with its own built-in preprocessing functions. We could override these since we have already done our own customized preprocessing above." ] }, { "cell_type": "code", - "execution_count": 18, + "execution_count": 30, "id": "5f6a755a-2721-4150-9723-e7a409a3e8e5", "metadata": {}, "outputs": [ @@ -1344,7 +1496,7 @@ "dask.array<_transformer, shape=(2139, 1048576), dtype=float64, chunksize=(211, 1048576), chunktype=scipy.csr_matrix>" ] }, - "execution_count": 18, + "execution_count": 30, "metadata": {}, "output_type": "execute_result" } @@ -1356,357 +1508,112 @@ }, { "cell_type": "code", - "execution_count": 19, - "id": "d559a8b2-4888-45db-a33e-235091ea8699", - "metadata": {}, - "outputs": [], - "source": [ - "X_train_vect = X_train_vect.persist()" - ] - }, - { - "cell_type": "markdown", - "id": "1a44d89a-2dc1-49ed-bfd7-6d2062b0d773", - "metadata": { - "tags": [] - }, - "source": [ - "## ML Classification with XGBoost\n", - "Now we're all set to feed our data into the XGBoost Classifier and run our predictions. We are predicting the Review Rating based on the text in the Description field." - ] - }, - { - "cell_type": "code", - "execution_count": 20, - "id": "2dfab100-29f6-4809-8f2f-fa43b6cf9b15", - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "/Users/rpelgrim/mambaforge/envs/dask-mongo/lib/python3.9/site-packages/xgboost/compat.py:36: FutureWarning: pandas.Int64Index is deprecated and will be removed from pandas in a future version. Use pandas.Index with the appropriate dtype instead.\n", - " from pandas import MultiIndex, Int64Index\n" - ] - } - ], - "source": [ - "import xgboost as xgb\n", - "clf = xgb.dask.DaskXGBClassifier()" - ] - }, - { - "cell_type": "code", - "execution_count": 21, - "id": "db1bca5a-4bf7-4054-9263-d89bfb61bc78", - "metadata": {}, - "outputs": [ - { - "ename": "KeyboardInterrupt", - "evalue": "", - "output_type": "error", - "traceback": [ - "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", - "\u001b[0;31mKeyboardInterrupt\u001b[0m Traceback (most recent call last)", - "File \u001b[0;32m:2\u001b[0m, in \u001b[0;36m\u001b[0;34m\u001b[0m\n", - "File \u001b[0;32m~/mambaforge/envs/dask-mongo/lib/python3.9/site-packages/xgboost/dask.py:1817\u001b[0m, in \u001b[0;36mDaskXGBClassifier.fit\u001b[0;34m(self, X, y, sample_weight, base_margin, eval_set, eval_metric, early_stopping_rounds, verbose, xgb_model, sample_weight_eval_set, base_margin_eval_set, feature_weights, callbacks)\u001b[0m\n\u001b[1;32m 1815\u001b[0m _assert_dask_support()\n\u001b[1;32m 1816\u001b[0m args \u001b[38;5;241m=\u001b[39m {k: v \u001b[38;5;28;01mfor\u001b[39;00m k, v \u001b[38;5;129;01min\u001b[39;00m \u001b[38;5;28mlocals\u001b[39m()\u001b[38;5;241m.\u001b[39mitems() \u001b[38;5;28;01mif\u001b[39;00m k \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;129;01min\u001b[39;00m (\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mself\u001b[39m\u001b[38;5;124m\"\u001b[39m, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m__class__\u001b[39m\u001b[38;5;124m\"\u001b[39m)}\n\u001b[0;32m-> 1817\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_client_sync\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_fit_async\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43margs\u001b[49m\u001b[43m)\u001b[49m\n", - "File \u001b[0;32m~/mambaforge/envs/dask-mongo/lib/python3.9/site-packages/xgboost/dask.py:1623\u001b[0m, in \u001b[0;36mDaskScikitLearnBase._client_sync\u001b[0;34m(self, func, **kwargs)\u001b[0m\n\u001b[1;32m 1620\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m ret\n\u001b[1;32m 1621\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m ret\n\u001b[0;32m-> 1623\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mclient\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msync\u001b[49m\u001b[43m(\u001b[49m\u001b[43mfunc\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43masynchronous\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43masynchronous\u001b[49m\u001b[43m)\u001b[49m\n", - "File \u001b[0;32m~/mambaforge/envs/dask-mongo/lib/python3.9/site-packages/distributed/utils.py:309\u001b[0m, in \u001b[0;36mSyncMethodMixin.sync\u001b[0;34m(self, func, asynchronous, callback_timeout, *args, **kwargs)\u001b[0m\n\u001b[1;32m 307\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m future\n\u001b[1;32m 308\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[0;32m--> 309\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43msync\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 310\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mloop\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mfunc\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43margs\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mcallback_timeout\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mcallback_timeout\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\n\u001b[1;32m 311\u001b[0m \u001b[43m \u001b[49m\u001b[43m)\u001b[49m\n", - "File \u001b[0;32m~/mambaforge/envs/dask-mongo/lib/python3.9/site-packages/distributed/utils.py:372\u001b[0m, in \u001b[0;36msync\u001b[0;34m(loop, func, callback_timeout, *args, **kwargs)\u001b[0m\n\u001b[1;32m 370\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 371\u001b[0m \u001b[38;5;28;01mwhile\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m e\u001b[38;5;241m.\u001b[39mis_set():\n\u001b[0;32m--> 372\u001b[0m \u001b[43mwait\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;241;43m10\u001b[39;49m\u001b[43m)\u001b[49m\n\u001b[1;32m 374\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m error:\n\u001b[1;32m 375\u001b[0m typ, exc, tb \u001b[38;5;241m=\u001b[39m error\n", - "File \u001b[0;32m~/mambaforge/envs/dask-mongo/lib/python3.9/site-packages/distributed/utils.py:361\u001b[0m, in \u001b[0;36msync..wait\u001b[0;34m(timeout)\u001b[0m\n\u001b[1;32m 359\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mwait\u001b[39m(timeout):\n\u001b[1;32m 360\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m--> 361\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43me\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mwait\u001b[49m\u001b[43m(\u001b[49m\u001b[43mtimeout\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 362\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mKeyboardInterrupt\u001b[39;00m:\n\u001b[1;32m 363\u001b[0m loop\u001b[38;5;241m.\u001b[39madd_callback(cancel)\n", - "File \u001b[0;32m~/mambaforge/envs/dask-mongo/lib/python3.9/threading.py:574\u001b[0m, in \u001b[0;36mEvent.wait\u001b[0;34m(self, timeout)\u001b[0m\n\u001b[1;32m 572\u001b[0m signaled \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_flag\n\u001b[1;32m 573\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m signaled:\n\u001b[0;32m--> 574\u001b[0m signaled \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_cond\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mwait\u001b[49m\u001b[43m(\u001b[49m\u001b[43mtimeout\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 575\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m signaled\n", - "File \u001b[0;32m~/mambaforge/envs/dask-mongo/lib/python3.9/threading.py:316\u001b[0m, in \u001b[0;36mCondition.wait\u001b[0;34m(self, timeout)\u001b[0m\n\u001b[1;32m 314\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 315\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m timeout \u001b[38;5;241m>\u001b[39m \u001b[38;5;241m0\u001b[39m:\n\u001b[0;32m--> 316\u001b[0m gotit \u001b[38;5;241m=\u001b[39m \u001b[43mwaiter\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43macquire\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43;01mTrue\u001b[39;49;00m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mtimeout\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 317\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 318\u001b[0m gotit \u001b[38;5;241m=\u001b[39m waiter\u001b[38;5;241m.\u001b[39macquire(\u001b[38;5;28;01mFalse\u001b[39;00m)\n", - "\u001b[0;31mKeyboardInterrupt\u001b[0m: " - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client\n", - "distributed.deploy.cluster - WARNING - Failed to sync cluster info multiple times - perhaps there's a connection issue? Error:\n", - "Traceback (most recent call last):\n", - " File \"/Users/rpelgrim/mambaforge/envs/dask-mongo/lib/python3.9/site-packages/distributed/comm/tcp.py\", line 426, in connect\n", - " stream = await self.client.connect(\n", - " File \"/Users/rpelgrim/mambaforge/envs/dask-mongo/lib/python3.9/site-packages/tornado/tcpclient.py\", line 275, in connect\n", - " af, addr, stream = await connector.start(connect_timeout=timeout)\n", - "asyncio.exceptions.CancelledError\n", - "\n", - "During handling of the above exception, another exception occurred:\n", - "\n", - "Traceback (most recent call last):\n", - " File \"/Users/rpelgrim/mambaforge/envs/dask-mongo/lib/python3.9/asyncio/tasks.py\", line 490, in wait_for\n", - " return fut.result()\n", - "asyncio.exceptions.CancelledError\n", - "\n", - "The above exception was the direct cause of the following exception:\n", - "\n", - "Traceback (most recent call last):\n", - " File \"/Users/rpelgrim/mambaforge/envs/dask-mongo/lib/python3.9/site-packages/distributed/comm/core.py\", line 289, in connect\n", - " comm = await asyncio.wait_for(\n", - " File \"/Users/rpelgrim/mambaforge/envs/dask-mongo/lib/python3.9/asyncio/tasks.py\", line 492, in wait_for\n", - " raise exceptions.TimeoutError() from exc\n", - "asyncio.exceptions.TimeoutError\n", - "\n", - "The above exception was the direct cause of the following exception:\n", - "\n", - "Traceback (most recent call last):\n", - " File \"/Users/rpelgrim/mambaforge/envs/dask-mongo/lib/python3.9/site-packages/distributed/deploy/cluster.py\", line 131, in _sync_cluster_info\n", - " await self.scheduler_comm.set_metadata(\n", - " File \"/Users/rpelgrim/mambaforge/envs/dask-mongo/lib/python3.9/site-packages/distributed/core.py\", line 827, in send_recv_from_rpc\n", - " comm = await self.live_comm()\n", - " File \"/Users/rpelgrim/mambaforge/envs/dask-mongo/lib/python3.9/site-packages/distributed/core.py\", line 784, in live_comm\n", - " comm = await connect(\n", - " File \"/Users/rpelgrim/mambaforge/envs/dask-mongo/lib/python3.9/site-packages/distributed/comm/core.py\", line 315, in connect\n", - " raise OSError(\n", - "OSError: Timed out trying to connect to tls://3.80.159.200:8786 after 30 s\n" - ] - } - ], - "source": [ - "%%time\n", - "# train classifier\n", - "clf.fit(X_train_vect, y_train)" - ] - }, - { - "cell_type": "markdown", - "id": "313679f9-1172-4325-9ce8-fcf1daeff104", - "metadata": {}, - "source": [ - "**NOTE:** this is taking WAY too long to start running." - ] - }, - { - "cell_type": "code", - "execution_count": 49, - "id": "18cb7b93-6a76-434b-a071-4e7ada9dc90c", + "execution_count": 31, + "id": "3c67234c-75c3-4491-ab40-e13a093367a8", "metadata": {}, "outputs": [ { - "ename": "NameError", - "evalue": "name 'xgb' is not defined", - "output_type": "error", - "traceback": [ - "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", - "\u001b[0;31mNameError\u001b[0m Traceback (most recent call last)", - "Input \u001b[0;32mIn [49]\u001b[0m, in \u001b[0;36m\u001b[0;34m()\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[38;5;66;03m# predict probabilities\u001b[39;00m\n\u001b[0;32m----> 2\u001b[0m proba \u001b[38;5;241m=\u001b[39m \u001b[43mxgb\u001b[49m\u001b[38;5;241m.\u001b[39mpredict_proba(X_test)\n", - "\u001b[0;31mNameError\u001b[0m: name 'xgb' is not defined" - ] + "data": { + "text/html": [ + "\n", + " \n", + " \n", + " \n", + " \n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
Array Chunk
Bytes 16.71 kiB 1.65 kiB
Shape (2139,) (211,)
Count 96 Tasks 12 Chunks
Type int64 numpy.ndarray
\n", + "
\n", + " \n", + "\n", + " \n", + " \n", + " \n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + " \n", + "\n", + " \n", + " 2139\n", + " 1\n", + "\n", + "
" + ], + "text/plain": [ + "dask.array" + ] + }, + "execution_count": 31, + "metadata": {}, + "output_type": "execute_result" } ], "source": [ - "# predict probabilities\n", - "proba = xgb.predict_proba(X_test)" + "# omit?\n", + "y_train.compute_chunk_sizes()" ] }, { "cell_type": "code", - "execution_count": null, - "id": "a591fcf0-776b-4aaf-b609-60861b8b2b6a", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "17b57714-f31c-4090-8517-16bf84e6a5bf", + "execution_count": 32, + "id": "c4c76ce9-a2c0-435f-9833-08c58d061e5b", "metadata": {}, "outputs": [], - "source": [] - }, - { - "cell_type": "markdown", - "id": "23b900b3-6b48-4fb6-b816-077360aeda8e", - "metadata": { - "tags": [] - }, "source": [ - "## Write Back to MongoDB" + "# omit?\n", + "y_train = y_train.reshape(2139,1)" ] }, { "cell_type": "code", - "execution_count": null, - "id": "f1501521-29d0-40d5-81c0-df38ceaedcb5", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "375da03a-e1c3-4a0b-9922-97b086a2b198", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "markdown", - "id": "94d9e401-cc0c-4e15-8e92-566dfeb2bd66", - "metadata": { - "tags": [] - }, - "source": [ - "## How Does it Work\n", - "The dask-mongo connector connects to the pymongo driver and..." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "c5e3db2d-1408-4f0d-99c4-17837e252dbb", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "markdown", - "id": "dc10efe7-b72f-4a6a-b824-f6f2e69c0728", - "metadata": { - "tags": [] - }, - "source": [ - "## Conclusion" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "789a5231-20c7-4527-9cad-9339c5f4c285", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "345ed7dc-fd60-4a5a-8867-65e7dddb377e", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "aac33f8f-f709-49cf-bc98-7e4a0d3c7dbd", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "markdown", - "id": "eed82700-2a37-4cd9-8cff-14a6f83372be", - "metadata": { - "tags": [] - }, - "source": [ - "# OLD STUFF" - ] - }, - { - "cell_type": "markdown", - "id": "62970030-962c-4a95-9898-85801dd3d78a", - "metadata": { - "jp-MarkdownHeadingCollapsed": true, - "tags": [] - }, - "source": [ - "### Create train/test split" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "803c7850-964c-4406-80e4-e62e8524ebbd", - "metadata": {}, - "outputs": [], - "source": [ - "from dask_ml.model_selection import train_test_split" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "ebd3927f-045c-4085-b0fa-4489c547ac4e", - "metadata": {}, - "outputs": [], - "source": [ - "X = ddf['description'].to_dask_array(lengths=True)\n", - "y = ddf['review_rating'].to_dask_array(lengths=True)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "0ea3b799-f210-40d8-9092-893f3701f420", - "metadata": {}, - "outputs": [], - "source": [ - "X_train, X_test, y_train, y_test = train_test_split(\n", - " X, \n", - " y, \n", - " test_size=0.20, \n", - " random_state=40\n", - ")" - ] - }, - { - "cell_type": "markdown", - "id": "1398654f-7fa8-4e76-a1ae-1fb1ef1cb4de", - "metadata": { - "jp-MarkdownHeadingCollapsed": true, - "tags": [] - }, - "source": [ - "### Vectorize" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "dae1e939-af01-453b-b329-874b23ebe18b", - "metadata": {}, - "outputs": [], - "source": [ - "from dask_ml.feature_extraction.text import HashingVectorizer" - ] - }, - { - "cell_type": "markdown", - "id": "1cbb872e-7b3b-46de-a588-dd054b56c21d", - "metadata": {}, - "source": [ - "HashingVectorizer has some built-in tokenization and preprocessing capabilities we could explore.\n", - "\n", - "We'll just use it out-of-the-box for now." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "81fa8ee5-8a8e-439a-b3b0-8f75c2353f82", - "metadata": {}, - "outputs": [], - "source": [ - "vect = HashingVectorizer()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "29d35d96-9d6c-4031-a264-dc34596ce37d", - "metadata": {}, - "outputs": [], - "source": [ - "X_train_vect = vect.fit_transform(X_train)" - ] - }, - { - "cell_type": "code", - "execution_count": 35, - "id": "bb19e54e-14f2-4c54-ac40-8b6242a7d1be", + "execution_count": 33, + "id": "2e022881-65bd-4936-868e-03e4be6d8d6f", "metadata": {}, "outputs": [ { @@ -1726,9 +1633,15 @@ " \n", " \n", " \n", + " Bytes \n", + " 16.71 kiB \n", + " 1.65 kiB \n", + " \n", + " \n", + " \n", " Shape \n", - " (nan, 1048576) \n", - " (nan, 1048576) \n", + " (2139, 1) \n", + " (211, 1) \n", " \n", " \n", " Count \n", @@ -1737,31 +1650,102 @@ " \n", " \n", " Type \n", - " float64 \n", - " scipy.sparse._csr.csr_matrix \n", + " int64 \n", + " numpy.ndarray \n", " \n", " \n", " \n", " \n", " \n", - " \n", + " \n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + " \n", + " \n", + "\n", + " \n", + " \n", + "\n", + " \n", + " 1\n", + " 2139\n", + "\n", " \n", " \n", "" ], "text/plain": [ - "dask.array<_transformer, shape=(nan, 1048576), dtype=float64, chunksize=(nan, 1048576), chunktype=scipy.csr_matrix>" + "dask.array" ] }, - "execution_count": 35, + "execution_count": 33, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "X_train_vect" + "# omit?\n", + "y_train" ] }, + { + "cell_type": "code", + "execution_count": 34, + "id": "d559a8b2-4888-45db-a33e-235091ea8699", + "metadata": {}, + "outputs": [], + "source": [ + "X_train_vect = X_train_vect.persist()\n", + "y_train = y_train.persist()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "97e08030-9d46-439c-830c-8aa34f43d6e1", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2f5aab69-4788-41a8-b588-5c288b3dc757", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5fadeb7c-efca-4978-ac13-6606049e90b5", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fdce62a6-4bd7-4719-aa4d-2debc1adcde3", + "metadata": {}, + "outputs": [], + "source": [] + }, { "cell_type": "markdown", "id": "5d2f8cd8-2209-4561-a69e-917e56067279", @@ -1909,16 +1893,6 @@ "Now use scipy.sparse matrix as input for distributed XGBoostClassifier." ] }, - { - "cell_type": "markdown", - "id": "829cea36-2e64-49e8-8b6d-428786c3cc00", - "metadata": { - "tags": [] - }, - "source": [ - "## 5. Train XGBoost Model" - ] - }, { "cell_type": "code", "execution_count": 39, @@ -2221,17 +2195,6 @@ "outputs": [], "source": [] }, - { - "cell_type": "markdown", - "id": "cc4b0fac-4dcf-4a94-89b1-01b37b6fd339", - "metadata": { - "tags": [] - }, - "source": [ - "# TRYING OUT\n", - "Let's try to map the preprocessing functions over `b_flattened` without splitting the `description` from the `ratings`:" - ] - }, { "cell_type": "code", "execution_count": 84, diff --git a/mongodb-with-coiled/environment.yml b/mongodb-with-coiled/environment.yml index 0b7d098..d156717 100644 --- a/mongodb-with-coiled/environment.yml +++ b/mongodb-with-coiled/environment.yml @@ -6,7 +6,7 @@ dependencies: - python==3.9.10 - coiled==0.2.6 - pandas==1.4.1 - - dask[complete]==2022.02.0 + - dask[complete]==2022.05.0 - nltk==3.6.5 - spacy==3.2.0 - pyarrow @@ -23,6 +23,7 @@ dependencies: - gensim[distributed] - dask-ml - xgboost==1.5.1 + - cloudpickle==2.1.0 - pip - pip: - pymongo[srv]