斯卡拉vs Python的Spark性能
我比Python更喜欢Python。 但是,由于Spark本身就是用Scala编写的,所以我期望我的代码在Scala中运行得比Python版本更快,原因很明显。
有了这个假设,我想学习和写一些非常普通的预处理代码的Scala版本的一些1 GB的数据。 数据来自Kaggle的SpringLeaf竞赛。 只是给出了数据的概述(它包含1936年的维度和145232行)。 数据由各种types组成,如int,float,string,boolean。 我正在使用6个核心中的8个进行Spark处理; 这就是为什么我使用minPartitions=6
以便每个核心都有可处理的东西。
斯卡拉代码
val input = sc.textFile("train.csv", minPartitions=6) val input2 = input.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter } val delim1 = "\001" def separateCols(line: String): Array[String] = { val line2 = line.replaceAll("true", "1") val line3 = line2.replaceAll("false", "0") val vals: Array[String] = line3.split(",") for((x,i) <- vals.view.zipWithIndex) { vals(i) = "VAR_%04d".format(i) + delim1 + x } vals } val input3 = input2.flatMap(separateCols) def toKeyVal(line: String): (String, String) = { val vals = line.split(delim1) (vals(0), vals(1)) } val input4 = input3.map(toKeyVal) def valsConcat(val1: String, val2: String): String = { val1 + "," + val2 } val input5 = input4.reduceByKey(valsConcat) input5.saveAsTextFile("output")
Python代码
input = sc.textFile('train.csv', minPartitions=6) DELIM_1 = '\001' def drop_first_line(index, itr): if index == 0: return iter(list(itr)[1:]) else: return itr input2 = input.mapPartitionsWithIndex(drop_first_line) def separate_cols(line): line = line.replace('true', '1').replace('false', '0') vals = line.split(',') vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"')) for e, val in enumerate(vals)] return vals2 input3 = input2.flatMap(separate_cols) def to_key_val(kv): key, val = kv.split(DELIM_1) return (key, val) input4 = input3.map(to_key_val) def vals_concat(v1, v2): return v1 + ',' + v2 input5 = input4.reduceByKey(vals_concat) input5.saveAsTextFile('output')
斯卡拉性能阶段0(38分钟),阶段1(18秒)
Python性能阶段0(11分钟),阶段1(7秒)
两者都产生不同的DAG可视化图(由于这两个图片显示Scala( map
)和Python( reduceByKey
)的不同阶段0函数)
但是,实质上这两个代码都试图将数据转换为(dimension_id,值列表的string)RDD并保存到磁盘。 输出将用于计算每个维度的各种统计数据。
性能方面,像这样的真实数据的Scala代码似乎比Python版本运行速度慢4倍 。 对我来说好消息是,它给了我很好的留在Python的动力。 坏消息是我不明白为什么?
讨论代码的原始答案可以在下面find。
首先,您必须区分不同types的API,每个API都有自己的性能考虑因素。
RDD API
(具有基于JVM的业务stream程的纯Python结构)
这是最受Python代码性能和PySpark实现细节影响的组件。 虽然Python性能不太可能成为问题,但至less有几个因素需要考虑:
- JVM通信的开销。 实际上所有来自Python执行程序的数据必须通过套接字和JVM工作程序传递。 虽然这是一个相对有效的本地通信,但它仍然不是免费的。
-
基于进程的执行程序(Python)与基于线程的(单个JVMmultithreading)执行程序(Scala)。 每个Python执行者都在自己的进程中运行。 作为一个副作用,它提供了比它的JVM对手更强的隔离性,以及对执行程序生命周期的一些控制,但可能会显着增加内存使用量:
- 解释器内存占用
- 加载库的占用空间
- 效率较低的广播(每个过程需要自己的广播副本)
-
Python代码本身的性能。 一般来说,Scala比Python更快,但是对于任务而言,它会有所不同。 此外,你有多种select,包括像Numba ,C扩展( Cython )或像Theano专业图书馆的JIT 。 最后,
如果你不使用ML / MLlib(或简单的NumPy堆栈),可以考虑使用PyPy作为替代解释器。 参见SPARK-3094 。 - PySparkconfiguration提供了
spark.python.worker.reuse
选项,可以用来select为每个任务spark.python.worker.reuse
Python进程和重用现有进程。 后一种select似乎有助于避免昂贵的垃圾收集(这比系统testing的结果更为显着),而前者(默认)对于昂贵的广播和import而言是最佳的。 - 引用计数作为CPython中的第一行垃圾回收方法,与典型的Spark工作负载(类stream处理,无参考周期)相当好,可以减lessGC停顿的风险。
MLlib
(混合Python和JVM执行)
基本考虑因素与以前几乎一样,还有一些其他问题。 虽然用于MLlib的基本结构是普通的Python RDD对象,但是所有的algorithm都是直接使用Scala来执行的。
这意味着将Python对象转换为Scala对象的附加成本,反过来,增加的内存使用和一些额外的限制,我们稍后会介绍。
截至目前(Spark 2.x),基于RDD的API处于维护模式, 计划在Spark 3.0中删除 。
DataFrame API和Spark ML
(使用限于驱动程序的Python代码执行JVM)
这些可能是标准数据处理任务的最佳select。 由于Python代码大多局限于驱动程序的高级逻辑操作,因此Python和Scala之间应该没有性能差异。
一个例外就是使用行式的Python UDF,它比Scala等效的效率低得多。 虽然有一些改进的机会(Spark 2.0.0已经有了大量的发展),但是最大的限制是内部表示(JVM)和Python解释器之间的全面往返。 如果可能的话,你应该赞成内buildexpression式的组合( 例如 ,Python 2.0的行为在Spark 2.0.0中已经得到了改进,但是与本地执行相比,它仍然是不理想的,随着向量化UDF的引入, (SPARK-21190) 。
还要确保避免DataFrames
和RDDs
之间不必要的传递数据。 这需要昂贵的序列化和反序列化,更不用说向Python解释器传输数据。
值得注意的是,Py4J的调用具有很高的延迟。 这包括简单的调用,如:
from pyspark.sql.functions import col col("foo")
通常,这应该不重要(开销是恒定的,不依赖于数据量),但在软实时应用程序的情况下,可以考虑caching/重用Java包装。
GraphX和Spark数据集
至于现在(Spark 1.6 2.1),没有一个提供PySpark API,所以你可以说PySpark比Scala差得多。
GraphX
实际上,GraphX开发几乎完全停止,项目目前处于维护模式, closures的JIRA门票closures,因为无法修复 。 GraphFrames库提供了一个替代的graphics处理库与Python绑定。
数据集
主观上来说,在Python中静态typesDatasets
没有太多的地方,即使当前的Scala实现过于简单,也不能提供与DataFrame
相同的性能优势。
stream
从我目前看到的,我强烈推荐使用Python的Scala。 如果PySpark获得对结构化stream的支持,未来可能会发生变化,但现在Scala API似乎更加健壮,全面和高效。 我的经验是相当有限的。
Spark 2.x中的结构化stream式处理似乎减less了语言之间的差距,但现在它还处于早期阶段。 不过,基于RDD的API在Databricks文档 (访问date2017-03-03)中已经被引用为“传统stream式传输”),所以期待进一步的统一工作是合理的。
非性能方面的考虑
function奇偶
并非所有的Sparkfunction都通过PySpark API公开。 请务必检查您所需的部件是否已经实施,并尝试了解可能的限制。
当使用MLlib和类似的混合上下文时,这一点尤为重要(请参阅从任务调用Java / Scala函数 )。 公平地说PySpark API的某些部分,比如mllib.linalg
,提供了比Scala更为全面的方法。
APIdevise
PySpark API密切反映了它的Scala对应,因此不完全是Pythonic。 这意味着在语言之间进行映射是非常容易的,但同时Python代码可能会更难以理解。
复杂的build筑
与纯JVM执行相比,PySpark数据stream相对复杂。 PySpark程序或debugging的理由要困难得多。 此外,至less对Scala和JVM的基本理解是非常必要的。
Spark 2.x及更高版本
对Dataset
API的持续转变,以及冻结的RDD API给Python用户带来了机遇和挑战。 虽然API的高级部分在Python中更容易公开,但更高级的function几乎不可能直接使用。
此外,本地Pythonfunction仍然是SQL世界中的二等公民。 希望Apache Arrow序列化在未来会有所改进( 目前的数据collection
工作目标是数据collection
但UDF serde是一个长期目标 )。
对于强烈依赖于Python代码库的项目,纯Python替代品(如Dask或Ray )可能是一个有趣的select。
它不一定是一个对另一个
Spark DataFrame(SQL,Dataset)API提供了一个在PySpark应用程序中集成Scala / Java代码的方法。 您可以使用DataFrames
将数据公开到本机JVM代码并读取结果。 我已经在其他地方解释了一些选项,你可以在Pyspark里面find如何使用Scala类的Python-Scala往返实例 。
可以通过引入用户定义的types来进一步增强它(请参阅如何在Spark SQL中定义自定义types的模式? )。
问题中提供的代码有什么问题
(免责声明:Pythonista的观点,很可能我错过了一些Scala技巧)
首先,你的代码中有一部分根本没有意义。 如果你已经有使用zipWithIndex
创build的(key, value)
对,或者enumerate
创buildstring的目的是什么? flatMap
不能recursion地工作,所以你可以简单地产生元组并跳过下面的map
。
另一个我觉得有问题的部分是reduceByKey
。 一般来说,如果应用聚合函数可以减less必须进行混洗的数据量, reduceByKey
非常有用。 既然你简单地连接string,这里没有任何东西可以获得。 忽略低级别的东西,比如引用的数量,你必须传输的数据量与groupByKey
完全一样。
通常我不会详述,但据我所知,这是Scala代码中的一个瓶颈。 在JVM上连接string是一个相当昂贵的操作(例如,请参阅scala中的string连接和Java中的代价一样昂贵 )。 这意味着像这样的_.reduceByKey((v1: String, v2: String) => v1 + ',' + v2)
这是相当于input4.reduceByKey(valsConcat)
在你的代码是不是一个好主意。
如果你想避免使用groupByKey
你可以尝试在StringBuilder
使用aggregateByKey
。 类似的东西应该做的诀窍:
rdd.aggregateByKey(new StringBuilder)( (acc, e) => { if(!acc.isEmpty) acc.append(",").append(e) else acc.append(e) }, (acc1, acc2) => { if(acc1.isEmpty | acc2.isEmpty) acc1.addString(acc2) else acc1.append(",").addString(acc2) } )
但我怀疑这是值得所有的大惊小怪。
牢记上面的内容,我已经重写了你的代码,如下所示:
斯卡拉 :
val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{ (idx, iter) => if (idx == 0) iter.drop(1) else iter } val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{ case ("true", i) => (i, "1") case ("false", i) => (i, "0") case p => p.swap }) val result = pairs.groupByKey.map{ case (k, vals) => { val valsString = vals.mkString(",") s"$k,$valsString" } } result.saveAsTextFile("scalaout")
Python :
def drop_first_line(index, itr): if index == 0: return iter(list(itr)[1:]) else: return itr def separate_cols(line): line = line.replace('true', '1').replace('false', '0') vals = line.split(',') for (i, x) in enumerate(vals): yield (i, x) input = (sc .textFile('train.csv', minPartitions=6) .mapPartitionsWithIndex(drop_first_line)) pairs = input.flatMap(separate_cols) result = (pairs .groupByKey() .map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1])))) result.saveAsTextFile("pythonout")
结果
在local[6]
模式下(Intel(R)Xeon(R)CPU E3-1245 V2 @ 3.40GHz),每个执行程序4GB内存,需要(n = 3):
- 斯卡拉 – 平均值:250.00s,stdev:12.49
- Python – 意思是:246.66s,stdev:1.15
我很确定,大部分时间都花在洗牌,序列化,反序列化和其他次要任务上。 只是为了好玩,这里是Python中朴素的单线程代码,在不到一分钟的时间内在这台机器上执行相同的任务:
def go(): with open("train.csv") as fr: lines = [ line.replace('true', '1').replace('false', '0').split(",") for line in fr] return zip(*lines[1:])