DataFrame(Spark 2.0中的DataSet )和Spark中的RDD之间的区别

我只是想知道在Apache Spark中RDDDataFrame (Spark 2.0.0 DataFrame是Dataset[Row]types别名)之间的区别是什么?

你可以转换一个到另一个?

一个DataFrame被很好的定义为“DataFrame definition”的谷歌search:

一个数据框是一个表,或者是一个二维的类似数组的结构,其中每一列包含一个variables的测量值,每一行包含一个个案。

因此, DataFrame由于其表格格式而具有额外的元数据,这允许Spark在最终查询上运行某些优化。

另一方面, RDD仅仅是一个数据的黑盒子,它不能被优化,因为对它的操作可以不受限制。

但是,您可以通过其rdd方法从DataFrame转到RDD ,并且可以通过toDF方法从RDD转到DataFrame (如果RDD采用表格格式)

一般来说 ,由于内置的​​查询优化,build议尽可能使用DataFrame

首先是DataFrameSchemaRDD发展而来。

方法toSchemaRDD

是的.. DataframeRDD之间的转换是绝对有可能的。

以下是一些示例代码片段。

  • df.rddRDD[Row]

以下是一些创build数据框的选项。

  • 1) yourrddOffrow.toDF转换为DataFrame

  • 2)使用sql上下文的createDataFrame

    val df = spark.createDataFrame(rddOfRow, schema)

其中架构可以从下面的一些选项,如SO SO所述..
从scala case类和scalareflectionapi

 import org.apache.spark.sql.catalyst.ScalaReflection val schema = ScalaReflection.schemaFor[YourScalacaseClass].dataType.asInstanceOf[StructType] 

或使用Encoders

 import org.apache.spark.sql.Encoders val mySchema = Encoders.product[MyCaseClass].schema 

如Schema所描述的,也可以使用StructTypeStructField来创build

 val schema = new StructType() .add(StructField("id", StringType, true)) .add(StructField("col1", DoubleType, true)) .add(StructField("col2", DoubleType, true)) etc... 

图片描述

实际上现在有3个Apache Spark API ..

在这里输入图像说明

  1. RDD API:

自1.0版本以来, RDD (弹性分布式数据集)API已经在Spark中。

RDD API提供了许多转换方法,例如map (), filter ()和reduce (),用于对数据执行计算。 这些方法中的每一个都会产生一个代表转换数据的新RDD 。 但是,这些方法只是定义要执行的操作,并且在调用操作方法之前不会执行转换。 动作方法的例子是collect ()和saveAsObjectFile ()。

RDD示例:

 rdd.filter(_.age > 21) // transformation .map(_.last)// transformation .saveAsObjectFile("under21.bin") // action 

示例:使用RDD按属性过滤

 rdd.filter(_.age > 21) 
  1. DataFrame API

Spark 1.3引入了一个新的DataFrame API,作为Project Tungsten计划的一部分,旨在提高Spark的性能和可扩展性。 DataFrame API引入了用于描述数据的模式概念,允许Sparkpipe理模式并仅在节点之间传递数据,而不是使用Java序列化。

DataFrame API与RDD API截然不同,因为它是构buildSpark的Catalyst优化器可以执行的关系查询计划的API。 对于熟悉构build查询计划的开发人员来说,API是很自然的

示例SQL风格:

df.filter("age > 21");

限制:因为代码是按名称引用数据属性的,所以编译器不可能捕获任何错误。 如果属性名称不正确,则仅在创build查询计划时才会在运行时检测到错误。

DataFrame API的另一个缺点是它非常以Scala为中心,虽然它支持Java,但支持是有限的。

例如,当从Java对象的现有RDD创buildDataFrame时,Spark的Catalyst优化器无法推断该模式,并假定DataFrame中的任何对象都实现了scala.Product接口。 因为他们实现了这个接口,所以Scala case class解决这个问题。

  1. Dataset API

在Spark 1.6中作为API预览版发布的Dataset API旨在提供两全其美的解决scheme; 熟悉RDD API的面向对象编程风格和编译时types安全性,但具有Catalyst查询优化器的性能优势。 数据集也使用与DataFrame API相同的高效堆外存储机制。

