Hadoop进程logging如何跨越块边界进行拆分?

根据Hadoop - The Definitive Guide

FileInputFormats定义的逻辑logging通常不适合HDFS。 例如,一个TextInputFormat的逻辑logging是行,这将越过HDFS边界。 这对你的程序的function没有任何影响 – 例如,行不会被遗漏或损坏,但值得了解,因为它意味着数据本地映射(即在同一主机上运行的映射input数据)将执行一些远程读取。 这造成的轻微的开销通常并不重要。

假设一条logging线分成两个块(b1和b2)。 处理第一个块(b1)的映射器将注意到最后一行没有EOL分隔符,并从下一个数据块(b2)中提取剩余的行。

映射器如何处理第二个块(b2),确定第一个logging是不完整的,并且应该从块(b2)中的第二个logging开始处理?

6 Solutions collect form web for “Hadoop进程logging如何跨越块边界进行拆分?”

有趣的问题,我花了一些时间看细节的代码,这是我的想法。 分割由客户端通过InputFormat.getSplits来处理,所以看一下FileInputFormat会给出以下信息:

  • 对于每个input文件,获取文件长度,块大小,并计算分割大小为max(minSize, min(maxSize, blockSize)) ,其中maxSize对应于mapred.max.split.sizeminSizemapred.min.split.size
  • 根据上面计算的拆分大小将文件分成不同的FileSplit 。 这里最重要的是每个FileSplit都用一个对应于input文件偏移的start参数进行初始化 。 目前还没有处理线路。 代码的相关部分如下所示:

     while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(new FileSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } 

之后,如果您查看由TextInputFormat定义的LineRecordReader ,则这是处理行的位置:

  • 当你初始化你的LineRecordReader它会尝试实例化一个LineReader ,它是一个抽象,能够通过FSDataInputStream读取行。 有两种情况:
  • 如果定义了CompressionCodec ,那么这个编解码器负责处理边界。 可能与您的问题不相关。
  • 如果没有编解码器,那么这就是有趣的地方:如果InputSplitstart点不是0,那么你回溯1个字符,然后跳过由\ n或\ r \ n(Windows)标识的第一行 ! 回溯很重要,因为如果您的行边界与分割边界相同,则可以确保您不会跳过有效行。 这是相关的代码:

     if (codec != null) { in = new LineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { if (start != 0) { skipFirstLine = true; --start; fileIn.seek(start); } in = new LineReader(fileIn, job); } if (skipFirstLine) { // skip first line and re-establish "start". start += in.readLine(new Text(), 0, (int)Math.min((long)Integer.MAX_VALUE, end - start)); } this.pos = start; 

因此,由于在客户端计算分割,映射器不需要依次运行,所以每个映射器都知道是否需要丢弃第一行。

所以基本上如果你在同一个文件中每个100Mb有两行,并且简单的说我们可以说分割大小是64Mb。 然后当计算input分裂时,我们将有以下情况:

  • 将包含path和主机的分割1包含到此块中。 初始化200-200 = 0Mb,长度64Mb。
  • Split 2初始化200-200 + 64 = 64Mb,长度64Mb。
  • Split 3初始化200-200 + 128 = 128Mb,长度64Mb。
  • 分割4初始化200-200 + 192 = 192Mb,长度8Mb。
  • 映射器A将处理拆分1,开始为0,所以不要跳过第一行,并读取超出64Mb限制的完整行,因此需要远程读取。
  • 映射器B将处理split 2,start是!= 0,所以跳过64Mb-1byte之后的第一行,这对应于仍在split 2中的100Mb的行1的结尾,我们在split 2中有28Mb的行远程读取剩余的72Mb。
  • Mapper C会处理split 3,start是!= 0,所以在128Mb-1byte之后跳过第一行,这对应于200Mb的第二行的结尾,这是文件的结尾,所以什么也不做。
  • 除了在192Mb-1字节后寻找换行符之外,Mapper D与映射器C相同。

Map Reduecealgorithm在文件的物理块上不起作用。 它适用于逻辑input分割。 input拆分取决于logging写入的位置。 一个logging可能跨越两个Mappers。

HDFS的设置方式是将非常大的文件分解为大块(例如,大小为128MB),并将这些块的三个副本存储在群集中的不同节点上。

HDFS没有意识到这些文件的内容。 logging可能已经在Block-a中开始,但该logging的结尾可能存在于Block-b中

为了解决这个问题,Hadoop使用存储在文件块中的数据的逻辑表示,称为input拆分。 当MapReduce作业客户端计算input分割时它会计算块中第一个完整logging的开始位置以及块中最后一条logging的结束位置

关键点:

在块中最后一个logging不完整的情况下,input拆分包括下一个块的位置信息和完成logging所需的数据的字节偏移。

看下面的图。

在这里输入图像描述

看看这篇文章和相关的SE问题: 关于Hadoop / HDFS文件拆分

更多细节可以从文档中读取

Map-Reduce框架依赖于作业的InputFormat来:

  1. validation作业的input规范。
  2. 将input文件拆分为逻辑InputSplits,然后将每个文件分配给一个单独的Mapper。
  3. 然后将每个InputSplit分配给一个单独的Mapper进行处理。 分裂可能是元组InputSplit[] getSplits(JobConf job,int numSplits )是处理这些事情的API。

FileInputFormat ,它扩展了InputFormat实现的getSplits ()方法。 看看这个方法的内部在grepcode上

我将其看作如下:InputFormat负责将数据拆分为逻辑分割,同时考虑到数据的性质。
没有什么可以阻止它这样做,尽pipe它可以增加工作的显着延迟 – 所有的逻辑和围绕期望的分割大小边界的阅读将发生在jobtracker。
最简单的logging感知input格式是TextInputFormat。 它正在如下工作(据我所知,从代码) – input格式创build拆分的大小,无论线,但LineRecordReader总是:
a)如果不是第一次分割,则跳过分割中的第一行(或其中的一部分)
b)在分割的边界之后读取一行(如果数据可用,则不是最后的分割)。

