Definition
目的是在海量语料库/文章中发现固定窗口(如5词以内、一句话内甚至一段内)单词a和单词b共同出现的频率,并以此构建单词共现矩阵。(矩阵可对称也可不对称(强调顺序),取决于具体应用).
单词共现算法实现
利用MapReduce实现单词共现算法的伪代码如下:
- Mapper:对窗口中的单词对进行遍历并输出,当窗口到达文档尾部时通过头部向后缩进来实现滑动,直到窗口大小为2时停止.
1 2 3 4 5 6 7
| class Mapper method Map(dociddid, doc d) for all word w in d for all word u in Window(w) //发射出现计数 1 Emit(pair (w, u), 1)
|
1 2 3 4 5 6
| class Reducer method Reduce(pair p; countlist [c1, c2,..]) s = 0 for all count c in countlist [c1, c2, ...] sum += c Emit(pair p, count sum)
|
Java MapReduce实现
自定义Key
-
通过继承WritableComparable
实现Key:WordPair
类
-
为保证所有相同的单词对都能传入相同的Reducer进行处理,需要重写hashCode()
方法使相同的单词对(不考虑顺序)在规约时处于同一Reducer.
-
通过重写compareTo()
和equals()
方法使得相同的键的值可以进行大小的比较和排序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class WordPair implements WritableComparable<WordPair>{ private String wordA; private String wordB; public WordPair(){ } public WordPair(String wordA,String wordB){ this.wordA = wordA; this.wordB = wordB; } public String getWordA(){ return this.wordA; } public String getWordB(){ return this.wordB; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(wordA); out.writeUTF(wordB); }
@Override public void readFields(DataInput in) throws IOException { wordA = in.readUTF(); wordB = in.readUTF(); } @Override public String toString(){ return wordA + "," + wordB; } @Override public int compareTo(WordPair o) { if(this.equals(o)) return 0; else return (wordA + wordB).compareTo(o.getWordA() + o.getWordB()); } @Override public boolean equals(Object o){ if(!(o instanceof WordPair)) return false; WordPair w = (WordPair) o; if((this.wordA.equals(w.wordA) && this.wordB.equals(w.wordB)) || (this.wordB.equals(w.wordA) && this.wordA.equals(w.wordB))) return true; return false; } @Override public int hashCode(){ return (wordA.hashCode() + wordB.hashCode()) * 17; } }
|
Map端实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| public class WordConcurrnce { private static int MAX_WINDOW = 20; private static String wordRegex = "([a-zA-Z]+)"; private static Pattern wordPattern = Pattern.compile(wordRegex); private static IntWritable one = new IntWritable(1); public static class WordConcurrenceMapper extends Mapper<Text, BytesWritable, WordPair, IntWritable>{ private int windowSize; private Queue<String> windowQueue = new LinkedList<String>(); @Override protected void setup(Context context) throws IOException,InterruptedException{ windowSize = Math.min(context.getConfiguration().getInt("window", 2) , MAX_WINDOW); }
@Override public void map(Text docName, BytesWritable docContent, Context context)throws IOException, InterruptedException{ Matcher matcher = wordPattern.matcher(new String(docContent.getBytes(),"UTF-8")); while(matcher.find()){ windowQueue.add(matcher.group()); if(windowQueue.size() >= windowSize){ Iterator<String> it = windowQueue.iterator(); String w1 = it.next(); while(it.hasNext()){ String next = it.next(); context.write(new WordPair(w1, next), one); } windowQueue.remove(); } } if(!(windowQueue.size() <= 1)){ Iterator<String> it = windowQueue.iterator(); String w1 = it.next(); while(it.hasNext()){ context.write(new WordPair(w1,it.next()), one); } } } }
|
因为需要统计各文章中单词之间的关系,因此需要实现单个文件读入以保证一个文本不被拆分地将内部单词对传入Map节点(具体实现待更新…)