斯卡拉vs Python的Spark性能

我比Python更喜欢Python。 但是,由于Spark本身就是用Scala编写的,所以我期望我的代码在Scala中运行得比Python版本更快,原因很明显。

有了这个假设,我想学习和写一些非常普通的预处理代码的Scala版本的一些1 GB的数据。 数据来自Kaggle的SpringLeaf竞赛。 只是给出了数据的概述(它包含1936年的维度和145232行)。 数据由各种types组成,如int,float,string,boolean。 我正在使用6个核心中的8个进行Spark处理; 这就是为什么我使用minPartitions=6以便每个核心都有可处理的东西。

斯卡拉代码

 val input = sc.textFile("train.csv", minPartitions=6) val input2 = input.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter } val delim1 = "\001" def separateCols(line: String): Array[String] = { val line2 = line.replaceAll("true", "1") val line3 = line2.replaceAll("false", "0") val vals: Array[String] = line3.split(",") for((x,i) <- vals.view.zipWithIndex) { vals(i) = "VAR_%04d".format(i) + delim1 + x } vals } val input3 = input2.flatMap(separateCols) def toKeyVal(line: String): (String, String) = { val vals = line.split(delim1) (vals(0), vals(1)) } val input4 = input3.map(toKeyVal) def valsConcat(val1: String, val2: String): String = { val1 + "," + val2 } val input5 = input4.reduceByKey(valsConcat) input5.saveAsTextFile("output") 

Python代码

 input = sc.textFile('train.csv', minPartitions=6) DELIM_1 = '\001' def drop_first_line(index, itr): if index == 0: return iter(list(itr)[1:]) else: return itr input2 = input.mapPartitionsWithIndex(drop_first_line) def separate_cols(line): line = line.replace('true', '1').replace('false', '0') vals = line.split(',') vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"')) for e, val in enumerate(vals)] return vals2 input3 = input2.flatMap(separate_cols) def to_key_val(kv): key, val = kv.split(DELIM_1) return (key, val) input4 = input3.map(to_key_val) def vals_concat(v1, v2): return v1 + ',' + v2 input5 = input4.reduceByKey(vals_concat) input5.saveAsTextFile('output') 

斯卡拉性能阶段0(38分钟),阶段1(18秒) 在这里输入图像描述

Python性能阶段0(11分钟),阶段1(7秒) 在这里输入图像描述

两者都产生不同的DAG可视化图(由于这两个图片显示Scala( map )和Python( reduceByKey )的不同阶段0函数)

但是,实质上这两个代码都试图将数据转换为(dimension_id,值列表的string)RDD并保存到磁盘。 输出将用于计算每个维度的各种统计数据。

性能方面,像这样的真实数据的Scala代码似乎比Python版本运行速度慢4倍 。 对我来说好消息是,它给了我很好的留在Python的动力。 坏消息是我不明白为什么?


讨论代码的原始答​​案可以在下面find。


首先,您必须区分不同types的API,每个API都有自己的性能考虑因素。

RDD API

(具有基于JVM的业务stream程的纯Python结构)

这是最受Python代码性能和PySpark实现细节影响的组件。 虽然Python性能不太可能成为问题,但至less有几个因素需要考虑:

  • JVM通信的开销。 实际上所有来自Python执行程序的数据必须通过套接字和JVM工作程序传递。 虽然这是一个相对有效的本地通信,但它仍然不是免费的。
  • 基于进程的执行程序(Python)与基于线程的(单个JVMmultithreading)执行程序(Scala)。 每个Python执行者都在自己的进程中运行。 作为一个副作用,它提供了比它的JVM对手更强的隔离性,以及对执行程序生命周期的一些控制,但可能会显着增加内存使用量:

    • 解释器内存占用
    • 加载库的占用空间
    • 效率较低的广播(每个过程需要自己的广播副本)
  • Python代码本身的性能。 一般来说,Scala比Python更快,但是对于任务而言,它会有所不同。 此外,你有多种select,包括像Numba ,C扩展( Cython )或像Theano专业图书馆的JIT 。 最后, 如果你不使用ML / MLlib(或简单的NumPy堆栈) ,可以考虑使用PyPy作为替代解释器。 参见SPARK-3094 。

  • PySparkconfiguration提供了spark.python.worker.reuse选项,可以用来select为每个任务spark.python.worker.reuse Python进程和重用现有进程。 后一种select似乎有助于避免昂贵的垃圾收集(这比系统testing的结果更为显着),而前者(默认)对于昂贵的广播和import而言是最佳的。
  • 引用计数作为CPython中的第一行垃圾回收方法,与典型的Spark工作负载(类stream处理,无参考周期)相当好,可以减lessGC停顿的风险。

