Definition
二级排序的目的是实现key-value对对键排序之后继续按值排序的功能.
解决方案1: 思路完全为Java算法. 在Reducer内直接对给定key的所有值进行排序(例如: 把一个key
对应的所有value
放到一个Array
或List
中,再排序),因此可能会出现数据量过大,Reducer内存溢出的情况.
解决方案2: 方案2通过MapReduce思路进行实现,因为在MapReduce
程序中,Mapper输出的键值对会经过shuffle
过程再交给Reducer,在 shuffle 阶段,Mapper 输出的键值对会经过 partition(分区)->sort(排序)->group(分组)
三个阶段,在MapReduce中可以通过重写sort
来实现,因此只需要对key的格式进行修改,加入需要二级考虑的值并修改key类的compareTo
或Comparator class
即可.
方案2的具体流程如下图所示,其中虚线框表示shuffle
过程:
实现
自定义Key:
Mapper过程中需要实现key的重定义,即将数据读入后记录为<(firstKey,secondKey), value>
,并对该key类的比较器进行重写
实现方式:继承WritableComparable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class PairWritable implements WritableComparable <PairWritable > { private int first; private int second; public PairWritable () { } public void set (int first, int second) { this .first = first; this .second = second; } public int getFirst () { return first; } public void setFirst (int first) { this .first = first; } public int getSecond () { return second; } public void setSecond (int second) { this .second = second; } public void readFields (DataInput in) throws IOException { this .setFirst(in.readInt()); this .setSecond(in.readInt()); } public void write (DataOutput out) throws IOException { out.writeInt(this .getFirst()); out.writeInt(this .getSecond()); } public int compareTo (PairWritable o) { int compare = Integer.valueOf(this .getFirst()).compareTo(o.getFirst()); if (compare != 0 ) { return compare; } return Integer.valueOf(this .getSecond()).compareTo(o.getSecond()); } @Override public int hashCode () { final int prime = 31 ; int result = 1 ; result = prime * result + first; result = prime * result + second; return result; } @Override public boolean equals (Object obj) { if (this == obj) return true ; if (obj == null ) return false ; if (getClass() != obj.getClass()) return false ; PairWritable other = (PairWritable) obj; if (first != other.first) return false ; if (second != other.second) return false ; return true ; } @Override public String toString () { return first + "\t" + second; } }
自定义Partition类:
分区器默认会根据Map
产出的key
来决定数据进到哪个Reduce
.在重写key之后键包含二级排序需要的全部参数,而二级排序需要将firstKey
相同的记录发送到同一Reducer以便进行组内排序,因此需要重写分组器,按照 PairWritable
的第一个字段分区
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Partitioner;public class FirstPartitioner extends Partitioner <PairWritable , IntWritable > { @Override public int getPartition (PairWritable key, IntWritable value, int numPartitions) { return ((String.valueOf(key.getFirst()).hashCode() & Integer_MAX_VALUE) % numPartitions; } }
之后需要在main
中自定义分区器:
1 2 3 4 5 import org.apache.hadoop.mapreduce.Job;... Job job = ...; ... job.setPartitionerClass(FirstPartitioner.class);
自定义Comparator(分组)类:
由于同一个分区仍然有可能出现多个firstKey
的分组,因此需要仍然按照firstKey
进行分组
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import org.apache.hadoop.io.RawComparator;import org.apache.hadoop.io.WritableComparator;public class FirstGroupingComparator implements RawComparator <PairWritable > { public int compare (PairWritable o1, PairWritable o2) { return o1.getFirst().compareTo(o2.getFirst()); } public int compare (byte [] arg0, int arg1, int arg2, byte [] arg3, int arg4, int arg5) { return WritableComparator.compareBytes(arg0, 0 , arg2-4 , arg3, 0 , arg5-4 ); } }
MapReduce框架
1 2 3 4 5 6 7 8 9 10 11 12 13 private PairWritable mapOutKey = new PairWritable();private IntWritable mapOutValue = new IntWritable();public void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineValue = value.toString(); String[] strs = lineValue.split("\t" ); mapOutKey.set(strs[0 ], Integer.valueOf(strs[1 ])); mapOutValue.set(Integer.valueOf(strs[1 ])); context.write(mapOutKey, mapOutValue); }
Reduce关键代码:不需要额外的工作,将 shuffle 的结果遍历输出即可
1 2 3 4 5 6 7 8 9 10 private Text outPutKey = new Text(); public void reduce (PairWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { for (IntWritable value : values) { outPutKey.set(key.getFirst()); context.write(outPutKey, value); } }
1 2 3 4 5 6 7 8 9 10 public static void main (String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration, "secondarysort" ); ··· job.setPartitionerClass(FirstPartitioner.class); job.setGroupingComparatorClass(FirstGroupingComparator.class); ··· }