Hadoop MapReduce: 带词频属性的文档倒排索引
问题描述
倒排索引(英语:Inverted index),也常被称为反向索引、置入档案或反向档案,是一种索引方法,被用来存储在全文搜索下某个单词在一个文档或者一组文档中的存储位置的映射。它是文档检索系统中最常用的数据结构。
例如,下面是要被索引的文本:
- T0 = "这 是 一本 书"
- T1 = "这 一本 书 多少 钱"
- T2 = "一本 数学 书 要 二十 元 钱"
则其倒排索引为
- "这": {0, 1}
- "是": {0}
- "一本": {0, 1, 2}
- "书": {0, 1, 2}
- "多少": {1}
- "钱": {1, 2}
- "数学": {2}
- "要": {2}
- "二十": {2}
- "元": {2}
带词频统计的文档倒排索引,主要功能是在实现每个单词的倒排索引基础上,统计出单词在每篇文档中出现的次数,并且按文档顺序排序。此外,还要求计算每个单词的“平均出现次数”(平均出现次数=词语在全部文档中出现的频数总和/包含该词语的文档数)。
输入
多个已分词的文本文件,两个词之间使用空格分隔。
输出
对每个词语,输出一个键值对,键值对格式如:
[词语]\TAB 平均出现次数, 小说1:词频; 小说2:词频; ... 小说N:词频
输出的小说名需要去除文件名后缀。
解决方法
思路
此问题可以仅用一个MapReduce任务完成,除必须的Mapper和Reducer外,还需要重载Combiner和Partitioner。具体各部分的作用如下所述。
Map
1. Mapper
输入:key: 文件当前行偏移位置,value: 文件当前行内容
输出:key: word#filename,value: 1
获取当前处理文件名filename,对value值进行切分得到多个word值,将每个word与filename拼接到一起作为输出key,其计数值为1,即value为1。
2. Combiner
输入:key: word#filename,value: [1, 1, 1, …]
输出:key: word#filename,value: 同一key下的累加和
将Mapper输出的中间结果相同key部分的value累加,减少向Reduce节点传输的数据量。
Reduce
1. Partitioner
输入:key: word#filename,value: 累加和
输出:key: word#filename,value: 累加和
这样可以保证同一word都在同一Reduce节点进行处理。
2. Reducer
输入:key: word#filename,vaule: [累加和1, 累加和2, …]
输出:key: word,value: 平均出现次数,filename:词频;filename:词频;…
利用Reduce节点输入的key值都是有序的,将key拆分,对于同一word,每次都保存其filename和词频,并统计其总出现次数和总出现文档数;当同一word处理完后,计算平均出现次数,将其与filename及其词频作为value输出。
代码
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.util.GenericOptionsParser;
public class InvertedIndex {
/**
* Mapper部分
**/
public static class InvertedIndexMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);//常量1
/**
* 对输入的Text切分为多个word,每个word作为一个key输出
* 输入:key:当前行偏移位置, value:当前行内容
* 输出:key:word#filename, value:1
*/
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String fileName = fileSplit.getPath().getName().toLowerCase();//获取文件名,为简化下一步对文件名处理,转换为小写
int pos = fileName.indexOf(".");
if (pos > 0) {
fileName = fileName.substring(0, pos);//去除文件名后缀
}
Text word = new Text();
StringTokenizer itr = new StringTokenizer(value.toString());
for (; itr.hasMoreTokens(); ) {
word.set(itr.nextToken() + "#" + fileName);
context.write(word, one);//输出 word#filename 1
}
}
}
/**
* Combiner部分
**/
public static class SumCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
/**
* 将Mapper输出的中间结果相同key部分的value累加,减少向Reduce节点传输的数据量
* 输出:key:word#filename, value:累加和
*/
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
/**
* Partitioner部分
**/
public static class NewPartitioner extends HashPartitioner<Text, IntWritable> {
/**
* 为了将同一个word的键值对发送到同一个Reduce节点,对key进行临时处理
* 将原key的(word, filename)临时拆开,使Partitioner只按照word值进行选择Reduce节点
*/
public int getPartition(Text key, IntWritable value, int numReduceTasks) {
String term = key.toString().split("#")[0];//获取word#filename中的word
return super.getPartition(new Text(term), value, numReduceTasks);
}
}
/**
* Reducer部分
**/
public static class InvertedIndexReducer extends Reducer<Text, IntWritable, Text, Text> {
private String term = new String();//临时存储word#filename中的word
private String last = " ";//临时存储上一个word
private int countItem;//统计word出现次数
private int countDoc;//统计word出现文件数
private StringBuilder out = new StringBuilder();//临时存储输出的value部分
private float f;//临时计算平均出现频率
/**
* 利用每个Reducer接收到的键值对中,word使排好序的
* 只需要将相同的word中,将(word,filename)拆分开,将filename与累加和拼到一起,存储到临时StringBuilder中
* 待出现word不同,则将此word作为key,存储有此word出现的全部filename及其出现次数的StringBuilder作为value输出
* 在处理相同word的同时,还会统计其出现的文件数目,总的出现次数,以此计算其平均出现频率
* 输入:key:word#filename, value:[NUM,NUM,...]
* 输出:key:word, value:平均出现频率,filename:NUM;filename:NUM;...
*/
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
term = key.toString().split("#")[0];//获取word
if (!term.equals(last)) {//此次word与上次不一样,则将上次进行处理并输出
if (!last.equals(" ")) {//避免第一次比较时出错
out.setLength(out.length() - 1);//删除value部分最后的;符号
f = (float) countItem / countDoc;//计算平均出现次数
context.write(new Text(last), new Text(String.format("%.2f,%s", f, out.toString())));//value部分拼接后输出
countItem = 0;//以下清除变量,初始化计算下一个word
countDoc = 0;
out = new StringBuilder();
}
last = term;//更新word,为下一次做准备
}
int sum = 0;//累加相同word和filename中出现次数
for (IntWritable val : values) {
sum += val.get();
}
out.append(key.toString().split("#")[1] + ":" + sum + ";");//将filename:NUM; 临时存储
countItem += sum;
countDoc += 1;
}
/**
* 上述reduce()只会在遇到新word时,处理并输出前一个word,故对于最后一个word还需要额外的处理
* 重载cleanup(),处理最后一个word并输出
*/
public void cleanup(Context context) throws IOException, InterruptedException {
out.setLength(out.length() - 1);
f = (float) countItem / countDoc;
context.write(new Text(last), new Text(String.format("%.2f,%s", f, out.toString())));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = new Job(conf, "inverted index");
job.setJarByClass(InvertedIndex.class);
job.setMapperClass(InvertedIndexMapper.class);
job.setCombinerClass(SumCombiner.class);
job.setReducerClass(InvertedIndexReducer.class);
job.setNumReduceTasks(5);//设定使用Reduce节点个数
job.setPartitionerClass(NewPartitioner.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
示例
对分词后的《陆小凤》7本小说
古龙38.陆小凤01陆小凤前传(金鹏王朝).txt.segmented
古龙39.陆小凤02绣花大盗.txt.segmented
古龙40.陆小凤03决战前后.txt.segmented
古龙41.陆小凤04银钩赌坊.txt.segmented
古龙42.陆小凤05幽灵山庄.txt.segmented
古龙43.陆小凤06凤舞九天.txt.segmented
古龙44.陆小凤07剑神一笑.txt.segmented
进行带词频属性的文档倒排索引,可以得到例如如下结果
...
一副 3.00,古龙38:2;古龙39:2;古龙40:3;古龙41:2;古龙42:1;古龙43:4;古龙44:7
...
匆匆 2.60,古龙38:1;古龙39:4;古龙40:3;古龙42:3;古龙43:2
...
宁可 3.40,古龙38:1;古龙40:3;古龙42:5;古龙43:5;古龙44:3
宁愿 2.50,古龙39:2;古龙40:5;古龙41:1;古龙42:2;古龙43:3;古龙44:2
...
小结
上述代码中,Partitioner实现了同一单词下的键值对一定被分到同一个Reduce节点;依靠Reducer得到键值对时的内排序,可以用很巧妙的方法完成平均出现次数的计算。
DEMO
上述解决方法Project放在Github上,参见zhantong/Hadoop-InvertedIndex
参考
- 黄宜华, 苗凯翔. "深入理解大数据:大数据处理与编程实践". 机械工业出版社, 2014.