MLlib

(混合Python和JVM执行)

基本考虑因素与以前几乎一样,还有一些其他问题。 虽然用于MLlib的基本结构是普通的Python RDD对象,但是所有的algorithm都是直接使用Scala来执行的。

这意味着将Python对象转换为Scala对象的附加成本,反过来,增加的内存使用和一些额外的限制,我们稍后会介绍。

截至目前(Spark 2.x),基于RDD的API处于维护模式, 计划在Spark 3.0中删除 。

DataFrame API和Spark ML

(使用限于驱动程序的Python代码执行JVM)

这些可能是标准数据处理任务的最佳select。 由于Python代码大多局限于驱动程序的高级逻辑操作,因此Python和Scala之间应该没有性能差异。

一个例外就是使用行式的Python UDF,它比Scala等效的效率低得多。 虽然有一些改进的机会(Spark 2.0.0已经有了大量的发展),但是最大的限制是内部表示(JVM)和Python解释器之间的全面往返。 如果可能的话,你应该赞成内buildexpression式的组合( 例如 ,Python 2.0的行为在Spark 2.0.0中已经得到了改进,但是与本地执行相比,它仍然是不理想的,随着向量化UDF的引入, (SPARK-21190) 。

还要确保避免DataFramesRDDs之间不必要的传递数据。 这需要昂贵的序列化和反序列化,更不用说向Python解释器传输数据。

值得注意的是,Py4J的调用具有很高的延迟。 这包括简单的调用,如:

 from pyspark.sql.functions import col col("foo") 

通常,这应该不重要(开销是恒定的,不依赖于数据量),但在软实时应用程序的情况下,可以考虑caching/重用Java包装。

GraphX和Spark数据集

至于现在(Spark 1.6 2.1),没有一个提供PySpark API,所以你可以说PySpark比Scala差得多。

GraphX

实际上,GraphX开发几乎完全停止,项目目前处于维护模式, closures的JIRA门票closures,因为无法修复 。 GraphFrames库提供了一个替代的graphics处理库与Python绑定。

数据集

主观上来说,在Python中静态typesDatasets没有太多的地方,即使当前的Scala实现过于简单,也不能提供与DataFrame相同的性能优势。

stream

从我目前看到的,我强烈推荐使用Python的Scala。 如果PySpark获得对结构化stream的支持,未来可能会发生变化,但现在Scala API似乎更加健壮,全面和高效。 我的经验是相当有限的。

Spark 2.x中的结构化stream式处理似乎减less了语言之间的差距,但现在它还处于早期阶段。 不过,基于RDD的API在Databricks文档 (访问date2017-03-03)中已经被引用为“传统stream式传输”),所以期待进一步的统一工作是合理的。

非性能方面的考虑

function奇偶

并非所有的Sparkfunction都通过PySpark API公开。 请务必检查您所需的部件是否已经实施,并尝试了解可能的限制。

当使用MLlib和类似的混合上下文时,这一点尤为重要(请参阅从任务调用Java / Scala函数 )。 公平地说PySpark API的某些部分,比如mllib.linalg ,提供了比Scala更为全面的方法。

APIdevise

PySpark API密切反映了它的Scala对应,因此不完全是Pythonic。 这意味着在语言之间进行映射是非常容易的,但同时Python代码可能会更难以理解。

复杂的build筑

与纯JVM执行相比,PySpark数据stream相对复杂。 PySpark程序或debugging的理由要困难得多。 此外,至less对Scala和JVM的基本理解是非常必要的。

Spark 2.x及更高版本

