Apache Spark移动平均

我在HDFS有一个巨大的文件,具有时间序列数据点(雅虎股票价格)。

我想要find时间序列的移动平均线,我该如何着手编写Apache Spark作业来做到这一点。

您可以使用MLLIB中的滑动function,这可能与Daniel的答案相同。 在使用滑动function之前,您将不得不按照时间对数据进行sorting。

import org.apache.spark.mllib.rdd.RDDFunctions._ sc.parallelize(1 to 100, 10) .sliding(3) .map(curSlice => (curSlice.sum / curSlice.size)) .collect() 

移动平均值对于Spark和任何分布式系统来说都是一个棘手的问题。 当数据分布在多台机器上时,会有一些跨越分区的时间窗口。 我们必须在分区开始时复制数据,以便计算每个分区的移动平均值可以提供完整的覆盖率。

这是一个在Spark中做到这一点的方法。 示例数据:

 val ts = sc.parallelize(0 to 100, 10) val window = 3 

一个简单的分区程序,把每一行放在我们用键指定的分区中:

 class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner { def numPartitions = p def getPartition(key: Any) = key.asInstanceOf[Int] } 

用第一个window - 1创build数据window - 1行复制到前一个分区:

 val partitioned = ts.mapPartitionsWithIndex((i, p) => { val overlap = p.take(window - 1).toArray val spill = overlap.iterator.map((i - 1, _)) val keep = (overlap.iterator ++ p).map((i, _)) if (i == 0) keep else keep ++ spill }).partitionBy(new StraightPartitioner(ts.partitions.length)).values 

只需计算每个分区的移动平均值即可:

 val movingAverage = partitioned.mapPartitions(p => { val sorted = p.toSeq.sorted val olds = sorted.iterator val news = sorted.iterator var sum = news.take(window - 1).sum (olds zip news).map({ case (o, n) => { sum += n val v = sum sum -= o v }}) }) 

由于重复的部分,这将没有覆盖的空白。

 scala> movingAverage.collect.sameElements(3 to 297 by 3) res0: Boolean = true 

Spark 1.4引入了窗口函数 ,这意味着你可以按如下方式做移动平均调整窗口的行数在两者之间

 val schema = Seq("id", "cykle", "value") val data = Seq( (1, 1, 1), (1, 2, 11), (1, 3, 1), (1, 4, 11), (1, 5, 1), (1, 6, 11), (2, 1, 1), (2, 2, 11), (2, 3, 1), (2, 4, 11), (2, 5, 1), (2, 6, 11) ) val dft = sc.parallelize(data).toDF(schema: _*) dft.select('*).show // PARTITION BY id ORDER BY cykle ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING (5) val w = Window.partitionBy("id").orderBy("cykle").rowsBetween(-2, 2) val x = dft.select($"id",$"cykle",avg($"value").over(w)) x.show 

输出(在齐柏林飞艇):

 schema: Seq[String] = List(id, cykle, value) data: Seq[(Int, Int, Int)] = List((1,1,1), (1,2,11), (1,3,1), (1,4,11), (1,5,1), (1,6,11), (2,1,1), (2,2,11), (2,3,1), (2,4,11), (2,5,1), (2,6,11)) dft: org.apache.spark.sql.DataFrame = [id: int, cykle: int, value: int] +---+-----+-----+ | id|cykle|value| +---+-----+-----+ | 1| 1| 1| | 1| 2| 11| | 1| 3| 1| | 1| 4| 11| | 1| 5| 1| | 1| 6| 11| | 2| 1| 1| | 2| 2| 11| | 2| 3| 1| | 2| 4| 11| | 2| 5| 1| | 2| 6| 11| +---+-----+-----+ w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@55cd666f x: org.apache.spark.sql.DataFrame = [id: int, cykle: int, 'avg(value) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING: double] +---+-----+-------------------------------------------------------------------------+ | id|cykle|'avg(value) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING| +---+-----+-------------------------------------------------------------------------+ | 1| 1| 4.333333333333333| | 1| 2| 6.0| | 1| 3| 5.0| | 1| 4| 7.0| | 1| 5| 6.0| | 1| 6| 7.666666666666667| | 2| 1| 4.333333333333333| | 2| 2| 6.0| | 2| 3| 5.0| | 2| 4| 7.0| | 2| 5| 6.0| | 2| 6| 7.666666666666667| +---+-----+————————————————————————————————————+