0%

【BDP】MapReduce-Secondary Sort

Definition

二级排序的目的是实现key-value对对键排序之后继续按值排序的功能.

  • 解决方案1: 思路完全为Java算法. 在Reducer内直接对给定key的所有值进行排序(例如: 把一个key对应的所有value放到一个ArrayList中,再排序),因此可能会出现数据量过大,Reducer内存溢出的情况.
  • 解决方案2: 方案2通过MapReduce思路进行实现,因为在MapReduce程序中,Mapper输出的键值对会经过shuffle过程再交给Reducer,在 shuffle 阶段,Mapper 输出的键值对会经过 partition(分区)->sort(排序)->group(分组)三个阶段,在MapReduce中可以通过重写sort来实现,因此只需要对key的格式进行修改,加入需要二级考虑的值并修改key类的compareToComparator class即可.

方案2的具体流程如下图所示,其中虚线框表示shuffle过程:

实现

自定义Key:

Mapper过程中需要实现key的重定义,即将数据读入后记录为<(firstKey,secondKey), value>,并对该key类的比较器进行重写

  • 实现方式:继承WritableComparable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

public class PairWritable
implements WritableComparable<PairWritable>{

private int first;
private int second;

/**
* 构造函数
*/
public PairWritable() { }

public void set(int first, int second) {
this.first = first;
this.second = second;
}

/**
* Getters and Setters
*/
public int getFirst() {
return first;
}
public void setFirst(int first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}

public void readFields(DataInput in) throws IOException {
this.setFirst(in.readInt());
this.setSecond(in.readInt());
}
public void write(DataOutput out) throws IOException {
out.writeInt(this.getFirst());
out.writeInt(this.getSecond());
}
public int compareTo(PairWritable o) {
// 先比较 first
int compare = Integer.valueOf(this.getFirst()).compareTo(o.getFirst());
if (compare != 0) {
return compare;
}
// first 相同再比较 second
return Integer.valueOf(this.getSecond()).compareTo(o.getSecond());
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + first;
result = prime * result + second;
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
PairWritable other = (PairWritable) obj;
if (first != other.first)
return false;
if (second != other.second)
return false;
return true;
}

@Override
public String toString() {
return first + "\t" + second;
}

}

自定义Partition类:

分区器默认会根据Map产出的key来决定数据进到哪个Reduce.在重写key之后键包含二级排序需要的全部参数,而二级排序需要将firstKey相同的记录发送到同一Reducer以便进行组内排序,因此需要重写分组器,按照 PairWritable 的第一个字段分区

  • 实现方式:继承partitioner
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class FirstPartitioner
extends Partitioner<PairWritable, IntWritable> {

@Override
public int getPartition(PairWritable key, IntWritable value,
int numPartitions) {
//default: (key.hashcode() & Integer_MAX_VALUE) % numPartitions
return ((String.valueOf(key.getFirst()).hashCode() & Integer_MAX_VALUE)
% numPartitions;
}
}

之后需要在main中自定义分区器:

1
2
3
4
5
import org.apache.hadoop.mapreduce.Job;
...
Job job = ...;
...
job.setPartitionerClass(FirstPartitioner.class);

自定义Comparator(分组)类:

由于同一个分区仍然有可能出现多个firstKey的分组,因此需要仍然按照firstKey进行分组

  • 实现方式:继承RawComparator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;

public class FirstGroupingComparator implements RawComparator<PairWritable> {

/*
* 对象比较
*/
public int compare(PairWritable o1, PairWritable o2) {
return o1.getFirst().compareTo(o2.getFirst());
}

/*
* 字节比较
* arg0,arg3为要比较的两个字节数组
* arg1,arg2表示第一个字节数组要进行比较的收尾位置,arg4,arg5表示第二个
* 从第一个字节比到组合key中second的前一个字节,因为second为int型,所以长度为4
*/
public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
return WritableComparator.compareBytes(arg0, 0, arg2-4, arg3, 0, arg5-4);
}
}

MapReduce框架

  • Map关键代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
private PairWritable mapOutKey = new PairWritable();
private IntWritable mapOutValue = new IntWritable();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String lineValue = value.toString();
String[] strs = lineValue.split("\t");

//设置组合key和value ==> <(key,value),value>
mapOutKey.set(strs[0], Integer.valueOf(strs[1]));
mapOutValue.set(Integer.valueOf(strs[1]));

context.write(mapOutKey, mapOutValue);
}
  • Reduce关键代码:不需要额外的工作,将 shuffle 的结果遍历输出即可
1
2
3
4
5
6
7
8
9
10
private Text outPutKey = new Text(); 
public void reduce(PairWritable key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
//迭代输出
for(IntWritable value : values) {
outPutKey.set(key.getFirst());
context.write(outPutKey, value);
}

}
  • Main关键设置
1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) 
throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "secondarysort");
···
// shuffle settings
job.setPartitionerClass(FirstPartitioner.class);
job.setGroupingComparatorClass(FirstGroupingComparator.class);
···
}