在Spark DataFrame中查找每个组的最大行数
我试图使用Spark数据框而不是RDDs,因为它们看起来比RDD更高级,并且倾向于生成更多可读的代码,但是我将非常乐意获得关于手头任务更为惯用的build议。
在一个14节点的Google Dataproc集群中,我有大约6百万个名字被两个不同的系统翻译成ID: sa
和sb
。 每Row
包含name
, id_sa
和id_sb
。 我的目标是产生从id_sa
到id_sb
的映射,使得对于每个id_sa
,相应的id_sb
是与id_sb
相连的所有名称中最频繁的id。
我们试着用一个例子来澄清一下。 如果我有以下几行:
[Row(name='n1', id_sa='a1', id_sb='b1'), Row(name='n2', id_sa='a1', id_sb='b2'), Row(name='n3', id_sa='a1', id_sb='b2'), Row(name='n4', id_sa='a2', id_sb='b2')]
我的目标是产生从a1
到b2
的映射。 实际上,与a1
相关的名称分别是n1
, n2
和n3
,它们分别映射到b1
, b2
和b2
,所以b2
是与a1
相关联的名称中最频繁的映射。 以同样的方式, a2
将被映射到b2
。 假设永远有一个赢家是没有问题的:不需要打破关系。
我希望我可以在我的数据groupBy(df.id_sa)
上使用groupBy(df.id_sa)
,但是我不知道下一步该怎么做。 我希望有一个聚合,最后可以产生以下几行:
[Row(id_sa=a1, max_id_sb=b2), Row(id_sa=a2, max_id_sb=b2)]
但也许我试图使用错误的工具,我应该回去使用RDD。
使用join
(如果join
,会导致组中有多行):
import pyspark.sql.functions as F from pyspark.sql.functions import count, col cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts") maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs") cnts.join(maxs, (col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa")) ).select(col("cnts.id_sa"), col("cnts.id_sb"))
使用窗口函数(将放弃关系):
from pyspark.sql.functions import rowNumber from pyspark.sql.window import Window w = Window().partitionBy("id_sa").orderBy(col("cnt").desc()) (cnts .withColumn("rn", rowNumber().over(w)) .where(col("rn") == 1) .select("id_sa", "id_sb"))
使用struct
sorting:
from pyspark.sql.functions import struct (cnts .groupBy("id_sa") .agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max")) .select(col("id_sa"), col("max.id_sb")))
另请参阅SPARK DataFrame:select每个组的第一行
我想你可能正在寻找窗口函数: http : //spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight= window# pyspark.sql.Window
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
下面是Scala中的一个例子(我现在没有Hive可用的Spark Shell,所以我无法testing代码,但我认为它应该可以工作):
case class MyRow(name: String, id_sa: String, id_sb: String) val myDF = sc.parallelize(Array( MyRow("n1", "a1", "b1"), MyRow("n2", "a1", "b2"), MyRow("n3", "a1", "b2"), MyRow("n1", "a2", "b2") )).toDF("name", "id_sa", "id_sb") import org.apache.spark.sql.expressions.Window val windowSpec = Window.partitionBy(myDF("id_sa")).orderBy(myDF("id_sb").desc) myDF.withColumn("max_id_b", first(myDF("id_sb")).over(windowSpec).as("max_id_sb")).filter("id_sb = max_id_sb")
有可能更有效的方法来实现与Window函数相同的结果,但我希望这指出你在正确的方向。