原创文章,转载请注明: 转载自慢慢的回味
一 理论分析
Large-scale Parallel Collaborative Filtering for the Netflix Prize
设表示为user和movie的矩阵。可以定义一个损失函数其中,r为实际的rating值,<u,m>为待求出的user,movie矩阵计算出的值,显然当损失函数值最小时,user-movie矩阵即为所求。
具体到每个user, movie,损失函数可以写为即
但是,实际rating的样本远远小于U,M所含有的参数。这里采用Tikhonov regularization来近似求解。
对函数关于Uki求导得:
同理可求得:
如此反复迭代,可以使求得的U,M矩阵近似满足R。
二 代码分析
1 从ratings.csv抽出90%作为训练集,剩下的作为测试集
$MAHOUT splitDataset --input ${WORK_DIR}/movielens/ratings.csv --output ${WORK_DIR}/dataset \ --trainingPercentage 0.9 --probePercentage 0.1 --tempDir ${WORK_DIR}/dataset/tmp |
2 运行分布式ALS-WR算法处理训练集,得到User特征矩阵和Item特征矩阵
$MAHOUT parallelALS --input ${WORK_DIR}/dataset/trainingSet/ --output ${WORK_DIR}/als/out \ --tempDir ${WORK_DIR}/als/tmp --numFeatures 20 --numIterations 10 --lambda 0.065 --numThreadsPerSolver 1 |
需要从表达式 A = U M’进行矩阵的QR分解,其中A (users x items)就算训练集中user对item的rating矩阵。
U (users x features) 就是我们需要求的User特征矩阵,M (items x features)是需要求解的Item特征矩阵。
Job itemRatings = prepareJob(getInputPath(), pathToItemRatings(), TextInputFormat.class, ItemRatingVectorsMapper.class, IntWritable.class, VectorWritable.class, VectorSumReducer.class, IntWritable.class, VectorWritable.class, SequenceFileOutputFormat.class); itemRatings.setCombinerClass(VectorSumCombiner.class); itemRatings.getConfiguration().set(USES_LONG_IDS, String.valueOf(usesLongIDs)); |
上面的Job求出的就是A‘矩阵,输出为{item1,{user1,rating1,…},…}
Job averageItemRatings = prepareJob(pathToItemRatings(), getTempPath("averageRatings"), AverageRatingMapper.class, IntWritable.class, VectorWritable.class, MergeVectorsReducer.class, IntWritable.class, VectorWritable.class); averageItemRatings.setCombinerClass(MergeVectorsCombiner.class); |
计算每个item的rating平均值,输出为{0,[item1,rating1,…]}
initializeM(averageRatings); |
得到一个初始的M–1矩阵,第一行为每个item的平均rating,剩余行为比较小的随机值。
/* 令currentIteration = 0*/ for (int currentIteration = 0; currentIteration < numIterations; currentIteration++) { /* broadcast M, read A row-wise, recompute U row-wise */ log.info("Recomputing U (iteration {}/{})", currentIteration, numIterations); /* 通过M--1计算出U-0 * 输出为[user1, {feature0:0.8817690514198447,feature1:-0.21707282987696083,...,feature19:0.23423786158394766}, ...] */ runSolver(pathToUserRatings(), pathToU(currentIteration), pathToM(currentIteration - 1), currentIteration, "U", numItems); /* broadcast U, read A' row-wise, recompute M row-wise */ log.info("Recomputing M (iteration {}/{})", currentIteration, numIterations); /* 通过U-0计算出M-0*/ runSolver(pathToItemRatings(), pathToM(currentIteration), pathToU(currentIteration), currentIteration, "M", numUsers); } /* 例如:通过M--1计算出U-0,以下代码基于此分析,其他情况请类推*/ private void runSolver(Path ratings, Path output, Path pathToUorM, int currentIteration, String matrixName, int numEntities) throws ClassNotFoundException, IOException, InterruptedException { // necessary for local execution in the same JVM only SharingMapper.reset(); int iterationNumber = currentIteration + 1; Class<? extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable>> solverMapperClassInternal; String name; /* implicitFeedback = false */ if (implicitFeedback) { solverMapperClassInternal = SolveImplicitFeedbackMapper.class; name = "Recompute " + matrixName + ", iteration (" + (iterationNumber + 1) + '/' + numIterations + "), " + '(' + numThreadsPerSolver + " threads, " + numFeatures + " features, implicit feedback)"; } else { solverMapperClassInternal = SolveExplicitFeedbackMapper.class; name = "Recompute " + matrixName + ", iteration (" + (iterationNumber + 1) + '/' + numIterations + "), " + '(' + numThreadsPerSolver + " threads, " + numFeatures + " features, explicit feedback)"; } /* MultithreadedSharingMapper是一个多线程执行Mapper Class的Job */ Job solverForUorI = prepareJob(ratings, output, SequenceFileInputFormat.class, MultithreadedSharingMapper.class, IntWritable.class, VectorWritable.class, SequenceFileOutputFormat.class, name); Configuration solverConf = solverForUorI.getConfiguration(); solverConf.set(LAMBDA, String.valueOf(lambda)); solverConf.set(ALPHA, String.valueOf(alpha)); solverConf.setInt(NUM_FEATURES, numFeatures); solverConf.set(NUM_ENTITIES, String.valueOf(numEntities)); FileSystem fs = FileSystem.get(pathToUorM.toUri(), solverConf); FileStatus[] parts = fs.listStatus(pathToUorM, PathFilters.partFilter()); for (FileStatus part : parts) { if (log.isDebugEnabled()) { log.debug("Adding {} to distributed cache", part.getPath().toString()); } /* cache file 为mahout-workdir-factorize-movielens/als/tmp/M--1/part-m-00000*/ DistributedCache.addCacheFile(part.getPath().toUri(), solverConf); } MultithreadedMapper.setMapperClass(solverForUorI, solverMapperClassInternal); MultithreadedMapper.setNumberOfThreads(solverForUorI, numThreadsPerSolver); boolean succeeded = solverForUorI.waitForCompletion(true); if (!succeeded) { throw new IllegalStateException("Job failed!"); } } /* 这就是上面SolveExplicitFeedbackMapper对应的map方法 * userOrItemID 这儿是user id * ratingsWritable 为{item1:5.0,item48:5.0,item150:5.0,item260:4.0,item527:5.0,...} */ @Override protected void map(IntWritable userOrItemID, VectorWritable ratingsWritable, Context ctx) throws IOException, InterruptedException { /* 通过上面的cache file,这儿uOrM为M的HashMap * [item1->{feature0:4.158833063209069,feature1:0.8746388048206997,feature2:0.7253548667517087,...,feature19:0.8728730662850301}, * item2->{feature0:3.2028985507246315,feature1:0.289635256964619,feature2:0.46411961454360107,...,feature19:0.6068967014493216}, * ..., item3695->{...}] */ OpenIntObjectHashMap uOrM = getSharedInstance(); /* 计算当前user id对应的ui feature集合(U矩阵的第i行),*/ uiOrmj.set(ALS.solveExplicit(ratingsWritable, uOrM, lambda, numFeatures)); ctx.write(userOrItemID, uiOrmj); } public static Vector solveExplicit(VectorWritable ratingsWritable, OpenIntObjectHashMap uOrM, double lambda, int numFeatures) { Vector ratings = ratingsWritable.get(); List featureVectors = Lists.newArrayListWithCapacity(ratings.getNumNondefaultElements()); for (Vector.Element e : ratings.nonZeroes()) { int index = e.index(); featureVectors.add(uOrM.get(index)); } /* 用户i的ratings为{item1:5.0,item48:5.0,...,item3408:4.0,} * 用户i获取到对应的item featureVectors为 * [item1->{0:4.158833063209069,1:0.8746388048206997,...,19:0.8728730662850301}, * item48->{0:3.0174418604651128,1:0.05897693253591574,...,19:0.9637219102684911}, * ... * item3408->{0:3.8755221386800365,1:0.9981447681344258,...,19:0.11514498636620973}] * lambda为0.065,numFeatures为20 */ return AlternatingLeastSquaresSolver.solve(featureVectors, ratings, lambda, numFeatures); } /* 这就是AlternatingLeastSquaresSolver的solve方法*/ public static Vector solve(Iterable featureVectors, Vector ratingVector, double lambda, int numFeatures) { Preconditions.checkNotNull(featureVectors, "Feature vectors cannot be null"); Preconditions.checkArgument(!Iterables.isEmpty(featureVectors)); Preconditions.checkNotNull(ratingVector, "rating vector cannot be null"); Preconditions.checkArgument(ratingVector.getNumNondefaultElements() > 0, "Rating vector cannot be empty"); Preconditions.checkArgument(Iterables.size(featureVectors) == ratingVector.getNumNondefaultElements()); /* nui = 48 */ int nui = ratingVector.getNumNondefaultElements(); /* MiIi为用户i对他所rating的每个item的feature矩阵,为20x48 * { * feature0 => {item0:4.158833063209069,item1:3.0174418604651128,...,item47:3.8755221386800365} * feature1 => {item0:0.8746388048206997,item1:0.05897693253591574,...,item47:0.9981447681344258} * ... * feature19 => {item0:0.8728730662850301,item1:0.9637219102684911,...,item47:0.11514498636620973} * } */ Matrix MiIi = createMiIi(featureVectors, numFeatures); /* RiIiMaybeTransposed为用户i对他所rating的每个item的实际值矩阵,为48x1 * { * item0 => {feature0:5.0} * item1 => {feature0:5.0} * ... * item47 => {feature0:4.0} * } */ Matrix RiIiMaybeTransposed = createRiIiMaybeTransposed(ratingVector); /* compute Ai = MiIi * t(MiIi) + lambda * nui * E */ /* Ai为featurexfeature矩阵(20x20) * { * 0 => {0:735.9857104327824, 1:102.81718116978466,...,18:95.69797654994501, 19:89.241608292493 } * 1 => {0:102.81718116978466,1:21.631573826528825,...,18:12.748809494518715,19:11.550196616709504} * ... * 18 => {0:95.69797654994501, 1:12.748809494518715,...,18:19.59090222332833, 19:11.978329842950425} * 19 => {0:89.241608292493, 1:11.550196616709504,...,18:11.978329842950425,19:19.040173127545934} * } */ Matrix Ai = miTimesMiTransposePlusLambdaTimesNuiTimesE(MiIi, lambda, nui); /* compute Vi = MiIi * t(R(i,Ii)) */ /* Vi为用户i对他所rating的每个item的(feature矩阵x实际值矩阵)的值,20x1 * { * 0 => {0:778.765601112045} * 1 => {0:106.72208299946884} * ... * 18 => {0:102.131821661106} * 19 => {0:97.00844504677671} * } */ Matrix Vi = MiIi.times(RiIiMaybeTransposed); /* compute Ai * ui = Vi */ /* 做QR分解就可以求出用户i的feature集合,20x1 * {0:0.8817690514198447,1:-0.21707282987696083,...,19:0.23423786158394766} */ return solve(Ai, Vi); } private static Vector solve(Matrix Ai, Matrix Vi) { return new QRDecomposition(Ai).solve(Vi).viewColumn(0); } |
根据理论分析部分的算法求解M和U。
3 运行推荐算法
$MAHOUT recommendfactorized --input ${WORK_DIR}/als/out/userRatings/ --output ${WORK_DIR}/recommendations/ \ --userFeatures ${WORK_DIR}/als/out/U/ --itemFeatures ${WORK_DIR}/als/out/M/ \ --numRecommendations 6 --maxRating 5 --numThreads 2 |
protected void map(IntWritable userIndexWritable, VectorWritable ratingsWritable, Context ctx) throws IOException, InterruptedException { Pair<OpenIntObjectHashMap<Vector>, OpenIntObjectHashMap<Vector>> uAndM = getSharedInstance(); OpenIntObjectHashMap<Vector> U = uAndM.getFirst(); OpenIntObjectHashMap<Vector> M = uAndM.getSecond(); Vector ratings = ratingsWritable.get(); int userIndex = userIndexWritable.get(); final OpenIntHashSet alreadyRatedItems = new OpenIntHashSet(ratings.getNumNondefaultElements()); for (Vector.Element e : ratings.nonZeroes()) { alreadyRatedItems.add(e.index()); } final TopItemsQueue topItemsQueue = new TopItemsQueue(recommendationsPerUser); final Vector userFeatures = U.get(userIndex); M.forEachPair(new IntObjectProcedure<Vector>() { @Override public boolean apply(int itemID, Vector itemFeatures) { if (!alreadyRatedItems.contains(itemID)) { double predictedRating = userFeatures.dot(itemFeatures); MutableRecommendedItem top = topItemsQueue.top(); if (predictedRating > top.getValue()) { top.set(itemID, (float) predictedRating); topItemsQueue.updateTop(); } } return true; } }); List<RecommendedItem> recommendedItems = topItemsQueue.getTopItems(); if (!recommendedItems.isEmpty()) { // cap predictions to maxRating for (RecommendedItem topItem : recommendedItems) { ((MutableRecommendedItem) topItem).capToMaxValue(maxRating); } if (usesLongIDs) { long userID = userIDIndex.get(userIndex); userIDWritable.set(userID); for (RecommendedItem topItem : recommendedItems) { // remap item IDs long itemID = itemIDIndex.get((int) topItem.getItemID()); ((MutableRecommendedItem) topItem).setItemID(itemID); } } else { userIDWritable.set(userIndex); } recommendations.set(recommendedItems); ctx.write(userIDWritable, recommendations); } } |
此即为PredictionMapper的map方法,找到当前user i的feature矩阵和除user i rating过的movie以外的movie j的feature矩阵点乘,这样对每个movie j就有一个predictedRating值。按大小排序就可以得到推荐的movie j列表。本作品采用知识共享署名 4.0 国际许可协议进行许可。