MR作业的迭代:JobControl设计及用法

JobControl设计原理分析

​ JobControl 由两个类组成:Job 和 JobControl。其中,Job 类封装了一个 MapReduce 作业及其对应的依赖关系,主要负责监控各个依赖作业的运行状态,以此更新自己的状态,其状态转移图如图所示。作业刚开始处于 WAITING 状态。如果没有依赖作业或者所有依赖作业均已运行完成,则进入READY 状态。一旦进入 READY 状态,则作业可被提交到 Hadoop 集群上运行,并进入 RUNNING 状态。在 RUNNING 状态下,根据作业运行情况,可能进入 SUCCESS 或者 FAILED 状态。需要注意的是,如果一个作业的依赖作业失败,则该作业也会失败,于是形成“多米诺骨牌效应”, 后续所有作业均会失败。

img

​ JobControl 封装了一系列 MapReduce 作业及其对应的依赖关系。 它将处于不同状态的作业放入不同的哈希表中,并按照图所示的状态转移作业,直到所有作业运行完成。在实现的时候,JobControl 包含一个线程用于周期性地监控和更新各个作业的运行状态,调度依赖作业运行完成的作业,提交处于 READY 状态的作业等。同时,它还提供了一些API 用于挂起、恢复和暂停该线程。

JobControl代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import java.io.File;
import java.io.IOException;
import java.util.HashSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import mapreduce.SegmentUtil;

public class JobControlDemo {
public static int main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
String[] otherargs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherargs.length != 3) {
System.err.println("Usage JobControlDemo <InputPath1> <InputPath1> <OutPath>");
System.exit(2);
}

// 创建基础作业
Job job1 = Job.getInstance(conf, JobControlDemo.class.getSimpleName() + "1");
Job job2 = Job.getInstance(conf, JobControlDemo.class.getSimpleName() + "2");
Job job3 = Job.getInstance(conf, JobControlDemo.class.getSimpleName() + "3");

// Job1作业参数配置
job1.setJarByClass(JobControlDemo.class);
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(Text.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(Text.class);
job1.setMapperClass(MyMapper1.class);
job1.setReducerClass(MyReducer1.class);
job1.setInputFormatClass(TextInputFormat.class);
job1.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job1, new Path(otherargs[0]));
FileOutputFormat.setOutputPath(job1, new Path(otherargs[2]+File.separator+"mid1"));

// Job2作业参数配置
job2.setJarByClass(JobControlDemo.class);
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(Text.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
job2.setMapperClass(MyMapper2.class);
job2.setReducerClass(MyReducer2.class);
job2.setInputFormatClass(TextInputFormat.class);
job2.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job2, new Path(otherargs[1]));
FileOutputFormat.setOutputPath(job2, new Path(otherargs[2]+File.separator+"mid2"));

// Job3作业参数配置
job3.setJarByClass(JobControlDemo.class);
job3.setMapOutputKeyClass(Text.class);
job3.setMapOutputValueClass(Text.class);
job3.setOutputKeyClass(Text.class);
job3.setOutputValueClass(Text.class);
job3.setMapperClass(MyMapper3.class);
job3.setReducerClass(MyReducer3.class);
job3.setInputFormatClass(KeyValueTextInputFormat.class);
job3.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job3, new Path(otherargs[2]+File.separator+"mid1"));
FileInputFormat.addInputPath(job3, new Path(otherargs[2]+File.separator+"mid2"));
FileOutputFormat.setOutputPath(job3, new Path(otherargs[2]+File.separator+"result"));

// 创建受控作业
ControlledJob cjob1 = new ControlledJob(conf);
ControlledJob cjob2 = new ControlledJob(conf);
ControlledJob cjob3 = new ControlledJob(conf);

// 将普通作业包装成受控作业
cjob1.setJob(job1);
cjob2.setJob(job2);
cjob3.setJob(job3);

// 设置依赖关系
//cjob2.addDependingJob(cjob1);
cjob3.addDependingJob(cjob1);
cjob3.addDependingJob(cjob2);

// 新建作业控制器
JobControl jc = new JobControl("My control job");

// 将受控作业添加到控制器中
jc.addJob(cjob1);
jc.addJob(cjob2);
jc.addJob(cjob3);

/**
* hadoop的JobControl类实现了线程Runnable接口。我们需要实例化一个线程来让它启动。直接调用JobControl的run()方法,线程将无法结束。
*/
//jc.run();

