Hadoop进程logging如何跨越块边界进行拆分?
根据Hadoop - The Definitive Guide
FileInputFormats定义的逻辑logging通常不适合HDFS块。 例如,一个TextInputFormat的逻辑logging是行,这将越过HDFS边界。 这对你的程序的function没有任何影响 – 例如,行不会被遗漏或损坏,但值得了解,因为它意味着数据本地映射(即在同一主机上运行的映射input数据)将执行一些远程读取。 这造成的轻微的开销通常并不重要。
假设一条logging线分成两个块(b1和b2)。 处理第一个块(b1)的映射器将注意到最后一行没有EOL分隔符,并从下一个数据块(b2)中提取剩余的行。
映射器如何处理第二个块(b2),确定第一个logging是不完整的,并且应该从块(b2)中的第二个logging开始处理?
有趣的问题,我花了一些时间看细节的代码,这是我的想法。 分割由客户端通过InputFormat.getSplits
来处理,所以看一下FileInputFormat会给出以下信息:
- 对于每个input文件,获取文件长度,块大小,并计算分割大小为
max(minSize, min(maxSize, blockSize))
,其中maxSize
对应于mapred.max.split.size
,minSize
是mapred.min.split.size
。 -
根据上面计算的拆分大小将文件分成不同的
FileSplit
。 这里最重要的是每个FileSplit
都用一个对应于input文件偏移的start
参数进行初始化 。 目前还没有处理线路。 代码的相关部分如下所示:while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(new FileSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; }
之后,如果您查看由TextInputFormat
定义的LineRecordReader
,则这是处理行的位置:
- 当你初始化你的
LineRecordReader
它会尝试实例化一个LineReader
,它是一个抽象,能够通过FSDataInputStream
读取行。 有两种情况: - 如果定义了
CompressionCodec
,那么这个编解码器负责处理边界。 可能与您的问题不相关。 -
如果没有编解码器,那么这就是有趣的地方:如果
InputSplit
的start
点不是0,那么你回溯1个字符,然后跳过由\ n或\ r \ n(Windows)标识的第一行 ! 回溯很重要,因为如果您的行边界与分割边界相同,则可以确保您不会跳过有效行。 这是相关的代码:if (codec != null) { in = new LineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { if (start != 0) { skipFirstLine = true; --start; fileIn.seek(start); } in = new LineReader(fileIn, job); } if (skipFirstLine) { // skip first line and re-establish "start". start += in.readLine(new Text(), 0, (int)Math.min((long)Integer.MAX_VALUE, end - start)); } this.pos = start;
因此,由于在客户端计算分割,映射器不需要依次运行,所以每个映射器都知道是否需要丢弃第一行。
所以基本上如果你在同一个文件中每个100Mb有两行,并且简单的说我们可以说分割大小是64Mb。 然后当计算input分裂时,我们将有以下情况:
- 将包含path和主机的分割1包含到此块中。 初始化200-200 = 0Mb,长度64Mb。
- Split 2初始化200-200 + 64 = 64Mb,长度64Mb。
- Split 3初始化200-200 + 128 = 128Mb,长度64Mb。
- 分割4初始化200-200 + 192 = 192Mb,长度8Mb。
- 映射器A将处理拆分1,开始为0,所以不要跳过第一行,并读取超出64Mb限制的完整行,因此需要远程读取。
- 映射器B将处理split 2,start是!= 0,所以跳过64Mb-1byte之后的第一行,这对应于仍在split 2中的100Mb的行1的结尾,我们在split 2中有28Mb的行远程读取剩余的72Mb。
- Mapper C会处理split 3,start是!= 0,所以在128Mb-1byte之后跳过第一行,这对应于200Mb的第二行的结尾,这是文件的结尾,所以什么也不做。
- 除了在192Mb-1字节后寻找换行符之外,Mapper D与映射器C相同。
Map Reduecealgorithm在文件的物理块上不起作用。 它适用于逻辑input分割。 input拆分取决于logging写入的位置。 一个logging可能跨越两个Mappers。
HDFS的设置方式是将非常大的文件分解为大块(例如,大小为128MB),并将这些块的三个副本存储在群集中的不同节点上。
HDFS没有意识到这些文件的内容。 logging可能已经在Block-a中开始,但该logging的结尾可能存在于Block-b中 。
为了解决这个问题,Hadoop使用存储在文件块中的数据的逻辑表示,称为input拆分。 当MapReduce作业客户端计算input分割时 , 它会计算块中第一个完整logging的开始位置以及块中最后一条logging的结束位置 。
关键点:
在块中最后一个logging不完整的情况下,input拆分包括下一个块的位置信息和完成logging所需的数据的字节偏移。
看下面的图。
看看这篇文章和相关的SE问题: 关于Hadoop / HDFS文件拆分
更多细节可以从文档中读取
Map-Reduce框架依赖于作业的InputFormat来:
- validation作业的input规范。
- 将input文件拆分为逻辑InputSplits,然后将每个文件分配给一个单独的Mapper。
- 然后将每个InputSplit分配给一个单独的Mapper进行处理。 分裂可能是元组 。
InputSplit[] getSplits(JobConf job,int numSplits
)是处理这些事情的API。
FileInputFormat ,它扩展了InputFormat
实现的getSplits
()方法。 看看这个方法的内部在grepcode上
我将其看作如下:InputFormat负责将数据拆分为逻辑分割,同时考虑到数据的性质。
没有什么可以阻止它这样做,尽pipe它可以增加工作的显着延迟 – 所有的逻辑和围绕期望的分割大小边界的阅读将发生在jobtracker。
最简单的logging感知input格式是TextInputFormat。 它正在如下工作(据我所知,从代码) – input格式创build拆分的大小,无论线,但LineRecordReader总是:
a)如果不是第一次分割,则跳过分割中的第一行(或其中的一部分)
b)在分割的边界之后读取一行(如果数据可用,则不是最后的分割)。
从我所理解的,当FileSplit
初始化为第一个块时,默认的构造函数被调用。 因此,开始和长度的值初始为零。 在第一个块的处理结束时,如果最后一行不完整,那么长度的值将大于分割的长度,并且它将读取下一个块的第一行。 由于这个原因,第一个块的LineRecordReader
将大于零,在这种情况下, LineRecordReader
将跳过第二个块的第一行。 (看来源 )
如果第一个块的最后一行完成,那么长度的值将等于第一个块的长度,第二个块的开始的值将为零。 在这种情况下, LineRecordReader
将不会跳过第一行并LineRecordReader
开始读第二个块。
说得通?
制图人员不必交stream。 文件块在HDFS中,当前的映射器(RecordReader)可以读取包含该行剩余部分的块。 这发生在幕后。
从LineRecordReader.java的hadoop源代码构造函数:我find一些评论:
// If this is not the first split, we always throw away first record // because we always (except the last split) read one extra line in // next() method. if (start != 0) { start += in.readLine(new Text(), 0, maxBytesToConsume(start)); } this.pos = start;
从这个我相信hadoop将读取一个额外的行每个拆分(在当前拆分结束,读下一行在下一个拆分),如果不是第一次拆分,第一行将被扔掉。 这样就不会有线路logging丢失和不完整的情况