Apache Spark中的案例类相等

为什么Spark中的模式匹配与Scala中的模式匹配不一样? 看下面的例子…函数f()尝试模式匹配的类,它在Scala REPL中工作,但在Spark中失败,并导致所有“???”。 f2()是一种解决方法,它使用.isInstanceOf()在Spark中获得所需的结果,但是我明白在Scala中是不好的forms。

任何帮助模式匹配在这种情况下在火花正确的方式将不胜感激。

 abstract class a extends Serializable {val a: Int} case class b(a: Int) extends a case class bNull(a: Int=0) extends a val x: List[a] = List(b(0), b(1), bNull()) val xRdd = sc.parallelize(x) 

尝试在Scala REPL中工作的模式匹配,但在Spark中失败

 def f(x: a) = x match { case b(n) => "b" case bNull(n) => "bnull" case _ => "???" } 

在Spark中起作用的解决方法,但forms不好(我认为)

 def f2(x: a) = { if (x.isInstanceOf[b]) { "b" } else if (x.isInstanceOf[bNull]) { "bnull" } else { "???" } } 

查看结果

 xRdd.map(f).collect //does not work in Spark // result: Array("???", "???", "???") xRdd.map(f2).collect // works in Spark // resut: Array("b", "b", "bnull") x.map(f(_)) // works in Scala REPL // result: List("b", "b", "bnull") 

使用的版本…在spark-shell中运行Spark结果(AWS EMR-4.3上的Spark 1.6)Scala REPL in SBT 0.13.9(Scala 2.10.5)

这是Spark REPL的一个已知问题。 您可以在SPARK-2620中find更多的细节。 它会影响Spark REPL中的多个操作,包括PairwiseRDDs上的大部分转换。 例如:

 case class Foo(x: Int) val foos = Seq(Foo(1), Foo(1), Foo(2), Foo(2)) foos.distinct.size // Int = 2 val foosRdd = sc.parallelize(foos, 4) foosRdd.distinct.count // Long = 4 foosRdd.map((_, 1)).reduceByKey(_ + _).collect // Array[(Foo, Int)] = Array((Foo(1),1), (Foo(1),1), (Foo(2),1), (Foo(2),1)) foosRdd.first == foos.head // Boolean = false Foo.unapply(foosRdd.first) == Foo.unapply(foos.head) // Boolean = true 

更糟糕的是结果取决于数据分布:

 sc.parallelize(foos, 1).distinct.count // Long = 2 sc.parallelize(foos, 1).map((_, 1)).reduceByKey(_ + _).collect // Array[(Foo, Int)] = Array((Foo(2),2), (Foo(1),2)) 

你可以做的最简单的事情是在REPL之外定义和打包所需的案例类。 任何直接使用spark-submit代码也应该可以工作。

在Scala 2.11+中,你可以使用paste -raw -raw直接在REPL中创build一个包。

 scala> :paste -raw // Entering paste mode (ctrl-D to finish) package bar case class Bar(x: Int) // Exiting paste mode, now interpreting. scala> import bar.Bar import bar.Bar scala> sc.parallelize(Seq(Bar(1), Bar(1), Bar(2), Bar(2))).distinct.collect res1: Array[bar.Bar] = Array(Bar(1), Bar(2)) 
Interesting Posts