在hadoop中将文件读取为单个logging

我有很大的不。 的小文件,我想使用CombineFileInputFormat合并文件,使每个文件数据作为我的MR作业中的单个logging。 我已经遵循http://yaseminavcular.blogspot.in/2011/03/many-small-input-files.html,并试图将其转换为新的API

我面临着两个问题:

a)我只用2个小文件testing,还有2个映射器被解雇。 我预计1

b)每一行都是单个logging,我想整个文件作为单个logging。

这可能是痛苦的,但请看下面的代码。 在hadoop中我还是一个天真的人

司机class

public class MRDriver extends Configured implements Tool { @Override public int run(String[] args) throws Exception { FileSystem fs = new Path(".").getFileSystem(getConf()); fs.printStatistics(); Job job = new Job(getConf()); job.setJobName("Enron MR"); job.setMapperClass(EnronMailReadMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setNumReduceTasks(0); job.setJarByClass(EnronMailReadMapper.class); RawCombineFileInputFormat.addInputPath(job, new Path(args[0])); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 :1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MRDriver(), args); System.exit(exitCode); } 

}

下面的类主要是复制LineRecordReader的粘贴,修改initialize()和nextKeyValue()函数

 public class SingleFileRecordReader extends RecordReader<LongWritable, Text> { private static final Log LOG = LogFactory.getLog(SingleFileRecordReader.class); private long start; private long pos; private long end; private LineReader in; private int maxLineLength; private LongWritable key = null; private Text value = null; public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); // open the file and seek to the start of the split FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); fileIn.seek(start); in = new LineReader(fileIn, job); // If this is not the first split, we always throw away first record // because we always (except the last split) read one extra line in // next() method. if (start != 0) { start += in.readLine(new Text(), 0, maxBytesToConsume(start)); } this.pos = start; } private int maxBytesToConsume(long pos) { return (int) Math.min(Integer.MAX_VALUE, end - pos); } private long getFilePosition() throws IOException { long retVal= pos; return retVal; } public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); } key.set(pos); if (value == null) { value = new Text(); } int newSize = 0; StringBuffer totalValue = new StringBuffer(); // We always read one extra line, which lies outside the upper // split limit ie (end - 1) while (getFilePosition() <= end) { newSize = in.readLine(value, maxLineLength, Math.max(maxBytesToConsume(pos), maxLineLength)); if (newSize == 0) { break; } totalValue.append(value.toString()+"\n"); pos += newSize; if (newSize < maxLineLength) { break; } // line too long. try again LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); } if (newSize == 0) { key = null; value = null; return false; } else { value = new Text(totalValue.toString()); return true; } } @Override public LongWritable getCurrentKey() { return key; } @Override public Text getCurrentValue() { return value; } /** * Get the progress within the split */ public float getProgress() throws IOException { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (getFilePosition() - start) / (float)(end - start)); } } public synchronized void close() throws IOException { try { if (in != null) { in.close(); } } finally { } } 

}

其他文件

 public class RawCombineFileInputFormat extends CombineFileInputFormat <LongWritable,Text>{ @Override public RecordReader<LongWritable, Text> createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException { return new CombineFileRecordReader< LongWritable, Text >((CombineFileSplit) split, context, MultiFileRecordReader.class); } 

}

 public class MultiFileRecordReader extends RecordReader < LongWritable, Text > { private CombineFileSplit split; private TaskAttemptContext context; private int index; private RecordReader< LongWritable, Text > rr; public MultiFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) { this.split = split; this.context = context; this.index = index; this.rr = new SingleFileRecordReader(); } @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.split = (CombineFileSplit) split; this.context = context; if (null == rr) { rr = new SingleFileRecordReader(); } FileSplit fileSplit = new FileSplit(this.split.getPath(index), this.split.getOffset(index), this.split.getLength(index), this.split.getLocations()); this.rr.initialize(fileSplit, this.context); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return this.rr.nextKeyValue(); } @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return this.rr.getCurrentKey(); } @Override public Text getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return this.rr.getCurrentValue(); } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub return this.rr.getProgress(); } @Override public void close() throws IOException { if (rr != null) { rr.close(); rr = null; } } 

}

看看这个input格式。这是一个input格式,用于在单个map任务中读取多个文件。每个传递给mapper的logging都会读取一个(unsplit)文件。 WholeFileRecordReader负责发送一个文件内容作为一个值。 返回的键是NullWritable,value是整个文件的内容。 现在你可以使用它来运行你的mapreduce作业,看看有多lessmappers实际运行,并检查你得到的输出是否正确。

logging是从WholeFileRecordReaders构build的。

  public class WholeFileInputFormat extends CombineFileInputFormat<NullWritable, Text>{ @Override protected boolean isSplitable(JobContext context, Path file) { return false; } /** * Creates a CombineFileRecordReader to read each file assigned to this InputSplit. * Note, that unlike ordinary InputSplits, split must be a CombineFileSplit, and therefore * is expected to specify multiple files. * * @param split The InputSplit to read. Throws an IllegalArgumentException if this is * not a CombineFileSplit. * @param context The context for this task. * @return a CombineFileRecordReader to process each file in split. * It will read each file with a WholeFileRecordReader. * @throws IOException if there is an error. */ @Override public RecordReader<NullWritable, Text> createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException { if (!(split instanceof CombineFileSplit)) { throw new IllegalArgumentException("split must be a CombineFileSplit"); } return new CombineFileRecordReader<NullWritable, Text>((CombineFileSplit) split, context, WholeFileRecordReader.class); } } 

