MapReduce 的 map和reduce函数的输入和输出是键/值对(key/value pair) 形式的数据处理模型。
+
MapReduce 的类型
+
Hadoop1.x MapReduce 有2套API.旧api偏向与接口,新api偏向与抽象类,如无特殊默认列举为旧的api作讨论.
+
在Hadoop的MapReduce中,map和reduce函数遵循如下格式:
+
+- map(K1, V1) –> list (K2, V2) // map:对输入分片数据进行过滤数据,组织 key/value 对等操作
+- combine(K2, list(V2)) –> list(K2, V2) // 在map端对输出进行预处理,类似 reduce。combine 不一定适用任何情况,如:对总和求平均数。选用。
+- partition(K2, V2) –> integer // 将中间键值对划分到一个 reduce 分区,返回分区索引号。实际上,分区单独由键决定(值是被忽略的),分区内的键会排序,相同的键的所有值会合成一个组(list(V2))
+- reduce(K2, list(V2)) –> list(K3, V3) // 每个 reduce 会处理具有某些特性的键,每个键上都有值的序列,是通过对所有 map 输出的值进行统计得来的,reduce 根据所有map传来的结果,最后进行统计合并操作,并输出结果。
+
+
旧api类代码
+
public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable {
+ void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter) throws IOException;
+}
+//
+public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {
+ void reduce(K2 key, Iterator<V2> values, OutputCollector<K3, V3> output, Reporter reporter) throws IOException;
+}
+//
+public interface Partitioner<K2, V2> extends JobConfigurable {
+ int getPartition(K2 key, V2 value, int numPartitions);
+}
+新api类代码
+
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+… …
+ protected void map(KEYIN key, VALUEIN value,
+ Context context) throws IOException, InterruptedException {
+ context.write((KEYOUT) key, (VALUEOUT) value);
+ }
+… …
+}
+//
+public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
+… …
+ protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
+ ) throws IOException, InterruptedException {
+ for(VALUEIN value: values) {
+ context.write((KEYOUT) key, (VALUEOUT) value);
+ }
+ }
+… …
+}
+//
+public interface Partitioner<K2, V2> extends JobConfigurable {
+ int getPartition(K2 key, V2 value, int numPartitions);
+}
+默认的 partitioner 是 HashPartitioner,对键进行哈希操作以决定该记录属于哪个分区让 reduce 处理,每个分区对应一个 reducer 任务。总槽数 solt=集群中节点数 * 每个节点的任务槽。实际值应该比理论值要小,以空闲一部分在错误容忍是备用。
+
HashPartitioner的实现
+
public class HashPartitioner<K, V> extends Partitioner<K, V> {
+ public int getPartition(K key, V value, int numReduceTasks) {
+ return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
+ }
+}
+hadooop1.x 版本中
+
+- 旧的api,map 默认的 IdentityMapper, reduce 默认的是 IdentityReducer
+- 新的api,map 默认的 Mapper, reduce 默认的是 Reducer
+
+
默认MapReduce函数实例程序
+
public class MinimalMapReduceWithDefaults extends Configured implements Tool {
+ @Override
+ public int run(String[] args) throws Exception {
+ Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
+ if (job == null) {
+ return -1;
+ }
+ //
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setMapperClass(Mapper.class);
+ job.setMapOutputKeyClass(LongWritable.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setPartitionerClass(HashPartitioner.class);
+ job.setNumReduceTasks(1);
+ job.setReducerClass(Reducer.class);
+ job.setOutputKeyClass(LongWritable.class);
+ job.setOutputValueClass(Text.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+ return job.waitForCompletion(true) ? 0 : 1;
+ }
+ //
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new MinimalMapReduceWithDefaults(), args);
+ System.exit(exitCode);
+ }
+}
+输入格式
+
输入分片与记录
+
一个输入分片(input split)是由单个 map 处理的输入块,即每一个 map 只处理一个输入分片,每个分片被划分为若干个记录( records ),每条记录就是一个 key/value 对,map 一个接一个的处理每条记录,输入分片和记录都是逻辑的,不必将他们对应到文件上。数据分片由数据块大小决定的。
+
注意,一个分片不包含数据本身,而是指向数据的引用( reference )。
+
输入分片在Java中被表示为InputSplit抽象类
+
public interface InputSplit extends Writable {
+ long getLength() throws IOException;
+ String[] getLocations() throws IOException;
+}
+InputFormat负责创建输入分片并将它们分割成记录,抽象类如下:
+
public interface InputFormat<K, V> {
+ InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
+ RecordReader<K, V> getRecordReader(InputSplit split,
+ JobConf job,
+ Reporter reporter) throws IOException;
+}
+客户端通过调用 getSpilts() 方法获得分片数目(怎么调到的?),在 TaskTracker 或 NodeManager上,MapTask 会将分片信息传给 InputFormat 的
+createRecordReader() 方法,进而这个方法来获得这个分片的 RecordReader,RecordReader 基本就是记录上的迭代器,MapTask 用一个 RecordReader 来生成记录的 key/value 对,然后再传递给 map 函数,如下步骤:
+
+- jobClient调用getSpilts()方法获得分片数目,将numSplits作为参数传入,以参考。InputFomat实现有自己的getSplits()方法。
+- 客户端将他们发送到jobtracker
+- jobtracker使用其存储位置信息来调度map任务从而在tasktracker上处理分片数据
+- 在tasktracker上,map任务把输入分片传给InputFormat上的getRecordReader()方法,来获取分片的RecordReader。
+- map 用一个RecordReader来生成纪录的键值对。
+- RecordReader的next()方法被调用,知道返回false。map任务结束。
+
+
MapRunner 类部分代码(旧api)
+
public class MapRunner<K1, V1, K2, V2>
+ implements MapRunnable<K1, V1, K2, V2> {
+… …
+ public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
+ Reporter reporter)
+ throws IOException {
+ try {
+ // allocate key & value instances that are re-used for all entries
+ K1 key = input.createKey();
+ V1 value = input.createValue();
+ //
+ while (input.next(key, value)) {
+ // map pair to output
+ mapper.map(key, value, output, reporter);
+ if(incrProcCount) {
+ reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
+ SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
+ }
+ }
+ } finally {
+ mapper.close();
+ }
+ }
+……
+}
+FileInputFormat类
+
FileInputFormat是所有使用文件为数据源的InputFormat实现的基类,它提供了两个功能:一个定义哪些文件包含在一个作业的输入中;一个为输入文件生成分片的实现,把分片割成记录的作业由其子类来完成。
+
下图为InputFormat类的层次结构:
+
+
FileInputFormat 类输入路径
+
FileInputFormat 提供四种静态方法来设定 Job 的输入路径,其中下面的 addInputPath() 方法 addInputPaths() 方法可以将一个或多个路径加入路径列表,setInputPaths() 方法一次设定完整的路径列表(可以替换前面所设路 径)
+
public static void addInputPath(Job job, Path path);
+public static void addInputPaths(Job job, String commaSeparatedPaths);
+public static void setInputPaths(Job job, Path... inputPaths);
+public static void setInputPaths(Job job, String commaSeparatedPaths);
+
如果需要排除特定文件,可以使用 FileInputFormat 的 setInputPathFilter() 设置一个过滤器:
+public static void setInputPathFilter(Job job, Class<? extends PathFilter> filter);
+它默认过滤隐藏文件中以”_“和”.“开头的文件
+
private static final PathFilter hiddenFileFilter = new PathFilter(){
+ public boolean accept(Path p){
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ };
+FileInputFormat 类的输入分片
+
FileInputFormat 类一般分割超过 HDFS 块大小的文件。通常分片与 HDFS 块大小一样,然后分片大小也可以改变的,下面展示了控制分片大小的属性:
+
待补。 TODO
+
FileInputFormat computeSplitSize(long goalSize, long minSize,long blockSize) {
+ return Math.max(minSize, Math.min(goalSize, blockSize));
+}
+即:
+minimumSize < blockSize < maximumSize 分片的大小即为块大小。
+
重载 FileInputFormat 的 isSplitable() =false 可以避免 mapreduce 输入文件被分割。
+
小文件与CombineFileInputFormat
+
+CombineFileInputFormat 是针对小文件设计的,CombineFileInputFormat 会把多个文件打包到一个分片中,以便每个 mapper 可以处理更多的数据;减少大量小文件的另一种方法可以使用 SequenceFile 将这些小文件合并成一个或者多个大文件。
+
+CombineFileInputFormat 不仅对于处理小文件实际上对于处理大文件也有好处,本质上,CombineFileInputFormat 使 map 操作中处理的数据量与 HDFS 中文件的块大小之间的耦合度降低了
+
+CombineFileInputFormat 是一个抽象类,没有提供实体类,所以需要实现一个CombineFileInputFormat 具体
+类和 getRecordReader() 方法(旧的接口是这个方法,新的接口InputFormat中则是createRecordReader())
+
+
+
把整个文件作为一条记录处理
+
有时,mapper 需要访问问一个文件中的全部内容。即使不分割文件,仍然需要一个 RecordReader 来读取文件内容为 record 的值,下面给出实现这个功能的完整程序,详细解释见《Hadoop权威指南》。
+
文本处理
+
+TextInputFileFormat 是默认的 InputFormat,每一行就是一个纪录
+
+TextInputFileFormat 的 key 是 LongWritable 类型,存储该行在整个文件的偏移量,value 是每行的数据内容,不包括任何终止符(换行符和回车符),它是Text类型.
+如下例
+On the top of the Crumpetty Tree
+
+The Quangle Wangle sat,
+But his face you could not see,
+On account of his Beaver Hat.
+每条记录表示以下key/value对
+(0, On the top of the Crumpetty Tree)
+(33, The Quangle Wangle sat,)
+(57, But his face you could not see,)
+(89, On account of his Beaver Hat.
+
+输入分片与 HDFS 块之间的关系:TextInputFormat 每一条纪录就是一行,很可能某一行跨数据库存放。
+
+
+

+
+KeyValueTextInputFormat。对下面的文本,KeyValueTextInputFormat 比较适合处理,其中可以通过
+mapreduce.input.keyvaluelinerecordreader.key.value.separator 属性设置指定分隔符,默认
+值为制表符,以下指定”→“为分隔符
+
+line1→On the top of the Crumpetty Tree
+line2→The Quangle Wangle sat,
+line3→But his face you could not see,
+line4→On account of his Beaver Hat.
+
+NLineInputFormat。如果希望 mapper 收到固定行数的输入,需要使用 NLineInputFormat 作为 InputFormat 。与 TextInputFormat 一样,key是文件中行的字节偏移量,值是行本身。
+
+
+
N 是每个 mapper 收到的输入行数,默认时 N=1,每个 mapper 会正好收到一行输入,mapreduce.input.lineinputformat.linespermap 属性控制 N 的值。以刚才的文本为例。
+如果N=2,则每个输入分片包括两行。第一个 mapper 会收到前两行 key/value 对:
+
(0, On the top of the Crumpetty Tree)
+(33, The Quangle Wangle sat,)
+另一个mapper则收到:
+(57, But his face you could not see,)
+(89, On account of his Beaver Hat.)
+
二进制输入
+
SequenceFileInputFormat
+如果要用顺序文件数据作为 MapReduce 的输入,应用 SequenceFileInputFormat。key 和 value 顺序文件,所以要保证map输入的类型匹配
+
SequenceFileInputFormat 可以读 MapFile 和 SequenceFile,如果在处理顺序文件时遇到目录,SequenceFileInputFormat 类会认为值正在读 MapFile 数据文件。
+
SequenceFileAsTextInputFormat 是 SequenceFileInputFormat 的变体。将顺序文件(其实就是SequenceFile)的 key 和 value 转成 Text 对象
+
SequenceFileAsBinaryInputFormat是 SequenceFileInputFormat 的变体。将顺序文件的key和value作为二进制对象
+
多种输入
+
对于不同格式,不同表示的文本文件输出的处理,可以用 MultipleInputs 类里处理,它允许为每条输入路径指定 InputFormat 和 Mapper。
+
MultipleInputs 类有一个重载版本的 addInputPath()方法:
+
+- 旧api列举
public static void addInputPath(JobConf conf, Path path, Class<? extends InputFormat> inputFormatClass)
+
+- 新api列举
public static void addInputPath(Job job, Path path, Class<? extends InputFormat> inputFormatClass)
+
在有多种输入格式只有一个mapper时候(调用Job的setMapperClass()方法),这个方法会很有用。
+
+
DBInputFormat
+
JDBC从关系数据库中读取数据的输入格式(参见权威指南)
+
输出格式
+
OutputFormat类的层次结构
+

+
文本输出
+
默认输出格式是 TextOutputFormat,它本每条记录写成文本行,key/value 任意,这里 key和value 可以用制表符分割,用 mapreduce.output.textoutputformat.separator 书信可以改变制表符,与TextOutputFormat 对应的输入格式是 KeyValueTextInputFormat。
+
可以使用 NullWritable 来省略输出的 key 和 value。
+
二进制输出
+
+- SequenceFileOutputFormat 将它的输出写为一个顺序文件,因为它的格式紧凑,很容易被压缩,所以易于作为 MapReduce 的输入
+- 把key/value对作为二进制格式写到一个 SequenceFile 容器中
+- MapFileOutputFormat 把 MapFile 作为输出,MapFile 中的 key 必需顺序添加,所以必须确保 reducer 输出的 key 已经排好序。
+
+
多个输出
+
+MultipleOutputFormat 类可以将数据写到多个文件中,这些文件名称源于输出的键和值。MultipleOutputFormat是个抽象类,它有两个子类:MultipleTextOutputFormat 和 MultipleSequenceFileOutputFormat 。它们是 TextOutputFormat 的和 SequenceOutputFormat 的多版本。
+
+MultipleOutputs 类
+用于生成多个输出的库,可以为不同的输出产生不同的类型,无法控制输出的命名。它用于在原有输出基础上附加输出。输出是制定名称的。
+
+
+
MultipleOutputFormat和MultipleOutputs的区别
+
这两个类库的功能几乎相同。MultipleOutputs 功能更齐全,但 MultipleOutputFormat 对 目录结构和文件命令更多de控制。
+
+
+
+
+ | 特征 |
+ MultipleOutputFormat |
+ MultipleOutputs |
+
+
+ | 完全控制文件名和目录名 |
+ 是 |
+ 否 |
+
+
+ | 不同输出有不同的键和值类型 |
+ 否 |
+ 是 |
+
+
+ | 从同一作业的map和reduce使用 |
+ 否 |
+ 是 |
+
+
+ | 每个纪录多个输出 |
+ 否 |
+ 是 |
+
+
+ | 与任意OutputFormat一起使用 |
+ 否,需要子类 |
+ 是 |
+
+
+
+
延时输出
+
有些文件应用倾向于不创建空文件,此时就可以利用 LazyOutputFormat (Hadoop 0.21.0版本之后开始提供),它是一个封装输出格式,可以保证指定分区第一条记录输出时才真正的创建文件,要使用它,用JobConf和相关输出格式作为参数来调用 setOutputFormatClass() 方法.
+
Streaming 和 Pigs 支持 -LazyOutput 选项来启用 LazyOutputFormat功能。
+
数据库输出
+
参见 关系数据和 HBase的输出格式。
+
练习代码
+
代码路径
+https://github.com/kangfoo/hadoop1.study/blob/master/kangfoo/study.hdfs/src/main/java/com/kangfoo/study/hadoop1/mp/typeformat
+使用 maven 打包之后用 hadoop jar 命令执行
+步骤同 Hadoop example jar 类
+
+使用 TextInputFormat 类型测试 wordcount
+TestMapreduceInputFormat
+上传一个文件
+$ ./bin/hadoop fs -mkdir /test/input1
+$ ./bin/hadoop fs -put ./wordcount.txt /test/input1
+
使用maven 打包 或者用eclipse hadoop 插件, 执行主函数时设置如下参数
+hdfs://master11:9000/test/input1/wordcount.txt hdfs://master11:9000/numbers.seq hdfs://master11:9000/test/output5
+
没改过端口默认 namenode RPC 交互端口 8020 将上述的 9000 改成你自己的端口即可。
+部分日志
+## 准备运行程序和测试数据
+lrwxrwxrwx. 1 hadoop hadoop 86 2月 17 21:02 study.hdfs-0.0.1-SNAPSHOT.jar -> /home/hadoop/env/kangfoo.study/kangfoo/study.hdfs/target/study.hdfs-0.0.1-SNAPSHOT.jar
+-rw-rw-r--. 1 hadoop hadoop 1983 2月 17 20:18 wordcount.txt
+##执行
+$ ./bin/hadoop jar study.hdfs-0.0.1-SNAPSHOT.jar TestMapreduceInputFormat /test/input1/wordcount.txt /test/output1
+
+使用SequenceInputFormat类型测试wordcound
+使用Hadoop权威指南中的示例创建 /numbers.seq 文件
+$ ./bin/hadoop fs -text /numbers.seq
+$ ./bin/hadoop jar study.hdfs-0.0.1-SNAPSHOT.jar TestMapreduceSequenceInputFormat /numbers.seq /test/output2
+
+多文件输入
+$ ./bin/hadoop jar study.hdfs-0.0.1-SNAPSHOT.jar TestMapreduceMultipleInputs /test/input1/wordcount.txt /numbers.seq /test/output3
+
+
+
博客参考
+
淘宝博客
+
+