MapReduce应用开发

前言

MapReduce编程遵循一个特定的流程。首先写map 函数和reduce函数,最好使用单元测试来确保函数的运行符合预期。然后,写一个驱动程序来运行作业,看这个驱动程序是否可以正确运行,可以先从本地IDE中用一个小的數掘集采送行它。如果驱动程序不能正确运行,就用本地 IDE 調試器来找出问题根源。根据这些调试信息,可以通过扩展单元测试来覆盖这一测试用例,从而改进mapper或reducer,使其能正确处理类似输入。

用于配置的API

Hadoop 中的组件是通过 Hadoop 自己的配置 API 来配置的。一个Configuration类的实例代表配置属性及其取值的一个集合。

资源合并

1
2
3
Configuration conf = new Configuration();
conf.addResource("configuration-1.xml");
conf.addResource("configuration-2.xml");

后采添加到资源文件的属性全覆盖(override)之前定义的属性。

可变的拓展

系统属性的优先级高于资源文件中定义的属性,该特性特别适用于在命令行方式下使用JVM参数 -Dproperty=value来覆盖属性。
注意,虽然配置属性可以通过系统属性来定义,但除非系统属性使用配置属性重新定义,否则,它们是无法通过配置API进行访问的。

配置开发环境

Maven-POM可说明编译和测试MapReduce程序是需要的一依赖项。要想构建MapReduce作业,只需要有hadoop-core依赖,它包含所有的Hadoop类。单元测试需要使用junit类以及两个辅助库。
很多IDE可以直接读Maven POM,因此只需要在包含pom.xml文件的目录中指向这些Maven POM即可。也可以使用Maven 为IDE生成配置文件。

管理配置

开发 Hadoop应用时,经常需要在本地运行和集群运行之间进行切换。事实比,可能在几个集群上工作,也可能在本地“伪分布式”集群上测试。应对这些变化的一种方法是使 Hadoop配置文件包含每个集群的连接设置, 并且在运行 Hadoop 应用或工具时指定使用哪个连接设置。

辅助类GenericOptionsParser, Tool和ToolRunner

为了简化命令行方式运行作业, Hadoop自带了一些辅助类,GenerOptionsParser是一个类,用类解释常用的Hadoop命令行选项。

本地运行测试数据

现在 mapper 和 reducer 已经能够在受控的输入上进行工作了,下一步是写一个作业驱动程序(job driver),然后在开发机器上使用测试数据运行它。

在本地作业运行器上运行作业

通过Tool接口可以轻松启动写一个MapReduce作业的启动程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class MaxTemperatureDriver extends Configured implements Tool {
@Override
public int run (String) args) throws Exception {
if (args.length != 2) {
System.err.printf["Usage: %s generic options] <input> < output>\n",
getClass(), getSimpleName());
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
Job job = new Job(getConf(), "Max temperature");
job.SetJarByClass(getClass());
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper. Class); job. setCombinerClass (Max Temperature Reducer. class);
job.setReducerClass(Max Temperature Reducer. class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main (String) args) throws Exception {
int exitCode = ToolRunner.run(new MaxTemperature Driver(), args) ;
System.exit(exitCode);
}
}

测试驱动程序

使用本地作业运行器在本地文件系统的测试文件上运行作业
使用一个mini集群来运行它

在集群上运行

打包作业

本地作业运行器使用单JVM运行一个作业,只要作业需要的所有类都在类路径(classpath)上,那么作业就可以正常执行。
在分布式的环境中,情况稍微复杂一些。开始的时候作业的类必须打包进作业的JAR文件中并发送给集群。
Hadoop通过搜索驱动程序的类的路径自动找到作业的JAR文件,该类路径包含了JobConf或Job上的setJarByClass()方法中设置的类.另一种方法,如果你想通过文件路径指定的Jar文件,可以使用setJar()方法.

如果每个JAR文件都有一个作业,可以在JAR文件的manifest中指定要运行的main类.如果 main 类不在 manifest 中,则必须在命令行指定。任何非独立的JAR文件应该打包到JAR文件的/lib子目录中。当然也有其他的方法将依赖包含进来.

客户端的类路径

由 hadoop jar 设置的用户客户端类路径包括以下几个组成部分:

  1. 作业的jar文件
  2. 作业jar文件的lib目录中的所有jar文件以及类目录(如果定义)
  3. HADOOP_CLASSPH定义的类路径(如果已经设置)

任务的类路径

打包依赖

给定这些不同的方法来控制客户端和类路径上的内容,也有相应的操作处理作业的库依赖:

  1. 将库解包和重新打包到作业的jar
  2. 将作业的Jar的目录中的库打包
  3. 保持库与作业的Jar分开, 并且通过HADOOP_CLASSPATH将他们添加到客户端的类路径,通过-libjars将它们添加到任务的类路径

任务类路径的优先权

启动作业

Job上的waitForCompletion()方法启动作业并检查进展情况.

注: 作业 任务 和 任务尝试 都有唯一的ID

MapReduce的Web界面

Hadoop的Web界面用来浏览作业信息,对于跟踪作业运行进度, 查找做哦也完成后的统计信息和日志非常有用.
http://jobtracker-host:50030/

jobtracker页面

作业历史

作业页面

获取结果

reduce的输出分区文件是无序的(使用hash partitioner的缘故)

作业调试

事实上,调试一个作业的时候,应当总想是否能够使用计数器来获得需要找出事件发生来源的相关信息.即使需要使用日志或状态信息,但是用计数器来衡量问题的严重程度仍然也是有帮助的.

Hadoop日志

远程调试

作业调优


分析任务

本地作业运行器是一个与集群完全不同的环境,并且数据流模式也也截然不同.如果MapReduce作业是I/O密集型的(很多作业都属于此类),那么优化代码的CPU性能是没有意义的.

HPROF分析工具

其他分析工具

MapReduce的工作流

将问题分解成MapReduce作业

Mapper一般执行输入格式解释,投影(选择相关的字段)和过滤(去掉无关记录).
可以将这些函数分割到不同的Mapper中,然后使用hadoop中自带的ChainMapper类库将它们连接成一个Mapper.

关于jobControl

如何管理作业按顺序执行,有几个方法,其中主要考虑是否有一个线性的作业链或者一个更复杂的作业有向无环图.
对于线性链表,最简单的方法是一个接一个的运行作业,等前一个作业运行结束后再运行下一个
JobClient.runJob(conf1);
JobClient.runJob(conf2);

  1. debug
  2. jobtacker
  3. web
  4. Driver conbine 技巧
    5 。。。