合并输出文件后缩小阶段

在mapreduce中,每个reduce任务将其输出写入名为part-r-nnnnn的文件,其中nnnnn是与reduce任务关联的分区ID。 映射/减less合并这些文件? 如果是的话,怎么样?

您可以通过调用下面的代码将合并的reduce输出文件委托给hadoop,而不是单独进行文件合并:

hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt 

不,这些文件不会被Hadoop合并。 您获得的文件数与减less任务的数量相同。

如果你需要这个作为下一个工作的input,那么不要担心有单独的文件。 只需指定整个目录作为下一个作业的input。

如果您确实需要集群外部的数据,那么我通常会在将数据从集群中拉出时将其合并到接收端。

即这样的事情:

 hadoop fs -cat /some/where/on/hdfs/job-output/part-r-* > TheCombinedResultOfTheJob.txt 

这是您可以用来在HDFS中合并文件的function

 public boolean getMergeInHdfs(String src, String dest) throws IllegalArgumentException, IOException { FileSystem fs = FileSystem.get(config); Path srcPath = new Path(src); Path dstPath = new Path(dest); // Check if the path already exists if (!(fs.exists(srcPath))) { logger.info("Path " + src + " does not exists!"); return false; } if (!(fs.exists(dstPath))) { logger.info("Path " + dest + " does not exists!"); return false; } return FileUtil.copyMerge(fs, srcPath, fs, dstPath, false, config, null); } 

对于仅文本文件和HDFS作为源和目标,请使用以下命令:

hadoop fs -cat /input_hdfs_dir/* | hadoop fs -put - /output_hdfs_file

这将连接input_hdfs_dir所有文件, input_hdfs_dir输出写回HDFS的output_hdfs_file 。 请记住,所有的数据将被带回到本地系统,然后再次上传到hdfs,虽然没有临时文件被创build,这在使用UNIX pe的飞行中发生。

此外,这不适用于诸如Avro,ORC等非文本文件。

对于二进制文件,你可以做这样的事情(如果你有Hive表映射到目录):

insert overwrite table tbl select * from tbl

根据您的configuration,这也可能创build多个文件。 要创build单个文件,请使用mapreduce.job.reduces=1显式设置reducer的数量为mapreduce.job.reduces=1或者将hive属性设置为hive.merge.mapredfiles=true

您可以运行其他map / reduce任务,map和reduce不更改数据,partitioner将所有数据分配给一个reducer。

part-r-nnnnn文件是在由“r”指定的缩小阶段之后生成的。 现在事实是,如果你有一个reducer运行,你将有一个输出文件,如part-r-00000。 如果reducer的数量是2那么你将有part-r-00000和part-r-00001等等。 看,如果输出文件太大,无法放入机器内存,因为hadoop框架已经devise为在商品机器上运行,那么文件就会被分割。 按照MRv1,你有20个减速器的限制来处理你的逻辑。 您可能在configuration文件mapred-site.xml中有更多但需要自定义的相同需求。 谈论你的问题; 您可以使用getmerge,也可以通过将以下语句embedded到驱动程序代码中,将reducer的数量设置为1

 job.setNumReduceTasks(1); 

希望这回答你的问题。

除了我以前的答案,我还有一个答案是你在几分钟前尝试的。 你可以使用CustomOutputFormat ,它看起来像下面给出的代码

 public class VictorOutputFormat extends FileOutputFormat<StudentKey,PassValue> { @Override public RecordWriter<StudentKey,PassValue> getRecordWriter( TaskAttemptContext tac) throws IOException, InterruptedException { //step 1: GET THE CURRENT PATH Path currPath=FileOutputFormat.getOutputPath(tac); //Create the full path Path fullPath=new Path(currPath,"Aniruddha.txt"); //create the file in the file system FileSystem fs=currPath.getFileSystem(tac.getConfiguration()); FSDataOutputStream fileOut=fs.create(fullPath,tac); return new VictorRecordWriter(fileOut); } } 

只是,看看最后的第四行。 我用我自己的名字作为输出文件名,我用15个减速器testing了这个程序。 仍然该文件保持不变。 因此,获取单个输出文件而不是两个或更多可能是非常清楚的,输出文件的大小不能超过主存储器的大小,即输出文件必须适合商品机器的存储器,否则可能存在输出文件拆分的问题。 谢谢!!

为什么不使用像这样的猪脚本来合并分区文件:

 stuff = load "/path/to/dir/*" store stuff into "/path/to/mergedir" 

如果这些文件有头文件,可以这样做:

 hadoop fs -cat /path/to/hdfs/job-output/part-* | grep -v "header" > output.csv 

然后为output.csv手动添加标题

。 映射/减less合并这些文件?

不,它不合并。

您可以使用IdentityReducer来实现您的目标。

不减less,直接将所有input值写入输出。

 public void reduce(K key, Iterator<V> values, OutputCollector<K,V> output, Reporter reporter) throws IOException 

将所有的键和值直接写入输出。

看看相关的SEpost:

hadoop:0还原剂和身份还原剂之间的区别?