-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSequenceFileWriterApp.java
More file actions
67 lines (54 loc) · 2.01 KB
/
SequenceFileWriterApp.java
File metadata and controls
67 lines (54 loc) · 2.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package examples.cn.crxy.offline5.mr.sequencefile;
import java.io.File;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.SequenceFile.Writer.Option;
public class SequenceFileWriterApp {
public static void main(String[] args) throws Exception {
if(args.length!=3){
System.err.println("参数必须是3个,分别是 inputDir outputPath compressionType");
System.exit(-1);
}
String inputDir = args[0];
Path outputPath = new Path(args[1]);
String compType = args[2]; //压缩类型,如果值为1表示None,值为2表示Block,值为3表示Record
Configuration conf = new Configuration();
outputPath.getFileSystem(conf).delete(outputPath, true);
Option[] opts = new Option[4];
opts[0] = Writer.file(outputPath);
opts[1] = Writer.keyClass(Text.class);
opts[2] = Writer.valueClass(Text.class);
CompressionType compressionType = getCompressionType(compType);
CompressionCodec codec = new GzipCodec();
opts[3] = Writer.compression(compressionType, codec);
Writer writer = SequenceFile.createWriter(conf , opts);
Text key = new Text();
Text val = new Text();
File[] listFiles = new File(inputDir).listFiles();
for (File file : listFiles) {
String name = file.getName();
key.set(name);
String content = FileUtils.readFileToString(file);
val.set(content);
writer.append(key, val);
}
writer.close();
}
public static CompressionType getCompressionType(String compType) {
if("2".equals(compType)){
return CompressionType.BLOCK;
}else if("3".equals(compType)){
return CompressionType.RECORD;
}else{
return CompressionType.NONE;
}
}
}