MapReduce Join

概述

在传统数据库(如:MySql)中,JOIN操作常常是非常耗时的。而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧。下面分别介绍MapReduce中的几种常见join,比如有最常见的 map side join,reduce side join,semi join(这些在Hive中都有) 等。Map side join在处理多个小表关联大表时非常有用,而 reduce join 在处理多表关联时是比较麻烦的,会造成大量的网络IO,效率低下,但在有些时候也是非常有用的。

常见的join方法介绍

map side join

Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。

为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:

(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口号)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。

(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。

补充:

旧版本的DistributedCache和getLocalCacheFiles已经被注解为过时,本人下面案例中是job.addCacheFile(new URI(SmallTable))指定要复制的文件,context.getCacheFiles()获取文件目录。和getLocalCacheFiles不同的是,getCacheFiles得到的路径是HDFS上的文件路径,如果使用这个方法,那么程序中读取的就不再试缓存在各个节点上的数据了,相当于共同访问HDFS上的同一个文件。

main方法中设置缓存文件,而且Map Join不需要Reduce阶段,设置Reduce Task数量为0

1
2
job.addCacheFile(new URI("file:/你的文件路径));
job.setNumReduceTasks(0);

或者是HDFS上的URI

1
job.addCacheFile(new URI("hdfs://url:port/filename"));

在Mapper类的setup方法中加载缓存文件,setup方法,在maptask运行前只调用一次,可进行初始化工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected void setup(Context context)throws IOException, InterruptedException {

//获取缓存文件路径
URI[] cacheFiles = context.getCacheFiles();
//新的检索缓存文件的API是 context.getCacheFiles() ,而 context.getLocalCacheFiles() 被弃用
//然而 context.getCacheFiles() 返回的是 HDFS 路径; context.getLocalCacheFiles() 返回的才是本地路径
String path = cacheFiles[0].getPath();

//读文件
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path)));

String line;
while(StringUtils.isNotEmpty(line = reader.readLine())) {
String[] splits = line.split("\t");
cache.put(splits[0].trim(), splits[1].trim());
}

IOUtils.closeStream(reader);
}

特点:

  • Join操作在map task中完成,因此无需启动reduce task
  • 适合一个大表,一个小表的连接操作

局限性:

  • 有一份数据比较小,在map端,能够把它加载在内存,并进行join操作。

reduce side join

reduce side join是一种最简单的join方式, 之所以存在reduce side join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。

假设要进行join的数据分别来自File1和File2.

在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0表示来自文件File1,tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件中的数据打标签。

在reduce阶段,reduce函数获取key相同的来自File1和File2文件的value list, 然后对于同一个key,对File1和File2中的数据进行join(笛卡尔乘积)。即:reduce阶段进行实际的连接操作。

特点:

  • Join操作在reduce task中完成
  • 适合两个大表的连接操作

局限性:

  • map阶段没有对数据瘦身,shuffle的网络传输和排序性能很低。
  • reduce端对2个集合做乘积计算,很耗内存,容易导致OOM。

Semi Join

SemiJoin,也叫半连接,是从分布式数据库中借鉴过来的方法。它的产生动机是:对于reduce side join,跨机器的数据传输量非常大,这成了join操作的一个瓶颈,如果能够在map端过滤掉不会参加join操作的数据,则可以大大节省网络IO。

实现方法很简单:选取一个小表,假设是File1,将其参与join的key抽取出来,保存到文件File3中,File3文件一般很小,可以放到内存中。在map阶段,使用DistributedCache将File3复制到各个TaskTracker上,然后将File2中不在File3中的key对应的记录过滤掉,剩下的reduce阶段的工作与reducee side join相同。

reduce side join + BloomFilter

在某些情况下,Semi Join抽取出来的小表的key集合在内存中仍然存放不下,这时候可以使用BloomFiler以节省空间。

BloomFilter最常见的作用是:判断某个元素是否在一个集合里面。它最重要的两个方法是:add() 和contains()。最大的特点是不会存在false negative,即:如果contains()返回false,则该元素一定不在集合中,但会存在一定的true negative,即:如果contains()返回true,则该元素可能在集合中。

因而可将小表中的key保存到BloomFilter中,在map阶段过滤大表,可能有一些不在小表中的记录没有过滤掉(但是在小表中的记录一定不会过滤掉),这没关系,只不过增加了少量的网络IO而已。

Hadoop面试的时候也会问到 Hadoop上Join的实现,几乎是一道必问的问题,而极个别公司还会涉及到DistributedCache原理以及怎样利用DistributedCache进行Join操作。

案例分析

数据准备

data/input/join/emp.txt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
7369    SMITH  CLERK  7902   1980-12-17 800.00    20
7499 ALLEN SALESMAN 7698 1981-2-20 1600.00 300.00 30
7521 WARD SALESMAN 7698 1981-2-22 1250.00 500.00 30
7566 JONES MANAGER 7839 1981-4-2 2975.00 20
7654 MARTIN SALESMAN 7698 1981-9-28 1250.00 1400.00 30
7698 BLAKE MANAGER 7839 1981-5-1 2850.00 30
7782 CLARK MANAGER 7839 1981-6-9 2450.00 10
7788 SCOTT ANALYST 7566 1987-4-19 3000.00 20
7839 KING PRESIDENT 1981-11-17 5000.00 10
7844 TURNER SALESMAN 7698 1981-9-8 1500.00 0.00 30
7876 ADAMS CLERK 7788 1987-5-23 1100.00 20
7900 JAMES CLERK 7698 1981-12-3 950.00 30
7902 FORD ANALYST 7566 1981-12-3 3000.00 20
7934 MILLER CLERK 7782 1982-1-23 1300.00 10

data/input/join/dept.txt

1
2
3
4
10	ACCOUNTING	NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON

自定义序列化类

info.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
* join结果最终需要的字段
*/
public class Info implements Writable {

private int empno;
private String ename;
private int deptno;
private String dname;
private int flag; // 标志位:用来区分数据来自于哪个表.mapjoin不需要flag,需要注释掉flag

public void write(DataOutput out) throws IOException {
out.writeInt(empno);
out.writeUTF(ename);
out.writeInt(deptno);
out.writeUTF(dname);
out.writeInt(flag);
}

public void readFields(DataInput in) throws IOException {
this.empno = in.readInt();
this.ename = in.readUTF();
this.deptno = in.readInt();
this.dname = in.readUTF();
this.flag = in.readInt();
}

public Info() {
}

public Info(int empno, String ename, int deptno, String dname) {
this.empno = empno;
this.ename = ename;
this.deptno = deptno;
this.dname = dname;
}

public Info(int empno, String ename, int deptno, String dname, int flag) {
this.empno = empno;
this.ename = ename;
this.deptno = deptno;
this.dname = dname;
this.flag = flag;
}

@Override
public String toString() {
return empno + "\t" + ename + "\t" + deptno + "\t" + dname;
}

public int getEmpno() {
return empno;
}

public void setEmpno(int empno) {
this.empno = empno;
}

public String getEname() {
return ename;
}

public void setEname(String ename) {
this.ename = ename;
}

public int getDeptno() {
return deptno;
}

public void setDeptno(int deptno) {
this.deptno = deptno;
}

public String getDname() {
return dname;
}

public void setDname(String dname) {
this.dname = dname;
}

public int getFlag() {
return flag;
}

public void setFlag(int flag) {
this.flag = flag;
}
}

Map Join

main():

  • 加载缓存数据:job.addCacheFile(new URI("file:/你的文件路径))或者job.addCacheFile(new URI("hdfs://url:port/filename"));
  • 设置Reduce Task数量为0:job.setNumReduceTasks(0);

setup():

  1. 获取缓存的文件
  2. 循环读取缓存文件的每一行
  3. 切割
  4. 缓存数据到集合hashtable
  5. 关闭资源

map():

  1. 获取一行数据
  2. 截取
  3. 根据公共字段获取数据
  4. 拼接
  5. 写出

MapJoinDriver.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;


public class MapJoinDriver {
public static void main(String[] args) throws Exception {
String BigTable = "data/input/join/emp.txt";
String SmallTable = "data/input/join/dept.txt";
String output = "out";

Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(MapJoinDriver.class);
job.setMapperClass(MyMapper.class);

//MapJoin需要把ReduceTasks数量设置为0
job.setNumReduceTasks(0);

// reduce的个数决定了最终文件输出的个数,如果没有reduce,那么就由map个数决定
job.setMapOutputKeyClass(Info.class);
job.setMapOutputValueClass(NullWritable.class);

// 小表加入到缓存中
job.addCacheFile(new URI(SmallTable));

FileInputFormat.setInputPaths(job, new Path(BigTable));
FileOutputFormat.setOutputPath(job, new Path(output));
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}

public static class MyMapper extends Mapper<LongWritable, Text, Info, NullWritable> {

Map<String,String> cache = new HashMap<String, String>();

@Override
protected void setup(Context context) throws IOException, InterruptedException {

//获取到缓存文件,是一个 URI 的数组
URI[] cacheFiles = context.getCacheFiles();

//此处应该遍历cacheFiles获取缓存的文件,但此案例中只有一个文件,所以不影响。
//由于只有一个缓存文件dept.txt,我们这里只需要拿到第一个元素即可,获取到缓存文件的路径
String path = cacheFiles[0].getPath();

//获取到bufferedReader对象(缓冲字符流)
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path)));

// 10 ACCOUNTING NEW YORK
//对每行数据做迭代,进行切割,切割后的数据放入到map中
String line;
while(StringUtils.isNotEmpty(line = reader.readLine())) {
String[] splits = line.split("\t");
cache.put(splits[0].trim(), splits[1].trim());
}

//关闭资源
IOUtils.closeStream(reader);
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//读取emp.txt 的每行数据,并进行切割
String[] splits = value.toString().split("\t");

//获取 deptno 公共字段
int empno = Integer.parseInt(splits[0].trim());
String ename = splits[1].trim();
int deptno = Integer.parseInt(splits[7].trim());

Info info = new Info();
info.setEmpno(empno);
info.setEname(ename);
info.setDeptno(deptno);

//大的数据每读一条,就和内存中的小的数据做比对
//根据deptno从map中获取到pname
info.setDname(cache.get(deptno+""));

context.write(info, NullWritable.get());
}
}

}

Reduce Join

ReduceJoinDriver.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class ReduceJoinDriver {
public static void main(String[] args) throws Exception {
String input = "data/input/join/";
String output = "out";

Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(JoinDriver.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Info.class);
job.setOutputKeyClass(Info.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}

/**
* 我们join的条件是deptno,所以key的类型是IntWritable
*
* 我们是两个文件一起进来的
* 要区分出数据到底是emp的还是dept的
*
*在序列化类中没有的字段也要set
*/
public static class MyMapper extends Mapper<LongWritable, Text, IntWritable, Info> {

String name;//文件名称区别数据来源的

@Override
protected void setup(Context context) throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) context.getInputSplit();
name = fileSplit.getPath().getName();
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] splits = value.toString().split("\t");
if(name.contains("emp")) { // 来自于emp
int empno = Integer.parseInt(splits[0].trim());
String ename = splits[1].trim();
int deptno = Integer.parseInt(splits[7].trim());

Info info = new Info();
info.setEmpno(empno);
info.setEname(ename);
info.setDeptno(deptno);
info.setDname("");
info.setFlag(1); // 标志位

context.write(new IntWritable(deptno), info);
} else { // 来自于dept
int deptno = Integer.parseInt(splits[0].trim());
String dname = splits[1].trim();

Info info = new Info();
info.setEmpno(0);
info.setEname("");
info.setDeptno(deptno);
info.setDname(dname);
info.setFlag(2); // 标志位
context.write(new IntWritable(deptno), info);
}
}
}

public static class MyReducer extends Reducer<IntWritable, Info,Info, NullWritable> {

// key = deptno
@Override
protected void reduce(IntWritable key, Iterable<Info> values, Context context) throws IOException, InterruptedException {

String dname = "";

//提供一种思路:更规范的写法应该是新增一个depts,拼接时遍历两个List
//但这里一个deptno是dept表的唯一主键,只对应一个dname,所以也没有问题
List<Info> emps = new ArrayList<Info>();

for(Info info : values) {
if(info.getFlag() == 1) { // emp
Info tmp = new Info();
tmp.setEmpno(info.getEmpno());
tmp.setEname(info.getEname());
tmp.setDeptno(info.getDeptno());
emps.add(tmp);
} else if(info.getFlag() == 2) { // dept
dname = info.getDname();
}
}

for(Info bean : emps) {
bean.setDname(dname);
context.write(bean, NullWritable.get());
}
}
}
}

参考链接:

hadoop 多表join:Map side join及Reduce side join范例