有人可以向我解释一下map和flatMap之间的区别,以及每个用例是什么?

有人可以向我解释一下map和flatMap之间的区别,以及每个用例是什么?

什么“平坦的结果”是什么意思? 到底有什么好处呢?

下面是一个例子,作为一个spark-shell会话:

首先,一些数据 – 两行文字:

 val rdd = sc.parallelize(Seq("Roses are red", "Violets are blue")) // lines rdd.collect res0: Array[String] = Array("Roses are red", "Violets are blue") 

现在, map将长度为N的RDD转换成长度为N的另一个RDD。

例如,它从两行映射成两行:

 rdd.map(_.length).collect res1: Array[Int] = Array(13, 16) 

flatMap (松散地说)将长度为N的RDD转换为N个集合的集合,然后将这些集合平面化为单个RDD的结果。

 rdd.flatMap(_.split(" ")).collect res2: Array[String] = Array("Roses", "are", "red", "Violets", "are", "blue") 

我们每行有多个单词,还有多行,但是我们最终得到单个输出的单词数组

为了说明这一点,从线集合到单词集合的flatMapping如下所示:

 ["aa bb cc", "", "dd"] => [["aa","bb","cc"],[],["dd"]] => ["aa","bb","cc","dd"] 

因此,对于flatMap ,input和输出RDD通常将具有不同的大小

一般我们在hadoop中使用字数统计的例子。 我将采取相同的用例,并将使用mapflatMap ,我们将看到它是如何处理数据的差异。

以下是示例数据文件。

 hadoop is fast hive is sql on hdfs spark is superfast spark is awesome 

上面的文件将使用mapflatMap进行parsing。

使用map

 >>> wc = data.map(lambda line:line.split(" ")); >>> wc.collect() [u'hadoop is fast', u'hive is sql on hdfs', u'spark is superfast', u'spark is awesome'] 

input有4行,输出大小也是4,即N个元素==> N个元素。

使用flatMap

 >>> fm = data.flatMap(lambda line:line.split(" ")); >>> fm.collect() [u'hadoop', u'is', u'fast', u'hive', u'is', u'sql', u'on', u'hdfs', u'spark', u'is', u'superfast', u'spark', u'is', u'awesome'] 

输出与地图不同。


让我们为每个键分配1来获得字数。

  • fm :使用flatMap创build的RDD
  • wc :使用map创build的RDD
 >>> fm.map(lambda word : (word,1)).collect() [(u'hadoop', 1), (u'is', 1), (u'fast', 1), (u'hive', 1), (u'is', 1), (u'sql', 1), (u'on', 1), (u'hdfs', 1), (u'spark', 1), (u'is', 1), (u'superfast', 1), (u'spark', 1), (u'is', 1), (u'awesome', 1)] 

而RDD上的map wc会给下面的不希望的输出:

 >>> wc.flatMap(lambda word : (word,1)).collect() [[u'hadoop', u'is', u'fast'], 1, [u'hive', u'is', u'sql', u'on', u'hdfs'], 1, [u'spark', u'is', u'superfast'], 1, [u'spark', u'is', u'awesome'], 1] 

如果使用map而不是flatMap则无法获得字数。

根据定义, mapflatMap区别是:

map :通过对RDD的每个元素应用给定的函数,它返回一个新的RDD。 map函数只返回一个项目。

flatMap :与map类似,它通过对RDD的每个元素应用一个函数来返回一个新的RDD,但是输出是平坦的。

如果您在Spark中询问RDD.map和RDD.flatMap之间的区别,则map会将大小为N的RDD转换为大小为N的另一个RDD。 例如。

 myRDD.map(x => x*2) 

例如,如果myRDD由双精度组成。

flatMap可以将RDD转换成另一种不同大小的格式,例如:

 myRDD.flatMap(x =>new Seq(2*x,3*x)) 

这将返回大小为2 * N或者的RDD

 myRDD.flatMap(x =>if x<10 new Seq(2*x,3*x) else new Seq(x) ) 