Dataset API的持续转变,以及冻结的RDD API给Python用户带来了机遇和挑战。 虽然API的高级部分在Python中更容易公开,但更高级的function几乎不可能直接使用。

此外,本地Pythonfunction仍然是SQL世界中的二等公民。 希望Apache Arrow序列化在未来会有所改进( 目前的数据collection工作目标是数据collection但UDF serde是一个长期目标 )。

对于强烈依赖于Python代码库的项目,纯Python替代品(如Dask或Ray )可能是一个有趣的select。

它不一定是一个对另一个

Spark DataFrame(SQL,Dataset)API提供了一个在PySpark应用程序中集成Scala / Java代码的方法。 您可以使用DataFrames将数据公开到本机JVM代码并读取结果。 我已经在其他地方解释了一些选项,你可以在Pyspark里面find如何使用Scala类的Python-Scala往返实例 。

可以通过引入用户定义的types来进一步增强它(请参阅如何在Spark SQL中定义自定义types的模式? )。


问题中提供的代码有什么问题

(免责声明:Pythonista的观点,很可能我错过了一些Scala技巧)

首先,你的代码中有一部分根本没有意义。 如果你已经有使用zipWithIndex创build的(key, value)对,或者enumerate创buildstring的目的是什么? flatMap不能recursion地工作,所以你可以简单地产生元组并跳过下面的map

另一个我觉得有问题的部分是reduceByKey 。 一般来说,如果应用聚合函数可以减less必须进行混洗的数据量, reduceByKey非常有用。 既然你简单地连接string,这里没有任何东西可以获得。 忽略低级别的东西,比如引用的数量,你必须传输的数据量与groupByKey完全一样。

通常我不会详述,但据我所知,这是Scala代码中的一个瓶颈。 在JVM上连接string是一个相当昂贵的操作(例如,请参阅scala中的string连接和Java中的代价一样昂贵 )。 这意味着像这样的_.reduceByKey((v1: String, v2: String) => v1 + ',' + v2)这是相当于input4.reduceByKey(valsConcat)在你的代码是不是一个好主意。

如果你想避免使用groupByKey你可以尝试在StringBuilder使用aggregateByKey 。 类似的东西应该做的诀窍:

 rdd.aggregateByKey(new StringBuilder)( (acc, e) => { if(!acc.isEmpty) acc.append(",").append(e) else acc.append(e) }, (acc1, acc2) => { if(acc1.isEmpty | acc2.isEmpty) acc1.addString(acc2) else acc1.append(",").addString(acc2) } ) 

但我怀疑这是值得所有的大惊小怪。

牢记上面的内容,我已经重写了你的代码,如下所示:

斯卡拉

 val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{ (idx, iter) => if (idx == 0) iter.drop(1) else iter } val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{ case ("true", i) => (i, "1") case ("false", i) => (i, "0") case p => p.swap }) val result = pairs.groupByKey.map{ case (k, vals) => { val valsString = vals.mkString(",") s"$k,$valsString" } } result.saveAsTextFile("scalaout") 

Python

 def drop_first_line(index, itr): if index == 0: return iter(list(itr)[1:]) else: return itr def separate_cols(line): line = line.replace('true', '1').replace('false', '0') vals = line.split(',') for (i, x) in enumerate(vals): yield (i, x) input = (sc .textFile('train.csv', minPartitions=6) .mapPartitionsWithIndex(drop_first_line)) pairs = input.flatMap(separate_cols) result = (pairs .groupByKey() .map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1])))) result.saveAsTextFile("pythonout") 

结果

local[6]模式下(Intel(R)Xeon(R)CPU E3-1245 V2 @ 3.40GHz),每个执行程序4GB内存,需要(n = 3):

  • 斯卡拉 – 平均值:250.00s,stdev:12.49
  • Python – 意思是:246.66s,stdev:1.15

我很确定,大部分时间都花在洗牌,序列化,反序列化和其他次要任务上。 只是为了好玩,这里是Python中朴素的单线程代码,在不到一分钟的时间内在这台机器上执行相同的任务:

 def go(): with open("train.csv") as fr: lines = [ line.replace('true', '1').replace('false', '0').split(",") for line in fr] return zip(*lines[1:])