观众互动与Apache火花和HyperLogLog分析
2015年10月13日 在公司博客上
这是一个客人从尤金Zhulenev博客他与工程机器学习的经历和观众在集体建模。
在集体,我们不仅工作很酷的东西像机器学习和预测建模还报告可以乏味和无聊。然而在我们的规模甚至简单的报告应用程序可以成为一个具有挑战性的工程问题。这篇文章是基于我在说话NY-Scala聚会。
示例应用程序可以在github:https://github.com/collectivemedia/spark-hyperloglog
印象日志
我们正在建设的报告应用程序是基于日志的印象。它不是完全的方式得到数据从合作伙伴,通过广告进行聚合,网站,饼干。bob体育外网下载甚至在这个格式进行聚合需要每天数百gb HDFS。
广告|网站|饼干|印象|点击|段- - - - - - - - - - - - - | - - - - - - - - - - - - - - - - | - - - - - - - - - - - - - - - - - - | | - - - - - - - - - - - - - - - - - - - - | - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -bmw_X5|forbes.com|13 e835610ff0d95|10|1|(一个。m b。rk, c。rh, d。sn,……)mercedes_2015|forbes.com|13 e8360c8e1233d|5|0|(一个。f, b。rk, c。海关,d。先生,……)诺基亚|gizmodo.com|13 e3c97d526839c|8|0|(一个。m b。tk, c。海关,d。sn,……)apple_music|reddit.com|1357年a253f00c0ac|3|1|(一个。m b。rk, d。锡、e。gh,…]诺基亚|cnn.com|13b23555294aced|2|1|(一个。f, b。tk, c。rh, d。sn,……)apple_music|facebook.com|13 e8333d16d723d|9|1|(一个。m, d。sn, g。gh, s。人力资源,……)
每个饼干id分配段,只是4 - 6字母代码,代表一些关于饼干的信息,我们从第三方数据提供者等Blukai。
——一个。m:男——一个。f:女性- b。tk:年收入75 k - 100 k美元- b。rk:年收入100 k - 150 k美元- - - - - - c。海关:高中- - - - - - c。rh:大学- d。sn:单- d。先生:结婚
例如,如果已经指定了一个饼干点
段,这意味着我们认为(实际上是数据提供者认为)这饼干属于男性。一样的年收入水平和其他人口统计信息。
我们没有精确的信息,究竟是一个特定的饼干是也没有什么是他们真正的年收入水平。这些片段基本上是概率,不过我们可以从这个数据非常有趣的见解。
我们能做这个数据
使用这种印象日志我们可以回答一些有趣的问题
- 我们可以计算给定组的流行运动的观众,如。什么角色男性在优化的观众固特异轮胎运动吗?
- 是什么男性/女性对于那些看过比例bmw_X5广告forbes.com
- 收入分配的人看到一个苹果音乐广告
- 诺基亚的点击分布在不同的教育水平
使用这些基本的问题我们可以创建一个“观众概要”,描述了什么类型的观众的一个优化运动或合作伙伴的网站。
蓝色酒吧意味着特定部分倾向于认为广告/访问网站超过平均和红酒吧意味着更少的。例如固特异轮胎我们希望看到更多男性观众比女。
解决问题与SQL
SQL看起来像一个简单的选择这个问题,但是我已经说过我们每天有几百千兆字节的数据,我们需要数字基于1年历史在几秒钟内。蜂巢/黑斑羚不能解决这个问题。
从印象选择计数(cookie_id截然不同)
在网站= ' forbes.com '
和广告= ' bmw_X5 '
和部分包含“点”
不幸的是,我们有几乎无限的组合过滤,用户可以定义,这不是可行的提前生成所有可能的报告。用户可以使用任意广告,网站活动,过滤器的组合,可能想知道观众与任何段交集。
观众与HyperLogLog基数近似
我们想出了一个不同的解决方案;而不是为每个查询提供精确的结果,我们提供近似数字非常高的精度。通常,出错率2%左右,对于这个特定的应用程序非常好。我们不需要知道精确的男性/女性饼干的观众。能够说出观众是主流,近似数字是绰绰有余的。
我们使用HyperLogLogcount-distinct问题,这是一个算法,近似的数量不同的元素(基数)。它使用有限的空间和具有可配置的精度。它能够估计基数> 10 ^ 9一个典型的准确性为2%,使用1.5 kb的内存。
特质HyperLogLog {
def添加(cookieId: String):单位
/ / | |
def基数():长
/ / | |∪B
def合并(其他:HyperLogLog): HyperLogLog
/ / | |∩B = | | + | | - - - | A∪|,
def相交(其他:HyperLogLog):长
}
这是一个粗略的API提供的HyperLogLog
。你可以添加一个新的cookieId,基数估计已经添加到它的独特的饼干,合并与另一个HyperLogLog
,最后得到一个十字路口。重要的是要注意,之后相交
操作,HyperLogLog
对象丢失,你只有近似交点基数。因此,通常HyperLogLog
计算交点是最后一步。
我建议你看可怕的谈话Avi科比他在那里讨论不仅HyperLogLog很多其他近似数据结构,可以用于大数据分析:https://www.infoq.com/presentations/abstract-algebra-analytics/。
从饼干HyperLogLog
我们分离出原始印象登录两个表。
对于广告表,我们删除部分信息和总饼干,印象和点击广告和网站。HyperLogLog
可以用在一个聚合函数类似于使用如何总和
操作。零是一个空HyperLogLog
而加操作合并
(顺便说一句,它正是所需的属性独异点
)
段表,我们删除广告和网站信息,和聚合数据段。
大学和高中教育的百分比宝马运动
如果你可以想象,我们可以将这些表加载到Seq
变成了一个非常简单的任务,那么观众十字路口,可以解决几行scala功能操作。
case类观众(广告:字符串,站点:字符串,hll: HyperLogLog,小鬼:长,clk:长)
case类段(名称:字符串,hll: HyperLogLog,小鬼:长,clk:长)
val adImpressions: Seq[观众]=…
val segmentImpressions: Seq[段]=…
val bmwCookies: HyperLogLog = adImpressions
.filter (_。广告= " bmw_X5”)
. map (_.hll)。减少(_ merge _)
val educatedCookies: HyperLogLog = segmentImpressions
.filter (_。段Seq(“大学”,“高中”))
. map (_.hll)。减少( _ merge _)
val p = (bmwCookies相交educatedCookies) / bmwCookies.count ()
Apache火花DataFrames HyperLogLog
显然我们不能所有的数据加载到一个scalaSeq
在单独的机器,因为它是巨大的。即使删除cookie数据,并将其转变为水平HyperLogLog
对象,它是在为一天1 - 2 gb的数据。
所以我们必须使用一些分布式数据处理框架来解决这个问题,我们选择了火花。
是什么引发DataFrames
- 灵感来自R data.frame和Python /熊猫DataFrame
- 分布式行组织成命名列的集合
- 曾经是SchemaRDD火花
高层DataFrame操作
- 选择所需的列
- 过滤
- 加入不同的数据集
- 聚合(数、总和、平均值,等)
您可以首先指的是火花DataFrame指南或砖的博客。
广告和DataFrames段
我们将我们所有的数据存储在HDFS使用镶花的数据格式,这样看起来加载到火花DataFrames之后。
val adImpressions: DataFrame = sqlContext.parquetFile (“/ aa /观众”)
adImpressions.printSchema ()
/ /根
/ / |——广告:字符串(nullable = true)
/ / |——网站:字符串(nullable = true)
/ / |——hll:二进制(nullable = true)
/ / |——印象:长(可空= true)
/ / |——点击:长(可空= true)
val segmentImpressions: DataFrame = sqlContext.parquetFile (“/ aa /段”)
segmentImpressions.printSchema ()
/ /根
/ / |——段:字符串(nullable = true)
/ / |——hll:二进制(nullable = true)
/ / |——印象:长(可空= true)
/ / |——点击:长(可空= true)
HyperLogLog
本质上是一个巨大的数组(字节)
用一些聪明的哈希和数学,这是简单的以连载的形式存储在HDFS。
与火花DataFrame合作
我们想知道这个问题的答案:“大学和高中教育的百分比在宝马运动”。
进口org.apache.spark.sql.functions._
进口org.apache.spark.sql.HLLFunctions._
val bmwCookies: HyperLogLog = adImpressions
.filter (col(“广告”)= = =“bmw_X5”)
.select (mergeHll(坳(hll))当代()/ /——总和(点击)
val educatedCookies: HyperLogLog = hllSegments
.filter(坳(Seq“段”)(“大学”、“高中”))
.select (mergeHll(坳(hll))当代()
val p = (bmwCookies相交educatedCookies) / bmwCookies.count ()
它看起来很熟悉,从不远的例子基于scalaSeq
。只有一个不寻常的操作,您可能会注意到如果你有一些经验与火花mergeHLL
。这不是在火花在默认情况下,这是一个习俗PartialAggregate
序列化函数可以计算总量HyperLogLog
对象。
编写自己的引发聚合函数
编写你自己的聚合函数,您需要定义一个函数,它将被应用到每一行抽样
分区,在这个例子中它叫MergeHLLPartition
。然后你需要定义的函数将结果从不同的分区和合并在一起,HyperLogLog
它被称为MergeHLLMerge
。最后你需要告诉引发你想要把你的计算抽样
(DataFrame是支持的抽样(行)
)
case类MergeHLLPartition(孩子:表达式)
扩展AggregateExpression与树木。UnaryNode(表达式){…}
case类MergeHLLMerge(孩子:表达式)
扩展AggregateExpression与树木。UnaryNode(表达式){…}
case类MergeHLL(孩子:表达式)
扩展PartialAggregate与树木。UnaryNode(表达式){
覆盖def asPartial: SplitEvaluation = {
val部分=别名(MergeHLLPartition(孩子),“PartialMergeHLL”) ()
SplitEvaluation (
MergeHLLMerge (partial.toAttribute),
部分:零
)
}
}
def mergeHLL (e:列):列= mergeHLL (e.expr)
之后,写作聚合成为一个非常简单的任务,和你的表情看起来像“本地”DataFrame代码,这是非常好的,和超级容易阅读和思考。
也工作速度远远超过scala转换的基础上解决这个问题抽样(行)
,引发催化剂优化器可以执行一个优化计划,减少的数据量需要火花节点之间。
最后,它更容易管理可变状态。火花鼓励您使用不可变的转换,这是很酷的,直到你需要代码的极端表现。例如,如果您使用的是类似的减少
或aggregateByKey
你不知道何时何地函数实例化,当它完成抽样
分区,也当结果被转移到另一个火花节点合并操作。与AggregateExpression
你有明确的控制可变状态,它是完全安全的执行期间积累可变状态为单个分区。最后当你需要发送数据到另一个节点,您可以创建不可变的副本。
在这种特殊情况下,使用一个可变的HyperLogLog
合并的实现有助于加快计算*近10倍。为每个分区HyperLogLog
积累了在单可变状态数组(字节)
最后当需要将数据转移到别的地方合并与另一个分区,创建了一个不变的副本。
一些花哨的骨料DataFrame API
您可以编写更复杂的聚合函数,例如,计算总基于多个列。这是一个代码示例从我们的观众分析项目。
case类SegmentEstimate (cookieHLL: HyperLogLog clickHLL: HyperLogLog)
字符串类型SegmentName =
val dailyEstimates:抽样(SegmentName地图[LocalDate SegmentEstimate])) =
segments.groupBy segment_name .agg (
segment_name,
mergeDailySegmentEstimates (
mkDailySegmentEstimate(/ /——地图(LocalDate, SegmentEstimate)
dt,
mkSegmentEstimate (/ /——SegmentEstimate (cookieHLL clickHLL)
cookie_hll,
click_hll)
)
)
)
这段代码计算每日观众聚合段。使用火花PartialAggregate
函数可以节省大量网络流量和最大限度地减少分布式洗牌的大小。
此聚合是可能的,因为好的属性独异点
HyperLogLog
是一个独异点
(已零
和+
操作)SegmentEstimate
是一个独异点
(两个独异点的元组)地图(K, SegmentEstimate)
是一个独异点
(地图价值独异点价值类型是独异点本身)
自定义聚合函数的问题
- 现在,它是一个封闭的API,因此你需要把所有的代码下
org.apache.spark.sql
包中。 - 不能保证它将在未来引发释放工作。
- 如果你想尝试的话,我建议你首先
org.apache.spark.sql.catalyst.expressions.Sum
为例。
引发内存中的一个SQL数据库
我们使用火花作为内存数据库为SQL(由DataFrame API)查询。
人们倾向于思考的火花与面向批处理的心态。开始纱火花集群,集群计算,杀死。提交你的应用程序集群独立的火花(便),杀死它。这种方法的最大问题在于,应用程序完成之后,JVM被杀,SparkContext
丢失,甚至如果您正在运行火花在独立模式下,所有的数据缓存应用程序丢失。
我们使用火花以完全不同的方式。我们开始引发聚集在纱、负载从HDFS数据,在内存中缓存,不关闭它。我们保持JVM中运行,它持有的引用SparkContext
工作者节点上,让所有的数据在内存中。
我们的后端应用程序本质上是很简单的REST / JSON服务器建立喷雾,保存SparkContext
参考,通过URL参数接收请求,在火花,运行查询并返回JSON响应。
现在(2015年7月)我们有数据从4月开始,并在100年40 g缓存节点。我们需要保持1年历史,所以我们不要期望超过500 g。我们非常有信心,我们可以扩展水平没有严重影响性能。现在平均请求响应时间是1 - 2秒这是很好的为我们的用例。
火花的最佳实践
这里有配置选项,我发现真正有用的对我们的特定任务。你可以找到更多关于每个细节的火花指南。
- spark.scheduler.mode =公平
——spark.yarn.executor.memoryOverhead = 4000
——spark.sql。autoBroadcastJoinThreshold = 300000000 / / ~ 300 mb
- spark.serializer = org.apache.spark.serializer.KryoSerializer
——spark.speculation = true
同时,我发现真的很重要如果你要进行重新分区数据集缓存查询和使用。optimalmal的分区数量大约是4 - 6为每个执行者核心,与40个节点和6执行人核我们使用1000个分区为最佳性能。
如果你有太多的分区火花会花太多的时间协调,和接收所有分区的结果。如果太小,你可能有太大问题块在洗牌,可以杀死不仅性能但所有集群:火星- 1476
其他选项
在开始这个项目之前,我们是评估其他选项
蜂巢
显然是太慢的交互式UI的后端,但我们发现这对批量数据处理非常有用。我们用它来处理原始日志和构建聚合表HyperLogLog
在里面。
黑斑羚
黑斑羚的良好性能,您需要编写c++用户定义函数,它并不是我想做的任务。也,我不相信,即使有定制c++函数黑斑羚可以显示我们需要的性能。
德鲁伊
德鲁伊是一个非常有趣的项目,它在另一个项目集体用于一个稍微不同的问题,但它没有在生产。
- 集群管理单独的德鲁伊,这不是我想做的任务
- 我们已经批量过程——和德鲁伊摄入是基于流数据
- 坏支持某些类型的查询,我们需要——如果我需要知道的一些特定的广告片段,德鲁伊的这将是10 k(段)查询,并将显然无法在1 - 2秒内完成
- 尚不清楚如何让数据从德鲁伊——很难从德鲁伊获得数据之后,如果它会解决不了问题
结论
火花是很棒的!我没有任何重大问题,和它的作品!新的DataFrame API是惊人的,我们要建立很多很酷的新项目在集体火花MLLib GraphX,我敢肯定他们会成功。