reduce迭代器

前言

为了处理成千上万的数据,Hadoop在设计的时候对性能要求极高,所以不同于一般的Java程序。从reduce阶段的对象重用便可管中窥豹,可见一斑。

Hadoop中的排序

在hadoop中,Mapper和reduce均会按照key对数据进行排序,这个操作是MR框架的默认行为。

在MapReduce中,主要用到的排序有: 快速排序 和 基于堆实现的优先级队列。

Mapper阶段

从Map输出到环形缓冲区 (一种用于表示一个固定尺寸、头尾相连的缓冲区的数据结构,适合缓存数据流) 的数据会被排序(MR改良的快速排序), 这个排序涉及 partition 和 key, 当缓冲区容量占用到80%, 会split数据到磁盘, 生成IFile文件, Map结束后, 会将IFile文件排序合并成一个大文件(基于堆实现的优先级队列), 以供不同的reduce来拉取相应的数据。

reduce阶段

从Mapper段取回来的数据已经是部分有序,Reduce Task 只需要进行一次归并排序即可保证数据整体有序。为了提高效率,Hadoop将sorting阶段和reduce阶段并行化,在sort阶段,Reduce Task为了内存和磁盘中的文件建立了小顶堆,保存了指向该小顶堆的节点的迭代器,并不断的移动迭代器,以将key相同的数据顺次交给reduce()函数处理,期间移动迭代器的过程实际上就是不断调整小顶堆的过程(建堆–>取堆顶元素–>重新建堆–>取堆顶元素), 这样,sort和reduce可以并行进行。

额外说明

需要说明的是,求 Top k,更简单的方法可以直接用内置的 TreeMap 或者 TreeSet,这两者是基于红黑树的一种数据结构,内部维持 key 的次序,但每次添加新元素,其排序的开销要大于堆调整的开销。例如要找最大的10个元素,那么创建的是小根堆。小根堆的特性是根节点是最小元素。不需要对堆进行再排序,当堆的根节点被替换成新的元素时,需要进行堆化,以保持小根堆的特性。

reduce的Iterator工作特点

通过上面对MapReduce排序的说明, 我们很容易明白reduce处理的是数据流,而不是数据集合。所以Reduce会用到对象重用。

对象重用

reduce方法的javadoc中已经说明了会出现的问题:

The framework calls this method for each pair in the grouped inputs. Output values must be of the same type as input values. Input keys must not be altered. The framework will reuse the key and value objects that are passed into the reduce, therefore the application should clone the objects they want to keep a copy of.

也就是说虽然reduce方法会反复执行多次,但key和value相关的对象只有两个,reduce会反复重用这两个对象。所以如果要保存key或者value的结果,只能将其中的值取出另存或者重新clone一个对象(例如Text store = new Text(value) 或者 String a = value.toString()),而不能直接赋引用。因为引用从始至终都是指向同一个对象,你如果直接保存它们,那最后它们都指向最后一个输入记录。会影响最终计算结果而出错。

一次性迭代器

The Iterator you receive from that Iterable’s iterator() method is special. The values may not all be in memory; Hadoop may be streaming them from disk. They aren’t really backed by a Collection, so it’s nontrivial to allow multiple iterations.

bug

bug分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//can you use priorityQueue to rank topN n-gram, then write out to hdfs?
PriorityQueue<Text> pq = new PriorityQueue<Text>(n, new Comparator<Text>() {
public int compare(Text o1, Text o2) {
int val1 = Integer.parseInt(o1.toString().split("=")[1].trim());
int val2 = Integer.parseInt(o2.toString().split("=")[1].trim());
//小根堆
return val1 - val2;
}
});
for (Text value : values) {
pq.offer(value);
if (pq.size() > n) {
pq.poll();
}
}
while (!pq.isEmpty()) {
String[] wordAndCount = pq.poll().toString().split("=");
int count = Integer.parseInt(wordAndCount[1].trim());
String followingWord = wordAndCount[0].trim();
context.write(new DBOutputWritable(key.toString(), followingWord, count), NullWritable.get());
}
}

上面我们说过, reduce中只有key和value两个对象。这两个对象会随着迭代而不断被Reduce重写,就像两岸猿声啼不住,轻舟已过万重山(滑稽~),迭代器这艘船,随着数据流的流动,两岸的风景也就不同了。所以,在上面代码中,不断把迭代出来的对象Text放进队列中,等到迭代到最后的时候,队列中所有的Text都已经悄然变化。那么队列中的这些Text的值都变成了什么呢?根据Hadoop中对象重用的原理,我们可以得出,这些Text都是同一个对象的引用,而这个一个对象被写成了最后迭代出来的值。可以说,把这些Text引用放进队列中,等到最后拿出来再使用,无异于刻舟求剑。

上面代码跑出来的结果如下:

bug解决

正确的做法应该是把每个Text的内容都存进队列中而不是把这个引用存进队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//this -> <is=1000, is book=10>
PriorityQueue<String> pq = new PriorityQueue<String>(n, new Comparator<String>() {
public int compare(String o1, String o2) {
int val1 = Integer.parseInt(o1.split("=")[1].trim());
int val2 = Integer.parseInt(o2.split("=")[1].trim());
return val1 - val2;
}
});
for (Text value : values) {
String valueToString = value.toString();
pq.offer(valueToString);
if (pq.size() > n) {
pq.poll();
}
}
while (!pq.isEmpty()) {
String[] wordAndCount = pq.poll().split("=");
int count = Integer.parseInt(wordAndCount[1].trim());
String followingWord = wordAndCount[0].trim();
context.write(new DBOutputWritable(key.toString(), followingWord, count), NullWritable.get());
}
}

跑出来的结果如下: