结构化流教程
传感器、物联网设备、社交网络和在线交易所有生成的数据需要不断监控,迅速采取行动。因此,需要大规模、实时流处理比以往任何时候都更加突出。本教程模块介绍了结构化流,主要处理流数据集模型在Apache火花。在结构化流,数据流被视为一个不断附加的表。这导致的流处理模型非常类似于批处理模式。你表达你的流计算标准批查询在一个静态的表,但火花运行作为一个增量查询无界输入表。
考虑输入数据流输入表。每个数据项都是到达流就像一个新行被添加到输入表。
输入的查询生成一个结果表。在每一个触发间隔(说,每1秒),新行添加到输入表,最终更新结果表。每当更新结果表,改变结果行写入外部下沉。被定义为输出写入到外部存储。输出可以配置在不同的模式:
完整的模式:整个结果表写入外部存储更新。由存储连接器来决定如何处理整个表的编写。
Append模式在结果表:只有新行附加自上次触发被写入外部存储。这只适用于现有的查询结果表中的行是不会改变的。
更新模式:只有结果表中的行,更新自上次触发被写入外部存储。这不同于完整的模式,更新模式输出的行改变了自从上次触发。如果查询不包含聚合,它相当于Append模式。
在本教程中,您将学习如何:
我们也提供一个样的笔记本你可以导入访问和运行的所有代码示例包含在模块。
加载示例数据
开始使用结构化流的最简单方法是使用一个砖中可用数据集的例子/ databricks-datasets
在砖工作区文件夹访问。砖示例事件数据文件/ / databricks-datasets / structured-streaming /事件
使用构建结构化的流媒体应用程序。让我们看一看这个目录的内容。
文件中的每一行包含一个JSON记录两个字段:时间
和行动
。
{“时间”:1469501675,“行动”:“开放”}{“时间”:1469501678,“行动”:“关闭”}{“时间”:1469501680,“行动”:“开放”}{“时间”:1469501685,“行动”:“开放”}{“时间”:1469501686,“行动”:“开放”}{“时间”:1469501689,“行动”:“开放”}{“时间”:1469501691,“行动”:“开放”}{“时间”:1469501694,“行动”:“开放”}{“时间”:1469501696,“行动”:“关闭”}{“时间”:1469501702,“行动”:“开放”}{“时间”:1469501703,“行动”:“开放”}{“时间”:1469501704,“行动”:“开放”}
初始化流
由于样本数据只是一个静态的文件集,您可以模拟流从他们通过读取一个文件,他们创建的顺序。
从pyspark.sql.types进口*从pyspark.sql.functions进口*inputPath=“/ databricks-datasets / structured-streaming /事件/”#定义模式加快处理jsonSchema=StructType([StructField(“时间”,TimestampType(),真正的),StructField(“行动”,StringType(),真正的)])streamingInputDF=(火花。readStream。模式(jsonSchema)#设置JSON数据的模式。选项(“maxFilesPerTrigger”,1)#治疗一系列的文件作为流一次通过选择一个文件。格式(“json”)。负载(inputPath))streamingCountsDF=(streamingInputDF。groupBy(streamingInputDF。行动,窗口(streamingInputDF。时间,“1小时”))。数())
开始流媒体工作
你开始流计算通过定义一个水槽和启动它。在我们的例子中,交互查询项,设置完整的组1小时数在内存中的表。
查询=(streamingCountsDF。writeStream。格式(“记忆”)#内存=存储内存表(仅用于测试)。queryName(“计数”)# =内存表的名称。outputMode(“完整的”)#完成=表中所有重要的应该。开始())
查询
是一个处理流查询命名计数
这是在后台运行。这个查询不断拿起文件和更新窗口的数量。
命令窗口流的状态报告:
当你扩张计数
,你会得到一个仪表板的数量记录处理,批统计数据,和聚合的状态:
交互式查询流
我们可以定期查询计数
聚合:
%sql选择行动,date_format(窗口。结束,“MMM-dd HH: mm”)作为时间,数从计数订单通过时间,行动
从这一系列的截图可以看到,每次执行查询变化以反映行动计算基于数据的输入流。
笔记本
要访问这些以及更多的代码示例,进口以下笔记本。更加结构化的流的例子,请参阅Apache火花结构化流是什么?。