MapReduceClass extends Configured implements Tool代码

在hdfs上运行jar包执行MapReduce程序时,要实现Tool接口,记录实现接口的MR程序代码,方便自己使用:

  1. extends Configured implements Tool
  2. run()放置任务代码.以下为mapreduce代码
  3. main方法中调用run()方法

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
//标出大概流程,具体实现代码省略
//Tool接口可以支持处理通用的命令行选项,它是所有Map-Reduce程序的都可用的一个标准接口

public class WordCount extends Configured implements Tool {

public static void main(String[] args) throws Exception {

// 让ToolRunner执行
System.exit(ToolRunner.run(new Configuration(), new WordCount(),args));
//ToolRunner.run(new Configuration(),new ClassName(),参数args)
}

public int run(String[] strings) throws Exception {

//ToolRunner要处理的Configuration,Tool通过ToolRunner调用ToolRunner.run时,传入参数Configuration
Configuration conf = getConf();
//根据需要设置configuration
//conf.set(key,value);

//获取job
Job job = Job.getInstance(conf, "word count");

// 2 设置jar
job.setJarByClass(WordCount.class);

// 3 设置mapper和reducer
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);

// 4 设置map的输出类型
job.setMapOutputKeyClass(Access.class);//key1
job.setMapOutputValueClass(NullWritable.class);//value1

// 5 设置reduce的输出类型
job.setOutputKeyClass(Text.class);//key2
job.setOutputValueClass(IntWritable.class);//value2

//设置分区
//job.setPartitionerClass(MyPartitioner.class);
//job.setNumReduceTasks(4);

// 6 设置输入输出
FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));

// 7 提交作业
boolean result = job.waitForCompletion(true);
return result?0:1;
}

public static class MyMapper extends Mapper<Object, Text, Text, IntWritable>{
//key1, value1
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
.
.
.
context.write(word, one); //context.write(key1,value1)
}
}

public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
<key1, value1, key2, value2>
@Override //reduce(key1,value1,key2,value2)
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
.
.
.
context.write(key, result);//context.write(key2,value2)
}
}
  1. 补充:自定义序列化类

    1
    2
    3
    4
    5
    6
    自定义序列化类的实现步骤:
    * 1)implements Writable
    * 2)必须要有无参构造
    * 3)实现write和readFields方法
    * 4)这两个方法中的字段顺序一定要一致
    * 5) optional: toString

参考链接:

https://dzone.com/articles/using-libjars-option-hadoop