DataFrame组通过行为/优化
假设我们有DataFrame df
由以下列组成:
名称,姓氏,大小,宽度,长度,称重
现在我们要执行一些操作,例如我们想创build一些包含大小和宽度数据的DataFrame。
val df1 = df.groupBy("surname").agg( sum("size") ) val df2 = df.groupBy("surname").agg( sum("width") )
正如您可以注意到的那样,其他列(如Length)不会在任何地方使用。 Spark是否足够聪明,可以在混洗阶段之前删除多余的列,或者他们是否随身携带? Wil跑步:
val dfBasic = df.select("surname", "size", "width")
之前分组莫名其妙地影响性能?
是的,这是“ 足够聪明 ”。 groupBy
在groupBy
执行的操作与在普通RDD上执行的DataFrame
操作groupBy
。 在你描述的场景中,根本不需要移动原始数据。 我们来创build一个小例子来说明:
val df = sc.parallelize(Seq( ("a", "foo", 1), ("a", "foo", 3), ("b", "bar", 5), ("b", "bar", 1) )).toDF("x", "y", "z") df.groupBy("x").agg(sum($"z")).explain // == Physical Plan == // TungstenAggregate(key=[x#3], functions=[(sum(cast(z#5 as bigint)),mode=Final,isDistinct=false)], output=[x#3,sum(z)#11L]) // TungstenExchange hashpartitioning(x#3) // TungstenAggregate(key=[x#3], functions=[(sum(cast(z#5 as bigint)),mode=Partial,isDistinct=false)], output=[x#3,currentSum#20L]) // TungstenProject [_1#0 AS x#3,_3#2 AS z#5] // Scan PhysicalRDD[_1#0,_2#1,_3#2]
正如你可以第一个阶段是一个投影,只保留所需的列。 接下来的数据在本地进行汇总,最后在全球进行汇总和汇总。 如果使用Spark <= 1.4,则会得到一些不同的答案输出,但一般结构应该完全相同。
最后是一个DAG可视化,显示以上描述描述了实际的工作: