From 6650f7f8953bad0fa7f0b9d88af37e1c1fa033f0 Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Sun, 26 Jun 2022 16:14:16 -0700 Subject: [PATCH] add back als nb --- nb/internet_als.ipynb | 479 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 479 insertions(+) create mode 100644 nb/internet_als.ipynb diff --git a/nb/internet_als.ipynb b/nb/internet_als.ipynb new file mode 100644 index 0000000..c2346f6 --- /dev/null +++ b/nb/internet_als.ipynb @@ -0,0 +1,479 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "54b5b878", + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "import numpy as np\n", + "import scipy\n", + "import matplotlib.pyplot as plt\n", + "from numpy.linalg import inv\n", + "from sklearn.model_selection import KFold\n", + "from sklearn.linear_model import Ridge\n", + "from pyspark.ml.recommendation import ALS\n", + "from pyspark.sql import SparkSession\n", + "\n", + "from sklearn.pipeline import make_pipeline\n", + "from sklearn.preprocessing import StandardScaler\n", + "\n", + "import sys \n", + "sys.path.insert(1, \"../\")\n", + "from workloads.util import use_results, use_dataset, read_config, log_dataset\n", + "from tqdm import tqdm \n", + "\n", + "%load_ext autoreload\n", + "%autoreload 2\n", + "\n", + "%matplotlib inline" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7b62534e", + "metadata": {}, + "outputs": [], + "source": [ + "dataset_dir = use_dataset(\"ml-latest-small\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a8fc5a3b", + "metadata": {}, + "outputs": [], + "source": [ + "ratings_path = f\"{dataset_dir}/ratings.csv\"\n", + "ratings_df = pd.read_csv(ratings_path)\n", + "ratings_df.columns = ['user_id', 'movie_id', 'rating', 'timestamp']" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c4677c22", + "metadata": {}, + "outputs": [], + "source": [ + "def split_data(df): \n", + " start_ts = df['timestamp'].min()\n", + " med_ts = df['timestamp'].quantile(.25)\n", + " end_ts = df['timestamp'].max()\n", + " train_df = df[df['timestamp'] <= med_ts]\n", + " stream_df = df[df['timestamp'] > med_ts]\n", + " seen_movies = set(train_df['movie_id'])\n", + " print(len(seen_movies), len(set(stream_df['movie_id'])), len(stream_df))\n", + " stream_df = stream_df.drop(stream_df[stream_df['movie_id'].map(lambda x: x not in seen_movies)].index)\n", + " train_df.to_csv(f'{dataset_dir}/train.csv', header=True, index = False)\n", + " stream_df.to_csv(f'{dataset_dir}/stream.csv', header=True, index = False)\n", + " return start_ts, med_ts, end_ts" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "15892f3a", + "metadata": {}, + "outputs": [], + "source": [ + "start_ts, med_ts, end_ts = split_data(ratings_df)\n", + "train_df = pd.read_csv(f'{dataset_dir}/train.csv')\n", + "test_df = pd.read_csv(f'{dataset_dir}/stream.csv')\n", + "test_df" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f4fb1a1d", + "metadata": {}, + "outputs": [], + "source": [ + "class CustomALS(object):\n", + " \"\"\"Predicts using ALS\"\"\"\n", + " \n", + " def __init__(self, k=20, n_iter=20, lambda_u=0.001,\n", + " lambda_v=0.001):\n", + " \n", + " self.k = k\n", + " self.n_iter = n_iter\n", + " self.lambda_u = lambda_u\n", + " self.lambda_v = lambda_v\n", + " \n", + " def predict(uid, mid): \n", + " return np.dot(self.U[uid,:], self.V[mid:,])\n", + " \n", + " def fit(self, R):\n", + " self.R = R.copy()\n", + " \n", + " # Convert missing entries to 0\n", + " self.R = np.nan_to_num(self.R)\n", + " \n", + " m, n = R.shape\n", + " \n", + " # Initialize\n", + " self.U = np.random.normal(loc=0., scale=0.01, size=(m, self.k))\n", + " self.V = np.random.normal(loc=0., scale=0.01, size=(n, self.k))\n", + "\n", + " I = np.eye(self.k)\n", + " Iu = self.lambda_u * I\n", + " Iv = self.lambda_v * I\n", + " \n", + " R_T = self.R.T\n", + " \n", + " #model_u = make_pipeline(StandardScaler(with_mean=False), Ridge(alpha=self.lambda_u, fit_intercept=True))\n", + " #model_v = make_pipeline(StandardScaler(with_mean=False), Ridge(alpha=self.lambda_v, fit_intercept=True))\n", + " model_u = Ridge(alpha=self.lambda_u, fit_intercept=True)\n", + " model_v = Ridge(alpha=self.lambda_v, fit_intercept=True)\n", + " \n", + "\n", + " for _ in tqdm(range(self.n_iter)):\n", + " # NOTE: This can be parallelized\n", + " for i in range(m):\n", + " model_u.fit(X=self.V,\n", + " y=R_T[:,i]) \n", + " self.U[i,:] = model_u.coef_ #model_u.named_steps['ridge'].coef_\n", + " \n", + " # NOTE: This can be parallelized\n", + " for j in range(n):\n", + " model_v.fit(X=self.U,\n", + " y=R_T[j,:]) \n", + " self.V[j,:] = model_v.coef_ #model_v.named_steps['ridge'].coef_\n", + " \n", + " self.R_hat = self.U.dot(self.V.T)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "dcf34b9d", + "metadata": {}, + "outputs": [], + "source": [ + "model = CustomALS(n_iter=5)\n", + "model.fit(R_df.values)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9ff35631", + "metadata": {}, + "outputs": [], + "source": [ + "test_df = pd.read_csv(f'{dataset_dir}/stream.csv')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "45661c4b", + "metadata": {}, + "outputs": [], + "source": [ + "def predict(uid, mid): \n", + " print(model.U[uid,:])\n", + " print(model.V[mid,:].T)\n", + " return np.dot(model.U[uid,:], model.V[mid,:].T)\n", + "\n", + "model.R_hat" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b069741a", + "metadata": {}, + "outputs": [], + "source": [ + "from sklearn.metrics import mean_squared_error\n", + "\n", + "mean_squared_error(model.R, model.R_hat)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "716f30df", + "metadata": {}, + "outputs": [], + "source": [ + "spark = SparkSession.builder \\\n", + " .master(\"local\") \\\n", + " .appName(\"test\") \\\n", + " .getOrCreate()\n", + " \n", + "class SparkALS(object):\n", + " def __init__(self, k=20, n_iter=20, lambda_=0.001): \n", + " self.als = ALS(rank=k, maxIter=n_iter, regParam=lambda_)\n", + " \n", + " def fit(self, R):\n", + " R = np.nan_to_num(R)\n", + " ratings = []\n", + " for i in range(R.shape[0]):\n", + " for j in range(R.shape[1]):\n", + " ratings.append((i, j, float(R[i,j])))\n", + "\n", + " df = spark.createDataFrame(ratings,\n", + " [\"user\", \"item\", \"rating\"])\n", + " \n", + " model = self.als.fit(df)\n", + " \n", + " user_factors = model.userFactors.orderBy(\"id\").collect()\n", + " item_factors = model.itemFactors.orderBy(\"id\").collect()\n", + " \n", + " self.U = np.array([f.features for f in user_factors])\n", + " self.V = np.array([f.features for f in item_factors])\n", + " self.R_hat = self.U.dot(self.V.T)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0d9dc724", + "metadata": {}, + "outputs": [], + "source": [ + "R_df = train_df.pivot(index='user_id',\n", + " columns='movie_id',\n", + " values='rating'\n", + " ).fillna(np.nan)\n", + "R_df" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "367b431a", + "metadata": {}, + "outputs": [], + "source": [ + "train_df" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bdb2b59c", + "metadata": {}, + "outputs": [], + "source": [ + "def cross_val_ndcg(model, X, n_splits=2):\n", + " m, n = X.shape\n", + " \n", + " rows = list(range(m))\n", + " \n", + " # Row index\n", + " I = np.array(range(n))\n", + " \n", + " # Now, split into K folds (by users)\n", + " kf = KFold(n_splits=n_splits, shuffle=True)\n", + " scores = []\n", + " for train, test in kf.split(rows):\n", + " \n", + " # Assign test entries as undefined\n", + " X_train = X.copy()\n", + " X_test = X[test,:]\n", + " \n", + " user_indx = dict()\n", + " \n", + " # Prepare training set\n", + " for i in test: \n", + " # Indices with non-nan\n", + " pos_indx = I[~np.isnan(X[i,:])]\n", + " neg_indx = I[np.isnan(X[i,:])]\n", + " \n", + " # Shuffle indices\n", + " np.random.shuffle(pos_indx)\n", + " np.random.shuffle(neg_indx)\n", + " \n", + " pos_test, _ = np.array_split(pos_indx, 2)\n", + " neg_test, _ = np.array_split(neg_indx, 2)\n", + " \n", + " test_indx = np.append(pos_test, neg_test)\n", + " \n", + " # \"Hide\" entries for this person\n", + " X_train[i, pos_test] = np.nan\n", + " \n", + " # Remember what indices to\n", + " # use during testing\n", + " user_indx[i] = test_indx\n", + " \n", + " # Train\n", + " model.fit(X_train)\n", + " \n", + " R_hat = model.R_hat\n", + " \n", + " for i in test:\n", + " test_indx = user_indx[i]\n", + " \n", + " # Need to rank these\n", + " # according to our algorithm\n", + " values = X[i, test_indx]\n", + " \n", + " # Replace missing entries with 0\n", + " I_ = np.array(range(len(values)))\n", + " neg_indx = I_[np.isnan(values)]\n", + " values[neg_indx] = 0.\n", + " \n", + " # These are the predicted values\n", + " pred = R_hat[i, test_indx]\n", + " \n", + " # Get sorted index position\n", + " sort_indx = np.argsort(pred)[::-1]\n", + " \n", + " values = values[sort_indx]\n", + " \n", + " # Now, order pred by holdout values\n", + " ndcg = ndcg_at_k(values, k=20)\n", + " \n", + " scores.append(ndcg)\n", + " \n", + " return scores" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "375cf9cb", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "31ede398", + "metadata": {}, + "outputs": [], + "source": [ + "def ndcg_at_k(x, k):\n", + " \n", + " if k == 0:\n", + " return .0\n", + " elif k < 0:\n", + " raise ValueError('k cannot be negative')\n", + " \n", + " # 1, 2, ..., k\n", + " i = np.arange(1, k + 1)\n", + " \n", + " # Discount factor\n", + " d = 1. / np.log2(i + 1)\n", + " \n", + " # Sorted for best possible scores\n", + " x_best = np.sort(x)[::-1]\n", + " \n", + " # Compute normalization constant\n", + " N = np.sum(d * x_best[:k])\n", + " n = np.sum(d * x[:k])\n", + " \n", + " return n / N" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f001a374", + "metadata": {}, + "outputs": [], + "source": [ + "als_scores = cross_val_ndcg(CustomALS(), R_df.values, n_splits=2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3a6c0954", + "metadata": {}, + "outputs": [], + "source": [ + "als_scores" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "79929922", + "metadata": {}, + "outputs": [], + "source": [ + "spark_als_scores = cross_val_ndcg(SparkALS(), R_df.values, n_splits=2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "03ea9a20", + "metadata": {}, + "outputs": [], + "source": [ + "for v in scores:\n", + " mu = round(np.mean(v['scores']),2)\n", + " std = round(np.std(v['scores']),2)\n", + " \n", + " plt.hist(v['scores'], color=v['color'],\n", + " bins=50, alpha=0.4,\n", + " label='{}, $\\mu$={}, $\\sigma$={}'.format(v['name'], mu, std))\n", + "\n", + "plt.ylabel('Count')\n", + "plt.xlabel('NDCG@20')\n", + "plt.title('2 Fold Cross Validation')\n", + "plt.legend(loc='upper right')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c3755df7", + "metadata": {}, + "outputs": [], + "source": [ + "model = CustomALS()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "04a0f660", + "metadata": {}, + "outputs": [], + "source": [ + "model.fit(R_df.values)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d2652684", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.7" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}