通过按键Spark – 一个Spark作业写入多个输出

如何在单个Job中使用Spark写入多个依赖于键的输出。 我当然可以使用.filter来处理所有可能的密钥,但这是一个非常糟糕的黑客攻击,它会触发许多作业, 并且需要对数据集进行多次迭代,即使缓存也是如此

相关: 通过键Scalding Hadoop(一个MapReduce作业)写入多个输出

例如

 sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"))) .writeAsMultiple(prefix, compressionCodecOption) 

将确保cat prefix/1

 a b 

cat prefix/2将是

 c 

我知道这个问题的答案将涉及在Hadoop中使用MultipleOutputFormat。

更新:请不要对结果输出进行任何限制,例如提供文件数量固定的解决方案,或者必须事先知道密钥数量的地方,或者压缩类型有限的地方。

更新:在烫伤这现在是超级简单感谢TemplatedTsv我想要一个答案就是这样!

如果您使用Spark 1.4+,则由于DataFrame API ,这变得非常容易。 (DataFrames是在Spark 1.3中引入的,但我们需要的partitionBy()是在1.4中引入的 。)

如果您刚开始使用RDD,则需要先将其转换为DataFrame:

 val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie"))) val people_df = people_rdd.toDF("number", "name") 

在Python中,这个相同的代码是:

 people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")]) people_df = people_rdd.toDF(["number", "name"]) 

一旦你有一个DataFrame,根据一个特定的密钥写入多个输出是很简单的。 更重要的是 – 这就是DataFrame API的美妙之处 – Python,Scala,Java和R中的代码几乎相同:

 people_df.write.partitionBy("number").text("people") 

如果你想要的话,你可以很容易地使用其他输出格式:

 people_df.write.partitionBy("number").json("people-json") people_df.write.partitionBy("number").parquet("people-parquet") 

在这些例子中的每一个中,Spark都会为我们在其上分割DataFrame的每个键创建一个子目录:

 people/ _SUCCESS number=1/ part-abcd part-efgh number=2/ part-abcd part-efgh 

