概述 在传统数据库(如:MySql)中,JOIN操作常常是非常耗时的。而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧。下面分别介绍MapReduce中的几种常见join,比如有最常见的 map side join,reduce side join,semi join(这些在Hive中都有) 等。Map side join在处理多个小表关联大表时非常有用,而 reduce join 在处理多表关联时是比较麻烦的,会造成大量的网络IO,效率低下,但在有些时候也是非常有用的。
常见的join方法介绍 map side join Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。
为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:
(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口号)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。
(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。
补充:
旧版本的DistributedCache和getLocalCacheFiles已经被注解为过时,本人下面案例中是job.addCacheFile(new URI(SmallTable))指定要复制的文件,context.getCacheFiles()获取文件目录。和getLocalCacheFiles不同的是,getCacheFiles得到的路径是HDFS上的文件路径,如果使用这个方法,那么程序中读取的就不再试缓存在各个节点上的数据了,相当于共同访问HDFS上的同一个文件。
main方法中设置缓存文件,而且Map Join不需要Reduce阶段,设置Reduce Task数量为0
1 2 job.addCacheFile(new URI("file:/你的文件路径)); job.setNumReduceTasks(0);
或者是HDFS上的URI
1 job.addCacheFile(new URI("hdfs://url:port/filename"));
在Mapper类的setup方法中加载缓存文件,setup方法,在maptask运行前只调用一次,可进行初始化工作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 protected void setup(Context context)throws IOException, InterruptedException { //获取缓存文件路径 URI[] cacheFiles = context.getCacheFiles(); //新的检索缓存文件的API是 context.getCacheFiles() ,而 context.getLocalCacheFiles() 被弃用 //然而 context.getCacheFiles() 返回的是 HDFS 路径; context.getLocalCacheFiles() 返回的才是本地路径 String path = cacheFiles[0].getPath(); //读文件 BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path))); String line; while(StringUtils.isNotEmpty(line = reader.readLine())) { String[] splits = line.split("\t"); cache.put(splits[0].trim(), splits[1].trim()); } IOUtils.closeStream(reader); }
特点:
Join操作在map task中完成,因此无需启动reduce task
适合一个大表,一个小表的连接操作
局限性:
有一份数据比较小,在map端,能够把它加载在内存,并进行join操作。
reduce side join reduce side join是一种最简单的join方式, 之所以存在reduce side join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。
假设要进行join的数据分别来自File1和File2.
在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0表示来自文件File1,tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件中的数据打标签。
在reduce阶段,reduce函数获取key相同的来自File1和File2文件的value list, 然后对于同一个key,对File1和File2中的数据进行join(笛卡尔乘积)。即:reduce阶段进行实际的连接操作。
特点:
Join操作在reduce task中完成
适合两个大表的连接操作
局限性:
map阶段没有对数据瘦身,shuffle的网络传输和排序性能很低。
reduce端对2个集合做乘积计算,很耗内存,容易导致OOM。
Semi Join SemiJoin,也叫半连接,是从分布式数据库中借鉴过来的方法。它的产生动机是:对于reduce side join,跨机器的数据传输量非常大,这成了join操作的一个瓶颈,如果能够在map端过滤掉不会参加join操作的数据,则可以大大节省网络IO。
实现方法很简单:选取一个小表,假设是File1,将其参与join的key抽取出来,保存到文件File3中,File3文件一般很小,可以放到内存中。在map阶段,使用DistributedCache将File3复制到各个TaskTracker上,然后将File2中不在File3中的key对应的记录过滤掉,剩下的reduce阶段的工作与reducee side join相同。
reduce side join + BloomFilter 在某些情况下,Semi Join抽取出来的小表的key集合在内存中仍然存放不下,这时候可以使用BloomFiler以节省空间。
BloomFilter最常见的作用是:判断某个元素是否在一个集合里面。它最重要的两个方法是:add() 和contains()。最大的特点是不会存在false negative,即:如果contains()返回false,则该元素一定不在集合中,但会存在一定的true negative,即:如果contains()返回true,则该元素可能在集合中。
因而可将小表中的key保存到BloomFilter中,在map阶段过滤大表,可能有一些不在小表中的记录没有过滤掉(但是在小表中的记录一定不会过滤掉),这没关系,只不过增加了少量的网络IO而已。
Hadoop面试的时候也会问到 Hadoop上Join的实现,几乎是一道必问的问题,而极个别公司还会涉及到DistributedCache原理以及怎样利用DistributedCache进行Join操作。
案例分析 数据准备 data/input/join/emp.txt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 7369 SMITH CLERK 7902 1980-12-17 800.00 20 7499 ALLEN SALESMAN 7698 1981-2-20 1600.00 300.00 30 7521 WARD SALESMAN 7698 1981-2-22 1250.00 500.00 30 7566 JONES MANAGER 7839 1981-4-2 2975.00 20 7654 MARTIN SALESMAN 7698 1981-9-28 1250.00 1400.00 30 7698 BLAKE MANAGER 7839 1981-5-1 2850.00 30 7782 CLARK MANAGER 7839 1981-6-9 2450.00 10 7788 SCOTT ANALYST 7566 1987-4-19 3000.00 20 7839 KING PRESIDENT 1981-11-17 5000.00 10 7844 TURNER SALESMAN 7698 1981-9-8 1500.00 0.00 30 7876 ADAMS CLERK 7788 1987-5-23 1100.00 20 7900 JAMES CLERK 7698 1981-12-3 950.00 30 7902 FORD ANALYST 7566 1981-12-3 3000.00 20 7934 MILLER CLERK 7782 1982-1-23 1300.00 10
data/input/join/dept.txt
1 2 3 4 10 ACCOUNTING NEW YORK 20 RESEARCH DALLAS 30 SALES CHICAGO 40 OPERATIONS BOSTON
自定义序列化类 info.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * join结果最终需要的字段 */ public class Info implements Writable { private int empno; private String ename; private int deptno; private String dname; private int flag; // 标志位:用来区分数据来自于哪个表.mapjoin不需要flag,需要注释掉flag public void write(DataOutput out) throws IOException { out.writeInt(empno); out.writeUTF(ename); out.writeInt(deptno); out.writeUTF(dname); out.writeInt(flag); } public void readFields(DataInput in) throws IOException { this.empno = in.readInt(); this.ename = in.readUTF(); this.deptno = in.readInt(); this.dname = in.readUTF(); this.flag = in.readInt(); } public Info() { } public Info(int empno, String ename, int deptno, String dname) { this.empno = empno; this.ename = ename; this.deptno = deptno; this.dname = dname; } public Info(int empno, String ename, int deptno, String dname, int flag) { this.empno = empno; this.ename = ename; this.deptno = deptno; this.dname = dname; this.flag = flag; } @Override public String toString() { return empno + "\t" + ename + "\t" + deptno + "\t" + dname; } public int getEmpno() { return empno; } public void setEmpno(int empno) { this.empno = empno; } public String getEname() { return ename; } public void setEname(String ename) { this.ename = ename; } public int getDeptno() { return deptno; } public void setDeptno(int deptno) { this.deptno = deptno; } public String getDname() { return dname; } public void setDname(String dname) { this.dname = dname; } public int getFlag() { return flag; } public void setFlag(int flag) { this.flag = flag; } }
Map Join main():
加载缓存数据:job.addCacheFile(new URI("file:/你的文件路径))或者job.addCacheFile(new URI("hdfs://url:port/filename"));
设置Reduce Task数量为0:job.setNumReduceTasks(0);
setup():
获取缓存的文件
循环读取缓存文件的每一行
切割
缓存数据到集合hashtable
关闭资源
map():
获取一行数据
截取
根据公共字段获取数据
拼接
写出
MapJoinDriver.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map; public class MapJoinDriver { public static void main(String[] args) throws Exception { String BigTable = "data/input/join/emp.txt"; String SmallTable = "data/input/join/dept.txt"; String output = "out"; Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(MapJoinDriver.class); job.setMapperClass(MyMapper.class); //MapJoin需要把ReduceTasks数量设置为0 job.setNumReduceTasks(0); // reduce的个数决定了最终文件输出的个数,如果没有reduce,那么就由map个数决定 job.setMapOutputKeyClass(Info.class); job.setMapOutputValueClass(NullWritable.class); // 小表加入到缓存中 job.addCacheFile(new URI(SmallTable)); FileInputFormat.setInputPaths(job, new Path(BigTable)); FileOutputFormat.setOutputPath(job, new Path(output)); boolean result = job.waitForCompletion(true); System.exit(result?0:1); } public static class MyMapper extends Mapper<LongWritable, Text, Info, NullWritable> { Map<String,String> cache = new HashMap<String, String>(); @Override protected void setup(Context context) throws IOException, InterruptedException { //获取到缓存文件,是一个 URI 的数组 URI[] cacheFiles = context.getCacheFiles(); //此处应该遍历cacheFiles获取缓存的文件,但此案例中只有一个文件,所以不影响。 //由于只有一个缓存文件dept.txt,我们这里只需要拿到第一个元素即可,获取到缓存文件的路径 String path = cacheFiles[0].getPath(); //获取到bufferedReader对象(缓冲字符流) BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path))); // 10 ACCOUNTING NEW YORK //对每行数据做迭代,进行切割,切割后的数据放入到map中 String line; while(StringUtils.isNotEmpty(line = reader.readLine())) { String[] splits = line.split("\t"); cache.put(splits[0].trim(), splits[1].trim()); } //关闭资源 IOUtils.closeStream(reader); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //读取emp.txt 的每行数据,并进行切割 String[] splits = value.toString().split("\t"); //获取 deptno 公共字段 int empno = Integer.parseInt(splits[0].trim()); String ename = splits[1].trim(); int deptno = Integer.parseInt(splits[7].trim()); Info info = new Info(); info.setEmpno(empno); info.setEname(ename); info.setDeptno(deptno); //大的数据每读一条,就和内存中的小的数据做比对 //根据deptno从map中获取到pname info.setDname(cache.get(deptno+"")); context.write(info, NullWritable.get()); } } }
Reduce Join ReduceJoinDriver.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class ReduceJoinDriver { public static void main(String[] args) throws Exception { String input = "data/input/join/"; String output = "out"; Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(JoinDriver.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Info.class); job.setOutputKeyClass(Info.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); boolean result = job.waitForCompletion(true); System.exit(result?0:1); } /** * 我们join的条件是deptno,所以key的类型是IntWritable * * 我们是两个文件一起进来的 * 要区分出数据到底是emp的还是dept的 * *在序列化类中没有的字段也要set */ public static class MyMapper extends Mapper<LongWritable, Text, IntWritable, Info> { String name;//文件名称区别数据来源的 @Override protected void setup(Context context) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit) context.getInputSplit(); name = fileSplit.getPath().getName(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] splits = value.toString().split("\t"); if(name.contains("emp")) { // 来自于emp int empno = Integer.parseInt(splits[0].trim()); String ename = splits[1].trim(); int deptno = Integer.parseInt(splits[7].trim()); Info info = new Info(); info.setEmpno(empno); info.setEname(ename); info.setDeptno(deptno); info.setDname(""); info.setFlag(1); // 标志位 context.write(new IntWritable(deptno), info); } else { // 来自于dept int deptno = Integer.parseInt(splits[0].trim()); String dname = splits[1].trim(); Info info = new Info(); info.setEmpno(0); info.setEname(""); info.setDeptno(deptno); info.setDname(dname); info.setFlag(2); // 标志位 context.write(new IntWritable(deptno), info); } } } public static class MyReducer extends Reducer<IntWritable, Info,Info, NullWritable> { // key = deptno @Override protected void reduce(IntWritable key, Iterable<Info> values, Context context) throws IOException, InterruptedException { String dname = ""; //提供一种思路:更规范的写法应该是新增一个depts,拼接时遍历两个List //但这里一个deptno是dept表的唯一主键,只对应一个dname,所以也没有问题 List<Info> emps = new ArrayList<Info>(); for(Info info : values) { if(info.getFlag() == 1) { // emp Info tmp = new Info(); tmp.setEmpno(info.getEmpno()); tmp.setEname(info.getEname()); tmp.setDeptno(info.getDeptno()); emps.add(tmp); } else if(info.getFlag() == 2) { // dept dname = info.getDname(); } } for(Info bean : emps) { bean.setDname(dname); context.write(bean, NullWritable.get()); } } } }
参考链接:
hadoop 多表join:Map side join及Reduce side join范例