Spark:如何将Python与Scala或Java用户定义函数进行映射?

比方说,我们的团队已经selectPython作为用Spark开发的参考语言。 但后来出于性能方面的原因,我们想开发特定的Scala或Java特定的库,以便将它们与我们的Python代码(类似于具有Scala或Java框架的Python存根)进行映射。

难道你不觉得是否有可能通过一些Scala或Java用户定义函数来接口新的自定义Python方法?

Spark 2.1+

您可以使用SQLContext.registerJavaFunction

注册一个Java UDF,以便在SQL语句中使用它。

它需要一个name ,Java类的完全限定名,以及可选的返回types。 不幸的是,现在它只能用在SQL语句(或expr / selectExpr )中,并且需要Java org.apache.spark.sql.api.java.UDF*

 scalaVersion := "2.11.8" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-sql" % "2.1.0" ) 
 package com.example.spark.udfs import org.apache.spark.sql.api.java.UDF1 class addOne extends UDF1[Integer, Integer] { def call(x: Integer) = x + 1 } 
 sqlContext.registerJavaFunction("add_one", "com.example.spark.udfs.addOne") sqlContext.sql("SELECT add_one(1)").show() ## +------+ ## |UDF(1)| ## +------+ ## | 2| ## +------+ 

版本独立

我不会说得到支持,但肯定是可以的。 目前在PySpark中可用的所有SQL函数只是一个围绕Scala API的包装器。

让我们假设我想重用GroupConcat UDAF我已经创build了作为SPARK SQLreplacemysql GROUP_CONCAT聚合函数的答案,它位于一个包com.example.udaf

 from pyspark.sql.column import Column, _to_java_column, _to_seq from pyspark.sql import Row row = Row("k", "v") df = sc.parallelize([ row(1, "foo1"), row(1, "foo2"), row(2, "bar1"), row(2, "bar2")]).toDF() def groupConcat(col): """Group and concatenate values for a given column >>> df = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v")) >>> df.select(groupConcat("v").alias("vs")) [Row(vs=u'foo,bar')] """ sc = SparkContext._active_spark_context # It is possible to use java_import to avoid full package path _groupConcat = sc._jvm.com.example.udaf.GroupConcat.apply # Converting to Seq to match apply(exprs: Column*) return Column(_groupConcat(_to_seq(sc, [col], _to_java_column))) df.groupBy("k").agg(groupConcat("v").alias("vs")).show() ## +---+---------+ ## | k| vs| ## +---+---------+ ## | 1|foo1,foo2| ## | 2|bar1,bar2| ## +---+---------+ 

我的口味中有太多的主要下划线,但你可以看到它可以完成。

相关:

  • 从任务调用Java / Scala函数
  • 如何在Pyspark内使用Scala类
  • 用Scala转换PySpark RDD