如何在Spark中转置RDD
我有这样的RDD:
1 2 3 4 5 6 7 8 9
这是一个matrix。 现在我想转置这样的RDD:
1 4 7 2 5 8 3 6 9
我该怎么做?
假设你有一个N×M的matrix。
如果N和M都很小,以至于可以在内存中保存N×M项目,那么使用RDD就没什么意义了。 但是转移它很容易:
val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9))) val transposed = sc.parallelize(rdd.collect.toSeq.transpose)
如果N或M如此之大以至于无法在内存中保存N或M个条目,则不能拥有此大小的RDD线。 在这种情况下,原始matrix或转置matrix是不可能表示的。
N和M可以是中等大小:您可以在内存中保存N或M个条目,但不能保存N×M个条目。 在这种情况下,你必须炸毁matrix并重新组合在一起:
val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9))) // Split the matrix into one number per line. val byColumnAndRow = rdd.zipWithIndex.flatMap { case (row, rowIndex) => row.zipWithIndex.map { case (number, columnIndex) => columnIndex -> (rowIndex, number) } } // Build up the transposed matrix. Group and sort by column index first. val byColumn = byColumnAndRow.groupByKey.sortByKey().values // Then sort by row index. val transposed = byColumn.map { indexedRow => indexedRow.toSeq.sortBy(_._1).map(_._2) }
没有使用collect()的初稿,所以一切都运行在工作人员的一方,没有任何事情在驱动程序上完成:
val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9))) rdd.flatMap(row => (row.map(col => (col, row.indexOf(col))))) // flatMap by keeping the column position .map(v => (v._2, v._1)) // key by column position .groupByKey.sortByKey // regroup on column position, thus all elements from the first column will be in the first row .map(_._2) // discard the key, keep only value
这个解决scheme的问题是,如果在分布式系统中执行操作,转置matrix中的列将最终被洗牌。 会想到一个改进的版本
我的想法是,除了将“列号”附加到matrix的每个元素之外,我们还附加“行号”。 所以我们可以按列的位置键和按键重新分组,然后我们可以对行号中的每一行重新sorting,然后从结果中去掉行/列号。 在将文件导入RDD时,我只是无法知道行号。
你可能会认为把一个列和一个行号附加到每个matrix元素上是很重的,但是我想这是付出的代价来有可能以分布式的方式处理你的input,从而处理巨大的matrix。
当我findsorting问题的解决scheme时,将更新答案。
从Spark 1.6开始,可以使用DataFrame上的pivot操作 ,具体取决于数据的实际形状,如果将其放入DF中,则可以将列转换为行,但下面的databricks博客非常有用,因为它详细描述了一个数字使用代码示例来转换用例