Apache Spark的主键
我有一个与Apache Spark和PostgreSQL的JDBC连接,我想插入一些数据到我的数据库。 当我使用append
模式时,我需要为每个DataFrame.Row
指定id
。 有什么办法让Spark创build主键?
斯卡拉 :
如果您只需要唯一的数字,您可以使用zipWithUniqueId
并重新创buildDataFrame。 首先是一些import和虚拟数据:
import sqlContext.implicits._ import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructType, StructField, LongType} val df = sc.parallelize(Seq( ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")
提取模式以进一步使用:
val schema = df.schema
添加ID字段:
val rows = df.rdd.zipWithUniqueId.map{ case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
创build数据框:
val dfWithPK = sqlContext.createDataFrame( rows, StructType(StructField("id", LongType, false) +: schema.fields))
在Python中同样的事情:
from pyspark.sql import Row from pyspark.sql.types import StructField, StructType, LongType row = Row("foo", "bar") row_with_index = Row(*["id"] + df.columns) df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF() def make_row(columns): def _make_row(row, uid): row_dict = row.asDict() return row_with_index(*[uid] + [row_dict.get(c) for c in columns]) return _make_row f = make_row(df.columns) df_with_pk = (df.rdd .zipWithUniqueId() .map(lambda x: f(*x)) .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))
如果你更喜欢连续的数字,你可以使用zipWithUniqueId
来代替zipWithUniqueId
,但是这样会更贵一些。
直接使用DataFrame
API :
(通用Scala,Python,Java,R几乎相同的语法)
以前,我已经错过了monotonicallyIncreasingId
函数应该工作得很好,只要你不需要连续的数字:
import org.apache.spark.sql.functions.monotonicallyIncreasingId df.withColumn("id", monotonicallyIncreasingId).show() // +---+----+-----------+ // |foo| bar| id| // +---+----+-----------+ // | a|-1.0|17179869184| // | b|-2.0|42949672960| // | c|-3.0|60129542144| // +---+----+-----------+
虽然有用的monotonicallyIncreasingId
是非确定性的。 不仅id可能不同于执行到执行,而且当后续操作包含filter时,不能用于标识行。
注意 :
也可以使用rowNumber
窗口函数:
from pyspark.sql.window import Window from pyspark.sql.functions import rowNumber w = Window().orderBy() df.withColumn("id", rowNumber().over(w)).show()
不幸:
警告窗口:没有为窗口操作定义分区! 将所有数据移动到一个分区,这可能会导致严重的性能下降。
所以除非你有一个自然的方式来分割你的数据,并确保唯一性在这个时刻不是特别有用。
from pyspark.sql.functions import monotonically_increasing_id df.withColumn("id", monotonically_increasing_id()).show()
请注意,df.withColumn的第二个参数是monotonically_increasing_id()而不是monotonically_increasing_id。
对于zipWithIndex()是所需的行为,即对于那些希望连续的整数的情况,我发现以下解决scheme相对简单。
在这种情况下,我们使用pyspark并依靠字典理解将原始行对象映射到适合包括唯一索引的新模式的新字典。
# read the initial dataframe without index dfNoIndex = sqlContext.read.parquet(dataframePath) # Need to zip together with a unique integer # First create a new schema with uuid field appended newSchema = StructType([StructField("uuid", IntegerType(), False)] + dfNoIndex.schema.fields) # zip with the index, map it to a dictionary which includes new field df = dfNoIndex.rdd.zipWithIndex()\ .map(lambda (row, id): {k:v for k, v in row.asDict().items() + [("uuid", id)]})\ .toDF(newSchema)