import numpy as np import pandas as pd from scipy import sparse from time import time from numpy import matrix from numpy.random import rand import random class MatrixFactorizationRec_v1(object): # On fat un tirage aléatoire des échntillons présentés # On introduit l'inverse du hessien # On fait décroitre le learning rate def __init__(self, n_features=8, learn_rate=0.005, regularization_param=0.02, optimizer_pct_improvement_criterion=2, max_iter=3000): self.n_features = n_features self.learn_rate = learn_rate self.regularization_param = regularization_param self.optimizer_pct_improvement_criterion = optimizer_pct_improvement_criterion self.max_iter = max_iter def fit(self, ratings_mat, ratings_validation, P_init=0,Q_init=0): # A faire : ajouter un ensemble de validation pour stopper l'algorithme self.ratings_mat = ratings_mat self.average_rating = ratings_mat.mean() self.n_users = ratings_mat.shape[0] self.n_items = ratings_mat.shape[1] self.n_rated = ratings_mat.nonzero()[0].size self.ratings_validation = ratings_validation if np.max(np.abs(P_init))==0: self.user_mat = matrix(rand(self.n_users*self.n_features).reshape([self.n_users, self.n_features])) else: self.user_mat = P_init if np.max(np.abs(P_init))==0: self.movie_mat = matrix(rand(self.n_items*self.n_features).reshape([self.n_features, self.n_items])) else: self.movie_mat = Q_init optimizer_iteration_count = 0 sse_accum = 0 Rpred = np.dot(self.user_mat,self.movie_mat) old_mse = mse_sparse_with_dense(self.ratings_mat, Rpred) old_mse_val = mse_sparse_with_dense(self.ratings_validation, Rpred) print("Optimization Statistics") print("Iterations | MSE train | MSE validation | Percent Improvement") while ((optimizer_iteration_count < self.max_iter) or (pct_improvement > self.optimizer_pct_improvement_criterion)): gamma = self.learn_rate/(1+0.7*self.learn_rate*optimizer_iteration_count) #/(self.movie_mat[k, j]**2+self.regularization_param) for i in random.sample(range(self.n_users), 10):# Randomly shuffle the training examples ww = self.ratings_mat[i,:].nonzero() if len(ww[0])>0: for j in np.random.choice(ww[0], size=10, replace=True):#int(max(round(len(ww[0])/5))) # if self.ratings_mat[i, j] > 0: error = self.ratings_mat[i, j] - np.dot(self.user_mat[i, :], self.movie_mat[:, j]) sse_accum += error**2 for k in range(self.n_features): self.user_mat[i, k] = self.user_mat[i, k] + gamma * \ (2 * error * self.movie_mat[k, j] - self.regularization_param * self.user_mat[i, k]) self.movie_mat[k, j] = self.movie_mat[k, j] + gamma * \ (2 * error * self.user_mat[ i, k] - self.regularization_param * self.movie_mat[k, j]) if optimizer_iteration_count % 200 == 0: Rpred = np.dot(self.user_mat,self.movie_mat) mse = mse_sparse_with_dense(self.ratings_mat, Rpred) mse_val = mse_sparse_with_dense(self.ratings_validation, Rpred) pct_improvement = 100 * (old_mse_val-mse_val) / old_mse_val print("%d \t\t %f \t\t %f \t\t %f" % ( optimizer_iteration_count, mse, mse_val, pct_improvement)) old_mse = mse old_mse_val = mse_val optimizer_iteration_count += 1 self.rmse = np.sqrt(mse) self.rmse_val = np.sqrt(mse_val) print("Fitting of latent feature matrices completed") return mse def pred_one_user(self, user_id, report_run_time=False): start_time = time() out = self.user_mat[user_id] * self.movie_mat if report_run_time: print("Execution time: %f seconds" % (time()-start_time)) return out def pred_all_users(self, report_run_time=False): start_time = time() out = self.user_mat * self.movie_mat if report_run_time: print("Execution time: %f seconds" % (time()-start_time)) return out def top_n_recs(self, user_id, n): pred_ratings = self.pred_one_user(user_id) item_index_sorted_by_pred_rating = list(np.argsort(pred_ratings)) items_rated_by_this_user = self.ratings_mat[user_id].nonzero()[1] unrated_items_by_pred_rating = [item for item in item_index_sorted_by_pred_rating if item not in items_rated_by_this_user] return unrated_items_by_pred_rating[-n:] def rmse(I,R,Q,P): return np.sqrt(np.sum(np.power(np.multiply(I, (ratings_mat - np.dot(user_mat,movie_mat))),2))/len(ratings_mat[ratings_mat > 0]))