尝试将dataframe行映射到更新的行时发生编码器错误
当我尝试在我的代码中做同样的事情,如下所述
dataframe.map(row => { val row1 = row.getAs[String](1) val make = if (row1.toLowerCase == "tesla") "S" else row1 Row(row(0),make,row(2)) })
我从这里采取上述参考: 斯卡拉:我怎样才能在Dataframs中使用scalareplace值但是我得到编码器错误
无法find存储在数据集中的types的编码器。 原始types(Int,S tring等)和产品types(case类)通过导入spark.im plicits._支持序列化其他types将在未来版本中添加。
注意:我正在使用spark 2.0!
这里没有什么意外的。 您正试图使用Spark 1.x编写的代码,并且不再支持Spark 2.0:
- 在1.x
DataFrame.map
是((Row) ⇒ T)(ClassTag[T]) ⇒ RDD[T]
- 2.x
Dataset[Row].map
是((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]
说实话,在1.x中也没什么意义。 独立于版本,您可以简单地使用DataFrame
API:
import org.apache.spark.sql.functions.{when, lower} val df = Seq( (2012, "Tesla", "S"), (1997, "Ford", "E350"), (2015, "Chevy", "Volt") ).toDF("year", "make", "model") df.withColumn("make", when(lower($"make") === "tesla", "S").otherwise($"make"))
如果你真的想使用map
你应该使用静态types的Dataset
:
import spark.implicits._ case class Record(year: Int, make: String, model: String) df.as[Record].map { case tesla if tesla.make.toLowerCase == "tesla" => tesla.copy(make = "S") case rec => rec }
或者至less返回一个将有隐式编码器的对象:
df.map { case Row(year: Int, make: String, model: String) => (year, if(make.toLowerCase == "tesla") "S" else make, model) }
最后,如果出于某种完全疯狂的原因,您确实想要通过Dataset[Row]
映射,则必须提供所需的编码器:
import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.types._ import org.apache.spark.sql.Row // Yup, it would be possible to reuse df.schema here val schema = StructType(Seq( StructField("year", IntegerType), StructField("make", StringType), StructField("model", StringType) )) val encoder = RowEncoder(schema) df.map { case Row(year, make: String, model) if make.toLowerCase == "tesla" => Row(year, "S", model) case row => row } (encoder)