有效计数与Apache Spark不同
在一些网站(比如100个网站)上,有1亿个用户点击了1000亿次。 点击stream在大数据集中可用。
使用Apache Spark的抽象,每个网站统计不同访问者的最有效方法是什么?
visitors.distinct().count()
将是明显的方式,在不同的第一种方式,你可以指定并行的水平,也看到速度的提高。 如果可以将访问者设置为stream并使用Dstream,则可以实时进行计数。 您可以直接从一个目录进行stream式处理,并使用与RDD相同的方法,如:
val file = ssc.textFileStream("...") file.distinct().count()
最后一个选项是使用def countApproxDistinct(relativeSD: Double = 0.05): Long
但是这被标记为实验性的,但是如果relativeSD
(标准偏差)更高,则会比计数快得多。
编辑:因为你想每个网站的计数,你可以减less网站ID,这可以做到有效(与组合),因为计数是聚合。 如果你有一个RDD的网站名称用户ID元组,你可以做。 visitors.countDistinctByKey()
或visitors.countApproxDistinctByKey()
,再次大概是实验性的。 要使用大约不同的密钥,你需要一个PairRDD
有趣的一面是,如果你对近似值是好的,并希望得到快速的结果,你可能想看看由火花放大器实验室同样的人做的blinkDB 。
我不得不做类似的事情,你可以做一个有效的事情(这不是真正的火花)是映射您的vistor ID字节列表而不是GUIDstring,你可以节省4倍空间然后(因为2个字符是hex编码一个字节,一个字符在一个string中使用2个字节)。
// Inventing these custom types purely for this question - don't do this in real life! type VistorID = List[Byte] type WebsiteID = Int val visitors: RDD[(WebsiteID, VisitorID)] = ??? visitors.distinct().mapValues(_ => 1).reduceByKey(_ + _)
注意你也可以这样做:
visitors.distinct().map(_._1).countByValue()
但是这并没有扩展。
我注意到,在RDD上运行时,基本独立函数的速度要比在DataFrame集合上运行速度快得多。 例如:
DataFrame df = sqlContext.load(...) df.distinct.count // 0.8 s df.rdd.distinct.count // 0.2 s
如果data
是(站点,访问者)对的RDD
,则data.countApproxDistinctByKey(0.05)
将为您提供(站点,计数)的RDD
。 参数可以减less,以更多的处理为代价获得更高的精度。
Spark 2.0在数据框和SQL API中添加了ApproxCountDistinct:
如果你想每个网页,然后visitors.distinct()...
是低效的。 如果有很多的访问者和很多的网页,那么你正在区分大量的(webpage, visitor)
组合,这可能会压倒内存。
这是另一种方式:
visitors.groupByKey().map { case (webpage, visitor_iterable) => (webpage, visitor_iterable.toArray.distinct.length) }
这要求访问单个网页的访问者适合内存,因此在所有情况下可能都不是最好的。