在这里上面你可以使用WholeFileRecordReader,如下所示: –

 public class WholeFileRecordReader extends RecordReader<NullWritable, Text> { private static final Logger LOG = Logger.getLogger(WholeFileRecordReader.class); /** The path to the file to read. */ private final Path mFileToRead; /** The length of this file. */ private final long mFileLength; /** The Configuration. */ private final Configuration mConf; /** Whether this FileSplit has been processed. */ private boolean mProcessed; /** Single Text to store the file name of the current file. */ // private final Text mFileName; /** Single Text to store the value of this file (the value) when it is read. */ private final Text mFileText; /** * Implementation detail: This constructor is built to be called via * reflection from within CombineFileRecordReader. * * @param fileSplit The CombineFileSplit that this will read from. * @param context The context for this task. * @param pathToProcess The path index from the CombineFileSplit to process in this record. */ public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext context, Integer pathToProcess) { mProcessed = false; mFileToRead = fileSplit.getPath(pathToProcess); mFileLength = fileSplit.getLength(pathToProcess); mConf = context.getConfiguration(); assert 0 == fileSplit.getOffset(pathToProcess); if (LOG.isDebugEnabled()) { LOG.debug("FileToRead is: " + mFileToRead.toString()); LOG.debug("Processing path " + pathToProcess + " out of " + fileSplit.getNumPaths()); try { FileSystem fs = FileSystem.get(mConf); assert fs.getFileStatus(mFileToRead).getLen() == mFileLength; } catch (IOException ioe) { // oh well, I was just testing. } } // mFileName = new Text(); mFileText = new Text(); } /** {@inheritDoc} */ @Override public void close() throws IOException { mFileText.clear(); } /** * Returns the absolute path to the current file. * * @return The absolute path to the current file. * @throws IOException never. * @throws InterruptedException never. */ @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } /** * <p>Returns the current value. If the file has been read with a call to NextKeyValue(), * this returns the contents of the file as a BytesWritable. Otherwise, it returns an * empty BytesWritable.</p> * * <p>Throws an IllegalStateException if initialize() is not called first.</p> * * @return A BytesWritable containing the contents of the file to read. * @throws IOException never. * @throws InterruptedException never. */ @Override public Text getCurrentValue() throws IOException, InterruptedException { return mFileText; } /** * Returns whether the file has been processed or not. Since only one record * will be generated for a file, progress will be 0.0 if it has not been processed, * and 1.0 if it has. * * @return 0.0 if the file has not been processed. 1.0 if it has. * @throws IOException never. * @throws InterruptedException never. */ @Override public float getProgress() throws IOException, InterruptedException { return (mProcessed) ? (float) 1.0 : (float) 0.0; } /** * All of the internal state is already set on instantiation. This is a no-op. * * @param split The InputSplit to read. Unused. * @param context The context for this task. Unused. * @throws IOException never. * @throws InterruptedException never. */ @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // no-op. } /** * <p>If the file has not already been read, this reads it into memory, so that a call * to getCurrentValue() will return the entire contents of this file as Text, * and getCurrentKey() will return the qualified path to this file as Text. Then, returns * true. If it has already been read, then returns false without updating any internal state.</p> * * @return Whether the file was read or not. * @throws IOException if there is an error reading the file. * @throws InterruptedException if there is an error. */ @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!mProcessed) { if (mFileLength > (long) Integer.MAX_VALUE) { throw new IOException("File is longer than Integer.MAX_VALUE."); } byte[] contents = new byte[(int) mFileLength]; FileSystem fs = mFileToRead.getFileSystem(mConf); FSDataInputStream in = null; try { // Set the contents of this file. in = fs.open(mFileToRead); IOUtils.readFully(in, contents, 0, contents.length); mFileText.set(contents, 0, contents.length); } finally { IOUtils.closeStream(in); } mProcessed = true; return true; } return false; } } 

以下是您的驱动程序代码:

 public int run(String[] arg) throws Exception { Configuration conf=getConf(); FileSystem fs = FileSystem.get(conf); //estimate reducers Job job = new Job(conf); job.setJarByClass(WholeFileDriver.class); job.setJobName("WholeFile"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(WholeFileInputFormat.class); job.setMapperClass(WholeFileMapper.class); job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, new Path(arg[0])); Path output=new Path(arg[1]); try { fs.delete(output, true); } catch (IOException e) { LOG.warn("Failed to delete temporary path", e); } FileOutputFormat.setOutputPath(job, output); boolean ret=job.waitForCompletion(true); if(!ret){ throw new Exception("Job Failed"); }