Hadoop DistributedCache已被弃用 – 首选的API是什么?

我的地图任务需要一些configuration数据,我想通过分布式caching进行分发。

Hadoop MapReduce教程显示了DistributedCache类的用法 ,大致如下:

// In the driver JobConf conf = new JobConf(getConf(), WordCount.class); ... DistributedCache.addCacheFile(new Path(filename).toUri(), conf); // In the mapper Path[] myCacheFiles = DistributedCache.getLocalCacheFiles(job); ... 

但是, DistributedCache在Hadoop 2.2.0中被标记为已弃用 。

什么是实现这个新的首选方式? 是否有涵盖此API的最新示例或教程?

分布式caching的API可以在Job类中find。 检查这里的文档: http : //hadoop.apache.org/docs/stable2/api/org/apache/hadoop/mapreduce/Job.html代码应该是这样的

 Job job = new Job(); ... job.addCacheFile(new Path(filename).toUri()); 

在你的映射器代码中:

 Path[] localPaths = context.getLocalCacheFiles(); ... 

为了扩展@jtravaglini,在YARN / MapReduce 2中使用DistributedCache的首选方法如下:

在你的驱动程序中,使用Job.addCacheFile()

 public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = Job.getInstance(conf, "MyJob"); job.setMapperClass(MyMapper.class); // ... // Mind the # sign after the absolute file location. // You will be using the name after the # sign as your // file name in your Mapper/Reducer job.addCacheFile(new URI("/user/yourname/cache/some_file.json#some")); job.addCacheFile(new URI("/user/yourname/cache/other_file.json#other")); return job.waitForCompletion(true) ? 0 : 1; } 

在你的Mapper / Reducer中,重写setup(Context context)方法:

 @Override protected void setup( Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { if (context.getCacheFiles() != null && context.getCacheFiles().length > 0) { File some_file = new File("./some"); File other_file = new File("./other"); // Do things to these two files, like read them // or parse as JSON or whatever. } super.setup(context); } 

YARN / MR2的新分布式cachingAPI可以在org.apache.hadoop.mapreduce.Job类中find。

  Job.addCacheFile() 

不幸的是,目前还没有很多全面的教程式的例子。

http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/Job.html#addCacheFile%28java.net.URI%29

我没有使用job.addCacheFile()。 相反,我使用了像-files /path/to/myfile.txt#myfile这样的-files选项。 然后在映射器或reducer代码中使用下面的方法:

 /** * This method can be used with local execution or HDFS execution. * * @param context * @param symLink * @param throwExceptionIfNotFound * @return * @throws IOException */ public static File findDistributedFileBySymlink(JobContext context, String symLink, boolean throwExceptionIfNotFound) throws IOException { URI[] uris = context.getCacheFiles(); if(uris==null||uris.length==0) { if(throwExceptionIfNotFound) throw new RuntimeException("Unable to find file with symlink '"+symLink+"' in distributed cache"); return null; } URI symlinkUri = null; for(URI uri: uris) { if(symLink.equals(uri.getFragment())) { symlinkUri = uri; break; } } if(symlinkUri==null) { if(throwExceptionIfNotFound) throw new RuntimeException("Unable to find file with symlink '"+symLink+"' in distributed cache"); return null; } //if we run this locally the file system URI scheme will be "file" otherwise it should be a symlink return "file".equalsIgnoreCase(FileSystem.get(context.getConfiguration()).getScheme())?(new File(symlinkUri.getPath())):new File(symLink); } 

然后在mapper / reducer中:

 @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); File file = HadoopUtils.findDistributedFileBySymlink(context,"myfile",true); ... do work ... } 

请注意,如果我直接使用“-files /path/to/myfile.txt”,那么我需要使用“myfile.txt”来访问该文件,因为这是默认的符号链接名称。

没有提到的解决scheme完全为我工作。 这可能是因为Hadoop版本不断变化我正在使用hadoop 2.6.4。 本质上,分布式caching已被弃用,所以我不想使用它。 正如一些postbuild议我们使用addCacheFile(),但它已经改变了一点。 这是如何为我工作

 job.addCacheFile(new URI("hdfs://XXXX:9000/EnglishStop.txt#EnglishStop.txt")); 

这里XXXX可以是主IP地址或本地主机。 EnglishStop.txt存储在HDFS的/位置。

 hadoop fs -ls / 

输出是

 -rw-r--r-- 3 centos supergroup 1833 2016-03-12 20:24 /EnglishStop.txt drwxr-xr-x - centos supergroup 0 2016-03-12 19:46 /test 

有趣但方便,#EnglishStop.txt意味着现在我们可以在映射器中以“EnglishStop.txt”的forms访问它。 这里是相同的代码

 public void setup(Context context) throws IOException, InterruptedException { File stopwordFile = new File("EnglishStop.txt"); FileInputStream fis = new FileInputStream(stopwordFile); BufferedReader reader = new BufferedReader(new InputStreamReader(fis)); while ((stopWord = reader.readLine()) != null) { // stopWord is a word read from Cache } } 

这只是为我工作。 您可以读取存储在HDFS中的文件的行

我有同样的问题。 而且不仅是分散式的,而且还有getLocalCacheFiles和“new Job”。 所以对我有用的是以下几点:

司机:

 Configuration conf = getConf(); Job job = Job.getInstance(conf); ... job.addCacheFile(new Path(filename).toUri()); 

在Mapper / Reducer设置中:

 @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); URI[] files = context.getCacheFiles(); // getCacheFiles returns null Path file1path = new Path(files[0]) ... }