如何更改Spark SQL的DataFrame中的列types?
假设我正在做这样的事情:
val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true")) df.printSchema() root |-- year: string (nullable = true) |-- make: string (nullable = true) |-- model: string (nullable = true) |-- comment: string (nullable = true) |-- blank: string (nullable = true) df.show() year make model comment blank 2012 Tesla S No comment 1997 Ford E350 Go get one now th...
但我真的希望year
为Int
(也许改变一些其他栏)。
我能想到的最好的是
df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank) org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string]
这有点复杂。
我来自R,而且我习惯于写作,例如
df2 <- df %>% mutate(year = year %>% as.integer, make = make %>% toupper)
我很可能错过了一些东西,因为在spark / scala中应该有更好的方法来做到这一点。
[编辑:2016年3月:感谢您的投票! 虽然这确实不是最好的答案,但我认为,由梅尔曼,马丁·塞内等人提出的以列为基础的解决scheme,以withColumn
withColumnRenamed
和方法更简单,更简洁。
我认为你的方法是可以的,回想一下,Spark DataFrame
是DataFrame
(不可变的)RDD,所以我们从来没有真正replace过列,只是每次使用新的模式创build新的DataFrame
。
假设你有一个原始的DF,下面的模式:
scala> df.printSchema root |-- Year: string (nullable = true) |-- Month: string (nullable = true) |-- DayofMonth: string (nullable = true) |-- DayOfWeek: string (nullable = true) |-- DepDelay: string (nullable = true) |-- Distance: string (nullable = true) |-- CRSDepTime: string (nullable = true)
一些UDF定义在一个或几个列上:
import org.apache.spark.sql.functions._ val toInt = udf[Int, String]( _.toInt) val toDouble = udf[Double, String]( _.toDouble) val toHour = udf((t: String) => "%04d".format(t.toInt).take(2).toInt ) val days_since_nearest_holidays = udf( (year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12 )
改变列types,甚至build立一个新的DataFrame可以这样写:
val featureDf = df .withColumn("departureDelay", toDouble(df("DepDelay"))) .withColumn("departureHour", toHour(df("CRSDepTime"))) .withColumn("dayOfWeek", toInt(df("DayOfWeek"))) .withColumn("dayOfMonth", toInt(df("DayofMonth"))) .withColumn("month", toInt(df("Month"))) .withColumn("distance", toDouble(df("Distance"))) .withColumn("nearestHoliday", days_since_nearest_holidays( df("Year"), df("Month"), df("DayofMonth")) ) .select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth", "month", "distance", "nearestHoliday")
这产生:
scala> df.printSchema root |-- departureDelay: double (nullable = true) |-- departureHour: integer (nullable = true) |-- dayOfWeek: integer (nullable = true) |-- dayOfMonth: integer (nullable = true) |-- month: integer (nullable = true) |-- distance: double (nullable = true) |-- nearestHoliday: integer (nullable = true)
这非常接近你自己的解决scheme。 简单地说,将types更改和其他转换保持为单独的udf val
使代码更具可读性和可重用性。
由于Spark版本1.4,你可以在列上应用带有DataType的转换方法:
import org.apache.spark.sql.types.IntegerType val df2 = df.withColumn("yearTmp", df.year.cast(IntegerType)) .drop("year") .withColumnRenamed("yearTmp", "year")
如果你正在使用sqlexpression式,你也可以这样做:
val df2 = df.selectExpr("cast(year as int) year", "make", "model", "comment", "blank")
有关更多信息,请查看文档: http : //spark.apache.org/docs/1.6.0/api/scala/#org.apache.spark.sql.DataFrame
由于Spark Column
的cast
操作是可用的(因为在这一点上我个人不赞成udf
提出的Svend
),那么如何:
df.select( df("year").cast(IntegerType).as("year"), ... )
投到要求的types? 作为一个整洁的副作用,在这个意义上不可转换/“可转换”的值将变为null
。
如果你需要这个辅助方法 ,使用:
object DFHelper{ def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = { df.withColumn( cn, df(cn).cast(tpe) ) } }
这是使用像:
import DFHelper._ val df2 = castColumnTo( df, "year", IntegerType )
首先如果你想投型
import org.apache.spark.sql df.withColumn("year", $"year".cast(sql.types.IntegerType))
使用相同的列名称,该列将被replace为新列,您不需要添加和删除。
其次,关于Scala vs R.与我可以实现的最相似的Scala代码:
val df2 = df.select( df.columns.map { case year @ "year" => df(year).cast(IntegerType).as(year) case make @ "make" => functions.upper(df(make)).as(make) case other => df(other) }: _* )
虽然长度比R的长一点。 请注意, mutate
是R数据框的一个函数,所以Scala在performance力方面非常好,不需要特殊的function。
( df.columns
令人惊讶的是Array [String]而不是Array [Column],也许他们希望它看起来像Pythonpandas的数据框。
你可以使用selectExpr
来使它更清洁一些:
df.selectExpr("cast(year as int) as year", "upper(make) as make", "model", "comment", "blank")
要将年从string转换为int,可以将以下选项添加到csv阅读器:“inferSchema” – >“true”,请参阅DataBricks文档
所以这只有当你有问题保存到像sqlserver这样的jdbc驱动程序时才有效,但是对于语法和types会遇到的错误是非常有帮助的。
import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect} import org.apache.spark.sql.jdbc.JdbcType val SQLServerDialect = new JdbcDialect { override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver") override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { case StringType => Some(JdbcType("VARCHAR(5000)", java.sql.Types.VARCHAR)) case BooleanType => Some(JdbcType("BIT(1)", java.sql.Types.BIT)) case IntegerType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER)) case LongType => Some(JdbcType("BIGINT", java.sql.Types.BIGINT)) case DoubleType => Some(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE)) case FloatType => Some(JdbcType("REAL", java.sql.Types.REAL)) case ShortType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER)) case ByteType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER)) case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY)) case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE)) case DateType => Some(JdbcType("DATE", java.sql.Types.DATE)) // case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC)) case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL)) case _ => throw new IllegalArgumentException(s"Don't know how to save ${dt.json} to JDBC") } } JdbcDialects.registerDialect(SQLServerDialect)
用于将DataFrame的数据types从String修改为Integer的Java代码
df.withColumn("col_name", df.col("col_name").cast(DataTypes.IntegerType))
它将简单地将现有的(String数据types)转换为Integer。
df.select($"long_col".cast(IntegerType).as("int_col"))
build议使用cast的答案,FYI,火花1.4.1中的cast方法被打破。
例如,对于bigint而言,具有值为“8182175552014127960”的string列的dataframe具有值“8182175552014128100”
df.show +-------------------+ | a| +-------------------+ |8182175552014127960| +-------------------+ df.selectExpr("cast(a as bigint) a").show +-------------------+ | a| +-------------------+ |8182175552014128100| +-------------------+
在发现这个bug之前,我们不得不面对很多问题,因为我们在生产中有bigint列。
该方法将删除旧列,并创build具有相同值和新数据types的新列。 DataFrame创build时的原始数据types是: –
root |-- id: integer (nullable = true) |-- flag1: string (nullable = true) |-- flag2: string (nullable = true) |-- name: string (nullable = true) |-- flag3: string (nullable = true)
之后,我跑了下面的代码来改变数据types:
df=df.withColumnRenamed(<old column name>,<dummy column>) // This was done for both flag1 and flag3 df=df.withColumn(<old column name>,df.col(<dummy column>).cast(<datatype>)).drop(<dummy column>)
在这之后我的结果是:
root |-- id: integer (nullable = true) |-- flag2: string (nullable = true) |-- name: string (nullable = true) |-- flag1: boolean (nullable = true) |-- flag3: boolean (nullable = true)
val fact_df = df.select($"data"(30) as "TopicTypeId", $"data"(31) as "TopicId",$"data"(21).cast(FloatType).as( "Data_Value_Std_Err")).rdd //Schema to be applied to the table val fact_schema = (new StructType).add("TopicTypeId", StringType).add("TopicId", StringType).add("Data_Value_Std_Err", FloatType) val fact_table = sqlContext.createDataFrame(fact_df, fact_schema).dropDuplicates()
可以通过在spark sql中使用强制转换来更改列的数据types。 表名是表,它有两列只有column1和column2和column1的数据types是要改变的。 ex-spark.sql(“select cast(column1 as Double)column1NewName,column2 from table”)代替double写入数据types。