网站建设 岗位,可建网站,广州网站优化,wordpress获取作者的权限要求
多个小文件合并#xff0c;要求将文件合并到SequenceFile中
SequenceFile对外是一个整体#xff0c;对内还是一个个的文件
期望结果是#xff1a;
key#xff1a;每一个小文件的带路径的文件名value#xff1a;每一个小文件的文件内容
第一步#xff1a;自定义…要求
多个小文件合并要求将文件合并到SequenceFile中
SequenceFile对外是一个整体对内还是一个个的文件
期望结果是
key每一个小文件的带路径的文件名value每一个小文件的文件内容
第一步自定义RecordReader类
public class FileCombineRecordReader extends RecordReaderText, BytesWritable {//每一个切片小文件调用一次这个类private FileSplit split;private Configuration cfg;private boolean isProcess false;private Text key new Text();private BytesWritable value new BytesWritable();Overridepublic void initialize(InputSplit inputSplit, TaskAttemptContext Context) {this.split (FileSplit) inputSplit;cfg Context.getConfiguration();}Override//核心业务逻辑public boolean nextKeyValue() throws IOException {//一次读取一个完整的文件并封装到KV中if (!isProcess) {byte[] buf new byte[(int) split.getLength()]; //1.根据切片长度定义缓冲区Path path split.getPath();//2.获得路径FileSystem fs path.getFileSystem(cfg); //3.通过路径获得文件系统FSDataInputStream fis fs.open(path); //4.通过文件系统获得输入流IOUtils.readFully(fis, buf, 0, buf.length); //5.拷贝流key.set(split.getPath().toString());//设置key值为文件的路径名称value.set(buf, 0, buf.length);//将buf中的内容输出到value中IOUtils.closeStream(fis);IOUtils.closeStream(fs);//6.关闭流isProcess true;//读完之后结束return true;}return false;}Overridepublic Text getCurrentKey() {//获取当前的keyreturn key;}Overridepublic BytesWritable getCurrentValue() {//获取当前的valuereturn value;}Overridepublic float getProgress() {//获取正在处理的进度return 0;}Overridepublic void close() {}
}第二步自定义InputFromat
public class FileCombineInputFormat extends FileInputFormatText, BytesWritable {Overrideprotected boolean isSplitable(JobContext context, Path filename) {return false; //原文件不可切割}Overridepublic RecordReaderText, BytesWritable createRecordReader(InputSplit split, TaskAttemptContext context) {FileCombineRecordReader recordReader new FileCombineRecordReader();//自定义RecordReader对象并初始化recordReader.initialize(split,context);return recordReader;}
}第三步编写Mapper类
public class FileCombineMapper extends MapperText, BytesWritable, Text, BytesWritable {Overrideprotected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {context.write(key, value);}
}第四步编写Reducer类
public class FileCombineReducer extends ReducerText, BytesWritable, Text, BytesWritable {Overrideprotected void reduce(Text key, IterableBytesWritable values, Context context) throws IOException, InterruptedException {//循环写出for(BytesWritable value : values){context.write(key, value);}}
}第五步编写SequenceFileDriver类
public class FileCombineDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 数据输入路径和输出路径args new String[2];args[0] src/main/resources/aai/;args[1] src/main/resources/aao;Configuration cfg new Configuration();//设置本地模式运行即使项目类路径下core-site.xml文件依然采用本地模式cfg.set(mapreduce.framework.name, local);cfg.set(fs.defaultFS, file:///);Job job Job.getInstance(cfg);job.setJarByClass(FileCombineDriver.class);job.setMapperClass(FileCombineMapper.class);job.setReducerClass(FileCombineReducer.class);//设置inputFormat为自定义的FileCombileInputFormatjob.setInputFormatClass(FileCombineInputFormat.class);job.setOutputFormatClass(SequenceFileOutputFormat.class);//设置输出的outputFormatjob.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(BytesWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));boolean b job.waitForCompletion(true);System.out.println(b);}
}