有人可以向我解释一下map和flatMap之间的区别,以及每个用例是什么?
有人可以向我解释一下map和flatMap之间的区别,以及每个用例是什么?
什么“平坦的结果”是什么意思? 到底有什么好处呢?
下面是一个例子,作为一个spark-shell
会话:
首先,一些数据 – 两行文字:
val rdd = sc.parallelize(Seq("Roses are red", "Violets are blue")) // lines rdd.collect res0: Array[String] = Array("Roses are red", "Violets are blue")
现在, map
将长度为N的RDD转换成长度为N的另一个RDD。
例如,它从两行映射成两行:
rdd.map(_.length).collect res1: Array[Int] = Array(13, 16)
但flatMap
(松散地说)将长度为N的RDD转换为N个集合的集合,然后将这些集合平面化为单个RDD的结果。
rdd.flatMap(_.split(" ")).collect res2: Array[String] = Array("Roses", "are", "red", "Violets", "are", "blue")
我们每行有多个单词,还有多行,但是我们最终得到单个输出的单词数组
为了说明这一点,从线集合到单词集合的flatMapping如下所示:
["aa bb cc", "", "dd"] => [["aa","bb","cc"],[],["dd"]] => ["aa","bb","cc","dd"]
因此,对于flatMap
,input和输出RDD通常将具有不同的大小
一般我们在hadoop中使用字数统计的例子。 我将采取相同的用例,并将使用map
和flatMap
,我们将看到它是如何处理数据的差异。
以下是示例数据文件。
hadoop is fast hive is sql on hdfs spark is superfast spark is awesome
上面的文件将使用map
和flatMap
进行parsing。
使用map
>>> wc = data.map(lambda line:line.split(" ")); >>> wc.collect() [u'hadoop is fast', u'hive is sql on hdfs', u'spark is superfast', u'spark is awesome']
input有4行,输出大小也是4,即N个元素==> N个元素。
使用flatMap
>>> fm = data.flatMap(lambda line:line.split(" ")); >>> fm.collect() [u'hadoop', u'is', u'fast', u'hive', u'is', u'sql', u'on', u'hdfs', u'spark', u'is', u'superfast', u'spark', u'is', u'awesome']
输出与地图不同。
让我们为每个键分配1来获得字数。
-
fm
:使用flatMap
创build的RDD -
wc
:使用map
创build的RDD
>>> fm.map(lambda word : (word,1)).collect() [(u'hadoop', 1), (u'is', 1), (u'fast', 1), (u'hive', 1), (u'is', 1), (u'sql', 1), (u'on', 1), (u'hdfs', 1), (u'spark', 1), (u'is', 1), (u'superfast', 1), (u'spark', 1), (u'is', 1), (u'awesome', 1)]
而RDD上的map
wc
会给下面的不希望的输出:
>>> wc.flatMap(lambda word : (word,1)).collect() [[u'hadoop', u'is', u'fast'], 1, [u'hive', u'is', u'sql', u'on', u'hdfs'], 1, [u'spark', u'is', u'superfast'], 1, [u'spark', u'is', u'awesome'], 1]
如果使用map
而不是flatMap
则无法获得字数。
根据定义, map
和flatMap
区别是:
map
:通过对RDD的每个元素应用给定的函数,它返回一个新的RDD。map
函数只返回一个项目。
flatMap
:与map
类似,它通过对RDD的每个元素应用一个函数来返回一个新的RDD,但是输出是平坦的。
如果您在Spark中询问RDD.map和RDD.flatMap之间的区别,则map会将大小为N的RDD转换为大小为N的另一个RDD。 例如。
myRDD.map(x => x*2)
例如,如果myRDD由双精度组成。
flatMap可以将RDD转换成另一种不同大小的格式,例如:
myRDD.flatMap(x =>new Seq(2*x,3*x))
这将返回大小为2 * N或者的RDD
myRDD.flatMap(x =>if x<10 new Seq(2*x,3*x) else new Seq(x) )
以test.md
为例:
➜ spark-1.6.1 cat test.md This is the first line; This is the second line; This is the last line. scala> val textFile = sc.textFile("test.md") scala> textFile.map(line => line.split(" ")).count() res2: Long = 3 scala> textFile.flatMap(line => line.split(" ")).count() res3: Long = 15 scala> textFile.map(line => line.split(" ")).collect() res0: Array[Array[String]] = Array(Array(This, is, the, first, line;), Array(This, is, the, second, line;), Array(This, is, the, last, line.)) scala> textFile.flatMap(line => line.split(" ")).collect() res1: Array[String] = Array(This, is, the, first, line;, This, is, the, second, line;, This, is, the, last, line.)
如果你使用map
方法,你会得到test.md
的行,对于flatMap
方法,你会得到的字数。
map
方法类似于flatMap
,它们都返回一个新的RDD。 常常使用map
方法返回一个新的RDD, flatMap
方法经常使用分词。
map和flatMap是类似的,从某种意义上说,它们从inputRDD中取出一行,并在其上应用一个函数。 它们的不同之处在于map中的函数只返回一个元素,而flatMap中的函数可以返回一个元素列表(0或更多)作为迭代器。
而且,flatMap的输出也是平坦的。 虽然flatMap中的函数返回一个元素列表,但flatMap返回一个RDD,其中包含列表中的所有元素(不是列表)。
map
返回的元素个数相等,而flatMap
可能不会。
flatMap
的示例用例过滤掉丢失或不正确的数据。
map
一个示例用例在各种各样的情况下,input和输出的元素数量是相同的。
number.csv
1 2 3 - 4 - 5
map.py在add.csv中添加所有数字。
from operator import * def f(row): try: return float(row) except Exception: return 0 rdd = sc.textFile('a.csv').map(f) print(rdd.count()) # 7 print(rdd.reduce(add)) # 15.0
flatMap.py使用flatMap
在添加之前过滤掉丢失的数据。 与以前的版本相比,添加了更less的数字。
from operator import * def f(row): try: return [float(row)] except Exception: return [] rdd = sc.textFile('a.csv').flatMap(f) print(rdd.count()) # 5 print(rdd.reduce(add)) # 15.0
Flatmap和Map都转换集合。
区别:
地图(FUNC)
通过函数func传递源的每个元素来形成一个新的分布式数据集。
flatMap(FUNC)
类似于map,但是每个input项可以映射到0个或更多个输出项(所以func应该返回一个Seq而不是单个项)。
转换function:
map :一个元素在 – >一个元素之外。
flatMap :一个元素在 – > 0个或多个元素之外(一个集合)。
对于所有想要PySpark相关的人:
示例转换:flatMap
>>> a="hello what are you doing" >>> a.split()
['hello','what','are','you','doing']
>>> b=["hello what are you doing","this is rak"] >>> b.split()
Traceback(最近一次调用的最后一个):AttributeError中的文件“”,第1行:“list”对象没有属性“split”
>>> rline=sc.parallelize(b) >>> type(rline)
>>> def fwords(x): ... return x.split() >>> rword=rline.map(fwords) >>> rword.collect()
[['hello','what','are','you','doing'],['this','是','rak']]
>>> rwordflat=rline.flatMap(fwords) >>> rwordflat.collect()
['hello','what','are','you','doing','this','是','rak']
希望能帮助到你 :)
从下面的示例pyspark代码可以看出差异:
rdd = sc.parallelize([2, 3, 4]) rdd.flatMap(lambda x: range(1, x)).collect() Output: [1, 1, 2, 1, 2, 3] rdd.map(lambda x: range(1, x)).collect() Output: [[1], [1, 2], [1, 2, 3]]
地图和flatMap的输出差异:
1. flatMap
val a = sc.parallelize(1 to 10, 5) a.flatMap(1 to _).collect()
输出:
1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
2. map
:
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.map(_.length).collect()
输出:
3 6 6 3 8