您现在的位置 >> Hadoop教程 >> Hadoop实战 >> 专题  
 

hadoop CombineFileInputFormat使用

【作者:Hadoop实战专家】【关键词:文件 实现 需要 】 【点击:14697次】【2013-12-3】
小文件与CombineFileInputFormat,相对于大批量的小文件,hadoop更适合处理少量的小文件。 return Math.min(1.0f, (this.pos - this.startOffset)/(float)(this.end - this.startOffset)); key = new FileLineWritable();  

相关热门搜索:linux文件系统结构

大数据标签:hadoop hdfs mapreduce bigdata

小文件与CombineFileInputFormat,相对于大批量的小文件,hadoop更适合处理少量的小文件。原因是FileInputFormat 生成的InputSplit是一个文件或者该文件的一部分。如果文件很小(比HDFS的块要小很多),并且文件数据很多,那么每次map任务只处理很少的输入数据,每次操作都会造成额外的开销。请比较分割成16个64MB的1GB的一个文件与100KB的10000个文件。10000个文件每个都需要使用一个map操作,作业时间比一个文件上的16个map操作慢上几十甚至几百倍。CombineFileInputFormat可以缓解这个问题,它是针对小文件而设计的,CombineFileInputFormat把多个文件打包到一个分片以便每个mapper可以处理更多的数据。CombineFileInputFormat是一个抽象类,没有提供实体类,所以使用的时候需要一些额外的工作。需要创建CombineFileInputFormat的具体子类,在旧的API实现getRecordReader()方法,新的API中实现createRecordReader()方法。下面的例子是实现新的API createRecordReader()的。
FileLineWritable 类:

1. package com.duplicate.self;

2.

3. import java.io.DataInput;

4. import java.io.DataOutput;

5. import java.io.IOException;

6.

7. import org.apache.hadoop.io.Text;

8. import org.apache.hadoop.io.WritableComparable;

9.

10. public class FileLineWritable implements WritableComparable {

11.

12.         public long offset;

13.         public String fileName;

14.

15.         public void readFields(DataInput in) throws IOException {

16.                 this.offset = in.readLong();

17.                 this.fileName = Text.readString(in);

18.         }

19.

20.         public void write(DataOutput out) throws IOException {

21.                 out.writeLong(this.offset);

22.                 Text.writeString(out,this.fileName);

23.         }

24.

25.         public int compareTo(FileLineWritable that) {

26.                 int cmp = this.fileName.compareTo(that.fileName);

27.                 if(cmp != 0){

28.                         return cmp;

29.                 }

30.                 return (int)Math.signum((double)(this.offset - that.offset));

31.         }

32.

33.         @Override

34.         public int hashCode(){

35.                 final int prime = 31;

36.                 int result = 1;

37.                 result = prime * result + ((fileName == null) ? 0:this.fileName.hashCode());

38.                 result = prime * result + (int)(this.offset ^ (this.offset >>> 2));

39.                 return result;

40.         }

41.

42.         @Override

43.         public boolean equals(Object obj){

44.                 if(this == obj){

45.                         return true;

46.                 }

47.

48.                 if(obj == null){

49.                         return false;

50.                 }

51.

52.                 if(this.getClass() != obj.getClass()){

53.                         return false;

54.                 }

55.

56.                 FileLineWritable other = (FileLineWritable)obj;

57.                 if(this.fileName == null){

58.                         if(other.fileName != null){

59.                                 return false;

60.                         }

61.                 }else if(!this.fileName.equals(other.fileName)){

62.                         return false;

63.                 }

64.

65.                 if(this.offset != other.offset){

66.                         return false;

67.                 }

68.

69.                 return true;

70.         }

71.

72. }

73.

复制代码
MyRecordReader 类的

1. package com.duplicate.self;

2.

3. import java.io.IOException;

4.

5. import org.apache.hadoop.fs.FSDataInputStream;

6. import org.apache.hadoop.fs.FileSystem;

7. import org.apache.hadoop.fs.Path;

8. import org.apache.hadoop.io.Text;

9. import org.apache.hadoop.mapreduce.InputSplit;

10. import org.apache.hadoop.mapreduce.RecordReader;

11. import org.apache.hadoop.mapreduce.TaskAttemptContext;

12. import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;

13. import org.apache.hadoop.util.LineReader;

14.

