1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
| import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.chain.ChainMapper; import org.apache.hadoop.mapreduce.lib.chain.ChainReducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/** * Created by yanzhe on 2017/8/18. */ public class App { public static void main(String[] args) throws Exception {
args = new String[]{"d:/java/mr/data/data.txt", "d:/java/mr/out"} ;
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf) ;
Path outPath = new Path(args[1]) ; if (fs.exists(outPath)){ fs.delete(outPath,true) ; }
Job job = Job.getInstance(conf) ;
ChainMapper.addMapper(job,Mapper1.class, LongWritable.class, Text.class, Text.class, IntWritable.class,job.getConfiguration());
ChainMapper.addMapper(job,Mapper2.class, Text.class,IntWritable.class, Text.class, IntWritable.class,job.getConfiguration());
ChainMapper.addMapper(job,Mapper3.class, Text.class,IntWritable.class, Text.class, IntWritable.class,job.getConfiguration());
ChainReducer.setReducer(job, Reducer1.class, Text.class, IntWritable.class, Text.class, IntWritable.class,job.getConfiguration());
ChainReducer.addMapper(job, ReducerMapper1.class, Text.class, IntWritable.class, Text.class, IntWritable.class, job.getConfiguration());
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,outPath);
job.setNumReduceTasks(2); job.setCombinerClass(Combiner1.class); job.setPartitionerClass(MyPartitioner.class);
job.waitForCompletion(true) ;
} public class Mapper1 extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { System.out.println("map1===========" + value.toString()); String line = value.toString() ; String[] strArr = line.split(" ") ;
for (String w: strArr) { context.write(new Text(w), new IntWritable(1)); } } } public static class Mapper2 extends Mapper<Text, IntWritable, Text, IntWritable> {
@Override protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException { System.out.println("map2==================" + key.toString() + ":" + value.toString()); //过滤单词'of' if (! key.toString().equals("of")){ context.write(key, value); } } }
public static class Mapper3 extends Mapper<Text, IntWritable, Text, IntWritable> { @Override protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException { System.out.println("map3==================" + key.toString() + ":" + value.toString()); //过滤单词'google' if (! key.toString().equals("xxx")){ context.write(key, value); } } } public static class Reducer1 extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0 ; for (IntWritable iw: values) { count += iw.get(); } context.write(key, new IntWritable(count)); System.out.println("reduce=========" + key.toString() + ":" + count); } }
public static class ReducerMapper1 extends Mapper<Text, IntWritable, Text, IntWritable> { @Override protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException { if (value.get() > 5) context.write(key, value);
System.out.println("reduceMap======" + key.toString() + ":" + value.toString()); } } }
|