从我所理解的,当FileSplit初始化为第一个块时,默认的构造函数被调用。 因此,开始和长度的值初始为零。 在第一个块的处理结束时,如果最后一行不完整,那么长度的值将大于分割的长度,并且它将读取下一个块的第一行。 由于这个原因,第一个块的LineRecordReader将大于零,在这种情况下, LineRecordReader将跳过第二个块的第一行。 (看来源 )

如果第一个块的最后一行完成,那么长度的值将等于第一个块的长度,第二个块的开始的值将为零。 在这种情况下, LineRecordReader将不会跳过第一行并LineRecordReader开始读第二个块。

说得通?

制图人员不必交stream。 文件块在HDFS中,当前的映射器(RecordReader)可以读取包含该行剩余部分的块。 这发生在幕后。

从LineRecordReader.java的hadoop源代码构造函数:我find一些评论:

 // If this is not the first split, we always throw away first record // because we always (except the last split) read one extra line in // next() method. if (start != 0) { start += in.readLine(new Text(), 0, maxBytesToConsume(start)); } this.pos = start; 

从这个我相信hadoop将读取一个额外的行每个拆分(在当前拆分结束,读下一行在下一个拆分),如果不是第一次拆分,第一行将被扔掉。 这样就不会有线路logging丢失和不完整的情况

  • 名称节点处于安全模式。 不能离开
  • 从简单的Java程序调用mapreduce作业
  • MapReduce的简单解释?
  • 好MapReduce的例子
  • 使用mapred或mapreduce包来创buildHadoop作业会更好吗?
  • hadoop map减less二次sorting
  • 有没有一个.NET相当于Apache Hadoop?
  • Mongodb聚合框架比map / reduce更快吗?
  • Mapreduce初学者程序实例
  • Hadoop DistributedCache已被弃用 - 首选的API是什么?
  • 检查数组中的每个元素是否匹配条件