网站建设需求说明书举例,新公司如何做推广,wordpress 4.6 中文版,微网站 pc网站同步MapReduce编程模型
理解MapReduce编程模型独立完成一个MapReduce程序并运行成功了解MapReduce工程流程掌握并描述出shuffle全过程#xff08;面试#xff09;独立编写课堂及作业中的MR程序理解并解决数据倾斜
1. MapReduce编程模型 Hadoop架构图 Hadoop由HDFS分布式存储、M…MapReduce编程模型
理解MapReduce编程模型独立完成一个MapReduce程序并运行成功了解MapReduce工程流程掌握并描述出shuffle全过程面试独立编写课堂及作业中的MR程序理解并解决数据倾斜
1. MapReduce编程模型 Hadoop架构图 Hadoop由HDFS分布式存储、MapReduce分布式计算、Yarn资源调度三部分组成 MapReduce是采用一种分而治之的思想设计出来的分布式计算框架 MapReduce由两个阶段组成 Map阶段切分成一个个小的任务Reduce阶段汇总小任务的结果 那什么是分而治之呢 比如一复杂、计算量大、耗时长的的任务暂且称为“大任务”此时使用单台服务器无法计算或较短时间内计算出结果时可将此大任务切分成一个个小的任务小任务分别在不同的服务器上并行的执行最终再汇总每个小任务的结果 1.1 Map阶段
map阶段有一个关键的map()函数此函数的输入是键值对输出是一系列键值对输出写入本地磁盘。
1.2 Reduce阶段 reduce阶段有一个关键的函数reduce()函数 此函数的输入也是键值对即map的输出kv对 输出也是一系列键值对结果最终写入HDFS
1.3 MapReduce 2. MapReduce编程示例
以MapReduce的词频统计为例统计一批英文文章当中每个单词出现的总次数
2.1 MapReduce原理图 Map阶段 假设MR的输入文件“Gone With The Wind”有三个blockblock1、block2、block3MR编程时每个block对应一个分片split每一个split对应一个map任务map task如图共3个map任务map1、map2、map3这3个任务的逻辑一样所以以第一个map任务map1为例分析map1读取block1的数据一次读取block1的一行数据 产生键值对(key/value)作为map()的参数传入调用map()假设当前所读行是第一行将当前所读行的行首相对于当前block开始处的字节偏移量作为key0当前行的内容作为valueDear Bear River map()内 (按需求写业务代码)将value当前行内容按空格切分得到三个单词Dear | Bear | River将每个单词变成键值对输出出去(Dear, 1) | (Bear, 1) | (River, 1)最终结果写入map任务所在节点的本地磁盘中内里还有细节讲到shuffle时再细细展开block的第一行的数据被处理完后接着处理第二行逻辑同上当map任务将当前block中所有的数据全部处理完后此map任务即运行结束 其它的每一个map任务都是如上逻辑不再赘述 Reduce阶段 reduce任务reduce task的个数由自己写的程序编程指定main()内的job.setNumReduceTasks(4)指定reduce任务是4个reduce1、reduce2、reduce3、reduce4每一个reduce任务的逻辑一样所以以第一个reduce任务reduce1为例分析map1任务完成后reduce1通过网络连接到map1将map1输出结果中属于reduce1的分区的数据通过网络获取到reduce1端拷贝阶段同样也如此连接到map2、map3获取结果最终reduce1端获得4个(Dear, 1)键值对由于key键相同它们分到同一组4个(Dear, 1)键值对转换成[Dear, Iterable(1, 1, 1, )]作为两个参数传入reduce()在reduce()内部计算Dear的总数为4并将(Dear, 4)作为键值对输出每个reduce任务最终输出文件内里还有细节讲到shuffle时再细细展开文件写入到HDFS
2.2 MR中key的作用 MapReduce编程中key有特殊的作用 ①数据中若要针对某个值进行分组、聚合时需将此值作为MR中的reduce的输入的key 如当前的词频统计例子按单词进行分组每组中对出现次数做聚合计算总和所以需要将每个单词作为reduce输入的keyMapReduce框架自动按照单词分组进而求出每组即每个单词的总次数 ②另外key还具有可排序的特性因为MR中的key类需要实现WritableComparable接口而此接口又继承Comparable接口可查看源码 MR编程时要充分利用以上两点结合实际业务需求设置合适的key
2.3 创建MAVEN工程 所有编程操作在hadoop集群某节点的IDEA中完成 使用IDEA创建maven工程pom文件参考提供的pom.xml主要用到的dependencies有 propertiescdh.version2.6.0-cdh5.14.2/cdh.version/propertiesrepositoriesrepositoryidcloudera/idurlhttps://repository.cloudera.com/artifactory/cloudera-repos//url/repository/repositoriesdependenciesdependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion2.6.0-mr1-cdh5.14.2/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-common/artifactIdversion${cdh.version}/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-hdfs/artifactIdversion${cdh.version}/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-mapreduce-client-core/artifactIdversion${cdh.version}/version/dependency!-- https://mvnrepository.com/artifact/junit/junit --dependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversion4.11/versionscopetest/scope/dependencydependencygroupIdorg.testng/groupIdartifactIdtestng/artifactIdversionRELEASE/versionscopetest/scope/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.38/versionscopecompile/scope/dependency/dependencies2.4 MR参考代码 创建包com.kaikeba.hadoop.wordcount 在包中创建自定义mapper类、自定义reducer类、包含main类
2.4.1 Mapper代码
package com.kaikeba.hadoop.wordcount;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** 类MapperLongWritable, Text, Text, IntWritable的四个泛型分别表示* map方法的输入的键的类型kin、值的类型vin输出的键的类型kout、输出的值的类型vout* kin指的是当前所读行行首相对于split分片开头的字节偏移量,所以是long类型对应序列化类型LongWritable* vin指的是当前所读行类型是String对应序列化类型Text* kout根据需求输出键指的是单词类型是String对应序列化类型是Text* vout根据需求输出值指的是单词的个数1类型是int对应序列化类型是IntWritable**/
public class WordCountMap extends MapperLongWritable, Text, Text, IntWritable {/*** 处理分片split中的每一行的数据针对每行数据会调用一次map方法* 在一次map方法调用时从一行数据中获得一个个单词word再将每个单词word变成键值对形式(word, 1)输出出去* 输出的值最终写到本地磁盘中* param key 当前所读行行首相对于split分片开头的字节偏移量* param value 当前所读行* param context* throws IOException* throws InterruptedException*/public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {//当前行的示例数据(单词间空格分割)Dear Bear River//取得当前行的数据String line value.toString();//按照\t进行分割得到当前行所有单词String[] words line.split(\t);for (String word : words) {//将每个单词word变成键值对形式(word, 1)输出出去//同样输出前要将kout, vout包装成对应的可序列化类型如String对应Textint对应IntWritablecontext.write(new Text(word), new IntWritable(1));}}
}
2.4.2 Reducer代码
package com.kaikeba.hadoop.wordcount;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/**** ReducerText, IntWritable, Text, IntWritable的四个泛型分别表示* reduce方法的输入的键的类型kin、输入值的类型vin输出的键的类型kout、输出的值的类型vout* 注意因为map的输出作为reduce的输入所以此处的kin、vin类型分别与map的输出的键类型、值类型相同* kout根据需求输出键指的是单词类型是String对应序列化类型是Text* vout根据需求输出值指的是每个单词的总个数类型是int对应序列化类型是IntWritable**/
public class WordCountReduce extends ReducerText, IntWritable, Text, IntWritable {/**** key相同的一组kv对会调用一次reduce方法* 如reduce task汇聚了众多的键值对有key是hello的键值对也有key是spark的键值对如下* (hello, 1)* (hello, 1)* (hello, 1)* (hello, 1)* ...* (spark, 1)* (spark, 1)* (spark, 1)** 其中key是hello的键值对被分成一组merge成[hello, Iterable(1,1,1,1)]调用一次reduce方法* 同样key是spark的键值对被分成一组merge成[spark, Iterable(1,1,1)]再调用一次reduce方法** param key 当前组的key* param values 当前组中所有value组成的可迭代集和* param context reduce上下文环境对象* throws IOException* throws InterruptedException*/public void reduce(Text key, IterableIntWritable values,Context context) throws IOException, InterruptedException {//定义变量用于累计当前单词出现的次数int sum 0;for (IntWritable count : values) {//从count中获得值累加到sum中sum count.get();}//将单词、单词次数分别作为键值对输出context.write(key, new IntWritable(sum));// 输出最终结果};
}2.4.3 Main程序入口
package com.kaikeba.hadoop.wordcount;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;/**** MapReduce程序入口* 注意* 导包时不要导错了* 另外map\reduce相关的类使用mapreduce包下的是新API如org.apache.hadoop.mapreduce.Job;*/
public class WordCountMain {//若在IDEA中本地执行MR程序需要将mapred-site.xml中的mapreduce.framework.name值修改成local//参数 c:/test/README.txt c:/test/wcpublic static void main(String[] args) throws IOException,ClassNotFoundException, InterruptedException {//判断一下输入参数是否是两个分别表示输入路径、输出路径if (args.length ! 2 || args null) {System.out.println(please input Path!);System.exit(0);}Configuration configuration new Configuration();//configuration.set(mapreduce.framework.name,local);//告诉程序要运行的jar包在哪//configuration.set(mapreduce.job.jar,/home/hadoop/IdeaProjects/Hadoop/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar);//调用getInstance方法生成job实例Job job Job.getInstance(configuration, WordCountMain.class.getSimpleName());//设置job的jar包如果参数指定的类包含在一个jar包中则此jar包作为job的jar包 参数class跟主类在一个工程即可一般设置成主类
// job.setJarByClass(WordCountMain.class);job.setJarByClass(WordCountMain.class);//通过job设置输入/输出格式//MR的默认输入格式是TextInputFormat输出格式是TextOutputFormat所以下两行可以注释掉
// job.setInputFormatClass(TextInputFormat.class);
// job.setOutputFormatClass(TextOutputFormat.class);//设置输入/输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//设置处理Map阶段的自定义的类job.setMapperClass(WordCountMap.class);//设置map combine类减少网路传出量job.setCombinerClass(WordCountReduce.class);//设置处理Reduce阶段的自定义的类job.setReducerClass(WordCountReduce.class);//注意如果map、reduce的输出的kv对类型一致直接设置reduce的输出的kv对就行如果不一样需要分别设置map, reduce的输出的kv类型//注意此处设置的map输出的key/value类型一定要与自定义map类输出的kv对类型一致否则程序运行报错
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(IntWritable.class);//设置reduce task最终输出key/value的类型//注意此处设置的reduce输出的key/value类型一定要与自定义reduce类输出的kv对类型一致否则程序运行报错job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 提交作业job.waitForCompletion(true);}
}程序运行有两种方式分别是windows本地运行、集群运行依次演示 2.5 本地运行 在windows的IDEA中运行 2.5.1 初次运行WordCountMain先设置main方法参数根据图示操作即可 弹出窗口中设置包含main方法的类 设置main方法在参数 注意两个参数间有一个英文空格表示两个参数 c:/test/README.txt c:/test/wc2.5.3 本地运行程序
在WordCountMain代码上点击鼠标右键运行程序
2.5.4 查看结果
①成功标识文件
②结果文件 2.6 集群运行
用maven插件打jar包①点击Maven②双击package打包 控制台打印结果①表示打包成功②是生成的jar所在路径 将jar包上传到node01用户主目录/home/hadoop下 用hadoop jar命令运行mr程序
[hadoopnode01 ~]$ cd
[hadoopnode01 ~]$ hadoop jar com.kaikeba.hadoop-1.0-SNAPSHOT.jar com.kaikeba.hadoop.wordcount.WordCountMain /README.txt /wordcount01说明 com.kaikeba.hadoop-1.0-SNAPSHOT.jar是jar包名 com.kaikeba.hadoop.wordcount.WordCountMain是包含main方法的类的全限定名 /NOTICE.txt和/wordcount是main方法的两个参数表示输入路径、输出路径 确认结果
[hadoopnode01 ~]$ hadoop fs -ls /wordcount012.7 总结
MR分为两个阶段map阶段、reduce阶段MR输入的文件有几个block就会生成几个map任务MR的reduce任务的个数由程序中编程指定job.setNumReduceTasks(4)map任务 map任务中map()一次读取block的一行数据以kv对的形式输入map()map()的输出作为reduce()的输入 reduce任务 reduce任务通过网络将各执行完成的map任务输出结果中属于自己的数据取过来key相同的键值对作为一组调用一次reduce()reduce任务生成一个结果文件文件写入HDFS
3. WEB UI查看结果
3.1 Yarn node01是resourcemanager所在节点主机名根据自己的实际情况修改主机名 浏览器访问url地址http://node01:8088 3.2 HDFS结果
浏览器输入URLhttp://node01:50070
①点击下拉框②浏览文件系统③输入根目录查看hdfs根路径中的内容
4. MapReduce编程了解一下就ok海量数据清洗 mapreduce在企业中可以用于对海量数据的数据清洗当然随着新一代大数据框架的出现也可以使用spark、flink等框架做数据清洗 4.1 需求
现有一批日志文件日志来源于用户使用搜狗搜索引擎搜索新闻并点击查看搜索结果过程但是日志中有一些记录损坏现需要使用MapReduce来将这些损坏记录如记录中少字段、多字段从日志文件中删除此过程就是传说中的数据清洗。并且在清洗时要统计损坏的记录数。
4.2 数据结构 日志格式每行记录有6个字段分别表示时间datetime、用户ID userid、新闻搜索关键字searchkwd、当前记录在返回列表中的序号retorder、用户点击链接的顺序cliorder、点击的URL连接cliurl 关于retorder、cliorder说明
4.3 逻辑分析 MapReduce程序一般分为map阶段将任务分而治之 reduce阶段将map阶段的结果进行聚合 而有些mapreduce应用不需要数据聚合的操作也就是说不需要reduce阶段。即编程时不需要编写自定义的reducer类在main()中调用job.setNumReduceTasks(0)设置 而本例的数据清洗就是属于此种情况 统计损坏的记录数可使用自定义计数器的方式进行 map方法的逻辑取得每一行数据与每条记录的固定格式比对是否符合 若符合则是完好的记录否则是损坏的记录。并对自定义计数器累加
4.4 MR代码 若要集群运行需先将sogou.50w.utf8上传到HDFS根目录 [hadoopnode01 soft]$ pwd
/kkb/soft
[hadoopnode01 soft]$ hadoop fs -put sogou.50w.utf8 /运行等操作与上边类似不再赘述 4.4.1 Mapper类 具体逻辑可详见代码注释 注意实际工作中写良好的代码注释也是基本的职业素养 package com.kaikeba.hadoop.dataclean;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/**** 现对sogou日志数据做数据清洗将不符合格式要求的数据删除* 每行记录有6个字段* 分别表示时间datetime、用户ID userid、新闻搜索关键字searchkwd、当前记录在返回列表中的序号retorder、用户点击链接的顺序cliorder、点击的URL连接cliurl* 日志样本* 20111230111308 0bf5778fc7ba35e657ee88b25984c6e9 nba直播 4 1 http://www.hoopchina.com/tv**/
public class DataClean {/**** 基本上大部分MR程序的main方法逻辑大同小异将其他MR程序的main方法代码拷贝过来稍做修改即可* 实际开发中也会有很多的复制、粘贴、修改** 注意若要IDEA中本地运行MR程序需要将resources/mapred-site.xml中的mapreduce.framework.name属性值设置成local* param args*/public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//判断一下输入参数是否是两个分别表示输入路径、输出路径if (args.length ! 2 || args null) {System.out.println(please input Path!);System.exit(0);}Configuration configuration new Configuration();//调用getInstance方法生成job实例Job job Job.getInstance(configuration, DataClean.class.getSimpleName());//设置jar包参数是包含main方法的类job.setJarByClass(DataClean.class);//设置输入/输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//设置处理Map阶段的自定义的类job.setMapperClass(DataCleanMapper.class);//注意此处设置的map输出的key/value类型一定要与自定义map类输出的kv对类型一致否则程序运行报错job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);//注意因为不需要reduce聚合阶段所以需要显示设置reduce task个数是0job.setNumReduceTasks(0);// 提交作业job.waitForCompletion(true);}/*** * 自定义mapper类* 注意若自定义的mapper类与main方法在同一个类中需要将自定义mapper类声明成static的*/public static class DataCleanMapper extends MapperLongWritable, Text, Text, NullWritable {//为了提高程序的效率避免创建大量短周期的对象出发频繁GC此处生成一个对象共用NullWritable nullValue NullWritable.get();Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//自定义计数器用于记录残缺记录数Counter counter context.getCounter(DataCleaning, damagedRecord);//获得当前行数据//样例数据20111230111645 169796ae819ae8b32668662bb99b6c2d 塘承高速公路规划线路图 1 1 http://auto.ifeng.com/roll/20111212/729164.shtmlString line value.toString();//将行数据按照记录中字段分隔符切分String[] fields line.split(\t);//判断字段数组长度是否为6if(fields.length ! 6) {//若不是则不输出并递增自定义计数器counter.increment(1L);} else {//若是6则原样输出context.write(value, nullValue);}}}
}4.4.2 运行结果 仅以本地运行演示 运行 ①reduce 0%job就已经successfully表示此MR程序没有reduce阶段 ②DataCleaning是自定义计数器组名damagedRecord是自定义的计数器值为6表示有6条损坏记录 图中part-m-00000中的m表示此文件是由map任务生成
4.5 总结 MR可用于数据清洗另外也可以使用Spark、Flink等组件做数据清洗 可使用自定义计数器记录符合特定条件的记录数用于统计
5. MapReduce编程用户搜索次数了解一下就ok
5.1 需求
使用MR编程统计sogou日志数据中每个用户搜索的次数结果写入HDFS
5.2 数据结构 数据来源自“MapReduce编程数据清洗”中的输出结果 仍然是sogou日志数据不再赘述 5.3 逻辑分析
还记得之前提到的MR中key的作用吗MR编程时若要针对某个值对数据进行分组、聚合时如当前的词频统计例子需要将每个单词作为reduce输入的key从而按照单词分组进而求出每组即每个单词的总次数那么此例也是类似的。 统计每个用户的搜索次数将userid放到reduce输入的key的位置对userid进行分组进而统计每个用户的搜索次数
5.4 MR代码 给MR程序在IDEA中设置参数运行等操作与上边类似不再赘述 此处MR程序的输入文件是“MapReduce编程数据清洗”中的输出结果文件 package com.kaikeba.hadoop.searchcount;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/**** 本MR示例用于统计每个用户搜索并查看URL链接的次数*/
public class UserSearchCount {/**** param args C:\test\datacleanresult c:\test\usersearch* throws IOException* throws ClassNotFoundException* throws InterruptedException*/public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//判断一下输入参数是否是两个分别表示输入路径、输出路径if (args.length ! 2 || args null) {System.out.println(please input Path!);System.exit(0);}Configuration configuration new Configuration();//configuration.set(mapreduce.job.jar,/home/hadoop/IdeaProjects/Hadoop/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar);//调用getInstance方法生成job实例Job job Job.getInstance(configuration, UserSearchCount.class.getSimpleName());//设置jar包参数是包含main方法的类job.setJarByClass(UserSearchCount.class);//通过job设置输入/输出格式//MR的默认输入格式是TextInputFormat输出格式是TextOutputFormat所以下两行可以注释掉
// job.setInputFormatClass(TextInputFormat.class);
// job.setOutputFormatClass(TextOutputFormat.class);//设置输入/输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//设置处理Map阶段的自定义的类job.setMapperClass(SearchCountMapper.class);//设置map combine类减少网路传出量//job.setCombinerClass(WordCountReduce.class);//设置处理Reduce阶段的自定义的类job.setReducerClass(SearchCountReducer.class);//如果map、reduce的输出的kv对类型一致直接设置reduce的输出的kv对就行如果不一样需要分别设置map, reduce的输出的kv类型//注意此处设置的map输出的key/value类型一定要与自定义map类输出的kv对类型一致否则程序运行报错
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(IntWritable.class);//设置reduce task最终输出key/value的类型//注意此处设置的reduce输出的key/value类型一定要与自定义reduce类输出的kv对类型一致否则程序运行报错job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//提交作业job.waitForCompletion(true);}public static class SearchCountMapper extends MapperLongWritable, Text, Text, IntWritable {//定义共用的对象减少GC压力Text userIdKOut new Text();IntWritable vOut new IntWritable(1);Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//获得当前行的数据//样例数据20111230111645 169796ae819ae8b32668662bb99b6c2d 塘承高速公路规划线路图 1 1 http://auto.ifeng.com/roll/20111212/729164.shtmlString line value.toString();//切分获得各字段组成的数组String[] fields line.split(\t);//因为要统计每个user搜索并查看URL的次数所以将userid放到输出key的位置//注意MR编程中根据业务需求设计key是很重要的能力String userid fields[1];//设置输出的key的值userIdKOut.set(userid);//输出结果context.write(userIdKOut, vOut);}}public static class SearchCountReducer extends ReducerText, IntWritable, Text, IntWritable {//定义共用的对象减少GC压力IntWritable totalNumVOut new IntWritable();Overrideprotected void reduce(Text key, IterableIntWritable values, Context context) throws IOException, InterruptedException {int sum 0;for(IntWritable value: values) {sum value.get();}//设置当前user搜索并查看总次数totalNumVOut.set(sum);context.write(key, totalNumVOut);}}
}5.4.1 结果
运行参数
C:\test\datacleanresult c:\test\usersearch运行结果 5.5 总结
结合本例子的需求设计MR程序因为要统计每个用户的搜索次数所以最终userid作为reduce的输出的keyMR编程能够根据业务需求设计合适的key是一个很重要的能力而这是需要建立在自己地MR框架原理有清晰认识的基础之上的
6. Shuffle重点 洗牌
shuffle主要指的是map端的输出作为reduce端输入的过程
6.1 shuffle简图 6.2 shuffle细节图 分区用到了分区器默认分区器是HashPartitioner 源码 public class HashPartitionerK2, V2 implements PartitionerK2, V2 {public void configure(JobConf job) {}/** Use {link Object#hashCode()} to partition. */public int getPartition(K2 key, V2 value,int numReduceTasks) {return (key.hashCode() Integer.MAX_VALUE) % numReduceTasks;}}6.3 map端
每个map任务都有一个对应的环形内存缓冲区输出是kv对先写入到环形缓冲区默认大小100M当内容占据80%缓冲区空间后由一个后台线程将缓冲区中的数据溢出写到一个磁盘文件在溢出写的过程中map任务可以继续向环形缓冲区写入数据但是若写入速度大于溢出写的速度最终造成100m占满后map任务会暂停向环形缓冲区中写数据的过程只执行溢出写的过程直到环形缓冲区的数据全部溢出写到磁盘才恢复向缓冲区写入后台线程溢写磁盘过程有以下几个步骤 先对每个溢写的kv对做分区分区的个数由MR程序的reduce任务数决定默认使用HashPartitioner计算当前kv对属于哪个分区计算公式(key.hashCode() Integer.MAX_VALUE) % numReduceTasks每个分区中根据kv对的key做内存中排序若设置了map端本地聚合combiner则对每个分区中排好序的数据做combine操作若设置了对map输出压缩的功能会对溢写数据压缩 随着不断的向环形缓冲区中写入数据会多次触发溢写每当环形缓冲区写满100m本地磁盘最终会生成多个溢出文件合并溢写文件在map task完成之前所有溢出文件会被合并成一个大的溢出文件且是已分区、已排序的输出文件小细节 在合并溢写文件时如果至少有3个溢写文件并且设置了map端combine的话会在合并的过程中触发combine操作但是若只有2个或1个溢写文件则不触发combine操作因为combine操作本质上是一个reduce需要启动JVM虚拟机有一定的开销
6.4 reduce端 reduce task会在每个map task运行完成后通过HTTP获得map task输出中属于自己的分区数据许多kv对 如果map输出数据比较小先保存在reduce的jvm内存中否则直接写入reduce磁盘 一旦内存缓冲区达到阈值默认0.66或map输出数的阈值默认1000则触发归并merge结果写到本地磁盘 若MR编程指定了combine在归并过程中会执行combine操作 随着溢出写的文件的增多后台线程会将它们合并大的、排好序的文件 reduce task将所有map task复制完后将合并磁盘上所有的溢出文件 默认一次合并10个 最后一批合并部分数据来自内存部分来自磁盘上的文件 进入“归并、排序、分组阶段” 每组数据调用一次reduce方法
6.5 总结
map端 map()输出结果先写入环形缓冲区缓冲区100M写满80M后开始溢出写磁盘文件此过程中会进行分区、排序、combine可选、压缩可选map任务完成前会将多个小的溢出文件合并成一个大的溢出文件已分区、排序 reduce端 拷贝阶段reduce任务通过http将map任务属于自己的分区数据拉取过来开始merge及溢出写磁盘文件所有map任务的分区全部拷贝过来后进行阶段合并、排序、分组阶段每组数据调用一次reduce()结果写入HDFS
7. 自定义分区重点
7.1 分区原理 根据之前讲的shuffle我们知道在map任务中从环形缓冲区溢出写磁盘时会先对kv对数据进行分区操作 分区操作是由MR中的分区器负责的 MapReduce有自带的默认分区器 HashPartitioner关键方法getPartition返回当前键值对的分区索引(partition index) public class HashPartitionerK2, V2 implements PartitionerK2, V2 {public void configure(JobConf job) {}/** Use {link Object#hashCode()} to partition. */public int getPartition(K2 key, V2 value, int numReduceTasks) {return (key.hashCode() Integer.MAX_VALUE) % numReduceTasks;}
}环形缓冲区溢出写磁盘前将每个kv对作为getPartition()的参数传入 先对键值对中的key求hash值int类型与MAX_VALUE按位与再模上reduce task个数假设reduce task个数设置为4可在程序中使用job.setNumReduceTasks(4)指定reduce task个数为4 那么map任务溢出文件有4个分区分区index分别是0、1、2、3getPartition()结果有四种0、1、2、3根据计算结果决定当前kv对落入哪个分区如结果是0则当前kv对落入溢出文件的0分区中最终被相应的reduce task通过http获得 若是MR默认分区器不满足需求可根据业务逻辑设计自定义分区器比如实现图上的功能
7.2 默认分区 程序执行略 代码详见工程com.kaikeba.hadoop.partitioner包 MR读取三个文件part1.txt、part2.txt、part3.txt三个文件放到HDFS目录/customParttitioner中 part1.txt内容如下 Dear Bear River
Dear Carpart2.txt内容如下 Car Car River
Dear Bearpart3.txt内容如下 Dear Car Bear
Car Car默认HashPartitioner分区时查看结果看代码 运行参数
/customParttitioner /cp01打jar包运行结果如下 只有part-r-00001、part-r-00003有数据另外两个没有数据 HashPartitioner将Bear分到index1的分区将Car|Dear|River分到index3分区 7.3 自定义分区
7.3.1 需求
自定义分区使得文件中分别以Dear、Bear、River、Car为键的键值对分别落到index是0、1、2、3的分区中
7.3.2 逻辑分析
若要实现以上的分区策略需要自定义分区类 此类实现Partitioner接口在getPartition()中实现分区逻辑 main方法中 设定reduce个数为4设置自定义的分区类调用job.setPartitionerClass方法
7.3.3 MR代码 完整代码见代码工程 自定义分区类如下
package com.kaikeba.hadoop.partitioner;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;import java.util.HashMap;public class CustomPartitioner extends PartitionerText, IntWritable {public static HashMapString, Integer dict new HashMapString, Integer();//定义每个键对应的分区index使用map数据结构完成static{dict.put(Dear, 0);dict.put(Bear, 1);dict.put(River, 2);dict.put(Car, 3);}public int getPartition(Text text, IntWritable intWritable, int i) {//int partitionIndex dict.get(text.toString());return partitionIndex;}
}运行结果 结果满足需求 7.4 总结
如果默认分区器不满足业务需求可以自定义分区器 自定义分区器的类继承Partitioner类覆写getPartition()在方法中定义自己的分区策略在main()方法中调用job.setPartitionerClass()main()中设置reduce任务数
8. 自定义Combiner重点本质是reduce,Map端聚合
8.1 需求 普通的MR是reduce通过http取得map任务的分区结果具体的聚合出结果是在reduce端进行的 以单词计数为例 下图中的第一个map任务(map1)本地磁盘中的结果有5个键值对(Dear, 1)、(Bear, 1)、(River, 1)、(Dear, 1)、(Car, 1)其中map1中的两个相同的键值对(Dear, 1)、(Dear, 1)会被第一个reduce任务(reduce1)通过网络拉取到reduce1端那么假设map1中(Dear, 1)有1亿个呢按原思路map1端需要存储1亿个(Dear, 1)再将1亿个(Dear, 1)通过网络被reduce1获得然后再在reduce1端汇总这样做map端本地磁盘IO、数据从map端到reduce端传输的网络IO比较大那么想能不能在reduce1从map1拉取1亿个(Dear, 1)之前在map端就提前先做下reduce汇总得到结果(Dear, 100000000)然后再将这个结果一个键值对传输到reduce1呢答案是可以的我们称之为combine操作 map端combine本地聚合本质是reduce
8.2 逻辑分析 注意 不论运行多少次Combine操作都不能影响最终的结果 并非所有的mr都适合combine操作比如求平均值 参考《并非所有MR都适合combine.txt》 原理图 看原图 当每个map任务的环形缓冲区添满80%开始溢写磁盘文件 此过程会分区、每个分区内按键排序、再combine操作若设置了combine的话、若设置map输出压缩的话则再压缩 在合并溢写文件时如果至少有3个溢写文件并且设置了map端combine的话会在合并的过程中触发combine操作但是若只有2个或1个溢写文件则不触发combine操作因为combine操作本质上是一个reduce需要启动JVM虚拟机有一定的开销 combine本质上也是reduce因为自定义的combine类继承自Reducer父类 map: (K1, V1) - list(K2, V2) combiner: (K2, list(V2)) - (K2, V2) reduce: (K2, list(V2)) - (K3, V3) reduce函数与combine函数通常是一样的K3与K2类型相同V3与V2类型相同即reduce的输入的kv类型分别与输出的kv类型相同
8.3 MR代码 对原词频统计代码做修改 详细代码见代码工程 WordCountMap、WordCountReduce代码保持不变唯一需要做的修改是在WordCountMain中增加job.setCombinerClass(WordCountReduce.class);修改如下 8.4 小结
使用combine时首先考虑当前MR是否适合combine总原则是不论使不使用combine不能影响最终的结果在MR时发生数据倾斜且可以使用combine时可以使用combine缓解数据倾斜
9. MR压缩
9.1 需求
作用在MR中为了减少磁盘IO及网络IO可考虑在map端、reduce端设置压缩功能给“MapReduce编程用户搜索次数”代码增加压缩功能
9.2 逻辑分析
那么如何设置压缩功能呢只需在main方法中给Configuration对象增加如下设置即可
//开启map输出进行压缩的功能
configuration.set(mapreduce.map.output.compress, true);
//设置map输出的压缩算法是BZip2Codec它是hadoop默认支持的压缩算法且支持切分
configuration.set(mapreduce.map.output.compress.codec, org.apache.hadoop.io.compress.BZip2Codec);
//开启job输出压缩功能
configuration.set(mapreduce.output.fileoutputformat.compress, true);
//指定job输出使用的压缩算法
configuration.set(mapreduce.output.fileoutputformat.compress.codec, org.apache.hadoop.io.compress.BZip2Codec);9.3 MR代码 给“MapReduce编程用户搜索次数”代码增加压缩功能代码如下 如何打jar包已演示过此处不再赘述
package com.kaikeba.hadoop.mrcompress;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** 本MR示例用于统计每个用户搜索并查看URL链接的次数*/
public class UserSearchCount {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//判断以下输入参数是否是两个分别表示输入路径、输出路径if (args.length ! 2 || args null) {System.out.println(please input Path!);System.exit(0);}Configuration configuration new Configuration();//configuration.set(mapreduce.job.jar,/home/hadoop/IdeaProjects/Hadoop/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar);//开启map输出进行压缩的功能configuration.set(mapreduce.map.output.compress, true);//设置map输出的压缩算法是BZip2Codec它是hadoop默认支持的压缩算法且支持切分configuration.set(mapreduce.map.output.compress.codec, org.apache.hadoop.io.compress.BZip2Codec);//开启job输出压缩功能configuration.set(mapreduce.output.fileoutputformat.compress, true);//指定job输出使用的压缩算法configuration.set(mapreduce.output.fileoutputformat.compress.codec, org.apache.hadoop.io.compress.BZip2Codec);//调用getInstance方法生成job实例Job job Job.getInstance(configuration, UserSearchCount.class.getSimpleName());//设置jar包参数是包含main方法的类job.setJarByClass(UserSearchCount.class);//通过job设置输入/输出格式//MR的默认输入格式是TextInputFormat所以下两行可以注释掉
// job.setInputFormatClass(TextInputFormat.class);
// job.setOutputFormatClass(TextOutputFormat.class);//设置输入/输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// FileOutputFormat.setCompressOutput(job, true);
// FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);//设置处理Map阶段的自定义的类job.setMapperClass(SearchCountMapper.class);//设置map combine类减少网路传出量//job.setCombinerClass(WordCountReduce.class);//设置处理Reduce阶段的自定义的类job.setReducerClass(SearchCountReducer.class);//如果map、reduce的输出的kv对类型一致直接设置reduce的输出的kv对就行如果不一样需要分别设置map, reduce的输出的kv类型//注意此处设置的map输出的key/value类型一定要与自定义map类输出的kv对类型一致否则程序运行报错
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(IntWritable.class);//设置reduce task最终输出key/value的类型//注意此处设置的reduce输出的key/value类型一定要与自定义reduce类输出的kv对类型一致否则程序运行报错job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 提交作业job.waitForCompletion(true);}public static class SearchCountMapper extends MapperLongWritable, Text, Text, IntWritable {//定义共用的对象减少GC压力Text userIdKOut new Text();IntWritable vOut new IntWritable(1);Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//获得当前行的数据//样例数据20111230111645 169796ae819ae8b32668662bb99b6c2d 塘承高速公路规划线路图 1 1 http://auto.ifeng.com/roll/20111212/729164.shtmlString line value.toString();//切分获得各字段组成的数组String[] fields line.split(\t);//因为要统计每个user搜索并查看URL的次数所以将userid放到输出key的位置//注意MR编程中根据业务需求设计key是很重要的能力String userid fields[1];//设置输出的key的值userIdKOut.set(userid);//输出结果context.write(userIdKOut, vOut);}}public static class SearchCountReducer extends ReducerText, IntWritable, Text, IntWritable {//定义共用的对象减少GC压力IntWritable totalNumVOut new IntWritable();Overrideprotected void reduce(Text key, IterableIntWritable values, Context context) throws IOException, InterruptedException {int sum 0;for(IntWritable value: values) {sum value.get();}//设置当前user搜索并查看总次数totalNumVOut.set(sum);context.write(key, totalNumVOut);}}
}生成jar包并运行jar包
[hadoopnode01 target]$ hadoop jar com.kaikeba.hadoop-1.0-SNAPSHOT.jar com.kaikeba.hadoop.mrcompress.UserSearchCount /sogou.2w.utf8 /compressed查看结果 可增加数据量查看使用压缩算法前后的系统各计数器的数据量变化
[hadoopnode01 target]$ hadoop fs -ls -h /compressed9.4 总结
MR过程中使用压缩可减少数据量进而减少磁盘IO、网络IO数据量可设置map端输出的压缩可设置job最终结果的压缩通过相应的配置项即可实现
10. 自定义InputFormat难点
10.1 MapReduce执行过程 上图也描述了mapreduce的一个完整的过程我们主要看map任务是如何从hdfs读取分片数据的部分 涉及3个关键的类 ①InputFormat输入格式类 ②InputSplit输入分片类getSplits() InputFormat输入格式类将输入文件分成一个个分片InputSplit每个Map任务对应一个split分片 ③RecordReader记录读取器类createRecordReader() RecordReader记录读取器读取分片数据一行记录生成一个键值对传入map任务的map()方法调用map() 所以如果需要根据自己的业务情况自定义输入的话需要自定义两个类 InputFormat类RecordReader类 详细流程 客户端调用InputFormat的**getSplits()**方法获得输入文件的分片信息 针对每个MR job会生成一个相应的app master负责map\reduce任务的调度及监控执行情况 将分片信息传递给MR job的app master app master根据分片信息尽量将map任务尽量调度在split分片数据所在节点移动计算不移动数据 有几个分片就生成几个map任务 每个map任务将split分片传递给createRecordReader()方法生成此分片对应的RecordReader RecordReader用来读取分片的数据生成记录的键值对 nextKeyValue()判断是否有下一个键值对如果有返回true否则返回false如果返回true调用getCurrentKey()获得当前的键调用getCurrentValue()获得当前的值 map任务运行过程 map任务运行时会调用run() 首先运行一次setup()方法只在map任务启动时运行一次一些初始化的工作可以在setup方法中完成如要连接数据库之类的操作 while循环调用context.nextKeyValue()会委托给RecordRecord的nextKeyValue()判断是否有下一个键值对 如果有下一个键值对调用context.getCurrentKey()、context.getCurrentValue()获得当前的键、值的值也是调用RecordReader的同名方法 作为参数传入map(key, value, context)调用一次map() 当读取分片尾context.nextKeyValue()返回false退出循环 调用cleanup()方法只在map任务结束之前调用一次所以一些回收资源的工作可在此方法中实现如关闭数据库连接
10.2 需求
无论hdfs还是mapreduce处理小文件都有损效率实践中又难免面临处理大量小文件的场景此时就需要有相应解决方案
10.3 逻辑分析
小文件的优化无非以下几种方式 在数据采集的时候就将小文件或小批数据合成大文件再上传HDFS(SequenceFile方案)在业务处理之前在HDFS上使用mapreduce程序对小文件进行合并可使用自定义InputFormat实现在mapreduce处理时可采用CombineFileInputFormat提高效率 本例使用第二种方案自定义输入格式
10.4 MR代码 自定义InputFormat package com.kaikeba.hadoop.inputformat;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import java.io.IOException;/*** 自定义InputFormat类* 泛型* 键因为不需要使用键所以设置为NullWritable* 值值用于保存小文件的内容此处使用BytesWritable*/
public class WholeFileInputFormat extends FileInputFormatNullWritable, BytesWritable {/**** 返回false表示输入文件不可切割* param context* param file* return*/Overrideprotected boolean isSplitable(JobContext context, Path file) {return false;}/*** 生成读取分片split的RecordReader* param split* param context* return* throws IOException* throws InterruptedException*/Overridepublic RecordReaderNullWritable, BytesWritable createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException,InterruptedException {//使用自定义的RecordReader类WholeFileRecordReader reader new WholeFileRecordReader();//初始化RecordReaderreader.initialize(split, context);return reader;}
}自定义RecordReader 实现6个相关方法 package com.kaikeba.hadoop.inputformat;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;/**** RecordReader的核心工作逻辑* 通过nextKeyValue()方法去读取数据构造将返回的key value* 通过getCurrentKey 和 getCurrentValue来返回上面构造好的key和value** author*/
public class WholeFileRecordReader extends RecordReaderNullWritable, BytesWritable {//要读取的分片private FileSplit fileSplit;private Configuration conf;//读取的value数据private BytesWritable value new BytesWritable();/**** 标识变量分片是否已被读取过因为小文件设置成了不可切分所以一个小文件只有一个分片* 而这一个分片的数据只读取一次一次读完所有数据* 所以设置此标识*/private boolean processed false;/*** 初始化* param split* param context* throws IOException* throws InterruptedException*/Overridepublic void initialize(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException {this.fileSplit (FileSplit) split;this.conf context.getConfiguration();}/*** 判断是否有下一个键值对。若有则读取分片中的所有的数据* return* throws IOException* throws InterruptedException*/Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {if (!processed) {byte[] contents new byte[(int) fileSplit.getLength()];Path file fileSplit.getPath();FileSystem fs file.getFileSystem(conf);FSDataInputStream in null;try {in fs.open(file);IOUtils.readFully(in, contents, 0, contents.length);value.set(contents, 0, contents.length);} finally {IOUtils.closeStream(in);}processed true;return true;}return false;}/*** 获得当前的key* return* throws IOException* throws InterruptedException*/Overridepublic NullWritable getCurrentKey() throws IOException,InterruptedException {return NullWritable.get();}/*** 获得当前的value* return* throws IOException* throws InterruptedException*/Overridepublic BytesWritable getCurrentValue() throws IOException,InterruptedException {return value;}/*** 获得分片读取的百分比因为如果读取分片数据的话会一次性的读取完所以进度要么是1要么是0* return* throws IOException*/Overridepublic float getProgress() throws IOException {//因为一个文件作为一个整体处理所以如果processed为true表示已经处理过了进度为1否则为0return processed ? 1.0f : 0.0f;}Overridepublic void close() throws IOException {}
}main方法 package com.kaikeba.hadoop.inputformat;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;/*** 让主类继承Configured类实现Tool接口* 实现run()方法* 将以前main()方法中的逻辑放到run()中* 在main()中调用ToolRunner.run()方法第一个参数是当前对象第二个参数是输入、输出*/
public class SmallFiles2SequenceFile extends Configured implements Tool {/*** 自定义Mapper类* mapper类的输入键值对类型与自定义InputFormat的输入键值对保持一致* mapper类的输出的键值对类型分别是文件名、文件内容*/static class SequenceFileMapper extendsMapperNullWritable, BytesWritable, Text, BytesWritable {private Text filenameKey;/*** 取得文件名* param context* throws IOException* throws InterruptedException*/Overrideprotected void setup(Context context) throws IOException,InterruptedException {InputSplit split context.getInputSplit();//获得当前文件路径Path path ((FileSplit) split).getPath();filenameKey new Text(path.toString());}Overrideprotected void map(NullWritable key, BytesWritable value,Context context) throws IOException, InterruptedException {context.write(filenameKey, value);}}public int run(String[] args) throws Exception {Configuration conf new Configuration();Job job Job.getInstance(conf,combine small files to sequencefile);job.setJarByClass(SmallFiles2SequenceFile.class);//设置自定义输入格式job.setInputFormatClass(WholeFileInputFormat.class);WholeFileInputFormat.addInputPath(job,new Path(args[0]));//设置输出格式SequenceFileOutputFormat及输出路径job.setOutputFormatClass(SequenceFileOutputFormat.class);SequenceFileOutputFormat.setOutputPath(job,new Path(args[1]));job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);job.setMapperClass(SequenceFileMapper.class);return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception {int exitCode ToolRunner.run(new SmallFiles2SequenceFile(),args);System.exit(exitCode);}
}10.5 总结
若要自定义InputFormat的话 需要自定义InputFormat类并覆写getRecordReader()方法自定义RecordReader类实现方法 initialize()nextKeyValue()getCurrentKey()getCurrentValue()getProgress()close()
11. 自定义OutputFormat
11.1 需求 现在有一些订单的评论数据要将订单的好评与其它级别的评论中评、差评进行区分开来将最终的数据分开到不同的文件夹下面去 数据第九个字段表示评分等级0 好评1 中评2 差评
11.2 逻辑分析
程序的关键点是在一个mapreduce程序中根据数据的不同(好评的评级不同)输出两类结果到不同目录这类灵活的输出需求通过自定义OutputFormat来实现
11.3 实现要点
在mapreduce中访问外部资源自定义OutputFormat类覆写getRecordWriter()方法自定义RecordWriter类覆写具体输出数据的方法write()
11.4 MR代码
自定义OutputFormat
package com.kaikeba.hadoop.outputformat;import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/**** 本例使用框架默认的Reducer它将Mapper输入的kv对原样输出所以reduce输出的kv类型分别是Text, NullWritable* 自定义OutputFormat的类泛型表示reduce输出的键值对类型要保持一致;* map--(kv)--reduce--(kv)--OutputFormat*/
public class MyOutPutFormat extends FileOutputFormatText, NullWritable {/*** 两个输出文件;* good用于保存好评文件其它评级保存到bad中* 根据实际情况修改path;node01及端口号8020*/String bad hdfs://node01:8020/outputformat/bad/r.txt;String good hdfs://node01:8020/outputformat/good/r.txt;/**** param context* return* throws IOException* throws InterruptedException*/Overridepublic RecordWriterText, NullWritable getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {//获得文件系统对象FileSystem fs FileSystem.get(context.getConfiguration());//两个输出文件路径Path badPath new Path(bad);Path goodPath new Path(good);FSDataOutputStream badOut fs.create(badPath);FSDataOutputStream goodOut fs.create(goodPath);return new MyRecordWriter(badOut,goodOut);}/*** 泛型表示reduce输出的键值对类型要保持一致*/static class MyRecordWriter extends RecordWriterText, NullWritable{FSDataOutputStream badOut null;FSDataOutputStream goodOut null;public MyRecordWriter(FSDataOutputStream badOut, FSDataOutputStream goodOut) {this.badOut badOut;this.goodOut goodOut;}/*** 自定义输出kv对逻辑* param key* param value* throws IOException* throws InterruptedException*/Overridepublic void write(Text key, NullWritable value) throws IOException, InterruptedException {if (key.toString().split(\t)[9].equals(0)){//好评goodOut.write(key.toString().getBytes());goodOut.write(\r\n.getBytes());}else{//其它评级badOut.write(key.toString().getBytes());badOut.write(\r\n.getBytes());}}/*** 关闭流* param context* throws IOException* throws InterruptedException*/Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {if(goodOut !null){goodOut.close();}if(badOut !null){badOut.close();}}}
}main方法
package com.kaikeba.hadoop.outputformat;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class MyOwnOutputFormatMain extends Configured implements Tool {public int run(String[] args) throws Exception {Configuration conf super.getConf();Job job Job.getInstance(conf, MyOwnOutputFormatMain.class.getSimpleName());job.setJarByClass(MyOwnOutputFormatMain.class);//默认项可以省略或者写出也可以//job.setInputFormatClass(TextInputFormat.class);//设置输入文件TextInputFormat.addInputPath(job, new Path(args[0]));job.setMapperClass(MyMapper.class);//job.setMapOutputKeyClass(Text.class);//job.setMapOutputValueClass(NullWritable.class);//设置自定义的输出类job.setOutputFormatClass(MyOutPutFormat.class);//设置一个输出目录这个目录会输出一个success的成功标志的文件MyOutPutFormat.setOutputPath(job, new Path(args[1]));job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//默认项即默认有一个reduce任务所以可以省略
// job.setNumReduceTasks(1);
// //Reducer将输入的键值对原样输出
// job.setReducerClass(Reducer.class);boolean b job.waitForCompletion(true);return b ? 0: 1;}/**** Mapper输出的key、value类型* 文件每行的内容作为输出的key对应Text类型* 输出的value为null对应NullWritable*/public static class MyMapper extends MapperLongWritable, Text, Text, NullWritable {Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//把当前行内容作为key输出value为nullcontext.write(value, NullWritable.get());}}/**** param args /ordercomment.csv /ofo* throws Exception*/public static void main(String[] args) throws Exception {Configuration configuration new Configuration();ToolRunner.run(configuration, new MyOwnOutputFormatMain(), args);}
}11.5 总结 自定义outputformat 泛型与reduce输出的键值对类型保持一致覆写getRecordWriter()方法 自定义RecordWriter 泛型与reduce输出的键值对类型保持一致覆写具体输出数据的方法write()、close() main方法 job.setOutputFormatClass使用自定义在输出类
12. 二次排序重点
12.1 需求 数据有一个简单的关于员工工资的记录文件salary.txt 每条记录如下有3个字段分别表示name、age、salary nancy 22 8000 使用MR处理记录实现结果中 按照工资从高到低的降序排序若工资相同则按年龄升序排序
12.2 逻辑分析 利用MR中key具有可比较的特点 MapReduce中根据key进行分区、排序、分组 有些MR的输出的key可以直接使用hadoop框架的可序列化可比较类型表示如Text、IntWritable等等而这些类型本身是可比较的如IntWritable默认升序排序 但有时使用MR编程输出的key若使用hadoop自带的key类型无法满足需求 此时需要自定义的key类型包含的是非单一信息如此例包含工资、年龄并且也得是**可序列化、可比较的** 需要自定义key定义排序规则 实现按照人的salary降序排序若相同则再按age升序排序若salary、age相同则放入同一组
12.3 MR代码
详见工程代码自定义key类型Person类
package com.kaikeba.hadoop.secondarysort;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;//根据输入文件格式定义JavaBean作为MR时Map的输出key类型要求此类可序列化、可比较
public class Person implements WritableComparablePerson {private String name;private int age;private int salary;public Person() {}public Person(String name, int age, int salary) {//super();this.name name;this.age age;this.salary salary;}public String getName() {return name;}public void setName(String name) {this.name name;}public int getAge() {return age;}public void setAge(int age) {this.age age;}public int getSalary() {return salary;}public void setSalary(int salary) {this.salary salary;}Overridepublic String toString() {return this.salary this.age this.name;}//两个Person对象的比较规则①先比较salary高的排序在前②若相同age小的在前public int compareTo(Person other) {int compareResult this.salary - other.salary;if(compareResult ! 0) {//若两个人工资不同//工资降序排序即工资高的排在前边return -compareResult;} else {//若工资相同//年龄升序排序即年龄小的排在前边return this.age - other.age;}}//序列化将NewKey转化成使用流传送的二进制public void write(DataOutput dataOutput) throws IOException {//注意①使用正确的write方法②记住此时的序列化的顺序name、age、salarydataOutput.writeUTF(name);dataOutput.writeInt(age);dataOutput.writeInt(salary);}//使用in读字段的顺序要与write方法中写的顺序保持一致name、age、salarypublic void readFields(DataInput dataInput) throws IOException {//read string//注意①使用正确的read方法②读取顺序与write()中序列化的顺序保持一致this.name dataInput.readUTF();this.age dataInput.readInt();this.salary dataInput.readInt();}
}main类、mapper、reducer
package com.kaikeba.hadoop.secondarysort;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.net.URI;public class SecondarySort {/**** param args /salary.txt /secondarysort* throws Exception*/public static void main(String[] args) throws Exception {Configuration configuration new Configuration();//configuration.set(mapreduce.job.jar,/home/bruce/project/kkbhdp01/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar);Job job Job.getInstance(configuration, SecondarySort.class.getSimpleName());FileSystem fileSystem FileSystem.get(URI.create(args[1]), configuration);//生产中慎用if (fileSystem.exists(new Path(args[1]))) {fileSystem.delete(new Path(args[1]), true);}FileInputFormat.setInputPaths(job, new Path(args[0]));job.setMapperClass(MyMap.class);//由于mapper与reducer输出的kv类型分别相同所以下两行可以省略
// job.setMapOutputKeyClass(Person.class);
// job.setMapOutputValueClass(NullWritable.class);//设置reduce的个数;默认为1//job.setNumReduceTasks(1);job.setReducerClass(MyReduce.class);job.setOutputKeyClass(Person.class);job.setOutputValueClass(NullWritable.class);FileOutputFormat.setOutputPath(job, new Path(args[1]));job.waitForCompletion(true);}//MyMap的输出key用自定义的Person类型输出的value为nullpublic static class MyMap extends MapperLongWritable, Text, Person, NullWritable {Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String[] fields value.toString().split(\t);String name fields[0];int age Integer.parseInt(fields[1]);int salary Integer.parseInt(fields[2]);//在自定义类中进行比较Person person new Person(name, age, salary);//person对象作为输出的keycontext.write(person, NullWritable.get());}}public static class MyReduce extends ReducerPerson, NullWritable, Person, NullWritable {Overrideprotected void reduce(Person key, IterableNullWritable values, Context context) throws IOException, InterruptedException {//输入的kv对原样输出context.write(key, NullWritable.get());}}
}12.4 总结
如果MR时key的排序规则比较复杂比如需要根据字段1排序若字段1相同则需要根据字段2排序…此时可以使用自定义key实现将自定义的key作为MR中map输出的key的类型reduce输入的类型自定义的key 实现WritableComparable接口实现compareTo比较方法实现write序列化方法实现readFields反序列化方法
13. 自定义分组求topN重难点
13.1 需求 现有一个淘宝用户订单历史记录文件每条记录有6个字段分别表示 userid、datetime、title商品标题、unitPrice商品单价、purchaseNum购买量、productId商品ID 现使用MR编程求出每个用户、每个月消费金额最多的两笔订单花了多少钱 所以得相同用户、同一个年月的数据分到同一组
13.2 逻辑分析
根据文件格式自定义JavaBean类OrderBean 实现WritableComparable接口包含6个字段分别对应文件中的6个字段重点实现compareTo方法 先比较userid是否相等若不相等则userid升序排序若相等比较两个Bean的日期是否相等若不相等则日期升序排序若相等再比较总开销降序排序 实现序列化方法write()实现反序列化方法readFields() 自定义分区类 继承Partitioner类getPartiton()实现userid相同的处于同一个分区 自定义Mapper类 输出key是当前记录对应的Bean对象输出的value对应当前订单的总开销 自定义分组类 决定userid相同、日期年月相同的记录分到同一组中调用一次reduce() 自定义Reduce类 reduce()中求出当前一组数据中开销头两笔的信息 main方法 job.setMapperClassjob.setPartitionerClassjob.setReducerClassjob.setGroupingComparatorClass
13.3 MR代码 详细代码见代码工程 OrderBean
package com.kaikeba.hadoop.grouping;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;//实现WritableComparable接口
public class OrderBean implements WritableComparableOrderBean {//用户IDprivate String userid;//年月//yearmonth - 201408private String datetime;//标题private String title;//单价private double unitPrice;//购买量private int purchaseNum;//商品IDprivate String produceId;public OrderBean() {}public OrderBean(String userid, String datetime, String title, double unitPrice, int purchaseNum, String produceId) {super();this.userid userid;this.datetime datetime;this.title title;this.unitPrice unitPrice;this.purchaseNum purchaseNum;this.produceId produceId;}//key的比较规则public int compareTo(OrderBean other) {//OrderBean作为MR中的key如果对象中的userid相同即ret1为0就表示两个对象是同一个用户int ret1 this.userid.compareTo(other.userid);if (ret1 0) {//如果userid相同比较年月String thisYearMonth this.getDatetime();String otherYearMonth other.getDatetime();int ret2 thisYearMonth.compareTo(otherYearMonth);if(ret2 0) {//若datetime相同//如果userid、年月都相同比较单笔订单的总开销Double thisTotalPrice this.getPurchaseNum()*this.getUnitPrice();Double oTotalPrice other.getPurchaseNum()*other.getUnitPrice();//总花销降序排序即总花销高的排在前边return -thisTotalPrice.compareTo(oTotalPrice);} else {//若datatime不同按照datetime升序排序return ret2;}} else {//按照userid升序排序return ret1;}}/*** 序列化* param dataOutput* throws IOException*/public void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(userid);dataOutput.writeUTF(datetime);dataOutput.writeUTF(title);dataOutput.writeDouble(unitPrice);dataOutput.writeInt(purchaseNum);dataOutput.writeUTF(produceId);}/*** 反序列化* param dataInput* throws IOException*/public void readFields(DataInput dataInput) throws IOException {this.userid dataInput.readUTF();this.datetime dataInput.readUTF();this.title dataInput.readUTF();this.unitPrice dataInput.readDouble();this.purchaseNum dataInput.readInt();this.produceId dataInput.readUTF();}/*** 使用默认分区器那么userid相同的落入同一分区* 另外一个方案此处不覆写hashCode方法而是自定义分区器getPartition方法中对OrderBean的userid求hashCode值%reduce任务数* return*/
// Override
// public int hashCode() {
// return this.userid.hashCode();
// }Overridepublic String toString() {return OrderBean{ userid userid \ , datetime datetime \ , title title \ , unitPrice unitPrice , purchaseNum purchaseNum , produceId produceId \ };}public String getUserid() {return userid;}public void setUserid(String userid) {this.userid userid;}public String getDatetime() {return datetime;}public void setDatetime(String datetime) {this.datetime datetime;}public String getTitle() {return title;}public void setTitle(String title) {this.title title;}public double getUnitPrice() {return unitPrice;}public void setUnitPrice(double unitPrice) {this.unitPrice unitPrice;}public int getPurchaseNum() {return purchaseNum;}public void setPurchaseNum(int purchaseNum) {this.purchaseNum purchaseNum;}public String getProduceId() {return produceId;}public void setProduceId(String produceId) {this.produceId produceId;}
}
MyPartitioner
package com.kaikeba.hadoop.grouping;import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Partitioner;//mapper的输出key类型是自定义的key类型OrderBean输出value类型是单笔订单的总开销double - DoubleWritable
public class MyPartitioner extends PartitionerOrderBean, DoubleWritable {Overridepublic int getPartition(OrderBean orderBean, DoubleWritable doubleWritable, int numReduceTasks) {//userid相同的落入同一分区return (orderBean.getUserid().hashCode() Integer.MAX_VALUE) % numReduceTasks;}
}
MyMapper
package com.kaikeba.hadoop.grouping;import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** 输出kv分别是OrderBean、用户每次下单的总开销*/
public class MyMapper extends MapperLongWritable, Text, OrderBean, DoubleWritable {DoubleWritable valueOut new DoubleWritable();DateUtils dateUtils new DateUtils();Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//13764633023 2014-12-01 02:20:42.000 全视目Allseelook 原宿风暴显色美瞳彩色隐形艺术眼镜1片 拍2包邮 33.6 2 18067781305String record value.toString();String[] fields record.split(\t);if(fields.length 6) {String userid fields[0];String datetime fields[1];String yearMonth dateUtils.getYearMonthString(datetime);String title fields[2];double unitPrice Double.parseDouble(fields[3]);int purchaseNum Integer.parseInt(fields[4]);String produceId fields[5];//生成OrderBean对象OrderBean orderBean new OrderBean(userid, yearMonth, title, unitPrice, purchaseNum, produceId);//此订单的总开销double totalPrice unitPrice * purchaseNum;valueOut.set(totalPrice);context.write(orderBean, valueOut);}}
}
MyReducer
package com.kaikeba.hadoop.grouping;import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;//输出的key为userid拼接上年月的字符串对应Text输出的value对应单笔订单的金额
public class MyReducer extends ReducerOrderBean, DoubleWritable, Text, DoubleWritable {/*** ①由于自定义分组逻辑相同用户、相同年月的订单是一组调用一次reduce()* ②由于自定义的key类OrderBean中比较规则compareTo规定相同用户、相同年月的订单按总金额降序排序* 所以取出头两笔就实现需求* param key* param values* param context* throws IOException* throws InterruptedException*/Overrideprotected void reduce(OrderBean key, IterableDoubleWritable values, Context context) throws IOException, InterruptedException {//求每个用户、每个月、消费金额最多的两笔多少钱int num 0;for(DoubleWritable value: values) {if(num 2) {String keyOut key.getUserid() key.getDatetime();context.write(new Text(keyOut), value);num;} else {break;}}}
}MyGroup
package com.kaikeba.hadoop.grouping;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;//自定义分组类reduce端调用reduce()前对数据做分组每组数据调用一次reduce()
public class MyGroup extends WritableComparator {public MyGroup() {//第一个参数表示key classsuper(OrderBean.class, true);}//分组逻辑Overridepublic int compare(WritableComparable a, WritableComparable b) {//userid相同且同一月的分成一组OrderBean aOrderBean (OrderBean)a;OrderBean bOrderBean (OrderBean)b;String aUserId aOrderBean.getUserid();String bUserId bOrderBean.getUserid();//userid、年、月相同的作为一组int ret1 aUserId.compareTo(bUserId);if(ret1 0) {//同一用户//年月也相同返回0在同一组return aOrderBean.getDatetime().compareTo(bOrderBean.getDatetime());} else {return ret1;}}
}
CustomGroupingMain
package com.kaikeba.hadoop.grouping;import com.kaikeba.hadoop.wordcount.WordCountMain;
import com.kaikeba.hadoop.wordcount.WordCountMap;
import com.kaikeba.hadoop.wordcount.WordCountReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class CustomGroupingMain extends Configured implements Tool {///tmall-201412-test.csv /cgopublic static void main(String[] args) throws Exception {int exitCode ToolRunner.run(new CustomGroupingMain(), args);System.exit(exitCode);}Overridepublic int run(String[] args) throws Exception {//判断以下输入参数是否是两个分别表示输入路径、输出路径if (args.length ! 2 || args null) {System.out.println(please input Path!);System.exit(0);}Configuration configuration new Configuration();//告诉程序要运行的jar包在哪//configuration.set(mapreduce.job.jar,/home/hadoop/IdeaProjects/Hadoop/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar);//调用getInstance方法生成job实例Job job Job.getInstance(configuration, CustomGroupingMain.class.getSimpleName());//设置jar包参数是包含main方法的类job.setJarByClass(CustomGroupingMain.class);//通过job设置输入/输出格式//MR的默认输入格式是TextInputFormat所以下两行可以注释掉
// job.setInputFormatClass(TextInputFormat.class);
// job.setOutputFormatClass(TextOutputFormat.class);//设置输入/输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//设置处理Map阶段的自定义的类job.setMapperClass(MyMapper.class);//设置map combine类减少网路传出量//job.setCombinerClass(MyReducer.class);job.setPartitionerClass(MyPartitioner.class);//设置处理Reduce阶段的自定义的类job.setReducerClass(MyReducer.class);job.setGroupingComparatorClass(MyGroup.class);//如果map、reduce的输出的kv对类型一致直接设置reduce的输出的kv对就行如果不一样需要分别设置map, reduce的输出的kv类型//注意此处设置的map输出的key/value类型一定要与自定义map类输出的kv对类型一致否则程序运行报错job.setMapOutputKeyClass(OrderBean.class);job.setMapOutputValueClass(DoubleWritable.class);//设置reduce task最终输出key/value的类型//注意此处设置的reduce输出的key/value类型一定要与自定义reduce类输出的kv对类型一致否则程序运行报错job.setOutputKeyClass(Text.class);job.setOutputValueClass(DoubleWritable.class);// 提交作业return job.waitForCompletion(true) ? 0 : 1;}
}
DateUtils
package com.kaikeba.hadoop.grouping;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;public class DateUtils {public static void main(String[] args) {//test1
// String str1 13764633024 2014-10-01 02:20:42.000;
// String str2 13764633023 2014-11-01 02:20:42.000;
// System.out.println(str1.compareTo(str2));//test2
// String datetime 2014-12-01 02:20:42.000;
// LocalDateTime localDateTime parseDateTime(datetime);
// int year localDateTime.getYear();
// int month localDateTime.getMonthValue();
// int day localDateTime.getDayOfMonth();
// System.out.println(year- year ; month - month ; day - day);//test3
// String datetime 2014-12-01 02:20:42.000;
// System.out.println(getYearMonthString(datetime));}public LocalDateTime parseDateTime(String dateTime) {DateTimeFormatter formatter DateTimeFormatter.ofPattern(yyyy-MM-dd HH:mm:ss.SSS);LocalDateTime localDateTime LocalDateTime.parse(dateTime, formatter);return localDateTime;}//日期格式转换工具类将2014-12-14 20:42:14.000转换成201412public String getYearMonthString(String dateTime) {DateTimeFormatter formatter DateTimeFormatter.ofPattern(yyyy-MM-dd HH:mm:ss.SSS);LocalDateTime localDateTime LocalDateTime.parse(dateTime, formatter);int year localDateTime.getYear();int month localDateTime.getMonthValue();return year month;}}
13.4 总结
要实现自定义分组逻辑 一般会自定义JavaBean作为map输出的key 实现其中的compareTo方法设置key的比较逻辑实现序列化方法write()实现反序列化方法readFields() 自定义mapper类、reducer类自定义partition类getPartition方法决定哪些key落入哪些分区自定义group分组类决定reduce阶段哪些kv对落入同一组调用一次reduce()写main方法设置自定义的类 job.setMapperClassjob.setPartitionerClassjob.setReducerClassjob.setGroupingComparatorClass
14. MapReduce数据倾斜经常被问到 什么是数据倾斜 数据中不可避免地会出现离群值outlier并导致数据倾斜。这些离群值会显著地拖慢MapReduce的执行。 常见的数据倾斜有以下几类 数据频率倾斜——某一个区域的数据量要远远大于其他区域。比如某一个key对应的键值对远远大于其他键的键值对。数据大小倾斜——部分记录的大小远远大于平均值。 在map端和reduce端都有可能发生数据倾斜 在map端的数据倾斜可以考虑使用combine在reduce端的数据倾斜常常来源于MapReduce的默认分区器 数据倾斜会导致map和reduce的任务执行时间大为延长也会让需要缓存数据集的操作消耗更多的内存资源
14.1 如何诊断是否存在数据倾斜
如何诊断哪些键存在数据倾斜 发现倾斜数据之后有必要诊断造成数据倾斜的那些键。有一个简便方法就是在代码里实现追踪每个键的最大值。为了减少追踪量可以设置数据量阀值只追踪那些数据量大于阀值的键并输出到日志中。实现代码如下运行作业后就可以从日志中判断发生倾斜的键以及倾斜程度跟踪倾斜数据是了解数据的重要一步也是设计MapReduce作业的重要基础 package com.kaikeba.hadoop.dataskew;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import org.apache.log4j.Logger;import java.io.IOException;public class WordCountReduce extends ReducerText, IntWritable, Text, IntWritable {private int maxValueThreshold;//日志类private static final Logger LOGGER Logger.getLogger(WordCountReduce.class);Overrideprotected void setup(Context context) throws IOException, InterruptedException {//一个键达到多少后会做数据倾斜记录maxValueThreshold 10000;}/*(hello, 1)(hello, 1)(hello, 1)...(spark, 1)key: hellovalue: List(1, 1, 1)*/public void reduce(Text key, IterableIntWritable values,Context context) throws IOException, InterruptedException {int sum 0;//用于记录键出现的次数int i 0;for (IntWritable count : values) {sum count.get();i;}//如果当前键超过10000个则打印日志if(i maxValueThreshold) {LOGGER.info(Received i values for key key);}context.write(key, new IntWritable(sum));// 输出最终结果};}14.2 减缓数据倾斜 Reduce数据倾斜一般是指map的输出数据中存在数据频率倾斜的状况即部分输出键的数据量远远大于其它的输出键 如何减小reduce端数据倾斜的性能损失常用方式有 一、自定义分区 基于输出键的背景知识进行自定义分区。 例如如果map输出键的单词来源于一本书。其中大部分必然是省略词stopword。那么就可以将自定义分区将这部分省略词发送给固定的一部分reduce实例。而将其他的都发送给剩余的reduce实例。 二、Combine 使用Combine可以大量地减小数据频率倾斜和数据大小倾斜。combine的目的就是聚合并精简数据。 三、抽样和范围分区 Hadoop默认的分区器是HashPartitioner基于map输出键的哈希值分区。这仅在数据分布比较均匀时比较好。在有数据倾斜时就很有问题。 使用分区器需要首先了解数据的特性。TotalOrderPartitioner中可以通过对原始数据进行抽样得到的结果集来预设分区边界值。 TotalOrderPartitioner中的范围分区器可以通过预设的分区边界值进行分区。因此它也可以很好地用在矫正数据中的部分键的数据倾斜问题。 四、数据大小倾斜的自定义策略 在map端或reduce端的数据大小倾斜都会对缓存造成较大的影响乃至导致OutOfMemoryError异常。处理这种情况并不容易。可以参考以下方法。 设置mapreduce.input.linerecordreader.line.maxlength来限制RecordReader读取的最大长度。 RecordReader在TextInputFormat和KeyValueTextInputFormat类中使用。默认长度没有上限。
15. MR调优
有调优专题
16. 抽样、范围分区
16.1 数据 数据气象站气象数据来源美国国家气候数据中心NCDC1901-2001年数据每年一个文件 气候数据record的格式如下 16.2 需求
对气象数据按照气温进行排序气温符合正太分布
16.3 实现方案 三种实现思路 方案一 设置一个分区即一个reduce任务在一个reduce中对结果进行排序失去了MR框架并行计算的优势 方案二 自定义分区人为指定各温度区间的记录落入哪个分区如分区温度边界值分别是-15、0、20共4个分区但由于对整个数据集的气温分布不了解可能某些分区的数据量大其它的分区小数据倾斜 方案三 通过对键空间采样只查看一小部分键获得键的近似分布好温度的近似分布进而据此结果创建分区实现尽可能的均匀的划分数据集Hadoop内置了采样器InputSampler
16.4 MR代码 分两大步 一、先将数据按气温对天气数据集排序。结果存储为sequencefile文件气温作为输出键数据行作为输出值 代码 此代码处理原始日志文件 结果用SequenceFile格式存储 温度作为SequenceFile的key记录作为value
package com.kaikeba.hadoop.totalorder;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import java.io.IOException;/*** 此代码处理原始日志文件 1901* 结果用SequenceFile格式存储* 温度作为SequenceFile的key记录作为value*/
public class SortDataPreprocessor {//输出的key\value分别是气温、记录static class CleanerMapper extends MapperLongWritable, Text, IntWritable, Text {private NcdcRecordParser parser new NcdcRecordParser();private IntWritable temperature new IntWritable();Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {//002902907099999190101010600464333023450FM-12000599999V0202701N015919999999N0000001N9-0078199999102001ADDGF108991999999999999999999parser.parse(value);if (parser.isValidTemperature()) {//是否是有效的记录temperature.set(parser.getAirTemperature());context.write(temperature, value);}}}//两个参数/ncdc/input /ncdc/sfoutputpublic static void main(String[] args) throws Exception {if (args.length ! 2) {System.out.println(input output);}Configuration conf new Configuration();Job job Job.getInstance(conf, SortDataPreprocessor.class.getSimpleName());job.setJarByClass(SortDataPreprocessor.class);//FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));job.setMapperClass(CleanerMapper.class);//最终输出的键、值类型job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(Text.class);//reduce个数为0job.setNumReduceTasks(0);//以sequencefile的格式输出job.setOutputFormatClass(SequenceFileOutputFormat.class);//开启job输出压缩功能//方案一conf.set(mapreduce.output.fileoutputformat.compress, true);conf.set(mapreduce.output.fileoutputformat.compress.type,RECORD);//指定job输出使用的压缩算法conf.set(mapreduce.output.fileoutputformat.compress.codec, org.apache.hadoop.io.compress.BZip2Codec);//方案二//设置sequencefile的压缩、压缩算法、sequencefile文件压缩格式block//SequenceFileOutputFormat.setCompressOutput(job, true);//SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);//SequenceFileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);//SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);System.exit(job.waitForCompletion(true) ? 0 : 1);}
}二、全局排序 使用全排序分区器TotalOrderPartitioner
package com.kaikeba.hadoop.totalorder;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;import java.net.URI;/*** 使用TotalOrderPartitioner全局排序一个SequenceFile文件的内容* 此文件是SortDataPreprocessor的输出文件* key是IntWritble气象记录中的温度*/
public class SortByTemperatureUsingTotalOrderPartitioner{/*** 两个参数/ncdc/sfoutput /ncdc/totalorder* 第一个参数是SortDataPreprocessor的输出文件*/public static void main(String[] args) throws Exception {if (args.length ! 2) {System.out.println(input output);}Configuration conf new Configuration();Job job Job.getInstance(conf, SortByTemperatureUsingTotalOrderPartitioner.class.getSimpleName());job.setJarByClass(SortByTemperatureUsingTotalOrderPartitioner.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//输入文件是SequenceFilejob.setInputFormatClass(SequenceFileInputFormat.class);//Hadoop提供的方法来实现全局排序要求Mapper的输入、输出的key必须保持类型一致job.setOutputKeyClass(IntWritable.class);job.setOutputFormatClass(SequenceFileOutputFormat.class);//分区器全局排序分区器job.setPartitionerClass(TotalOrderPartitioner.class);//分了3个区且分区i-1中的key小于i分区中所有的键job.setNumReduceTasks(3);/*** 随机采样器从所有的分片中采样* 每一个参数采样率* 第二个参数总的采样数* 第三个参数采样的最大分区数* 只要numSamples和maxSplitSampled第二、第三参数任一条件满足则停止采样*/InputSampler.SamplerIntWritable, Text sampler new InputSampler.RandomSamplerIntWritable, Text(0.1, 5000, 10);
// TotalOrderPartitioner.setPartitionFile();/*** 存储定义分区的键即整个数据集中温度的大致分布情况* 由TotalOrderPartitioner读取作为全排序的分区依据让每个分区中的数据量近似*/InputSampler.writePartitionFile(job, sampler);//根据上边的SequenceFile文件包含键的近似分布情况创建分区String partitionFile TotalOrderPartitioner.getPartitionFile(job.getConfiguration());URI partitionUri new URI(partitionFile);// JobConf jobConf new JobConf();//与所有map任务共享此文件添加到分布式缓存中DistributedCache.addCacheFile(partitionUri, job.getConfiguration());
// job.addCacheFile(partitionUri);//方案一输出的文件RECORD级别使用BZip2Codec进行压缩conf.set(mapreduce.output.fileoutputformat.compress, true);conf.set(mapreduce.output.fileoutputformat.compress.type,RECORD);//指定job输出使用的压缩算法conf.set(mapreduce.output.fileoutputformat.compress.codec, org.apache.hadoop.io.compress.BZip2Codec);//方案二//SequenceFileOutputFormat.setCompressOutput(job, true);//SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);//SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
17 总结拓展 对大量数据进行全局排序 先使用InputSampler.Sampler采样器对整个key空间进行采样得到key的近似分布 保存到key分布情况文件中 使用TotalOrderPartitioner利用上边的key分布情况文件进行分区每个分区的数据量近似从而防止数据倾斜 扩展阅读《Hadoop权威指南第4版》 7.3小节 - shuffle和排序 8.2 输入格式——MR中还有一些自带的输入格式 9.2.3 全排序 描述MR的shuffle全流程面试谈谈什么是数据倾斜什么情况会造成数据倾斜面试对MR数据倾斜如何解决面试补充图解》》》》》》》》》》