Thread jcThread = new Thread(jc);
jcThread.start();
while(true){
if(jc.allFinished()){
System.out.println(jc.getSuccessfulJobList());
jc.stop();
return 0;
}
if(jc.getFailedJobList().size() > 0){
System.out.println(jc.getFailedJobList());
jc.stop();
return 1;
}
}
}
/**
* 第一个Job
*/
public static class MyMapper1 extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String[] spl1=value.toString().split("\t");
if(spl1.length==2){
context.write(new Text(spl1[0].trim()), new Text(spl1[1].trim()));
}
}
}
public static class MyReducer1 extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text k2, Iterable<Text> v2s, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
for (Text v2 : v2s) {
context.write(k2, v2);
}
}
}
/**
* 第二个Job
*/
public static class MyMapper2 extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String[] spl2=value.toString().split("\t");
if(spl2.length==2){
context.write(new Text(spl2[0].trim()), new Text(spl2[1].trim()));
}
}
}
public static class MyReducer2 extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text k3, Iterable<Text> v3s, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
for (Text v3 : v3s) {
context.write(k3, v3);
}
}
}
/**
* 第三个Job
*/
public static class MyMapper3 extends Mapper<Text, Text, Text, Text>{
@Override
protected void map(Text key, Text value, Mapper<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
context.write(key, value);
}
}
public static class MyReducer3 extends Reducer<Text,Text, Text, Text>{
@Override
protected void reduce(Text k4, Iterable<Text> v4s,Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
HashSet<String> hashSet=new HashSet<String>();
for (Text v4 : v4s) {
hashSet.add(v4.toString().trim());
}
if(hashSet.size()>=2){
context.write(k4, new Text("OK"));
}
}
}
}

测试输入数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
hdfs dfs -text /user/jiuqian/libin/input/inputpath1.txt
hadoop a
spark a
hive a
hbase a
tachyon a
storm a
redis a
hdfs dfs -text /user/jiuqian/libin/input/inputpath2.txt
hadoop b
spark b
kafka b
tachyon b
oozie b
flume b
sqoop b
solr b

测试输出数据:

1
2
3
4
hdfs dfs -text /user/jiuqian/libin/input/inputpathmerge2.txt/result/*
hadoop OK
spark OK
tachyon OK

运行输出信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
[sshexec] cmd : bash -c 'source  /home/jiuqian/.bashrc; /home/hduser/hadoop/bin/hadoop jar  /home/jiuqian/blb/JobControlDemo.jar -D mapreduce.map.java.opts=-Xmx2048m -D mapreduce.input.fileinputformat.split.minsize=1 -Dmapreduce.input.fileinputformat.split.maxsize=512000000 -D mapred.linerecordreader.maxlength=32768 /user/jiuqian/libin/input/inputpath1.txt /user/jiuqian/libin/input/inputpath2.txt /user/jiuqian/libin/input/inputpathmerge2.txt'
16/02/27 12:37:45 INFO client.RMProxy: Connecting to ResourceManager at sh-rslog1/192.168.1.2:8032
16/02/27 12:37:46 INFO input.FileInputFormat: Total input paths to process : 1
16/02/27 12:37:46 INFO mapreduce.JobSubmitter: number of splits:1
16/02/27 12:37:46 INFO Configuration.deprecation: mapred.linerecordreader.maxlength is deprecated. Instead, use mapreduce.input.linerecordreader.line.maxlength
16/02/27 12:37:47 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1446086163035_17037
16/02/27 12:37:47 INFO impl.YarnClientImpl: Submitted application application_1446086163035_17037
16/02/27 12:37:47 INFO mapreduce.Job: The url to track the job: http://sh-rslog1:8088/proxy/application_1446086163035_17037/
16/02/27 12:37:47 INFO client.RMProxy: Connecting to ResourceManager at sh-rslog1/27.115.29.102:8032
16/02/27 12:37:47 INFO input.FileInputFormat: Total input paths to process : 1
16/02/27 12:37:47 INFO mapreduce.JobSubmitter: number of splits:1
16/02/27 12:37:47 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1446086163035_17038
16/02/27 12:37:47 INFO impl.YarnClientImpl: Submitted application application_1446086163035_17038
16/02/27 12:37:47 INFO mapreduce.Job: The url to track the job: http://sh-rslog1:8088/proxy/application_1446086163035_17038/
16/02/27 12:38:13 INFO client.RMProxy: Connecting to ResourceManager at sh-rslog1/27.115.29.102:8032
16/02/27 12:38:13 INFO input.FileInputFormat: Total input paths to process : 2
16/02/27 12:38:13 INFO mapreduce.JobSubmitter: number of splits:2
16/02/27 12:38:13 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1446086163035_17039
16/02/27 12:38:13 INFO impl.YarnClientImpl: Submitted application application_1446086163035_17039
16/02/27 12:38:13 INFO mapreduce.Job: The url to track the job: http://sh-rslog1:8088/proxy/application_1446086163035_17039/
[job name: JobControlDemo1
job id: My control job0
job state: SUCCESS
job mapred id: job_1446086163035_17037
job message: just initialized
job has no depending job:
, job name: JobControlDemo2
job id: My control job1
job state: SUCCESS
job mapred id: job_1446086163035_17038
job message: just initialized
job has no depending job:
, job name: JobControlDemo3
job id: My control job2
job state: SUCCESS
job mapred id: job_1446086163035_17039
job message: just initialized
job has 2 dependeng jobs:
depending job 0: JobControlDemo1
depending job 1: JobControlDemo2
]
[INFO] Executed tasks
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------

参考链接:

https://hadoop.apache.org/docs/r3.2.2/api/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.html

https://blog.csdn.net/baolibin528/article/details/50754753

https://www.cnblogs.com/wuyudong/p/hadoop-jobcontrol.html