test.md为例:

 ➜ spark-1.6.1 cat test.md This is the first line; This is the second line; This is the last line. scala> val textFile = sc.textFile("test.md") scala> textFile.map(line => line.split(" ")).count() res2: Long = 3 scala> textFile.flatMap(line => line.split(" ")).count() res3: Long = 15 scala> textFile.map(line => line.split(" ")).collect() res0: Array[Array[String]] = Array(Array(This, is, the, first, line;), Array(This, is, the, second, line;), Array(This, is, the, last, line.)) scala> textFile.flatMap(line => line.split(" ")).collect() res1: Array[String] = Array(This, is, the, first, line;, This, is, the, second, line;, This, is, the, last, line.) 

如果你使用map方法,你会得到test.md的行,对于flatMap方法,你会得到的字数。

map方法类似于flatMap ,它们都返回一个新的RDD。 常常使用map方法返回一个新的RDD, flatMap方法经常使用分词。

map和flatMap是类似的,从某种意义上说,它们从inputRDD中取出一行,并在其上应用一个函数。 它们的不同之处在于map中的函数只返回一个元素,而flatMap中的函数可以返回一个元素列表(0或更多)作为迭代器。

而且,flatMap的输出也是平坦的。 虽然flatMap中的函数返回一个元素列表,但flatMap返回一个RDD,其中包含列表中的所有元素(不是列表)。

map返回的元素个数相等,而flatMap可能不会。

flatMap的示例用例过滤掉丢失或不正确的数据。

map一个示例用例在各种各样的情况下,input和输出的元素数量是相同的。

number.csv

 1 2 3 - 4 - 5 

map.py在add.csv中添加所有数字。

 from operator import * def f(row): try: return float(row) except Exception: return 0 rdd = sc.textFile('a.csv').map(f) print(rdd.count()) # 7 print(rdd.reduce(add)) # 15.0 

flatMap.py使用flatMap在添加之前过滤掉丢失的数据。 与以前的版本相比,添加了更less的数字。

 from operator import * def f(row): try: return [float(row)] except Exception: return [] rdd = sc.textFile('a.csv').flatMap(f) print(rdd.count()) # 5 print(rdd.reduce(add)) # 15.0 

Flatmap和Map都转换集合。

区别:

地图(FUNC)
通过函数func传递源的每个元素来形成一个新的分布式数据集。

flatMap(FUNC)
类似于map,但是每个input项可以映射到0个或更多个输出项(所以func应该返回一个Seq而不是单个项)。

转换function:
map :一个元素在 – >一个元素之外。
flatMap :一个元素在 – > 0个或多个元素之外(一个集合)。

对于所有想要PySpark相关的人:

示例转换:flatMap

 >>> a="hello what are you doing" >>> a.split() 

['hello','what','are','you','doing']

 >>> b=["hello what are you doing","this is rak"] >>> b.split() 

Traceback(最近一次调用的最后一个):AttributeError中的文件“”,第1行:“list”对象没有属性“split”

 >>> rline=sc.parallelize(b) >>> type(rline) 
 >>> def fwords(x): ... return x.split() >>> rword=rline.map(fwords) >>> rword.collect() 

[['hello','what','are','you','doing'],['this','是','rak']]

 >>> rwordflat=rline.flatMap(fwords) >>> rwordflat.collect() 

['hello','what','are','you','doing','this','是','rak']

希望能帮助到你 :)

从下面的示例pyspark代码可以看出差异:

 rdd = sc.parallelize([2, 3, 4]) rdd.flatMap(lambda x: range(1, x)).collect() Output: [1, 1, 2, 1, 2, 3] rdd.map(lambda x: range(1, x)).collect() Output: [[1], [1, 2], [1, 2, 3]] 

地图和flatMap的输出差异:

1. flatMap

 val a = sc.parallelize(1 to 10, 5) a.flatMap(1 to _).collect() 

输出:

  1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 

2. map

 val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.map(_.length).collect() 

输出:

 3 6 6 3 8