DataFrame(Spark 2.0中的DataSet )和Spark中的RDD之间的区别
我只是想知道在Apache Spark中RDD
和DataFrame
(Spark 2.0.0 DataFrame是Dataset[Row]
types别名)之间的区别是什么?
你可以转换一个到另一个?
一个DataFrame
被很好的定义为“DataFrame definition”的谷歌search:
一个数据框是一个表,或者是一个二维的类似数组的结构,其中每一列包含一个variables的测量值,每一行包含一个个案。
因此, DataFrame
由于其表格格式而具有额外的元数据,这允许Spark在最终查询上运行某些优化。
另一方面, RDD
仅仅是一个数据的黑盒子,它不能被优化,因为对它的操作可以不受限制。
但是,您可以通过其rdd
方法从DataFrame转到RDD
,并且可以通过toDF
方法从RDD
转到DataFrame
(如果RDD采用表格格式)
一般来说 ,由于内置的查询优化,build议尽可能使用DataFrame
。
首先是
DataFrame
从SchemaRDD
发展而来。
是的.. Dataframe
和RDD
之间的转换是绝对有可能的。
以下是一些示例代码片段。
-
df.rdd
是RDD[Row]
以下是一些创build数据框的选项。
-
1)
yourrddOffrow.toDF
转换为DataFrame
。 -
2)使用sql上下文的
createDataFrame
val df = spark.createDataFrame(rddOfRow, schema)
其中架构可以从下面的一些选项,如SO SO所述..
从scala case类和scalareflectionapiimport org.apache.spark.sql.catalyst.ScalaReflection val schema = ScalaReflection.schemaFor[YourScalacaseClass].dataType.asInstanceOf[StructType]
或使用
Encoders
import org.apache.spark.sql.Encoders val mySchema = Encoders.product[MyCaseClass].schema
如Schema所描述的,也可以使用
StructType
和StructField
来创buildval schema = new StructType() .add(StructField("id", StringType, true)) .add(StructField("col1", DoubleType, true)) .add(StructField("col2", DoubleType, true)) etc...
实际上现在有3个Apache Spark API ..
-
RDD
API:
自1.0版本以来,
RDD
(弹性分布式数据集)API已经在Spark中。
RDD
API提供了许多转换方法,例如map
(),filter
()和reduce
(),用于对数据执行计算。 这些方法中的每一个都会产生一个代表转换数据的新RDD
。 但是,这些方法只是定义要执行的操作,并且在调用操作方法之前不会执行转换。 动作方法的例子是collect
()和saveAsObjectFile
()。
RDD示例:
rdd.filter(_.age > 21) // transformation .map(_.last)// transformation .saveAsObjectFile("under21.bin") // action
示例:使用RDD按属性过滤
rdd.filter(_.age > 21)
-
DataFrame
API
Spark 1.3引入了一个新的
DataFrame
API,作为Project Tungsten计划的一部分,旨在提高Spark的性能和可扩展性。DataFrame
API引入了用于描述数据的模式概念,允许Sparkpipe理模式并仅在节点之间传递数据,而不是使用Java序列化。
DataFrame
API与RDD
API截然不同,因为它是构buildSpark的Catalyst优化器可以执行的关系查询计划的API。 对于熟悉构build查询计划的开发人员来说,API是很自然的
示例SQL风格:
df.filter("age > 21");
限制:因为代码是按名称引用数据属性的,所以编译器不可能捕获任何错误。 如果属性名称不正确,则仅在创build查询计划时才会在运行时检测到错误。
DataFrame
API的另一个缺点是它非常以Scala为中心,虽然它支持Java,但支持是有限的。
例如,当从Java对象的现有RDD
创buildDataFrame
时,Spark的Catalyst优化器无法推断该模式,并假定DataFrame中的任何对象都实现了scala.Product
接口。 因为他们实现了这个接口,所以Scala case class
解决这个问题。
-
Dataset
API
在Spark 1.6中作为API预览版发布的
Dataset
API旨在提供两全其美的解决scheme; 熟悉RDD
API的面向对象编程风格和编译时types安全性,但具有Catalyst查询优化器的性能优势。 数据集也使用与DataFrame
API相同的高效堆外存储机制。在序列化数据时,
Dataset
API具有在JVM表示(对象)和Spark内部二进制格式之间进行转换的编码器的概念。 Spark内置的编码器非常先进,它们生成的字节码与堆外数据进行交互,并提供对各个属性的按需访问,而无需对整个对象进行反序列化。 Spark尚未提供用于实现自定义编码器的API,但是计划在未来版本中使用。另外,
Dataset
API被devise为与Java和Scala一样好。 使用Java对象时,重要的是它们完全符合bean的要求。
示例Dataset
API SQL样式:
dataset.filter(_.age < 21);
评估差异。 DataFrame
和DataSet
之间:
RDD
Spark提供的主要抽象是一个弹性分布式数据集(RDD),它是在集群节点间进行分区的元素集合,可以并行操作。
RDD特点: –
-
分布式集合:
RDD使用广泛采用的MapReduce操作,在集群上处理和生成具有并行分布式algorithm的大型数据集。 它允许用户使用一组高级操作符编写并行计算,而不必担心工作分配和容错。 -
不可变:由分区logging集合组成的RDD。 分区是RDD中并行性的基本单位,每个分区是一个逻辑分区,通过对现有分区进行一些转换来创build,是不可变的。数据的不可分性有助于实现计算的一致性。
-
容错:在我们丢失RDD的一些分区的情况下,我们可以重放在分区中的分区上的转换以实现相同的计算,而不是在多个节点上进行数据复制。这个特性是RDD的最大益处,因为它节省了在数据pipe理和复制方面做了很多努力,从而实现了更快的计算。
-
懒惰评估: Spark中的所有转换都是懒惰的,因为它们不会马上计算结果。 相反,他们只记得应用于某些基础数据集的转换。 只有在动作需要将结果返回给驱动程序时才会计算转换。
-
function转换: RDD支持两种types的操作:转换(从现有的转换创build一个新的数据集)和动作(在数据集上运行计算后将值返回给驱动程序)。
-
数据处理格式:
它可以轻松高效地处理结构化数据和非结构化数据。
- 编程语言支持:
RDD API可用于Java,Scala,Python和R.
RDD限制: –
-
没有内置的优化引擎:在处理结构化数据时,RDD不能利用Spark的高级优化器,包括催化剂优化器和钨执行引擎。 开发人员需要根据其属性来优化每个RDD。
-
处理结构化数据:与数据框和数据集不同,RDD不推断摄取数据的模式,并要求用户指定它。
Dataframes
Spark在Spark 1.3中引入了Dataframes。 数据框架克服了RDD所面临的主要挑战。
DataFrame是分布式数据集合,组织到命名列中。 它在概念上等同于关系数据库中的表或R / Python数据框。 除了Dataframe之外,Spark还引入了催化剂优化器,该优化器利用高级编程function构build可扩展的查询优化器。
dataframefunction: –
-
行对象的分布式集合: DataFrame是组织成命名列的分布式数据集合。 它在概念上等同于关系数据库中的表格,但在引擎盖下具有更丰富的优化。
-
数据处理:处理结构化和非结构化数据格式(Avro,CSV,弹性search和Cassandra)和存储系统(HDFS,HIVE表,MySQL等)。 它可以读取和写入所有这些不同的数据源。
-
使用催化剂优化器进行优化:它支持SQL查询和DataFrame API。 Dataframe分四个阶段使用催化剂树转换框架,
1.Analyzing a logical plan to resolve references 2.Logical plan optimization 3.Physical planning 4.Code generation to compile parts of the query to Java bytecode.
-
Hive兼容性:使用Spark SQL,您可以在您现有的Hive仓库上运行未经修改的Hive查询。 它重用了Hive前端和MetaStore,使您能够与现有的Hive数据,查询和UDF完全兼容。
-
钨:钨提供了一个物理执行后端,明确地pipe理内存和dynamic生成字节码进行expression式评估。
-
编程语言支持:
Dataframe API可用于Java,Scala,Python和R.
数据框限制: –
- 编译时types安全性:如前所述,Dataframe API不支持编译时安全性,这会限制您在结构不知情时操作数据。 下面的例子在编译期间工作。 但是,执行此代码时,您将得到一个运行时exception。
例:
case class Person(name : String , age : Int) val dataframe = sqlContect.read.json("people.json") dataframe.filter("salary > 10000").show => throws Exception : cannot resolve 'salary' given input age , name
当您正在进行多个转换和聚合步骤时,这尤其具有挑战性。
- 无法在域对象上运行(丢失域对象):将域对象转换为数据框后,无法从域中重新生成域对象。 在下面的例子中,一旦我们从personRDD创buildpersonDF,我们将不会恢复Person类(RDD [Person])的原始RDD。
例:
case class Person(name : String , age : Int) val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20))) val personDF = sqlContect.createDataframe(personRDD) personDF.rdd // returns RDD[Row] , does not returns RDD[Person]
数据集API
数据集API是DataFrame的扩展,它提供了一个types安全的,面向对象的编程接口。 它是映射到关系模式的强types,不可变对象集合。
在数据集的核心,API是一个新的概念,称为编码器,它负责在JVM对象和表格表示之间进行转换。 表格表示使用Spark内部的钨二进制格式存储,允许对串行数据进行操作并提高内存利用率。 Spark 1.6支持自动生成各种types的编码器,包括原始types(如String,Integer,Long),Scala case类和Java Beans。
数据集特点: –
-
提供RDD和Dataframe的最佳function: RDD(函数式编程,types安全),DataFrame(关系模型,查询优化,钨执行,sorting和混洗)
-
编码器:使用编码器,很容易将任何JVM对象转换为数据集,使用户可以像Dataframe一样处理结构化数据和非结构化数据。
-
支持编程语言:数据集API目前仅在Scala和Java中可用。 Python和R目前在版本1.6中不受支持。 Python的支持是2.0版本。
-
types安全性:数据集API提供编译时安全性,在Dataframes中不可用。 在下面的例子中,我们可以看到Dataset如何在具有编译lambda函数的域对象上运行。
例:
case class Person(name : String , age : Int) val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20))) val personDF = sqlContect.createDataframe(personRDD) val ds:Dataset[Person] = personDF.as[Person] ds.filter(p => p.age > 25) ds.filter(p => p.salary > 25) // error : value salary is not a member of person ds.rdd // returns RDD[Person]
- 互操作性:数据集使您可以轻松地将现有的RDD和数据框转换为数据集,而无需模板代码。
数据集API限制: –
- 需要将types转换为string:从数据集中查询数据当前需要我们将类中的字段指定为string。 一旦我们查询了数据,我们不得不将数据列转换为所需的数据types。 另一方面,如果我们在数据集上使用映射操作,则不会使用Catalyst优化器。
例:
ds.select(col("name").as[String], $"age".as[Int]).collect()
不支持Python和R:从1.6版开始,Datasets只支持Scala和Java。 Python支持将在Spark 2.0中引入。
与现有的RDD和Dataframe API相比,Datasets API具有更好的types安全性和function性编程的优点。面对API中types转换需求的挑战,您仍然不需要所需的types安全性,并且会使代码变得脆弱。
简单RDD
是核心组件,但DataFrame
是spark 1.30中引入的API。
RDD
数据分区的集合称为RDD
。 这些RDD
必须遵循以下几个属性:
- 一成不变的,
- 容错,
- 分散式,
- 更多。
这里RDD
是结构化的或非结构化的。
dataframe
DataFrame
是Scala,Java,Python和R中可用的API。它允许处理任何types的结构化和半结构化数据。 要定义DataFrame
,分布式数据的一个集合被组织DataFrame
为DataFrame
命名列。 您可以轻松优化RDDs
中的DataFrame
。 您可以使用DataFrame
一次处理JSON数据, DataFrame
数据,HiveQL数据。
val sampleRDD = sqlContext.jsonFile("hdfs://localhost:9000/jsondata.json") val sample_DF = sampleRDD.toDF()
这里的Sample_DF考虑为DataFrame
。 sampleRDD
是(原始数据),称为RDD
。
大部分答案都是正确的,只想在这里加一点
在Spark 2.0中,两个API(DataFrame + DataSet)将统一在一个API中。
“统一数据框和数据集:在Scala和Java中,DataFrame和Dataset是统一的,即DataFrame只是Row的Datasettypes的别名,在Python和R中,由于缺lesstypes安全性,DataFrame是主要的编程接口。
数据集类似于RDD,但是,不使用Java序列化或Kryo,而是使用专门的编码器对对象进行序列化以便通过networking进行处理或传输。
Spark SQL支持将现有RDD转换为Datasets的两种不同方法。 第一种方法使用reflection来推断包含特定types对象的RDD的模式。 这种基于reflection的方法导致更简洁的代码,并且在编写Spark应用程序时已经知道模式的情况下运行良好。
创build数据集的第二种方法是通过编程接口,允许您构build模式,然后将其应用于现有的RDD。 虽然这个方法比较冗长,但是它允许你在构造数据集的时候直到运行时才知道列和它们的types。
在这里你可以findRDDdataframe对话的答案
如何将rdd对象转换为spark中的dataframe
DataFrame相当于RDBMS中的表格,也可以以类似的方式操作RDD中的“本地”分布式集合。 与RDD不同的是,Dataframes跟踪模式并支持导致更优化执行的各种关系操作。 每个DataFrame对象都表示一个逻辑计划,但由于其“懒惰”性质,直到用户调用特定的“输出操作”才会执行。
全部(RDD,DataFrame和DataSet)于一体
图像学分
RDD
RDD
是可以并行操作的容错元素集合。
dataframe
DataFrame
是一个数据集组织DataFrame
名列。 它在概念上等同于关系数据库中的表格或R / Python中的数据框架, 但具有更丰富的优化内容 。
数据集
Dataset
是分布式数据集合。 数据集是Spark 1.6中添加的一个新接口,它提供了RDD (强打字,使用强大的lambda函数的function)的好处以及Spark SQL优化执行引擎的优点 。
注意:
Scala / Java 中行 数据集 (
Dataset[Row]
)通常被称为DataFrame 。
把所有的代码与代码片段进行比较
资源
问:你能把一个转换成另一个,比如RDD转换成DataFrame,反之亦然?
是的,两者都是可能的
1.使用.toDF()
RDD
为DataFrame
val rowsRdd: RDD[Row] = sc.parallelize( Seq( Row("first", 2.0, 7.0), Row("second", 3.5, 2.5), Row("third", 7.0, 5.9) ) ) val df = spark.createDataFrame(rowsRdd).toDF("id", "val1", "val2") df.show() +------+----+----+ | id|val1|val2| +------+----+----+ | first| 2.0| 7.0| |second| 3.5| 2.5| | third| 7.0| 5.9| +------+----+----+
更多方法: 将RDD对象转换为Spark中的Dataframe
2.使用.rdd()
方法将DataFrame
/ DataSet
DataFrame
到RDD
val rowsRdd: RDD[Row] = df.rdd() // DataFrame to RDD
Dataframe是Row对象的RDD,每个都代表一条logging。 数据框也知道其行的模式(即数据字段)。 虽然数据框看起来像普通的RDD,但在内部,它们以更高效的方式存储数据,并利用其模式。 另外,它们提供RDD上不可用的新操作,例如运行SQL查询的能力。 可以从外部数据源,查询结果或常规RDD创build数据框。
参考文献:Zaharia M.,et al。 学习Spark(O'Reilly,2015)
从使用的angular度来看,RDD和DataFrame有一些分析:
- RDD是惊人的! 因为它们给我们所有的灵活性来处理几乎任何types的数据; 非结构化,半结构化和结构化的数据。 由于很多时候数据还没有准备好适应DataFrame(甚至是JSON),所以RDD可以用来对数据进行预处理,使其适合dataframe。 RDD是Spark中的核心数据抽象。
- 不是所有可能在RDD上进行的转换都可以在DataFrame上进行,例如subtract()用于RDD,except()用于DataFrame。
- 由于DataFrames就像一个关系表,它们在使用set / relational转换时遵循严格的规则,例如,如果你想联合两个dataframe,那么要求两个dfs具有相同数量的列和相关的列数据types。 列名可以不同。 这些规则不适用于RDD。 这是一个很好的教程,解释这些事实。
- 使用DataFrame时性能有所提高,正如其他人已经深入解释的那样。
- 使用DataFrames,您不需要像使用RDD编程时那样传递任意函数。
- 您需要使用SQLContext / HiveContext来编写数据框,因为它们位于Spark生态系统的SparkSQL区域中,但对于RDD,只需要位于Spark Core库中的SparkContext / JavaSparkContext。
- 如果您可以为其定义架构,则可以从RDD创build一个df。
- 您也可以将df转换为rdd并将rdd转换为df。
我希望它有帮助!