前言
为了处理成千上万的数据,Hadoop在设计的时候对性能要求极高,所以不同于一般的Java程序。从reduce阶段的对象重用便可管中窥豹,可见一斑。
Hadoop中的排序
在hadoop中,Mapper和reduce均会按照key对数据进行排序,这个操作是MR框架的默认行为。
在MapReduce中,主要用到的排序有: 快速排序 和 基于堆实现的优先级队列。
Mapper阶段
从Map输出到环形缓冲区 (一种用于表示一个固定尺寸、头尾相连的缓冲区的数据结构,适合缓存数据流) 的数据会被排序(MR改良的快速排序), 这个排序涉及 partition 和 key, 当缓冲区容量占用到80%, 会split数据到磁盘, 生成IFile文件, Map结束后, 会将IFile文件排序合并成一个大文件(基于堆实现的优先级队列), 以供不同的reduce来拉取相应的数据。
reduce阶段
从Mapper段取回来的数据已经是部分有序,Reduce Task 只需要进行一次归并排序即可保证数据整体有序。为了提高效率,Hadoop将sorting阶段和reduce阶段并行化,在sort阶段,Reduce Task为了内存和磁盘中的文件建立了小顶堆,保存了指向该小顶堆的节点的迭代器,并不断的移动迭代器,以将key相同的数据顺次交给reduce()函数处理,期间移动迭代器的过程实际上就是不断调整小顶堆的过程(建堆–>取堆顶元素–>重新建堆–>取堆顶元素), 这样,sort和reduce可以并行进行。
额外说明
需要说明的是,求 Top k,更简单的方法可以直接用内置的 TreeMap 或者 TreeSet,这两者是基于红黑树的一种数据结构,内部维持 key 的次序,但每次添加新元素,其排序的开销要大于堆调整的开销。例如要找最大的10个元素,那么创建的是小根堆。小根堆的特性是根节点是最小元素。不需要对堆进行再排序,当堆的根节点被替换成新的元素时,需要进行堆化,以保持小根堆的特性。
reduce的Iterator工作特点
通过上面对MapReduce排序的说明, 我们很容易明白reduce处理的是数据流,而不是数据集合。所以Reduce会用到对象重用。
对象重用
reduce方法的javadoc中已经说明了会出现的问题:
The framework calls this method for each
也就是说虽然reduce方法会反复执行多次,但key和value相关的对象只有两个,reduce会反复重用这两个对象。所以如果要保存key或者value的结果,只能将其中的值取出另存或者重新clone一个对象(例如Text store = new Text(value) 或者 String a = value.toString()),而不能直接赋引用。因为引用从始至终都是指向同一个对象,你如果直接保存它们,那最后它们都指向最后一个输入记录。会影响最终计算结果而出错。
一次性迭代器
The Iterator you receive from that Iterable’s iterator() method is special. The values may not all be in memory; Hadoop may be streaming them from disk. They aren’t really backed by a Collection, so it’s nontrivial to allow multiple iterations.
bug
bug分析
|
|
上面我们说过, reduce中只有key和value两个对象。这两个对象会随着迭代而不断被Reduce重写,就像两岸猿声啼不住,轻舟已过万重山(滑稽~),迭代器这艘船,随着数据流的流动,两岸的风景也就不同了。所以,在上面代码中,不断把迭代出来的对象Text放进队列中,等到迭代到最后的时候,队列中所有的Text都已经悄然变化。那么队列中的这些Text的值都变成了什么呢?根据Hadoop中对象重用的原理,我们可以得出,这些Text都是同一个对象的引用,而这个一个对象被写成了最后迭代出来的值。可以说,把这些Text引用放进队列中,等到最后拿出来再使用,无异于刻舟求剑。
上面代码跑出来的结果如下:
bug解决
正确的做法应该是把每个Text的内容都存进队列中而不是把这个引用存进队列中。
|
|
跑出来的结果如下: