Hadoop中的block Size和split Size是什么关系
问题
hadoop的split size 和 block size 是什么关系? 是否 split size 应该 n倍于 block size ?
概念
在 hdfs 架构中,存在 blocks 的概念。 通常来说,hdfs中的一个block 是 64MB 。 当我们把一个大文件导入hdfs中的时候,文件会按 64MB 每个block来分割(版本不同,默认配置可能不同)。 如果你有1GB的文件要存入HDFS中, 1GB/64MB = 1024MB / 64MB = 16 个blocks 会被分割到不同的datanode上。
目的
数据分割(data splitting )策略是基于文件偏移进行的。文件分割的目的是有利于数据并行处理 ,以及便于数据容灾恢复。
区别
split 是逻辑意义上的split。 输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组。
通常在 M/R 程序或者其他数据处理技术上用到。根据你处理的数据量的情况,split size是允许用户自定义的。
split size 定义好了之后,可以控制 M/R 中 Mapper 的数量。如果M/R中没有定义 split size , 就用默认的HDFS配置作为 input split。
其中有俩个配置文件(如下):
1 | --minsize 默认大小为1 |
1.如果blockSize小于maxSize && blockSize 大于 minSize之间,那么split就是blockSize;
2.如果blockSize小于maxSize && blockSize 小于 minSize之间,那么split就是minSize;
3.如果blockSize大于maxSize && blockSize 大于 minSize之间,那么split就是maxSize;
举例
案例1:
你有个100MB的文件,block size 是 64MB , 那么就会被split成 2 块。这时如果你你没有指定 input split , 你的M/R程序就会按2个input split 来处理 , 并分配两个mapper来完成这个job。
但如果你把 split size 指定为 100MB(split.minsize=100MB),那么M/R程序就会把数据处理成一个 split,这时只用分配一个mapper 就可以了。
但如果你把 split size 指定为 25MB(split.maxsize=25MB),M/R就会将数据分成4个split,分配4个mapper来处理这个job。
案例2:
1 | [test@dw01 ~]$ hadoop fs -dus -h /tmp/wordcount/input/* |
在本例子中,mapreduce.input.fileinputformat.split.maxsize=104857440 (100MB),mapred.split.zero.file.skip=true,所有文件的blockSize 大小都是256MB,故splitSize=100MB
1 | hdfs://namenode.test.net:9000/tmp/wordcount/input/part-r-00000 划分成3个split |
总结
- block是物理上的数据分割,而split是逻辑上的分割。split是mapreduce中的概念,而block是hdfs中切块的大小。
- 如果没有特别指定,split size 就等于 HDFS 的 block size 。
- 用户可以在M/R 程序中自定义split size。
- 一个split 可以包含多个blocks,也可以把一个block应用多个split操作。
- 一个split不会包含两个File的Block,不会跨越File边界
- 有多少个split,就有多少个mapper。
补充
性能
一般来说,block size 和 split size 设置成一致,性能较好。
FileInputFormat generates splits in such a way that each split is all or part of a single file. 所以 hadoop处理大文件比处理小文件来得效率高得多。
如何避免切片:
将切片的最小值设置为大于文件的大小
使用FileInputFormat的具体子类,重写isSplitable()方法,将返回值设置为false。
参考:
https://blog.csdn.net/qq_20641565/article/details/53457622
https://blog.csdn.net/wisgood/article/details/79178663
MapReduce Input Split(输入分/切片)详解
输入分片(Input Split):在进行map计算之前,mapreduce会根据输入文件计算输入分片(input split),每个输入分片(input split)针对一个map任务,输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组。
分片大小范围可以在mapred-site.xml中设置,mapred.min.split.size mapred.max.split.size,
minSplitSize大小默认为1B,maxSplitSize大小默认为Long.MAX_VALUE = 9223372036854775807
那么分片到底是多大呢?
1 | long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job)); |
*所以在我们没有设置分片的范围的时候,分片大小是由block块大小决定的,和它的大小一样。比如把一个258MB的文件上传到HDFS上,假设block块大小是128MB,那么它就会被分成三个block块,与之对应产生三个split***,所以最终会产生三个map task。我又发现了另一个问题,第三个block块里存的文件大小只有2MB,而它的block块大小是128MB,那它实际占用Linux file system的多大空间?
答案是实际的文件大小,而非一个块的大小。有大神已经验证这个答案了:http://blog.csdn.net/samhacker/article/details/23089157
如果hdfs占用Linux file system的磁盘空间按实际文件大小算,那么这个”块大小“有必要存在吗?
其实块大小还是必要的,一个显而易见的作用就是当文件通过append操作不断增长的过程中,可以通过来block size决定何时split文件。以下是Hadoop Community的专家给我的回复:
“The block size is a meta attribute. If you append tothe file later, it still needs to know when to split further - so it keeps that value as a mere metadata it can use to advise itself on write boundaries.”
补充:
一个split的大小是由goalSize, minSize, blockSize这三个值决定的。computeSplitSize的逻辑是,先从goalSize和blockSize两个值中选出最小的那个(比如一般不设置map数,这时blockSize为当前文件的块size,而goalSize是“InputFile大小”/“我们在配置文件中定义的mapred.map.tasks”值,如果没设置的话,默认是1)。
goalSize这个计算意味着:
- 拆分不会小于文件中的剩余数据或
minSize
。 - 分割不会大于
goalSize
和blockSize
中的较小者。
(个人疑问,好像自己翻源码没有goalSize这个参数,都是用maxSize的表示的)
1 | protected long computeSplitSize(long blockSize, long minSize, long maxSize) { |
hadooop提供了一个设置map个数的参数mapred.map.tasks,我们可以通过这个参数来控制map的个数。但是通过这种方式设置map的个数,并不是每次都有效的。原因是mapred.map.tasks只是一个hadoop的参考数值,最终map的个数,还取决于其他的因素。
为了方便介绍,先来看几个名词:
block_size : hdfs的文件块大小,默认为64M,可以通过参数dfs.block.size设置
total_size : 输入文件整体的大小
input_file_num : 输入文件的个数
默认map个数
如果不进行任何设置,默认的map个数是和blcok_size相关的。
default_num = total_size / block_size;
期望大小
可以通过参数 mapred.map.tasks来设置程序员期望的map个数,但是这个个数只有在大于default_num的时候,才会生效。
goal_num = mapred.map.tasks;
设置处理的文件大小
可以通过mapred.min.split.size 设置每个task处理的文件大小,但是这个大小只有在大于 block_size的时候才会生效。
split_size = max( mapred.min.split.size, block_size );
split_num = total_size / split_size;
计算的map个数
compute_map_num = min(split_num, max(default_num, goal_num))
除了这些配置以外,mapreduce还要遵循一些原则。 mapreduce的每一个map处理的数据是不能跨越文件的,也就是说min_map_num >= input_file_num。 所以,最终的map个数应该为:
final_map_num = max(compute_map_num, input_file_num)
经过以上的分析,在设置map个数的时候,可以简单的总结为以下几点:
(1)如果想增加map个数,则设置mapred.map.tasks 为一个较大的值。
(2)如果想减小map个数,则设置mapred.min.split.size 为一个较大的值。
(3)如果输入中有很多小文件,依然想减少map个数,则需要将小文件merger为大文件,然后使用准则2。
参考:
https://blog.csdn.net/Dr_Guo/article/details/51150278
https://blog.csdn.net/lylcore/article/details/9136555
mapreduce中split划分分析(新版api)
计算splitsize
minSize :每个split的最小值,默认为1.getFormatMinSplitSize()为代码中写死,固定返回1,除非修改了hadoop的源代码.getMinSplitSize(job)取决于参数mapreduce.input.fileinputformat.split.minsize,如果没有设置该参数,返回1.故minSize默认为1.
maxSize:每个split的最大值,如果设置了mapreduce.input.fileinputformat.split.maxsize,则为该值,否则为Long的最大值。
blockSize :默认为HDFS设置的文件存储BLOCK大小。注意:该值并不一定是唯一固定不变的。HDFS上不同的文件该值可能不同。故将文件划分成split的时候,对于每个不同的文件,需要获取该文件的blocksize。
splitSize :根据公式,默认为blockSize 。
getSplits()方法在 FileInputFormat.addInputPath(job, path)中
遍历输入目录中的每个文件,拿到该文件
计算文件长度,A:如果文件长度为0,如果mapred.split.zero.file.skip=true,则不划分split ; 如果mapred.split.zero.file.skip为false,生成一个length=0的split .B:如果长度不为0,跳到步骤3
判断该文件是否支持split :如果支持,跳到步骤4;如果不支持,该文件不切分,生成1个split,split的length等于文件长度。
根据当前文件,计算splitSize,本文中为100M
判断剩余待切分文件大小/splitsize是否大于SPLIT_SLOP(该值为1.1,代码中写死了) 如果true,切分成一个split,待切分文件大小更新为当前值-splitsize ,再次切分。生成的split的length等于splitsize; 如果false 将剩余的切到一个split里,生成的split length等于剩余待切分的文件大小。之所以需要判断剩余待切分文件大小/splitsize,主要是为了避免过多的小的split。比如文件中有100个109M大小的文件,如果splitSize=100M,如果不判断剩余待切分文件大小/splitsize,将会生成200个split,其中100个split的size为100M,而其中100个只有9M,存在100个过小的split。MapReduce首选的是处理大文件,过多的小split会影响性能。
/** * Generate the list of files and make them into FileSplits. * @param job the job context * @throws IOException */ public List<InputSplit> getSplits(JobContext job) throws IOException { //用于记录分片开始的时间,最后会得到一个分片总用时,时间单位是纳秒 StopWatch sw = new StopWatch().start(); //用来计算分片大小 //minSize 就是 1 //maxSize 追到最下面可以发现其实就是long的最大值 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); //存放切片对象 List<InputSplit> splits = new ArrayList<InputSplit>(); //得到路径下的所有文件 List<FileStatus> files = listStatus(job); //遍历得到的文件 for (FileStatus file: files) { //得到文件路径 Path path = file.getPath(); //获取文件大小 long length = file.getLen(); //如果文件大小不为0的话 if (length != 0) { //定义块数组,存放块在datanode上的位置 BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0, length); //如果这个文件可以分片的话进行分片,zip、视频等不能进行分片 if (isSplitable(job, path)) { //获取块大小,hadoop1默认是64M hadoop2默认是128M hadoop3默认是256M long blockSize = file.getBlockSize(); //得到片大小 //--> 最终决定出切片的大小(128M) --> blockSize值 //Math.max(minSize, Math.min(max, blockSize));这是实现 long splitSize = computeSplitSize(blockSize, minSize, maxSize); //获取文件大小 long bytesRemaining = length; //文件大小/片大小>1.1 开始分片 //例如 文件大小为260M 260/128=2.03>1.1 进入循环开始分片 //132/128 <1.1 不再进行分片,循环结束 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); //文件大小 = 原文件大小 - 当前分片大小 //260 -128 = 132 现在文件大小是132 MB bytesRemaining -= splitSize; } //循环结束之后,只要文件大小不等于0 此时也会在切一个片 if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else { // not splitable splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { //为零长度文件创建空主机数组 splits.add(makeSplit(path, 0, length, new String[0])); } } // 保存文件数 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); sw.stop(); //返回携带着切片文件的集合 return splits; }
Hadoop map和reduce数量估算
Hadoop在运行一个mapreduce job之前,需要估算这个job的maptask数和reducetask数。
map task数量
首先分析一下job的maptask数,当一个job提交时,jobclient首先分析job被拆分的split数量,然后吧job.split文件放置在HDFS中,一个job的MapTask数量就等于split的个数。
job.split中包含split的个数由FileInputFormat.getSplits计算出,方法的逻辑如下:
读取参数mapred.map.tasks,这个参数默认设置为0,生产系统中很少修改。
计算input文件的总字节数,总字节数/(mapred.map.tasks==0 ? 1: mapred.map.tasks )=goalsize
每个split的最小值minSize由mapred.min.split.size参数设置,这个参数默认设置为0,生产系统中很少修改。
调用computeSplitSize方法,计算出splitsize= Math.max(minSize, Math.min(goalSize, blockSize)),通常这个值=blockSize,输入的文件较小,文件字节数之和小于blocksize时,splitsize=输入文件字节数之和。
对于input的每个文件,计算split的个数。
a) 文件大小/splitsize>1.1,创建一个split,这个split的字节数=splitsize,文件剩余字节数=文件大小-splitsize
b) 文件剩余字节数/splitsize<1.1,剩余的部分作为一个split
举例说明:
input只有一个文件,大小为100M,splitsize=blocksize,则split数为2,第一个split为64M,第二个为36M
input只有一个文件,大小为65M,splitsize=blocksize,则split数为1,split大小为65M
input只有一个文件,大小为129M,splitsize=blocksize,则split数为2,第一个split为64M,第二个为65M(最后一个split的大小可能超过splitsize)
input只有一个文件,大小为20M ,splitsize=blocksize,则split数为1,split大小为20M
input有两个文件,大小为100M和20M,splitsize=blocksize,则split数为3,第一个文件分为两个split,第一个split为64M,第二个为36M,第二个文件为一个split,大小为20M
input有两个文件,大小为25M和20M,splitsize=blocksize,则split数为2,第一个文件为一个split,大小为25M,第二个文件为一个split,大小为20M
假设一个job的input大小固定为100M,当只包含一个文件时,split个数为2,maptask数为2,但当包含10个10M的文件时,maptask数为10。
reduce task数量
下面来分析reducetask,纯粹的mapreduce task的reduce task数很简单,就是参数mapred.reduce.tasks的值,hadoop-site.xml文件中和mapreduce job运行时不设置的话默认为1。
在HIVE中运行sql的情况又不同,hive会估算reduce task的数量,估算方法如下:
通常是ceil(input文件大小/1024*1024*1024)
,每1GB大小的输入文件对应一个reduce task。
特殊的情况是当sql只查询count(*)时,reduce task数被设置成1。
总结:
通过map和reduce task数量的分析可以看出,hadoop/hive估算的map和reduce task数可能和实际情况相差甚远。假定某个job的input数据量庞大,reduce task数量也会随之变大,而通过join和group by,实际output的数据可能不多,但reduce会输出大量的小文件,这个job的下游任务将会启动同样多的map来处理前面reduce产生的大量文件。在生产环境中每个user group有一个map task数的限额,一个job启动大量的map task很显然会造成其他job等待释放资源。
Hive对于上面描述的情况有一种补救措施,参数hive.merge.smallfiles.avgsize控制hive对output小文件的合并,当hiveoutput的文件的平均大小小于hive.merge.smallfiles.avgsize-默认为16MB左右,hive启动一个附加的mapreducejob合并小文件,合并后文件大小不超过hive.merge.size.per.task-默认为256MB。
尽管Hive可以启动小文件合并的过程,但会消耗掉额外的计算资源,控制单个reduce task的输出大小>64MB才是最好的解决办法。
map数据计算示例
hive> set dfs.block.size;
dfs.block.size=268435456
hive> set mapred.map.tasks;
mapred.map.tasks=2
文件块大小为256MB,map.tasks为2
查看文件大小和文件数:(共4539.059804MB,18个文件)
1 | [dwapp@dw-yuntigw-63 hadoop]$ hadoop dfs -ls /group/alibaba-dw-icbu/hive/bdl_en12_pageview_fatdt0_d/hp_stat_date=2012-11-25; |
文件: | 大小Bytes | 大小MB | splitsize(MB) | 每个文件需要的map数量 | |
---|---|---|---|---|---|
文件1 | 290700555 | 277.2336531 | 256 | 1.082943957 | |
文件2 | 290695945 | 277.2292566 | 256 | 1.082926784 | |
文件3 | 290182606 | 276.7396984 | 256 | 1.081014447 | |
文件4 | 271979933 | 259.3802767 | 256 | 1.013204206 | |
文件5 | 258448208 | 246.4754181 | 256 | 0.962794602 | |
文件6 | 258440338 | 246.4679127 | 256 | 0.962765284 | |
文件7 | 258419852 | 246.4483757 | 256 | 0.962688968 | |
文件8 | 258347423 | 246.379302 | 256 | 0.962419149 | |
文件9 | 258349480 | 246.3812637 | 256 | 0.962426811 | |
文件10 | 258301657 | 246.3356562 | 256 | 0.962248657 | |
文件11 | 258270954 | 246.3063755 | 256 | 0.962134279 | |
文件12 | 258266805 | 246.3024187 | 256 | 0.962118823 | |
文件13 | 258253133 | 246.2893801 | 256 | 0.962067891 | |
文件14 | 258236047 | 246.2730856 | 256 | 0.962004241 | |
文件15 | 258239072 | 246.2759705 | 256 | 0.96201551 | |
文件16 | 258170671 | 246.2107382 | 256 | 0.961760696 | |
文件17 | 258160711 | 246.2012396 | 256 | 0.961723592 | |
文件18 | 258085783 | 246.1297827 | 256 | 0.961444464 | |
总文件大小: | 4759549173 | 4539.059804 |
goalSize = 4539.059804 (文件总大小)/ mapred.map.tasks(2) = 2269.529902MB。
分片大小:Math.max(minSize, Math.min(goalSize, blockSize)):blockSize
因此splitsize取值为256MB,所以一共分配18个map。
修改map.tasks参数为32
set mapred.map.tasks = 32;
文件: | 大小Bytes | 大小MB | splitsize(MB) | 每个文件需要的map数量 | |
---|---|---|---|---|---|
文件1 | 290700555 | 277.2336531 | 141.8 | 1.955103336 | |
文件2 | 290695945 | 277.2292566 | 141.8 | 1.955072332 | |
文件3 | 290182606 | 276.7396984 | 141.8 | 1.951619876 | |
文件4 | 271979933 | 259.3802767 | 141.8 | 1.829198002 | |
文件5 | 258448208 | 246.4754181 | 141.8 | 1.738190537 | |
文件6 | 258440338 | 246.4679127 | 141.8 | 1.738137607 | |
文件7 | 258419852 | 246.4483757 | 141.8 | 1.737999829 | |
文件8 | 258347423 | 246.379302 | 141.8 | 1.737512708 | |
文件9 | 258349480 | 246.3812637 | 141.8 | 1.737526543 | |
文件10 | 258301657 | 246.3356562 | 141.8 | 1.737204909 | |
文件11 | 258270954 | 246.3063755 | 141.8 | 1.736998417 | |
文件12 | 258266805 | 246.3024187 | 141.8 | 1.736970513 | |
文件13 | 258253133 | 246.2893801 | 141.8 | 1.736878562 | |
文件14 | 258236047 | 246.2730856 | 141.8 | 1.73676365 | |
文件15 | 258239072 | 246.2759705 | 141.8 | 1.736783995 | |
文件16 | 258170671 | 246.2107382 | 141.8 | 1.736323965 | |
文件17 | 258160711 | 246.2012396 | 141.8 | 1.736256979 | |
文件18 | 258085783 | 246.1297827 | 141.8 | 1.735753051 | |
总文件大小: | 4759549173 | 4539.059804 |
goalSize = 4539.059804 / mapred.map.tasks(32) = 141.8456189
分片大小:Math.max(minSize, Math.min(goalSize, blockSize)):goalSize
因此splitsize取值为141.8MB,所以一共分配36个map。
原文地址: