使用Databricks Delta构建手机游戏事件数据管道
如何用结构化流构建端到端数据管道
在Databricks中试试这个笔记本
手机游戏世界是一个快节奏的世界,需要开发者能够快速扩展。全世界有数百万用户每秒通过游戏玩法产生数百万个事件,你需要计算关键参数(游戏邦注:如分数调整、游戏内购买、游戏内行动等)实时。同样重要的是,一款受欢迎的游戏发行或功能会增加活动流量,你需要基础设施来处理这种快速的规模。
由于低延迟洞察力和快速可扩展基础设施的复杂性,为手机游戏分析等大容量流用例构建数据管道可能会变得复杂和令人困惑。负责这项工作的开发人员将遇到许多架构问题。
- 首先,他们应该考虑哪些技术可以减少他们的学习曲线,并且集成得很好?
- 第二,构建时架构的可扩展性如何?
- 最后,组织中的不同角色将如何协作?
最终,他们将需要构建一个端到端的数据管道,该管道由以下三个功能组件组成:数据摄取/流;数据转换;数据分析和可视化。
解决这些问题的一种方法是选择一个提供这些功能的统一平台。bob体育客户端下载Databricks提供了bob体育亚洲版统一分析平台bob体育客户端下载它将大数据和人工智能结合在一起,允许组织中的不同角色聚集在一起,在一个工作空间中进行协作。
在这篇博客中,我们将探讨如何:
- 使用AWS服务(如API Gateway、Lambda和Kinesis Streams)构建手机游戏数据管道
- 使用Spark Structured Streaming构建一个流摄取服务
- 使用三角洲湖作为流媒体操作的接收器
- 探索如何在此表上直接执行分析,从而最大限度地减少数据延迟
- 说明Databricks Delta如何解决流数据的传统问题
高级基础设施组件
构建手机游戏数据管道非常复杂,因为你需要快速扩展基础设施来处理数百万用户的数百万个事件,并实时获得可操作的见解。这就是用AWS和Databricks构建数据管道的美妙之处。Kinesis分片可以动态地重新配置以处理增加的负载,Databricks可以自动扩展集群以处理数据的增加。
在我们的示例中,我们使用事件生成器模拟来自移动用户的游戏事件。这些事件被推送到一个REST端点,并通过摄取到Databricks Delta表,遵循我们的数据管道。可以找到此事件生成器的代码在这里。
Amazon API Gateway, Lambda和Kinesis Streams
对于本例,我们使用Amazon API Gateway构建一个REST端点。到达此端点的事件会自动触发无服务器lambda函数,该函数将这些事件输送到Kinesis流中供我们使用。您将需要将lambda集成与端点设置为自动触发,并调用将这些事件写入kinesis的函数。
像这样设置一个Python lambda函数:
进口json进口boto3进口随机进口base64进口时间deflambda_handler(事件中,上下文):打印"收到的事件:{}"。格式(事件)stream_name =“streamdemo_incoming”Record = json.loads(event[事件])“身体”])记录(“eventTime”] =int(time.time ())事件(“身体”] =记录Client = boto3.client()“运动”)客户端。put_record(StreamName = stream_name, Data = json.dumps(event), PartitionKey = . Data)str(random.randint (1,One hundred.)))返回没有一个
kineesis流是根据吞吐量提供的,因此您可以根据需要提供尽可能多的分片来处理预期的数据吞吐量。每个分片提供的写吞吐量为1mb /秒,读吞吐量为2MB/秒,或者每秒最多1000条记录。有关Kinesis流吞吐量的更多信息,请查看文档。随机PartitionKeys对于均匀分布很重要,如果您有多个分片。
使用结构化流从Kinesis摄取
从Kinesis流中摄取数据是直接的。在生产环境中,您需要设置适当的IAM角色策略以确保您的集群可以访问您的kinesstream。它的最小权限如下所示:
{“版本”:“2012-10-17”,“声明”:【{“效应”:“允许”,“行动”:【“运动:DescribeStream”,“运动:GetRecords”,“运动:GetShardIterator”],“资源”:“ARN_FOR_YOUR_STREAM”}]}
或者,您还可以使用AWS访问密钥和将它们作为选项传递但是,IAM角色是生产用例的最佳实践方法。在本例中,我们假设集群具有适当的IAM角色设置。
首先像这样创建一个DataFrame:
kinesisDataFrame = spark \.readStream \。格式(“运动”) \.option (“streamName”,“MY_KINESIS_STREAM_NAME”) \.option (“initialPosition”,“STREAM_POSITION”) \.option (“地区”,“KINESIS_REGION”) \.load ()
您还需要定义传入数据的模式。kineesis的数据是这样包装的:
kinesisSchema=StructType () \。添加(“身体”, StringType()) \。添加(“资源”, StringType()) \。添加(“requestContext”StringType ()) \。添加(“queryStringParameters”, StringType()) \。添加(“httpMethod”, StringType()) \。添加(“pathParameters”, StringType()) \。添加(“头”, StringType()) \。添加(“stageVariables”, StringType()) \。添加(“路径”, StringType()) \。添加(“isBase64Encoded”StringType ())eventSchema=StructType()。添加(“eventName”, StringType()) \。添加(“eventTime”, timestamp ()) \。添加(“eventParams”, StructType(。添加(“game_keyword”, StringType()) \。添加(“app_name”, StringType()) \。添加(“scoreAdjustment”, IntegerType()) \。添加(“bob体育客户端下载平台”, StringType()) \。添加(“app_version”, StringType()) \。添加(“device_id”, StringType()) \。添加(“client_event_time”, timestamp ()) \。添加(“数量”倍增式()))
在这个演示中,我们只对kinesisSchema
,其中将包含我们在eventSchema
。
someEventDF=kinesisDataFrame。选择Expr("cast (data as STRING) jsonData") \。选择(from_json (“jsonData”kinesisSchema) .alias (“requestBody”)) \。选择(from_json (“requestBody.body”eventSchema) .alias (“身体”)) \。选择(“body.attr1”,“body.attr2”,“body.etc”)
使用Databricks Delta的实时数据管道
现在我们已经定义了流数据框架,让我们继续进行一些简单的转换。事件数据通常是基于时间序列的,因此最好根据事件日期之类的东西进行分区。但是,我们的传入流没有事件日期参数,因此我们将通过转换事件日期参数来创建自己的事件日期参数eventTime
列。我们还会给你一张支票,以确保eventTime
不为空:
base_path =“/道路/ / mobile_events_stream /”eventsStream = gamingEventDF。过滤器(gamingEventDF.eventTime.isNotNull ()) .withColumn (“eventDate”, to_date(gamingEventDF.eventTime)) \.writeStream \.partitionBy (“eventDate”) \。格式(“δ”) \.option (“checkpointLocation”, base_path +' / _checkpoint ') \.start (base_path)
让我们利用这个机会定义我们的表位置。
创建表格如果不存在mobile_events_delta_raw使用δ位置“/道路/ / mobile_events_stream /”;
实时分析、kpi和可视化
现在我们已经有了实时数据流到Databricks Delta表中,我们可以继续查看一些kpi。传统上,公司只会每天查看这些数据,但有了结构化流媒体和Databricks Delta,你就可以在Databricks笔记本上实时可视化这些数据。
让我们从一个简单的开始。这一小时我看了多少场比赛?
countsDF=gamingEventDF。withWatermark("eventTime", "180分钟").groupBy(窗口("eventTime", "60分钟"))。数()countsQuery=countsDF。writeStream \.format (“记忆”) \.queryName (“incoming_events_counts”) \。开始()
然后我们可以在我们的笔记本上把它想象成一个条形图:
也许我们可以让事情变得更有趣。我在过去一个小时里赚了多少钱?让我们检查一下预订情况。了解每小时的预订量是一个重要的指标,因为它可以指示我们的应用程序/生产系统是如何工作的。例如,如果在新游戏补丁发布后,预订量突然下降,我们马上就知道出了问题。
我们可以取相同的数据帧,但对所有数据帧进行过滤purchaseEvents
,按60分钟的窗口分组。
bookingsDF=gamingEventDF。withWatermark("eventTime", "180 minutes")。过滤器(gamingEventDF.eventName==“purchaseEvent”) .groupBy (窗口("eventTime", "60分钟"))。总和(“eventParams.amount”)bookingsQuery=bookingsDF。writeStream \.format (“记忆”) \.queryName (“incoming_events_bookings”) \。开始()
让我们选择一个线形图来形象化:
对于SQL爱好者,您可以直接查询Databricks Delta表。让我们来看一个简单的查询,以显示当前的日活跃用户(DAU)。我知道我们实际上是在查看设备id,因为我们的样本集不包含用户id,所以为了便于示例,让我们假设用户和设备之间存在1-1映射(尽管在现实世界中并非总是如此)。
选择数(截然不同的eventParams.device_id)作为道从mobile_events_delta_raw在哪里to_date (eventTime)=当前日期;
用Databricks Delta解决传统的流媒体“小文件”问题
流媒体面临的一个共同挑战是经典的“小文件”问题。根据触发写操作的频率和接收的流量大小,最终可能会得到许多大小不一的文件,其中许多文件太小,无法提高操作效率。
Databricks Delta通过引入优化
命令。该命令有效地对这些文件执行压缩,从而获得更大(最多1GiB)的文件。
优化“/道路/ / mobile_events_stream /”
但是,您会注意到,仍然有一些小文件。这是因为Databricks Delta管理事务。在压缩完成后,可能有查询或运行时间较长的进程仍在访问旧文件。此时提交的任何新查询或作业最终都将访问更新的、更大的文件,但任何现有作业仍将查询较旧的文件。
可以定期调用真空
命令。
真空“/ mnt / syu / mobile_events_stream /”;
其结果很简单:
默认情况下真空
删除超过7天的文件。但是,您可以通过指定a来手动设置自己的保留保留
像这样的从句:
真空“/道路/ / mobile_events_stream /”保留12小时;
强烈建议不要将保留时间设置为0小时,除非您绝对确定没有其他进程正在写入或读取您的表。
总结
最后,我们演示了如何使用控件构建数据管道的三个功能组件Databricks统bob体育亚洲版一分析平台bob体育客户端下载: Spark结构化流,Databricks Delta和Databricks笔记本。我们已经说明了从实时流数据推断关键性能指标的不同方法,以及解决传统上与流相关的问题。Spark Structured Streaming和Databricks Delta的结合减少了数据的整体端到端延迟和可用性,使数据工程、数据分析和数据科学团队能够快速响应诸如预订量突然下降或错误消息事件增加等对收入有直接影响的事件。此外,通过消除通常与此类管道相关的数据工程复杂性Databricks统bob体育亚洲版一分析平台bob体育客户端下载,这使得数据工程团队能够专注于更高价值的项目。
为了更好地理解这个特定的例子,我在下面提供了一些参考资料,以及一个笔记本供您自己尝试。
阅读更多
有关Databricks Delta、结构化流和笔记本的更多信息,请阅读这些来源
访问三角洲湖在线中心要了解BOB低频彩更多信息,请下载最新代码并加入Delta Lake社区。