有效计数与Apache Spark不同

在一些网站(比如100个网站)上,有1亿个用户点击了1000亿次。 点击stream在大数据集中可用。

使用Apache Spark的抽象,每个网站统计不同访问者的最有效方法是什么?

visitors.distinct().count()将是明显的方式,在不同的第一种方式,你可以指定并行的水平,也看到速度的提高。 如果可以将访问者设置为stream并使用Dstream,则可以实时进行计数。 您可以直接从一个目录进行stream式处理,并使用与RDD相同的方法,如:

val file = ssc.textFileStream("...") file.distinct().count()

最后一个选项是使用def countApproxDistinct(relativeSD: Double = 0.05): Long但是这被标记为实验性的,但是如果relativeSD (标准偏差)更高,则会比计数快得多。

编辑:因为你想每个网站的计数,你可以减less网站ID,这可以做到有效(与组合),因为计数是聚合。 如果你有一个RDD的网站名称用户ID元组,你可以做。 visitors.countDistinctByKey()visitors.countApproxDistinctByKey() ,再次大概是实验性的。 要使用大约不同的密钥,你需要一个PairRDD

有趣的一面是,如果你对近似值是好的,并希望得到快速的结果,你可能想看看由火花放大器实验室同样的人做的blinkDB 。

我不得不做类似的事情,你可以做一个有效的事情(这不是真正的火花)是映射您的vistor ID字节列表而不是GUIDstring,你可以节省4倍空间然后(因为2个字符是hex编码一个字节,一个字符在一个string中使用2个字节)。

 // Inventing these custom types purely for this question - don't do this in real life! type VistorID = List[Byte] type WebsiteID = Int val visitors: RDD[(WebsiteID, VisitorID)] = ??? visitors.distinct().mapValues(_ => 1).reduceByKey(_ + _) 

注意你也可以这样做:

 visitors.distinct().map(_._1).countByValue() 

但是这并没有扩展。

我注意到,在RDD上运行时,基本独立函数的速度要比在DataFrame集合上运行速度快得多。 例如:

 DataFrame df = sqlContext.load(...) df.distinct.count // 0.8 s df.rdd.distinct.count // 0.2 s 

如果data是(站点,访问者)对的RDD ,则data.countApproxDistinctByKey(0.05)将为您提供(站点,计数)的RDD 。 参数可以减less,以更多的处理为代价获得更高的精度。

如果你想每个网页,然后visitors.distinct()...是低效的。 如果有很多的访问者和很多的网页,那么你正在区分大量的(webpage, visitor)组合,这可能会压倒内存。

这是另一种方式:

 visitors.groupByKey().map { case (webpage, visitor_iterable) => (webpage, visitor_iterable.toArray.distinct.length) } 

这要求访问单个网页的访问者适合内存,因此在所有情况下可能都不是最好的。

Interesting Posts