在序列化数据时, Dataset API具有在JVM表示(对象)和Spark内部二进制格式之间进行转换的编码器的概念。 Spark内置的编码器非常先进,它们生成的字节码与堆外数据进行交互,并提供对各个属性的按需访问,而无需对整个对象进行反序列化。 Spark尚未提供用于实现自定义编码器的API,但是计划在未来版本中使用。

另外, Dataset API被devise为与Java和Scala一样好。 使用Java对象时,重要的是它们完全符合bean的要求。

示例Dataset API SQL样式:

 dataset.filter(_.age < 21); 

评估差异。 DataFrameDataSet之间: 在这里输入图像说明

RDD

Spark提供的主要抽象是一个弹性分布式数据集(RDD),它是在集群节点间进行分区的元素集合,可以并行操作。

RDD特点: –

  • 分布式集合:
    RDD使用广泛采用的MapReduce操作,在集群上处理和生成具有并行分布式algorithm的大型数据集。 它允许用户使用一组高级操作符编写并行计算,而不必担心工作分配和容错。

  • 不可变:由分区logging集合组成的RDD。 分区是RDD中并行性的基本单位,每个分区是一个逻辑分区,通过对现有分区进行一些转换来创build,是不可变的。数据的不可分性有助于实现计算的一致性。

  • 容错:在我们丢失RDD的一些分区的情况下,我们可以重放在分区中的分区上的转换以实现相同的计算,而不是在多个节点上进行数据复制。这个特性是RDD的最大益处,因为它节省了在数据pipe理和复制方面做了很多努力,从而实现了更快的计算。

  • 懒惰评估: Spark中的所有转换都是懒惰的,因为它们不会马上计算结果。 相反,他们只记得应用于某些基础数据集的转换。 只有在动作需要将结果返回给驱动程序时才会计算转换。

  • function转换: RDD支持两种types的操作:转换(从现有的转换创build一个新的数据集)和动作(在数据集上运行计算后将值返回给驱动程序)。

  • 数据处理格式:

它可以轻松高效地处理结构化数据和非结构化数据。

  • 编程语言支持:
    RDD API可用于Java,Scala,Python和R.

RDD限制: –

  • 没有内置的优化引擎:在处理结构化数据时,RDD不能利用Spark的高级优化器,包括催化剂优化器和钨执行引擎。 开发人员需要根据其属性来优化每个RDD。

  • 处理结构化数据:数据框和数据集不同,RDD不推断摄取数据的模式,并要求用户指定它。

Dataframes

Spark在Spark 1.3中引入了Dataframes。 数据框架克服了RDD所面临的主要挑战。

DataFrame是分布式数据集合,组织到命名列中。 它在概念上等同于关系数据库中的表或R / Python数据框。 除了Dataframe之外,Spark还引入了催化剂优化器,该优化器利用高级编程function构build可扩展的查询优化器。

dataframefunction: –

  • 行对象的分布式集合: DataFrame是组织成命名列的分布式数据集合。 它在概念上等同于关系数据库中的表格,但在引擎盖下具有更丰富的优化。

  • 数据处理:处理结构化和非结构化数据格式(Avro,CSV,弹性search和Cassandra)和存储系统(HDFS,HIVE表,MySQL等)。 它可以读取和写入所有这些不同的数据源。

  • 使用催化剂优化器进行优化:它支持SQL查询和DataFrame API。 Dataframe分四个阶段使用催化剂树转换框架,

     1.Analyzing a logical plan to resolve references 2.Logical plan optimization 3.Physical planning 4.Code generation to compile parts of the query to Java bytecode. 
  • Hive兼容性:使用Spark SQL,您可以在您现有的Hive仓库上运行未经修改的Hive查询。 它重用了Hive前端和MetaStore,使您能够与现有的Hive数据,查询和UDF完全兼容。

  • 钨:钨提供了一个物理执行后端,明确地pipe理内存和dynamic生成字节码进行expression式评估。

  • 编程语言支持:
    Dataframe API可用于Java,Scala,Python和R.

数据框限制: –

  • 编译时types安全性:如前所述,Dataframe API不支持编译时安全性,这会限制您在结构不知情时操作数据。 下面的例子在编译期间工作。 但是,执行此代码时,您将得到一个运行时exception。

例:

 case class Person(name : String , age : Int) val dataframe = sqlContect.read.json("people.json") dataframe.filter("salary > 10000").show => throws Exception : cannot resolve 'salary' given input age , name 

