如何将COGROUP用于大型数据集

我有两个rdd's val tab_a: RDD[(String, String)]val tab_b: RDD[(String, String)]我正在使用cogroup的数据集,如:

 val tab_c = tab_a.cogroup(tab_b).collect.toArray val updated = tab_c.map { x => { //somecode } } 

我正在使用tab_c cogrouped的值映射函数,它适用于小数据集,但在大数据集的情况下,它会抛出Out Of Memory exception

我已经尝试将最终值转换为RDD,但没有运气相同的错误

 val newcos = spark.sparkContext.parallelize(tab_c) 

1.如何将Cogroup用于大型数据集?

我们能坚持这个价值吗?

  val source_primary_key = source.map(rec => (rec.split(",")(0), rec)) source_primary_key.persist(StorageLevel.DISK_ONLY) val destination_primary_key = destination.map(rec => (rec.split(",")(0), rec)) destination_primary_key.persist(StorageLevel.DISK_ONLY) val cos = source_primary_key.cogroup(destination_primary_key).repartition(10).collect() var srcmis: Array[String] = new Array[String](0) var destmis: Array[String] = new Array[String](0) var extrainsrc: Array[String] = new Array[String](0) var extraindest: Array[String] = new Array[String](0) var srcs: String = Seq("")(0) var destt: String = Seq("")(0) val updated = cos.map { x => { val key = x._1 val value = x._2 srcs = value._1.mkString(",") destt = value._2.mkString(",") if (srcs.equalsIgnoreCase(destt) == false && destt != "") { srcmis :+= srcs destmis :+= destt } if (srcs == "") { extraindest :+= destt.mkString("") } if (destt == "") { extrainsrc :+= srcs.mkString("") } } } 

代码更新:

  val tab_c = tab_a.cogroup(tab_b).filter(x => x._2._1 =!= x => x._2._2) // tab_c = {1,Compactbuffer(1,john,US),Compactbuffer(1,john,UK)} {2,Compactbuffer(2,john,US),Compactbuffer(2,johnson,UK)}.. 

错误:

  ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(4,3,ResultTask,FetchFailed(null,0,-1,27,org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693) ERROR YarnScheduler: Lost executor 8 on datanode1: Container killed by YARN for exceeding memory limits. 1.0 GB of 1020 MB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 

谢谢

当你使用collect()你基本上是在告诉spark把所有的结果数据移回主节点,这很容易产生一个瓶颈。 在这一点上,你不再使用Spark,而只是一台机器上的普通数组。

为了触发计算,只需使用需要每个节点上的数据的东西,这就是执行程序位于分布式文件系统之上的原因。 例如saveAsTextFile()

这里有一些基本的例子。

请记住,这里的整个目标(也就是说,如果你有大数据)是将代码移到你的数据并在那里计算,而不是把所有的数据带到计算中去。

TL; DR不要collect

为了安全地运行这些代码,没有额外的假设(对于工作节点的平均需求可能要小得多),每个节点(驱动程序和每个执行程序)都要求内存显着超过所有数据的总内存要求。

如果你要在Spark外运行,你只需要一个节点。 所以Spark在这里没有任何好处。

但是,如果您跳过collect.toArray并对数据分发做出一些假设,那么可以运行它。