15. public class MyRecordReader extends RecordReader {

16.

17.         private long startOffset;

18.         private long end;

19.         private long pos;

20.         private FileSystem fs;

21.         private Path path;

22.         private FileLineWritable key;

23.         private Text value;

24.

25.         private FSDataInputStream fileIn;

26.         private LineReader reader;

27.

28.         public MyRecordReader(CombineFileSplit split,TaskAttemptContext context,Integer index) throws IOException{

29.                 this.path = split.getPath(index);

30.                 this.fs = this.path.getFileSystem(context.getConfiguration());

31.                 this.startOffset = split.getOffset(index);

32.                 this.end = this.startOffset + split.getLength();

33.

34.                 fileIn = this.fs.open(this.path);

35.                 reader = new LineReader(fileIn);

36.                 this.pos = this.startOffset;

37.         }

38.

39.         @Override

40.         public void close() throws IOException {

41.                 // TODO Auto-generated method stub

42.

43.         }

44.

45.         @Override

46.         public FileLineWritable getCurrentKey() throws IOException,

47.                         InterruptedException {

48.                 // TODO Auto-generated method stub

49.                 return key;

50.         }

51.

52.         @Override

53.         public Text getCurrentValue() throws IOException, InterruptedException {

54.                 // TODO Auto-generated method stub

55.                 return value;

56.         }

57.

58.         @Override

59.         public float getProgress() throws IOException, InterruptedException {

60.                 if(this.startOffset == this.end){

61.                         return 0;

62.                 }

63.

64.                 return Math.min(1.0f, (this.pos - this.startOffset)/(float)(this.end - this.startOffset));

65.         }

66.

67.         @Override

68.         public void initialize(InputSplit arg0, TaskAttemptContext arg1)

69.                         throws IOException, InterruptedException {

70.                 // TODO Auto-generated method stub

71.

72.         }

73.

74.         @Override

75.         public boolean nextKeyValue() throws IOException, InterruptedException {

76.                 if(key == null){

77.                         key = new FileLineWritable();

78.                         key.fileName = path.getName();

79.                 }

80.

81.                 key.offset = pos;

82.                 if(null == value){

83.                         value = new Text();

84.                 }

85.

86.                 int newSize = 0;

87.                 if(pos < end){

88.                         newSize = reader.readLine(value);

89.                         pos += newSize;

90.                 }

91.

92.                 if(newSize == 0){

93.                         key = null;

94.                         value = null;

95.                         return false;

96.                 }else{

97.                         return true;

98.                 }

99.         }

100.

101. }

102.

复制代码
MyCombineFileInputFormat 类:

1. package com.duplicate.self;

2.

3. import java.io.IOException;

4.

5. import org.apache.hadoop.fs.Path;

6. import org.apache.hadoop.io.Text;

7. import org.apache.hadoop.mapreduce.InputSplit;

8. import org.apache.hadoop.mapreduce.JobContext;

9. import org.apache.hadoop.mapreduce.RecordReader;

10. import org.apache.hadoop.mapreduce.TaskAttemptContext;

11. import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;

12. import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;

13. import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;

14.

15. public class MyCombineFileInputFormat extends CombineFileInputFormat {

16.

17.         @Override

18.         public RecordReader createRecordReader(InputSplit split,

19.                         TaskAttemptContext context) throws IOException {

20.                 return new CombineFileRecordReader((CombineFileSplit)split,context,MyRecordReader.class);

21.         }

22.

23.          @Override

24.           protected boolean isSplitable(JobContext context, Path file){

25.             return false;

26.           }

27.

28. }

29.

复制代码
job配置

1. import org.apache.hadoop.mapreduce.Job;

2. // standard hadoop conf

3. Job job = new Job(getConf());

4. job.setInputFormatClass(MyCombineFileInputFormat .class);

5. job.setMapperClass(Mapper.class);

6. job.setNumReduceTasks(0); // map only

复制代码
来自群组: hadoop技术组

大数据系列相关文章:

最新评论
相遇2014-09-09 07:13:31
不好意思 发错了
幸福梦想2014-09-09 10:46:31
签到!
Samantha2014-09-09 05:51:54
谢了,
陈江-V2014-09-08 05:30:14
2.5.0 其实 修改BUG不多,新增了几个小功能
kaka2014-09-07 09:50:37
${JAVA_HOME} 会取环境变量里的配置的
 
  • Hadoop生态系统资料推荐