A Distributed Item based Recommendation Based on Mahout 25 Mar 2014
算法分析
假设存在一个网购平台,平台会记录每一个用户对商品的访问记录,假设有以下的访问记录(1-9:商品,A-F:用户)
A 1 2 3 8 9
B 2 4 7 8
C 1 3 4 5 6
D 2 4 6 8
E 1 3 5 7 9
F 1 2 3 4 5 6 7 8 9
则可以得出一个矩阵:
Items | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |
1 | 4 | 2 | 4 | 2 | 3 | 2 | 2 | 2 | 3 |
2 | 2 | 4 | 2 | 3 | 1 | 2 | 2 | 4 | 2 |
3 | 4 | 2 | 4 | 2 | 3 | 2 | 2 | 2 | 3 |
4 | 2 | 3 | 2 | 4 | 2 | 3 | 2 | 3 | 1 |
5 | 3 | 1 | 3 | 2 | 3 | 2 | 2 | 1 | 2 |
6 | 2 | 2 | 2 | 3 | 2 | 3 | 1 | 2 | 1 |
7 | 2 | 2 | 2 | 2 | 2 | 1 | 3 | 2 | 2 |
8 | 2 | 4 | 2 | 3 | 1 | 2 | 2 | 4 | 2 |
9 | 3 | 2 | 3 | 1 | 2 | 1 | 2 | 2 | 3 |
对于一个用户对n个商品的浏览情况(可以将其假设为对商品的关注程度,或者是购买意向),我们将它看做是一个n维向量, 由于我们没有用户对每件商品的喜爱程度的数据,所以所有向量中的值都为0(未浏览)或1(浏览过)。比如A用户浏览记录对应的向量为[1, 1, 1, 0, 0, 0, 0, 1, 1].
针对具体的一个用户进行推荐
- 基于用户相似度:计算两个用户的相似度的问题转化为计算两个向量间的距离,距离越近,则两个用户越相似。
该图片摘自余弦相似性-维基百科
产生的相似度介于-1和1之间,-1代表截然相反,0代表相互独立,1代表相同
- 基于商品相似度:将商品相似度的矩阵与用户浏览记录的n维向量相乘,计算结果如下:
从中选出值最高的entity(不包含用户已经查看的)。 具体计算结果为[15,14,15,11,10,9,10,14,13],由于商品1,2,3,8,9,用户A都访问过(灰色),其值可以忽略,则剩下的最大值为11,即商品4推介度最高。
为什么乘积最高的项推介度最高呢?
2 * 1 + 3 * 1 + 2 * 1 + 4 * 0 + 2 * 0 + 3 * 0 + 2 * 0 + 3 * 1 + 1 * 1 = 11
第4行的值代表商品4与其他商品的共生数(与其他商品同时被多少个用户浏览),这样,如果商品4与用户A浏览过的大部分商品共生的话,那么商品4可能会是用户A比较感兴趣的。上面的算式中,商品4与其他商品的共生数和用户A对这些商品的浏览记录相重叠(如上式中第一个乘积2 * 1中,2为商品4与商品1的共生数,1为用户A浏览过商品1),当和值越大,说明包含的与商品4共生的商品也就越多。这样也就证明了矩阵乘积结果中的最大项就是最好的推介。
MapReduce算法简介
- 输入需要转换成许多key-value(K1,V1)的形式
- 实现一个map方法来处理每一个(K1,V1)键值对,并输出一个不同类型的键值对(K2,V2)
- 每一个K2下的所有V2合并(hadoop自行处理)
- 实现一个reduce方法处理每一个K2以及和它关联的所有V2,并输出一个不同类型的键值对(K3,V3),输出到HDFS。
针对上面假设的网购平台实现分布式算法, 假设输入文件是以每行一个用户来表示它的浏览记录,它的形式为:UserId:ItemID1 ItemID2 ItemID3 …,该文件放置于HDFS实例中.
生成用户向量
- 输入数据转换成(Long, String)键值对,Long键值表示文件的行数,String值是每一行的文件数据,例如 123 / C:1 3 4 5 6
- 实现一个map方法,将上述输入转换成针对每一个商品,生成一个(UserId,ItemId).例如 C / 1, C / 3等
- hadoop帮助我们将每一个用户及其对应的所有商品合并起来形成C / [1,3,4,5,6]
- 实现一个reduce方法,根据该用户下的所有商品构造一个Vector(来自于mahout),输出用户ID和用户浏览记录的Vector。例如C / {1:1, 3:1, 4:1, 5:1, 6:1}。
A mapper that parses Wikipedia link files
public class WikipediaToItemPrefsMapper
extends Mapper<LongWritable,Text,VarLongWritable,VarLongWritable> {
private static final Pattern NUMBERS = Pattern.compile("(\\d+)");
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
Matcher m = NUMBERS.matcher(line);
m.find();
VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group()));
VarLongWritable itemID = new VarLongWritable();
while (m.find()) {
itemID.set(Long.parseLong(m.group()));
context.write(userID, itemID);
}
}
}
Reducer which produces Vectors from a user’s item preferences
public class WikipediaToUserVectorReducer extends
Reducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable> {
public void reduce(VarLongWritable userID, Iterable<VarLongWritable> itemPrefs, Context context) throws IOException, InterruptedException {
Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
for (VarLongWritable itemPref : itemPrefs) {
userVector.set((int)itemPref.get(), 1.0f);
}
context.write(userID, new VectorWritable(userVector));
}
}
计算共生
该过程的输入直接使用上一步的数据结果。
- 输入数据为用户ID和其对应的浏览记录向量的键值对,例如C / {1:1, 3:1, 4:1, 5:1, 6:1}。
- 实现一个map方法,将该用户下所有商品之间的ID相互映射,如1/3,1/4,1/5,1/6,3/4,3/5,3/6等。
- hadoop帮助我们将map方法输出的所有结果按key(第一个itemID)分类并合并,形成类似于1 / [2, 3, 8, 9, 3, 4, 5, 6, 3, 5, 7, 9, 2, 3, 4, 5, 6, 7, 8, 9]
- 实现一个reduce方法,统计与该商品下同时出现的所有商品的次数,其输出结果为商品ID与该商品和其他商品的共生次数形成的向量的键值对,例如1 / {2:2, 3:3, 4:2, 5:3, 6:2, 7:2, 8:2, 9:3}
Mapper component of co-occurrence computation
public class UserVectorToCooccurrenceMapper extends
Mapper<VarLongWritable, VectorWritable, IntWritable, IntWritable> {
public void map(VarLongWritable userID, VectorWritable userVector,
Context context) throws IOException, InterruptedException {
Iterator<Vector.Element> it = userVector.get().iterateNonZero();
while (it.hasNext()) {
int index1 = it.next().index();
Iterator<Vector.Element> it2 = userVector.get().iterateNonZero();
while (it2.hasNext()) {
int index2 = it2.next().index();
context.write(new IntWritable(index1), new IntWritable(index2));
}
}
}
}
Reducer component of co-occurrence computation
public class UserVectorToCooccurrenceReducer extends
Reducer<IntWritable, IntWritable, IntWritable, VectorWritable> {
public void reduce(IntWritable itemIndex1,
Iterable<IntWritable> itemIndex2s, Context context)
throws IOException, InterruptedException {
Vector cooccurrenceRow = new RandomAccessSparseVector(
Integer.MAX_VALUE, 100);
for (IntWritable intWritable : itemIndex2s) {
int itemIndex2 = intWritable.get();
cooccurrenceRow.set(itemIndex2,
cooccurrenceRow.get(itemIndex2) + 1.0);
}
context.write(itemIndex1, new VectorWritable(cooccurrenceRow));
}
}
向量相乘
在算法分析过程中,为了针对某一用户进行推荐,我们需要将共生矩阵与用户的浏览记录向量相乘,从而计算出能够表达该用户对所有商品可能感兴趣的向量,该计算过程如下:
for each row i in the co-occurrence matrix
|
循环遍历共生矩阵中每一行
|
上述过程需要涉及到两个向量的相乘,首先将它转换为Map-Reduce的形式比较有难度,另外就是效率问题。让我们再看一种正确的向量相乘的方式:
assign R to be the zero vector
for each column i in the co-occurrence matrix
|
将R设置为一个零向量
循环遍历共生矩阵的每一列
|
针对该过程,首先可以跳过向量相乘的运算,另外,当用户浏览向量的第i个元素为0时,矩阵中的第i列可忽略,这样在所有用户浏览记录较少的情况下,可以极大的提高效率。
针对第二种算法的理解,由于我们最终针对每一个用户的推荐结果是一个向量,所以我们可以将上述矩阵乘法转换成为
虽然在实际情况下,它们相乘的结果并不相同,但由于我们只关心它们相乘后得到的数列,所以此时可以如此理解
然后第二种算法其实就是矩阵相乘中的系数-向量方法。参见矩阵乘法
系数-向量方法可以理解为
循环遍历左边矩阵到第i行,
将R设置为一个零向量
循环遍历第i行中的第j个元素
将第j个元素与右边矩阵中第j行的向量相乘,形成一个新的向量(这也就是为什么上面用的是行)
上一步计算结果与R相加
将R作为计算结果的第i行
系数-向量法计算矩阵乘积
要实现上述过程,我们需要每一件商品与其共生商品的共生值组成的向量,以及对于一件商品的浏览记录(0或者1),这需要将两种不同类型的数据合并在一个计算过程中。这样的话我们建立了一个类VectorOrPrefWritable用来同时保存这两种数据,以使得这两种数据可以被一个计算过程(reduce)使用,该类结构如下
class VectorOrPrefWritable {
Vector vector; // 用来表示商品Z与其共生的商品的共生值形成的向量
long userId; // 用户ID
float value; // 用户a是否浏览过商品Z,1.0:浏览过,0.0,未浏览
}
该过程实际上包含两个map过程,而没有实际的reduce过程
- mapper1:对共生矩阵进行包装,直接将计算共生过程输出的结果使用
VectorOrPrefWritable
进行包装,同样是商品的ID以及该商品与其共生商品的共生值组成的向量的键值对 - mapper2:分割用户浏览向量,输入数据为用户ID和其对应的用户浏览向量的键值对,例如C / {1:1, 3:1, 4:1, 5:1, 6:1}
每一个非0的向量元素放入VectorOrPrefWritable
的userId
和value
属性,输出数据为以商品ID和浏览过该商品的所有用户ID与value值,如1 / [A:1.0], 1 / [C:1.0], 1 / [E:1.0], 1 / [F:1.0]
hadoop将上述两个mapper的结果通过商品ID进行合并。
Wrapping co-occurrence columns
public class CooccurrenceColumnWrapperMapper extends
Mapper<IntWritable, VectorWritable, IntWritable, VectorOrPrefWritable> {
public void map(IntWritable key, VectorWritable value, Context context)
throws IOException, InterruptedException {
context.write(key, new VectorOrPrefWritable(value.get()));
}
}
Splitting user vectors
public class UserVectorSplitterMapper
extends
Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable> {
public void map(VarLongWritable key, VectorWritable value, Context context)
throws IOException, InterruptedException {
long userID = key.get();
Vector userVector = value.get();
Iterator<Vector.Element> it = userVector.iterateNonZero();
IntWritable itemIndexWritable = new IntWritable();
while (it.hasNext()) {
Vector.Element e = it.next();
int itemIndex = e.index();
float preferenceValue = (float) e.get();
itemIndexWritable.set(itemIndex);
context.write(itemIndexWritable,
new VectorOrPrefWritable(userID, preferenceValue));
}
}
}
这两个mapper(实质上是两个独立的job)执行完之后,输出的数据作为另外一个MapReduce过程的输入,该过程map不做任何操作,reduce方法将上述的结果合并在一起,以商品ID和VectorAndPrefsWritable
键值对的形式输出,这个过程mahout已经为我们实现,详见下面代码
ToVectorAndPrefReducer.java
public final class ToVectorAndPrefReducer extends
Reducer<VarIntWritable,VectorOrPrefWritable,VarIntWritable,VectorAndPrefsWritable> {
@Override
protected void reduce(VarIntWritable key,
Iterable<VectorOrPrefWritable> values,
Context context) throws IOException, InterruptedException {
List<Long> userIDs = Lists.newArrayList();
List<Float> prefValues = Lists.newArrayList();
Vector similarityMatrixColumn = null;
for (VectorOrPrefWritable value : values) {
if (value.getVector() == null) {
// Then this is a user-pref value
userIDs.add(value.getUserID());
prefValues.add(value.getValue());
} else {
// Then this is the column vector
if (similarityMatrixColumn != null) {
throw new IllegalStateException("Found two similarity-matrix columns for item index " + key.get());
}
similarityMatrixColumn = value.getVector();
}
}
if (similarityMatrixColumn == null) {
return;
}
VectorAndPrefsWritable vectorAndPrefs = new VectorAndPrefsWritable(similarityMatrixColumn, userIDs, prefValues);
context.write(key, vectorAndPrefs);
}
}
VectorAndPrefsWritable结构
class VectorAndPrefsWritable {
Vector vector; // 用来表示商品Z与其共生的商品的共生值形成的向量
List<Long> userIds; // 浏览过商品Z的所有用户ID列表
List<float> values; // 对应与用户的value值,在该例中一般为1
}
现在我们已经拥有了我们需要的数据,接下来我们将计算用户对某一件商品浏览值与该商品与其共生商品的共生值形成的向量的乘积
输入数据为ToVectorAndPrefReducer
产生的结果,商品ID和VectorAndPrefsWritable
的键值对
实现一个map方法,计算针对商品1和其共生商品的共生值形成的向量与浏览过商品1的所有用户对商品1浏览值(一般为1)的乘积,并以用户ID与该用户浏览值乘以商品1共生向量产生的结果的键值对输出,例如A / {2:2, 3:3, 4:2, 5:3, 6:2, 7:2, 8:2, 9:3}
hadoop将map输出结果通过userId合并。
实现一个reduce方法,将该用户ID下的所有向量相加,其结果就是对该用户的推荐向量(用户ID和推荐向量的键值对),例如A / {1:15, 2:14, 3:15, 4:11, 5:10, 6:9, 7:10, 8:14, 9:13}
Computing partial recommendation vectors
public class PartialMultiplyMapper extends
Mapper<IntWritable, VectorAndPrefsWritable, VarLongWritable, VectorWritable> {
public void map(IntWritable key,
VectorAndPrefsWritable vectorAndPrefsWritable, Context context)
throws IOException, InterruptedException {
Vector cooccurrenceColumn = vectorAndPrefsWritable.getVector();
List<Long> userIDs = vectorAndPrefsWritable.getUserIDs();
List<Float> prefValues = vectorAndPrefsWritable.getValues();
for (int i = 0; i < userIDs.size(); i++) {
long userID = userIDs.get(i);
float prefValue = prefValues.get(i);
Vector partialProduct = cooccurrenceColumn.times(prefValue);
context.write(new VarLongWritable(userID),
new VectorWritable(partialProduct));
}
}
}
该map方法输出了大量的数据,对于每一个user-item关联,都要输出一个user-共生向量,然后将他们按用户ID分组,聚合,并将共生向量相加,我们可以添加一个combiner来对这个过程优化,combiner更像一个缩小版的reducer,其实combiner就是继承了Reducer,combiner在map输出仍然在内存时执行,这样在reduce过程之前,先将一部分map输出聚合,帮助我们节省了一部分I/O资源,但是,combiner并不能取代reducer,他只能将map的部分结果聚合,在reduce过程中,还需要对其结果再次进行聚合。
Combiner for partial products
public class AggregateCombiner extends
Reducer<VarLongWritable, VectorWritable, VarLongWritable, VectorWritable> {
public void reduce(VarLongWritable key, Iterable<VectorWritable> values,
Context context) throws IOException, InterruptedException {
Vector partial = null;
for (VectorWritable vectorWritable : values) {
partial = partial == null ? vectorWritable.get() : partial
.plus(vectorWritable.get());
}
context.write(key, new VectorWritable(partial));
}
}
最后,对每一个用户,分别为他们计算推荐向量
Producing recommendations from vectors
public class AggregateAndRecommendReducer
extends
Reducer<VarLongWritable, VectorWritable, VarLongWritable, RecommendedItemsWritable> {
private int recommendationsPerUser = 10;
private OpenIntLongHashMap indexItemIDMap;
static final String ITEMID_INDEX_PATH = "itemIDIndexPath";
static final String NUM_RECOMMENDATIONS = "numRecommendations";
static final int DEFAULT_NUM_RECOMMENDATIONS = 10;
protected void setup(Context context) throws IOException {
Configuration jobConf = context.getConfiguration();
recommendationsPerUser = jobConf.getInt(NUM_RECOMMENDATIONS,
DEFAULT_NUM_RECOMMENDATIONS);
indexItemIDMap = TasteHadoopUtils.readItemIDIndexMap(
jobConf.get(ITEMID_INDEX_PATH), jobConf);
}
public void reduce(VarLongWritable key, Iterable<VectorWritable> values,
Context context) throws IOException, InterruptedException {
Vector recommendationVector = null;
for (VectorWritable vectorWritable : values) {
recommendationVector = recommendationVector == null ? vectorWritable
.get() : recommendationVector.plus(vectorWritable.get());
}
Queue<RecommendedItem> topItems = new PriorityQueue<RecommendedItem>(
recommendationsPerUser + 1,
Collections.reverseOrder(ByValueRecommendedItemComparator
.getInstance()));
Iterator<Vector.Element> recommendationVectorIterator = recommendationVector
.iterateNonZero();
while (recommendationVectorIterator.hasNext()) {
Vector.Element element = recommendationVectorIterator.next();
int index = element.index();
float value = (float) element.get();
if (topItems.size() < recommendationsPerUser) {
topItems.add(new GenericRecommendedItem(indexItemIDMap
.get(index), value));
} else if (value > topItems.peek().getValue()) {
topItems.add(new GenericRecommendedItem(indexItemIDMap
.get(index), value));
topItems.poll();
}
}
List<RecommendedItem> recommendations = new ArrayList<RecommendedItem>(
topItems.size());
recommendations.addAll(topItems);
Collections.sort(recommendations,
ByValueRecommendedItemComparator.getInstance());
context.write(key, new RecommendedItemsWritable(recommendations));
}
}
推荐流程
源代码
以上代码实现可见商品相似度简单实现
用户使用A,B,C等字母代替, 商品以1, 2, 3代替
HouseVisitingMapper
: 用户访问日志转换为系统可理解的格式 输入&&&&&&&, 输出A/1; A/2; A/3; B/2; B/3-
HouseVisitingReducer
: 以用户cookieID或者用户名为key, 浏览记录为值, 输出A/[1, 2, 3]; B/[2,3] VisitHistoryToItemPrefsMapper
: 用户访问记录转变为用户编码为key, 商品为值的记录对, 输出A/[1, 2, 3]; B/[2,3]-
VisitHistoryToUserVectorReducer
: VisitHistoryToItemPrefsMapper的输出作为输入, 输出以用户编码为key, 商品浏览向量为值的记录对, A/{1:1, 2:1, 3:1}; B/{2:1, 3:1} UserVectorToCooccurrenceMapper
: 用户浏览向量转换为共生向量, 输出以商品ID为key, 与key被同时浏览的商品ID为value的值, 1/1; 1/2; 1/3; 2/1; 2/2; 2/3; 3/1; 3/2; 3/3; 2/2; 2/3; 3/2; 3/3; 当然为了提高推荐准确度, 可以将对自身的共生去除UserVectorToCooccurrenceReducer
: 用户浏览向量转换为共生向量, 输出以商品ID为key, 与该商品共生的商品ID以及共生量; 输入为1/[1,2,3]; 2/[1,2,3,2,3]; 3/[1,2,3,2,3], 输出为1/{1:1, 2:1, 3:1}; 2/{1:1, 2:2, 3:2}; 3/{1:1, 2:2, 3:2}
共生矩阵以及用户浏览向量数据合并
CooccurrenceColumnWrapperMapper
: 共生矩阵合并, 输出为1/{1:1, 2:1, 3:1}; 2/{1:1, 2:2, 3:2}; 3/{1:1, 2:2, 3:2}-
UserVectorSplitterMapper
: 用户浏览向量合并, 输入为A/{1:1, 2:1, 3:1}; B/{2:1, 3:1}; 输出为1/{A:1}; 2/{A:1}; 3/{A:1}; 2/{B:1}; 3/{B:1} ToVectorAndPrefReducer
: 将上面两个Mapper输出的数据合并, 输入为1/[{1:1, 2:1, 3:1}, {A:1}]; 2/[{1:1, 2:2, 3:2}, {A:1}, {B:1}]; 3/[{1:1, 2:2, 3:2}, {A:1}, {B:1}], 输出为1/{1:1, 2:1, 3:1}+[{A:1}]; 2/{1:1, 2:2, 3:2}+[{A:1}, {B:1}]; 3/{1:1, 2:2, 3:2}+[{A:1}, {B:1}]
共生矩阵与商品的用户浏览向量相乘
PartialMultiplyMapper
: 商品1的共生向量与商品1的所有用户的pref值相乘 输入为1/{1:1, 2:1, 3:1}+[{A:1}]; 2/{1:1, 2:2, 3:2}+[{A:1}, {B:1}]; 3/{1:1, 2:2, 3:2}+[{A:1}, {B:1}], 输出为A/{1:1, 2:1, 3:1}; A/{1:1, 2:2, 3:2}; B/{1:1, 2:2, 3:2}; A/{1:1, 2:2, 3:2}; B/{1:1, 2:2, 3:2}ItemRecommender
: 按用户聚合后, 叠加向量值: 输入为A/[{1:1, 2:1, 3:1}, {1:1, 2:2, 3:2}, {1:1, 2:2, 3:2}]; B/[{1:1, 2:2, 3:2}, {1:1, 2:2, 3:2}]; 输出为A/[{1:3, 2:5, 3:5}], B/[{1:2, 2:4, 3:4}], 如果给B推荐, 抛开2,3已经浏览过的, 只剩1了….
举例的数据仅供参考