当您正在进行多个转换和聚合步骤时,这尤其具有挑战性。

  • 无法在域对象上运行(丢失域对象):将域对象转换为数据框后,无法从域中重新生成域对象。 在下面的例子中,一旦我们从personRDD创buildpersonDF,我们将不会恢复Person类(RDD [Person])的原始RDD。

例:

 case class Person(name : String , age : Int) val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20))) val personDF = sqlContect.createDataframe(personRDD) personDF.rdd // returns RDD[Row] , does not returns RDD[Person] 

数据集API

数据集API是DataFrame的扩展,它提供了一个types安全的,面向对象的编程接口。 它是映射到关系模式的强types,不可变对象集合。

在数据集的核心,API是一个新的概念,称为编码器,它负责在JVM对象和表格表示之间进行转换。 表格表示使用Spark内部的钨二进制格式存储,允许对串行数据进行操作并提高内存利用率。 Spark 1.6支持自动生成各种types的编码器,包括原始types(如String,Integer,Long),Scala case类和Java Beans。

数据集特点: –

  • 提供RDD和Dataframe的最佳function RDD(函数式编程,types安全),DataFrame(关系模型,查询优化,钨执行,sorting和混洗)

  • 编码器:使用编码器,很容易将任何JVM对象转换为数据集,使用户可以像Dataframe一样处理结构化数据和非结构化数据。

  • 支持编程语言:数据集API目前仅在Scala和Java中可用。 Python和R目前在版本1.6中不受支持。 Python的支持是2.0版本。

  • types安全性:数据集API提供编译时安全性,在Dataframes中不可用。 在下面的例子中,我们可以看到Dataset如何在具有编译lambda函数的域对象上运行。

例:

 case class Person(name : String , age : Int) val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20))) val personDF = sqlContect.createDataframe(personRDD) val ds:Dataset[Person] = personDF.as[Person] ds.filter(p => p.age > 25) ds.filter(p => p.salary > 25) // error : value salary is not a member of person ds.rdd // returns RDD[Person] 
  • 互操作性:数据集使您可以轻松地将现有的RDD和数据框转换为数据集,而无需模板代码。

数据集API限制: –

  • 需要将types转换为string:从数据集中查询数据当前需要我们将类中的字段指定为string。 一旦我们查询了数据,我们不得不将数据列转换为所需的数据types。 另一方面,如果我们在数据集上使用映射操作,则不会使用Catalyst优化器。

例:

 ds.select(col("name").as[String], $"age".as[Int]).collect() 

不支持Python和R:从1.6版开始,Datasets只支持Scala和Java。 Python支持将在Spark 2.0中引入。

与现有的RDD和Dataframe API相比,Datasets API具有更好的types安全性和function性编程的优点。面对API中types转换需求的挑战,您仍然不需要所需的types安全性,并且会使代码变得脆弱。

简单RDD是核心组件,但DataFrame是spark 1.30中引入的API。

RDD

数据分区的集合称为RDD 。 这些RDD必须遵循以下几个属性:

  • 一成不变的,
  • 容错,
  • 分散式,
  • 更多。

这里RDD是结构化的或非结构化的。

dataframe

DataFrame是Scala,Java,Python和R中可用的API。它允许处理任何types的结构化和半结构化数据。 要定义DataFrame ,分布式数据的一个集合被组织DataFrameDataFrame命名列。 您可以轻松优化RDDs中的DataFrame 。 您可以使用DataFrame一次处理JSON数据, DataFrame数据,HiveQL数据。

 val sampleRDD = sqlContext.jsonFile("hdfs://localhost:9000/jsondata.json") val sample_DF = sampleRDD.toDF() 

这里的Sample_DF考虑为DataFramesampleRDD是(原始数据),称为RDD

大部分答案都是正确的,只想在这里加一点

在Spark 2.0中,两个API(DataFrame + DataSet)将统一在一个API中。

“统一数据框和数据集:在Scala和Java中,DataFrame和Dataset是统一的,即DataFrame只是Row的Datasettypes的别名,在Python和R中,由于缺lesstypes安全性,DataFrame是主要的编程接口。

数据集类似于RDD,但是,不使用Java序列化或Kryo,而是使用专门的编码器对对象进行序列化以便通过networking进行处理或传输。

