连接apache spark数据框中的列
我们如何在数据框中连接2列? 有没有在火花SQL中的任何function,我们可以用来连接一个DF表中的2列。
使用原始的SQL你可以使用CONCAT
:
-
在Python中
df = sqlContext.createDataFrame([("foo", 1), ("bar", 2)], ("k", "v")) df.registerTempTable("df") sqlContext.sql("SELECT CONCAT(k, ' ', v) FROM df")
-
在斯卡拉
import sqlContext.implicits._ val df = sc.parallelize(Seq(("foo", 1), ("bar", 2))).toDF("k", "v") df.registerTempTable("df") sqlContext.sql("SELECT CONCAT(k, ' ', v) FROM df")
自Spark 1.5.0以来,您可以使用带有DataFrame API的concat
函数:
-
在Python中:
from pyspark.sql.functions import concat, col, lit df.select(concat(col("k"), lit(" "), col("v")))
-
在Scala中:
import org.apache.spark.sql.functions.{concat, lit} df.select(concat($"k", lit(" "), $"v"))
还有一个concat_ws
函数,它将string分隔符作为第一个参数。
如果你想使用DF,你可以使用udf来添加一个基于现有列的新列。
val sqlContext = new SQLContext(sc) case class MyDf(col1: String, col2: String) //here is our dataframe val df = sqlContext.createDataFrame(sc.parallelize( Array(MyDf("A", "B"), MyDf("C", "D"), MyDf("E", "F")) )) //Define a udf to concatenate two passed in string values val getConcatenated = udf( (first: String, second: String) => { first + " " + second } ) //use withColumn method to add a new column called newColName df.withColumn("newColName", getConcatenated($"col1", $"col2")).select("newColName", "col1", "col2").show()
以下是如何做自定义命名
import pyspark from pyspark.sql import functions as sf sc = pyspark.SparkContext() sqlc = pyspark.SQLContext(sc) df = sqlc.createDataFrame([('row11','row12'), ('row21','row22')], ['colname1', 'colname2']) df.show()
给,
+--------+--------+ |colname1|colname2| +--------+--------+ | row11| row12| | row21| row22| +--------+--------+
通过连接创build新列:
df = df.withColumn('joined_column', sf.concat(sf.col('colname1'),sf.lit('_'), sf.col('colname2'))) df.show() +--------+--------+-------------+ |colname1|colname2|joined_column| +--------+--------+-------------+ | row11| row12| row11_row12| | row21| row22| row21_row22| +--------+--------+-------------+
这是另一种做这个pyspark的方法:
#import concat and lit functions from pyspark.sql.functions from pyspark.sql.functions import concat, lit #Create your data frame countryDF = sqlContext.createDataFrame([('Ethiopia',), ('Kenya',), ('Uganda',), ('Rwanda',)], ['East Africa']) #Use select, concat, and lit functions to do the concatenation personDF = countryDF.select(concat(countryDF['East Africa'], lit('n')).alias('East African')) #Show the new data frame personDF.show() ----------RESULT------------------------- 84 +------------+ |East African| +------------+ | Ethiopian| | Kenyan| | Ugandan| | Rwandan| +------------+
当您不知道数据框中列的编号或名称时,build议您提供一个build议。
val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*))
pySpark使用sqlContext的另一种方法…
#Suppose we have a dataframe: df = sqlContext.createDataFrame([('row1_1','row1_2')], ['colname1', 'colname2']) # Now we can concatenate columns and assign the new column a name df = df.select(concat(df.colname1, df.colname2).alias('joined_colname'))