如何在数据集中存储自定义对象?
据介绍Spark数据集 :
当我们期待Spark 2.0时,我们计划对数据集进行一些令人振奋的改进,特别是:…自定义编码器 – 当我们目前自动生成各种类型的编码器时,我们希望打开自定义对象的API。
并尝试将自定义类型存储在Dataset
集中导致出现以下错误:
无法找到存储在数据集中的类型的编码器。 通过导入sqlContext.implicits._支持原始类型(Int,String等)和产品类型(case类)。支持序列化其他类型将在未来版本中添加
要么:
Java.lang.UnsupportedOperationException:没有找到编码器….
是否有任何现有的解决方法?
请注意,此问题仅作为社区Wiki答案的入口点存在。 随时更新/改善问题和答案。
更新
这个答案仍然有效和信息丰富,尽管现在2.2 / 2.3版本已经更好了,它增加了对Set
, Seq
, Map
, Date
, Timestamp
和BigDecimal
内置编码器支持。 如果你坚持仅仅使用case类和通常的Scala类型进行类型化,那么你应该只用SQLImplicits
的隐式SQLImplicits
。
不幸的是,几乎没有添加任何帮助。 在Encoders.scala
或SQLImplicits.scala
搜索@since 2.0.0
主要是为了处理基本类型(以及一些case类的调整)。 所以,首先要说的是,目前对定制类编码器没有真正的好的支持 。 这样一来,接下来就是一些我们可以期待的工作,因为我们现在掌握了这些工作。 作为一个前面的免责声明:这不会完美的工作,我会尽我所能,使所有的限制清晰和前期。
究竟是什么问题
当你想创建一个数据集时,Spark需要一个编码器(将T类型的JVM对象转换为内部的Spark SQL表示形式),这通常是通过SparkSession
implicits自动创建的,或者可以通过调用static Encoders
方法“(取自createDataset
上的文档 )。 编码器将采用Encoder[T]
的形式,其中T
是您正在编码的类型。 第一个建议是添加import spark.implicits._
(给你这些隐式编码器),第二个建议是使用这组编码器相关函数明确地传入隐式编码器。
没有编码器可用于常规课程,所以
import spark.implicits._ class MyObj(val i: Int) // ... val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
会给你下面隐含的相关编译时错误:
无法找到存储在数据集中的类型的编码器。 通过导入sqlContext.implicits._支持原始类型(Int,String等)和产品类型(case类)。支持序列化其他类型将在未来版本中添加
但是,如果您将任何仅用于在扩展Product
类中获得上述错误的类型进行封装,则会将错误混淆地延迟到运行时,所以
import spark.implicits._ case class Wrap[T](unwrap: T) class MyObj(val i: Int) // ... val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
编译得很好,但在运行时失败
java.lang.UnsupportedOperationException:没有找到MyObj的编码器
其原因是Spark创建的编码器实际上只是在运行时(通过scala重新获得)创建的。 在这种情况下,所有的Spark检查在编译时是最外面的类扩展Product
(所有的case类),并且只在运行时意识到它仍然不知道如何处理MyObj
(如果我尝试了同样的问题创建一个Dataset[(Int,MyObj)]
– Spark等待直到运行时MyObj
上的MyObj
)。 这些都是亟需解决的核心问题:
- 一些扩展了
Product
编译的类,尽管在运行时总是崩溃 - 没有办法传递嵌套类型的自定义编码器(我没有办法给Spark一个编码器只为
MyObj
这样它就知道如何编码Wrap[MyObj]
或(Int,MyObj)
)。
只要用kryo
大家建议的解决方案是使用kryo
编码器。
import spark.implicits._ class MyObj(val i: Int) implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj] // ... val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
这虽然很快,但非常繁琐。 特别是如果你的代码正在操纵各种数据集,加入,分组等等,你最终会得到一堆额外的暗示。 那么,为什么不只是自动地做一个隐含的事呢?
import scala.reflect.ClassTag implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = org.apache.spark.sql.Encoders.kryo[A](ct)
现在,似乎我几乎可以做任何我想要的东西(下面的例子不会在spark.implicits._
被自动导入的spark-shell
中工作)
class MyObj(val i: Int) val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3))) val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and .. val d3 = d1.map(d => (di, d)).alias("d3") // .. deals with the new type val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!
或几乎。 问题是使用kryo
导致Spark只是将数据集中的每一行都存储为一个扁平的二进制对象。 对于map
, filter
, foreach
就足够了,但对于像join
这样的操作,Spark确实需要将这些分隔成列。 检查d2
或d3
的模式,你会发现只有一个二进制列:
d2.printSchema // root // |-- value: binary (nullable = true)
元组的部分解决方案
所以,利用Scala中蕴含的魔力(更多的是在6.26.3重载解析中 ),我可以给自己一系列的含义 ,尽可能地做好工作,至少对于元组来说,并且能够很好地处理现有的含义:
import org.apache.spark.sql.{Encoder,Encoders} import scala.reflect.ClassTag import spark.implicits._ // we can still take advantage of all the old implicits implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c) implicit def tuple2[A1, A2]( implicit e1: Encoder[A1], e2: Encoder[A2] ): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2) implicit def tuple3[A1, A2, A3]( implicit e1: Encoder[A1], e2: Encoder[A2], e3: Encoder[A3] ): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3) // ... you can keep making these
然后,带着这些暗示,我可以让我的例子工作,虽然有一些专栏更名
class MyObj(val i: Int) val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3))) val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2") val d3 = d1.map(d => (di ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3") val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")
我还没有想出如何获得预期的元组名( _1
, _2
,…)默认情况下不重命名 – 如果别人想要玩这个, 这是名称"value"
得到引入和这是通常添加元组名称的地方。 但关键是我现在有一个很好的结构化模式:
d4.printSchema // root // |-- _1: struct (nullable = false) // | |-- _1: integer (nullable = true) // | |-- _2: binary (nullable = true) // |-- _2: struct (nullable = false) // | |-- _1: integer (nullable = true) // | |-- _2: binary (nullable = true)
所以,总之,这个解决方法:
- 允许我们为元组获得单独的列(所以我们可以再次加入元组,)
- 我们可以再次依靠这些暗示(所以不需要
kryo
所有的地方) - 几乎完全向后兼容
import spark.implicits._
(涉及一些重命名) - 不让我们加入
kyro
序列化的二进制列,更不用说那些可能有的字段 - 有一些令人不愉快的副作用,将某些元组列重命名为“value”(如果有必要,可以通过转换
.toDF
,指定新的列名并转换回数据集来取消这个操作),并且模式名似乎被保留通过连接,他们最需要的地方)。
一般的班级部分解决方案
这一个是不愉快的,没有好的解决方案。 然而,现在我们已经有了上面的元组解决方案,我从另一个答案中得到了一个隐式的转换解决方案,因为你可以把更复杂的类转换成元组,所以也不会那么痛苦。 然后,在创建数据集之后,您可能会使用数据框方法重命名这些列。 如果一切顺利的话,这是一个真正的改进,因为我现在可以在我的类的领域进行连接。 如果我刚刚使用了一个kryo
平面二进制kryo
序列化器。
下面是一个例子,它做了一些事情:我有一个MyObj
类,它有Int
, java.util.UUID
和Set[String]
类型的字段。 第一个照顾自己。 第二,虽然我可以使用kryo
序列化将更有用,如果存储为一个String
(因为UUID
通常是我想要加入)。 第三个真的只属于一个二元列。
class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String]) // alias for the type to convert to and from type MyObjEncoded = (Int, String, Set[String]) // implicit conversions implicit def toEncoded(o: MyObj): MyObjEncoded = (oi, outoString, os) implicit def fromEncoded(e: MyObjEncoded): MyObj = new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
现在,我可以使用这个机制创建一个具有良好模式的数据集:
val d = spark.createDataset(Seq[MyObjEncoded]( new MyObj(1, java.util.UUID.randomUUID, Set("foo")), new MyObj(2, java.util.UUID.randomUUID, Set("bar")) )).toDF("i","u","s").as[MyObjEncoded]
模式向我展示了我有正确名字的专栏和前两个我可以加入的东西。
d.printSchema // root // |-- i: integer (nullable = false) // |-- u: string (nullable = true) // |-- s: binary (nullable = true)
-
使用通用编码器。
现在有两个通用的编码器可用于
kryo
和javaSerialization
,其中后者被明确描述为:效率极低,只能作为最后的手段。
假设下面的课
class Bar(i: Int) { override def toString = s"bar $i" def bar = i }
您可以通过添加隐式编码器来使用这些编码器:
object BarEncoders { implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = org.apache.spark.sql.Encoders.kryo[Bar] }
它们可以一起使用如下:
object Main { def main(args: Array[String]) { val sc = new SparkContext("local", "test", new SparkConf()) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ import BarEncoders._ val ds = Seq(new Bar(1)).toDS ds.show sc.stop() } }
它将对象存储为
binary
列,因此在转换为DataFrame
您将获得以下模式:root |-- value: binary (nullable = true)
也可以使用
kryo
编码器对特定字段进行元组编码:val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar]) spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder) // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]
请注意,我们在这里不依赖于隐式编码器,而是明确地传递编码器,所以这很可能不适用于
toDS
方法。 -
使用隐式转换:
在可以被编码的表示和自定义类之间提供隐式转换,例如:
object BarConversions { implicit def toInt(bar: Bar): Int = bar.bar implicit def toBar(i: Int): Bar = new Bar(i) } object Main { def main(args: Array[String]) { val sc = new SparkContext("local", "test", new SparkConf()) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ import BarConversions._ type EncodedBar = Int val bars: RDD[EncodedBar] = sc.parallelize(Seq(new Bar(1))) val barsDS = bars.toDS barsDS.show barsDS.map(_.bar).show sc.stop() } }
相关问题:
- 如何为选项类型构造函数创建编码器,例如Option [Int]?
编码器在Spark2.0
工作或多或少相同。 而Kryo
仍然是推荐的serialization
选择。
你可以看下面的例子与火花外壳
scala> import spark.implicits._ import spark.implicits._ scala> import org.apache.spark.sql.Encoders import org.apache.spark.sql.Encoders scala> case class NormalPerson(name: String, age: Int) { | def aboutMe = s"I am ${name}. I am ${age} years old." | } defined class NormalPerson scala> case class ReversePerson(name: Int, age: String) { | def aboutMe = s"I am ${name}. I am ${age} years old." | } defined class ReversePerson scala> val normalPersons = Seq( | NormalPerson("Superman", 25), | NormalPerson("Spiderman", 17), | NormalPerson("Ironman", 29) | ) normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29)) scala> val ds1 = sc.parallelize(normalPersons).toDS ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int] scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name)) ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string] scala> ds1.show() +---------+---+ | name|age| +---------+---+ | Superman| 25| |Spiderman| 17| | Ironman| 29| +---------+---+ scala> ds2.show() +----+---------+ |name| age| +----+---------+ | 25| Superman| | 17|Spiderman| | 29| Ironman| +----+---------+ scala> ds1.foreach(p => println(p.aboutMe)) I am Ironman. I am 29 years old. I am Superman. I am 25 years old. I am Spiderman. I am 17 years old. scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name)) ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string] scala> ds2.foreach(p => println(p.aboutMe)) I am 17. I am Spiderman years old. I am 25. I am Superman years old. I am 29. I am Ironman years old.
直到现在]在当前范围内没有appropriate encoders
,所以我们的人员不被编码为binary
值。 但是一旦我们提供了一些使用Kryo
序列化的implicit
编码器,这将会改变。
// Provide Encoders scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson] normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary] scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson] reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary] // Ecoders will be used since they are now present in Scope scala> val ds3 = sc.parallelize(normalPersons).toDS ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary] scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name)) ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary] // now all our persons show up as binary values scala> ds3.show() +--------------------+ | value| +--------------------+ |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| +--------------------+ scala> ds4.show() +--------------------+ | value| +--------------------+ |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| +--------------------+ // Our instances still work as expected scala> ds3.foreach(p => println(p.aboutMe)) I am Ironman. I am 29 years old. I am Spiderman. I am 17 years old. I am Superman. I am 25 years old. scala> ds4.foreach(p => println(p.aboutMe)) I am 25. I am Superman years old. I am 29. I am Ironman years old. I am 17. I am Spiderman years old.
在Java Bean类的情况下,这可能是有用的
import spark.sqlContext.implicits._ import org.apache.spark.sql.Encoders implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass])
现在,您可以简单地将dataFrame读取为自定义的DataFrame
dataFrame.as[MyClass]
这将创建一个自定义类编码器,而不是一个二进制。
我的例子将在Java中,但我不认为适应Scala很困难。
使用spark.createDataset和Encoders.bean ,我相当成功地将RDD<Fruit>
转换为Dataset<Fruit>
,只要Fruit
是一个简单的Java Bean 。
第1步:创建简单的Java Bean。
public class Fruit implements Serializable { private String name = "default-fruit"; private String color = "default-color"; // AllArgsConstructor public Fruit(String name, String color) { this.name = name; this.color = color; } // NoArgsConstructor public Fruit() { this("default-fruit", "default-color"); } // ...create getters and setters for above fields // you figure it out }
在DataBricks人们加强编码器之前,我会坚持使用原始类型和字符串作为字段。 如果您有一个带有嵌套对象的类,则可以创建另一个简单的Java Bean,并将其所有字段放平,以便可以使用RDD转换将复杂类型映射到更简单的类型。 当然这是一个额外的工作,但是我认为这对于使用平面模式的性能有很大的帮助。
第2步:从RDD获取数据集
SparkSession spark = SparkSession.builder().getOrCreate(); JavaSparkContext jsc = new JavaSparkContext(); List<Fruit> fruitList = ImmutableList.of( new Fruit("apple", "red"), new Fruit("orange", "orange"), new Fruit("grape", "purple")); JavaRDD<Fruit> fruitJavaRDD = jsc.parallelize(fruitList); RDD<Fruit> fruitRDD = fruitJavaRDD.rdd(); Encoder<Fruit> fruitBean = Encoders.bean(Fruit.class); Dataset<Fruit> fruitDataset = spark.createDataset(rdd, bean);
瞧! 泡,冲洗,重复。