我会这样做,这是可扩展的

 import org.apache.hadoop.io.NullWritable import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] { override def generateActualKey(key: Any, value: Any): Any = NullWritable.get() override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String] } object Split { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Split" + args(1)) val sc = new SparkContext(conf) sc.textFile("input/path") .map(a => (k, v)) // Your own implementation .partitionBy(new HashPartitioner(num)) .saveAsHadoopFile("output/path", classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat]) spark.stop() } } 

刚才看到类似的答案,但实际上我们不需要定制的分区。 MultipleTextOutputFormat将为每个键创建文件。 多个具有相同密钥的记录可以归入同一个分区。

新的HashPartitioner(num),其中num是你想要的分区号。 如果你有很多不同的密钥,你可以将数字设置为大。 在这种情况下,每个分区不会打开太多的hdfs文件处理程序。

如果您可能有一个给定的密钥许多值,我认为可扩展的解决方案是写出每个分区每个密钥一个文件。 不幸的是,在Spark中没有内置的支持,但我们可以掀起一些东西。

 sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"))) .mapPartitionsWithIndex { (p, it) => val outputs = new MultiWriter(p.toString) for ((k, v) <- it) { outputs.write(k.toString, v) } outputs.close Nil.iterator } .foreach((x: Nothing) => ()) // To trigger the job. // This one is Local, but you could write one for HDFS class MultiWriter(suffix: String) { private val writers = collection.mutable.Map[String, java.io.PrintWriter]() def write(key: String, value: Any) = { if (!writers.contains(key)) { val f = new java.io.File("output/" + key + "/" + suffix) f.getParentFile.mkdirs writers(key) = new java.io.PrintWriter(f) } writers(key).println(value) } def close = writers.values.foreach(_.close) } 

(用您选择的分布式文件系统操作替换PrintWriter 。)

这在RDD上进行一次传递并且不执行随机播放。 它为每个键提供一个目录,每个目录中包含多个文件。

我有类似的需求,找到了一个方法。 但是它有一个缺点(这对我的情况不是问题):您需要为每个输出文件使用一个分区重新分区数据。

要以这种方式进行分区,通常需要事先知道作业将输出多少个文件,并找到将每个键映射到每个分区的函数。

首先让我们创建基于MultipleTextOutputFormat的类:

 import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat class KeyBasedOutput[T >: Null, V <: AnyRef] extends MultipleTextOutputFormat[T , V] { override def generateFileNameForKeyValue(key: T, value: V, leaf: String) = { key.toString } override protected def generateActualKey(key: T, value: V) = { null } } 

有了这个类,Spark会从分区中获得一个密钥(我想是第一个/最后一个),并用这个密钥命名文件,所以在同一分区上混合多个密钥并不好。

对于你的例子,你将需要一个自定义的分区。 这将完成这项工作:

 import org.apache.spark.Partitioner class IdentityIntPartitioner(maxKey: Int) extends Partitioner { def numPartitions = maxKey def getPartition(key: Any): Int = key match { case i: Int if i < maxKey => i } } 

现在我们把所有东西放在一起:

 val rdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"), (7, "d"), (7, "e"))) // You need to know the max number of partitions (files) beforehand // In this case we want one partition per key and we have 3 keys, // with the biggest key being 7, so 10 will be large enough val partitioner = new IdentityIntPartitioner(10) val prefix = "hdfs://.../prefix" val partitionedRDD = rdd.partitionBy(partitioner) partitionedRDD.saveAsHadoopFile(prefix, classOf[Integer], classOf[String], classOf[KeyBasedOutput[Integer, String]]) 

这将在前缀(命名为1,2和7)下生成3个文件,一次处理所有内容。

正如你所看到的,你需要一些关于你的密钥的知识才能使用这个解决方案。

对我来说,这很容易,因为我需要一个输出文件的每个键哈希和文件的数量在我的控制下,所以我可以使用股票HashPartitioner做的伎俩。

saveAsText()和saveAsHadoop(…)是基于RDD数据实现的,特别是通过以下方法实现的: PairRDD.saveAsHadoopDataset ,它从执行PairRdd的数据中获取数据。 我看到两种可能的选择:如果数据的大小相对较小,则可以通过在RDD上进行分组来节省一些实施时间,从每个集合创建一个新的RDD并使用该RDD来写入数据。 像这样的东西:

 val byKey = dataRDD.groupByKey().collect() val rddByKey = byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)} val rddByKey.foreach{ case (k,rdd) => rdd.saveAsText(prefix+k} 

请注意,它不适用于大数据集b / c v.toSeq迭代器的v.toSeq可能不适合内存。

我看到的另一个选项,实际上我推荐的方法是:直接调用hadoop / hdfs api。

以下是我在研究这个问题时开始的一个讨论: 如何从另一个RDD创建RDD?

我在Java中需要同样的东西。 将我的翻译张湛的Scala答案发布给Spark Java API用户:

 import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import java.util.Arrays; class RDDMultipleTextOutputFormat<A, B> extends MultipleTextOutputFormat<A, B> { @Override protected String generateFileNameForKeyValue(A key, B value, String name) { return key.toString(); } } public class Main { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("Split Job") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); String[] strings = {"Abcd", "Azlksd", "whhd", "wasc", "aDxa"}; sc.parallelize(Arrays.asList(strings)) // The first character of the string is the key .mapToPair(s -> new Tuple2<>(s.substring(0,1).toLowerCase(), s)) .saveAsHadoopFile("output/", String.class, String.class, RDDMultipleTextOutputFormat.class); sc.stop(); } } 

我有一个类似的用例,我把Hadoop HDFS上的输入文件分割成多个基于一个键(每个键一个文件)的文件。 这是我的斯卡拉火花代码

 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; val hadoopconf = new Configuration(); val fs = FileSystem.get(hadoopconf); @serializable object processGroup { def apply(groupName:String, records:Iterable[String]): Unit = { val outFileStream = fs.create(new Path("/output_dir/"+groupName)) for( line <- records ) { outFileStream.writeUTF(line+"\n") } outFileStream.close() } } val infile = sc.textFile("input_file") val dateGrouped = infile.groupBy( _.split(",")(0)) dateGrouped.foreach( (x) => processGroup(x._1, x._2)) 

我已经根据密钥对记录进行了分组。 每个键的值被写入单独的文件。

python用户的好消息,如果你有多列,你想保存所有其他列没有分区的CSV格式,如果你使用“文本”方法作为尼克Chammas的建议,将失败。

 people_df.write.partitionBy("number").text("people") 

错误信息是“AnalysisException:u'Text数据源只支持单列,而且你有2列;'”

在spark 2.0.0中(我的测试环境是hdp的spark 2.0.0)现在已经集成了“com.databricks.spark.csv”包,它允许我们保存仅由一列进行分区的文本文件,请参阅示例:

 people_rdd = sc.parallelize([(1,"2016-12-26", "alice"), (1,"2016-12-25", "alice"), (1,"2016-12-25", "tom"), (1, "2016-12-25","bob"), (2,"2016-12-26" ,"charlie")]) df = people_rdd.toDF(["number", "date","name"]) df.coalesce(1).write.partitionBy("number").mode("overwrite").format('com.databricks.spark.csv').options(header='false').save("people") [root@namenode people]# tree . ├── number=1 │?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv ├── number=2 │?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv └── _SUCCESS [root@namenode people]# cat number\=1/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv 2016-12-26,alice 2016-12-25,alice 2016-12-25,tom 2016-12-25,bob [root@namenode people]# cat number\=2/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv 2016-12-26,charlie 

在我的spark 1.6.1环境中,代码没有抛出任何错误,但是这只是生成一个文件。 它不是由两个文件夹分区。

希望这可以帮助。

我有一个类似的用例。 我通过编写两个实现MultipleTextOutputFormatRecordWriter自定义类来解决它在Java中的RecordWriter

我的输入是一个JavaPairRDD<String, List<String>> ,我想存储在一个文件中,其中包含所有的行。

这是我的MultipleTextOutputFormat实现的代码

 class RDDMultipleTextOutputFormat<K, V> extends MultipleTextOutputFormat<K, V> { @Override protected String generateFileNameForKeyValue(K key, V value, String name) { return key.toString(); //The return will be used as file name } /** The following 4 functions are only for visibility purposes (they are used in the class MyRecordWriter) **/ protected String generateLeafFileName(String name) { return super.generateLeafFileName(name); } protected V generateActualValue(K key, V value) { return super.generateActualValue(key, value); } protected String getInputFileBasedOutputFileName(JobConf job, String name) { return super.getInputFileBasedOutputFileName(job, name); } protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException { return super.getBaseRecordWriter(fs, job, name, arg3); } /** Use my custom RecordWriter **/ @Override RecordWriter<K, V> getRecordWriter(final FileSystem fs, final JobConf job, String name, final Progressable arg3) throws IOException { final String myName = this.generateLeafFileName(name); return new MyRecordWriter<K, V>(this, fs, job, arg3, myName); } } 

这是我的RecordWriter实现的代码。

 class MyRecordWriter<K, V> implements RecordWriter<K, V> { private RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat; private final FileSystem fs; private final JobConf job; private final Progressable arg3; private String myName; TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap(); MyRecordWriter(RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat, FileSystem fs, JobConf job, Progressable arg3, String myName) { this.rddMultipleTextOutputFormat = rddMultipleTextOutputFormat; this.fs = fs; this.job = job; this.arg3 = arg3; this.myName = myName; } @Override void write(K key, V value) throws IOException { String keyBasedPath = rddMultipleTextOutputFormat.generateFileNameForKeyValue(key, value, myName); String finalPath = rddMultipleTextOutputFormat.getInputFileBasedOutputFileName(job, keyBasedPath); Object actualValue = rddMultipleTextOutputFormat.generateActualValue(key, value); RecordWriter rw = this.recordWriters.get(finalPath); if(rw == null) { rw = rddMultipleTextOutputFormat.getBaseRecordWriter(fs, job, finalPath, arg3); this.recordWriters.put(finalPath, rw); } List<String> lines = (List<String>) actualValue; for (String line : lines) { rw.write(null, line); } } @Override void close(Reporter reporter) throws IOException { Iterator keys = this.recordWriters.keySet().iterator(); while(keys.hasNext()) { RecordWriter rw = (RecordWriter)this.recordWriters.get(keys.next()); rw.close(reporter); } this.recordWriters.clear(); } } 

大部分代码与FileOutputFormat完全相同。 唯一的区别是那几行

 List<String> lines = (List<String>) actualValue; for (String line : lines) { rw.write(null, line); } 

这些行允许我在文件中写入我的输入List<String>每一行。 write函数的第一个参数设置为null ,以避免在每一行上write密钥。

要完成,我只需要做这个调用来写我的文件

 javaPairRDD.saveAsHadoopFile(path, String.class, List.class, RDDMultipleTextOutputFormat.class); 

这包括所要求的编解码器,必要的进口和按要求的皮条客。

 import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext // TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) { def writeAsMultiple(prefix: String, codec: String, keyName: String = "key") (implicit sqlContext: SQLContext): Unit = { import sqlContext.implicits._ rdd.toDF(keyName, "_2").write.partitionBy(keyName) .format("text").option("codec", codec).save(prefix) } } val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"))) myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec") 

OP的一个细微差别是它将前缀<keyName>=作为目录名称。 例如

 myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec") 

会给:

 prefix/key=1/part-00000 prefix/key=2/part-00000 

其中prefix/my_number=1/part-00000将包含行abprefix/my_number=2/part-00000将包含行c

 myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo") 

会给:

 prefix/foo=1/part-00000 prefix/foo=2/part-00000 

应该清楚如何编辑parquet

最后,下面是Dataset一个例子,使用元组也许更好。

 implicit class PimpedDataset[T](dataset: Dataset[T]) { def writeAsMultiple(prefix: String, codec: String, field: String): Unit = { dataset.write.partitionBy(field) .format("text").option("codec", codec).save(prefix) } }