Spark SQL支持将现有RDD转换为Datasets的两种不同方法。 第一种方法使用reflection来推断包含特定types对象的RDD的模式。 这种基于reflection的方法导致更简洁的代码,并且在编写Spark应用程序时已经知道模式的情况下运行良好。

创build数据集的第二种方法是通过编程接口,允许您构build模式,然后将其应用于现有的RDD。 虽然这个方法比较冗长,但是它允许你在构造数据集的时候直到运行时才知道列和它们的types。

在这里你可以findRDDdataframe对话的答案

如何将rdd对象转换为spark中的dataframe

DataFrame相当于RDBMS中的表格,也可以以类似的方式操作RDD中的“本地”分布式集合。 与RDD不同的是,Dataframes跟踪模式并支持导致更优化执行的各种关系操作。 每个DataFrame对象都表示一个逻辑计划,但由于其“懒惰”性质,直到用户调用特定的“输出操作”才会执行。

全部(RDD,DataFrame和DataSet)于一体

RDD vs DataFrame vs DataSet

图像学分

RDD

RDD是可以并行操作的容错元素集合。

dataframe

DataFrame是一个数据集组织DataFrame名列。 它在概念上等同于关系数据库中的表格或R / Python中的数据框架, 但具有更丰富的优化内容

数据集

Dataset是分布式数据集合。 数据集是Spark 1.6中添加的一个新接口,它提供了RDD (强打字,使用强大的lambda函数的function)的好处以及Spark SQL优化执行引擎优点


注意:

Scala / Java 中行 数据集Dataset[Row] )通常被称为DataFrame


把所有的代码与代码片段进行比较

RDD与DataFrame与DataSet与代码

资源


问:你能把一个转换成另一个,比如RDD转换成DataFrame,反之亦然?

是的,两者都是可能的

1.使用.toDF() RDDDataFrame

 val rowsRdd: RDD[Row] = sc.parallelize( Seq( Row("first", 2.0, 7.0), Row("second", 3.5, 2.5), Row("third", 7.0, 5.9) ) ) val df = spark.createDataFrame(rowsRdd).toDF("id", "val1", "val2") df.show() +------+----+----+ | id|val1|val2| +------+----+----+ | first| 2.0| 7.0| |second| 3.5| 2.5| | third| 7.0| 5.9| +------+----+----+ 

更多方法: 将RDD对象转换为Spark中的Dataframe

2.使用.rdd()方法将DataFrame / DataSet DataFrameRDD

 val rowsRdd: RDD[Row] = df.rdd() // DataFrame to RDD 

Dataframe是Row对象的RDD,每个都代表一条logging。 数据框也知道其行的模式(即数据字段)。 虽然数据框看起来像普通的RDD,但在内部,它们以更高效的方式存储数据,并利用其模式。 另外,它们提供RDD上不可用的新操作,例如运行SQL查询的能力。 可以从外部数据源,查询结果或常规RDD创build数据框。

参考文献:Zaharia M.,et al。 学习Spark(O'Reilly,2015)

从使用的angular度来看,RDD和DataFrame有一些分析:

  1. RDD是惊人的! 因为它们给我们所有的灵活性来处理几乎任何types的数据; 非结构化,半结构化和结构化的数据。 由于很多时候数据还没有准备好适应DataFrame(甚至是JSON),所以RDD可以用来对数据进行预处理,使其适合dataframe。 RDD是Spark中的核心数据抽象。
  2. 不是所有可能在RDD上进行的转换都可以在DataFrame上进行,例如subtract()用于RDD,except()用于DataFrame。
  3. 由于DataFrames就像一个关系表,它们在使用set / relational转换时遵循严格的规则,例如,如果你想联合两个dataframe,那么要求两个dfs具有相同数量的列和相关的列数据types。 列名可以不同。 这些规则不适用于RDD。 这是一个很好的教程,解释这些事实。
  4. 使用DataFrame时性能有所提高,正如其他人已经深入解释的那样。
  5. 使用DataFrames,您不需要像使用RDD编程时那样传递任意函数。
  6. 您需要使用SQLContext / HiveContext来编写数据框,因为它们位于Spark生态系统的SparkSQL区域中,但对于RDD,只需要位于Spark Core库中的SparkContext / JavaSparkContext。
  7. 如果您可以为其定义架构,则可以从RDD创build一个df。
  8. 您也可以将df转换为rdd并将rdd转换为df。

我希望它有帮助!