跳转到主要内容
工程的博客

火花结构化流

分享这篇文章

Apache火花的第一个版本2.0增加了一个新的更高级的API,结构化流,为构建连续应用程序。的主要目标是使其更容易构建端到端流媒体应用程序集成存储,服务系统和批处理作业一致和容错。在这篇文章中,我们解释了为什么这是很难做到与当前分布式流媒体引擎,并介绍结构化流。

为什么流是困难的

乍一看,构建一个分布式流媒体引擎可能看起来简单推出一组服务器和推动它们之间的数据。不幸的是,分布式流处理遇到的多种并发症不影响简单计算就像批处理作业。

首先,考虑一个简单的应用程序:我们收到(phone_id、时间、动作)事件从一个移动应用程序,并且想要数数有多少行动每小时发生的每种类型,然后在MySQL存储结果。如果我们运行这个应用程序作为批处理作业,有一个表的所有输入事件,我们可以将其表示为以下SQL查询:

选择行动,窗口(时间“1小时”),(*)事件集团通过行动,窗口(时间,“1小时”)

在分布式流媒体引擎,我们可以设置节点处理数据在“使用映射-规约模式”模式,如下所示。第一层中的每个节点读取输入数据的分区(说,手机的流从一组),然后散列的事件(行动,小时)将它们发送到减速机节点,追踪集团的统计,定期更新MySQL。

image00

不幸的是,这种类型的设计可以带来不少挑战:

  1. 一致性:这个分布式设计会导致记录处理系统的一部分之前,他们在另一个处理,导致荒谬的结果。例如,假设应用程序发送一个“开放”的事件,当用户打开它,和一个“关闭”事件关闭。如果减速机节点负责“开放”是低于一个“关闭”,我们可能会看到一个“关闭”的总数量高于在MySQL中“打开”,这将没有意义。上图实际上显示了一个这样的例子。
  2. 容错:如果一个映射器或还原剂失败?减速机不应该算一个动作在MySQL中两次,但是应该知道如何请求时旧的数据映射器。流引擎经过大量的麻烦提供强大的语义,至少引擎。在许多引擎,然而,保持一致的结果在外部存储留给用户。
  3. 无序的数据:在现实世界中,来自不同数据源的数据可以出来的秩序:例如,一个电话可能上传数据小时如果是覆盖。只是写减速器运营商承担数据到达时间字段的顺序将不工作,他们需要准备接收无序的数据,并相应地更新导致MySQL。

在最新的流媒体系统中,部分或所有这些担忧都是留给用户。这是不幸的,因为这些问题————应用程序与外部世界的交互————一些最难的思考和正确的。特别是,没有简单的方法来获得语义简单上面的SQL查询。

结构化流模型

在结构化流,我们直面问题的语义系统通过一个强有力的保证:在任何时候,应用程序的输出相当于执行一个批处理作业数据的前缀。例如,在我们的监视应用程序,结果表总是会在MySQL中相当于一个前缀的每个手机的更新流(无论数据进入系统到目前为止)和运行我们上面显示的SQL查询。永远不会有“开放”事件计算速度比“关闭”事件,重复更新失败,等。结构化流自动处理一致性和可靠性在引擎和与外部系统的交互(如以事务的方式更新MySQL)。

前缀的完整性保证很容易推断我们确定的三个挑战。特别是:

  1. 输出表总是一致的与所有的记录数据的前缀。例如,只要每个电话作为连续流上传数据(例如,相同的分区在Apache卡夫卡),我们将永远过程和计数的事件顺序。
  2. 容错是由结构化流整体处理,包括在交互输出下沉。这是一个支持的主要目标连续应用程序
  3. 的影响无序的数据是明确的。我们知道工作输出计数按行动和时间分组流的一个前缀。如果我们以后得到更多的数据,我们可能会看到一个时间字段一个小时过去,我们会更新在MySQL中各自的行。结构化流api还支持过滤过于旧数据,如果用户想要的。但从根本上说,无序的数据并不是一个“特殊情况”:查询说group by时间字段,并看到一个古老的时间并不比看到一个不同的重复动作。

结构化流的最后一个好处是,API是非常易于使用:它只是火花DataFrame和数据集API。用户描述查询他们想要运行,输入和输出位置和选择更多的细节。然后系统运行查询增量,保持足够的状态从失败中恢复过来,保持一致的结果在外部存储,等等。例如,如何写我们的流媒体监控应用程序:

//连续读取数据一个S3的位置val inputDF=spark.readStream.json (s3: / /日志)//做操作使用标准DataFrame APIMySQLinputDF.groupBy($“行动”,窗口($“时间”,“1小时”))。().writeStream.format (jdbc)开始(" jdbc: mysql / /…”)

下面这段代码是几乎相同的批版——只有“读”和“写”改变:

/ /读取数据从一个S3的位置val inputDF = spark.read.json (“s3: / /日志”)/ /操作使用标准DataFrame API和MySQL编写inputDF.groupBy ($“行动”,窗口(美元)“时间”,“1小时”).count ().writeStream.format (“jdbc”).save (“jdbc: mysql / /…”)

下一小节将详细说明该模型,以及API。

模型的细节

从概念上讲,结构化流将到达的所有数据视为一个无界的输入表。流中的每个新项目就像一行添加到输入表。我们不会保留所有输入,但我们的研究结果将相当于所有它并运行一个批处理作业。
image01
开发人员定义了一个查询在此输入表,就好像它是一个静态表,计算最后一个结果表将写入到输出下沉。火花自动将这批查询转换为流执行计划。这就是所谓的incrementalization:引发出状态需要维护数据更新结果每次记录的到来。最后,开发人员指定触发器控制何时更新结果。每次引发火灾,引发检查新数据输入表(新行),并逐步更新结果。

结构化模型

模型的最后一部分输出模式。每次更新结果表,开发人员想写更改外部系统,如S3, HDFS,或数据库。我们通常想写输出增量。为此,结构化流提供了三种输出模式:

  • 附加:只有新行添加到结果表自上次触发将写入到外部存储。这是只适用于查询结果表中现有的行不能改变一个输入流(如地图)。
  • 完整的:整个更新结果表将被写入外部存储器。
  • 更新:只有结果表中的行,更新自上次引发外部存储器将会改变。这种模式适用于输出下沉,可以更新到位,比如MySQL表。

让我们看看我们移动监视应用程序可以运行在这个模型。我们的批量查询是计算一个计数的行动(行动,小时)分组。逐步运行该查询,火花将保持一些州的数量每一对迄今为止,和更新新记录到达时。为每一个记录改变,它将输出数据根据其输出模式。下图显示了该执行使用更新输出模式:

在每一个触发点,我们把之前的分组数,用新数据更新到自上次引发新结果表。我们仅仅发出需要的变化输出模式,水槽,在这儿,我们更新记录(行动,小时)对改变在触发MySQL(红色所示)。

注意,系统也会自动处理数据。在上图中,phone3的“开放”的事件,发生在1:58打电话,只能系统在秒。尽管如此,即使是过去的两点,我们在MySQL一点更新记录。然而,前缀完整性保证在结构流确保我们从每个源过程记录他们的顺序到达。例如,因为phone1“关闭”事件到“开放”事件后,我们将永远更新“开放”的数量在我们更新的“关闭”。

故障恢复和存储系统的要求

结构化流使其结果有效,即使机器失败。要做到这一点,它把两个要求输入源和输出汇:

  1. 输入源必须可复制最近,所以数据可以重读如果工作崩溃。例如,像亚马逊这样的消息总线运动和Apache卡夫卡是可复制,文件系统的输入源。只有几分钟的数据需要保留;结构化流将保持自己的内部状态。
  2. 输出水槽必须支持事务更新,这样系统可以使一组记录自动出现。当前版本的结构化下沉流媒体实现了这个文件,我们还计划将其添加为常见的数据库和键值存储。

我们发现大多数火花应用程序已经在使用水槽与这些性质和来源,因为用户希望自己的工作是可靠的。

除了这些需求,结构化流将管理其内部状态在一个可靠的存储系统,如S3或者HDFS,存储数据,如运行在我们的例子中。鉴于这些属性,结构完整的端到端流将执行前缀。

结构化流API

结构化流集成到引发的数据集和DataFrame api;在大多数情况下,你只需要添加一些方法调用运行流计算。它还增加了新的运营商为窗口的聚合和设置参数执行模型(如输出模式)。在Apache 2.0火花,我们已经建立了一个alpha版本系统的核心api。更多的运营商,如sessionization,会在将来的版本中。

API基础知识

流在结构化流表示为DataFrames或数据集isStreaming属性设置为true。您可以创建使用特殊的阅读方法从各种来源。例如,假设我们想要在我们的监视应用程序读取数据从JSON文件上载到Amazon S3。下面的代码显示了如何做到这一点在Scala中:

val inputDF = spark.readStream.json (“s3: / /日志”)

我们的结果DataFrame inputDF,是我们输入表,将不断扩展与新行新文件添加到目录中。表有两列,时间和行动。现在你可以使用普通的DataFrame /数据集操作转换数据。在我们的示例中,我们希望每小时计数操作类型。要做到这一点,我们必须在行动组数据窗口和1小时的时间。

val countsDF=inputDF.groupBy($“行动”,窗口($“时间”,“1小时”))()

新DataFrame countsDF是我们的结果表,列行动,窗口,和计数,并将不断更新查询时开始。注意,这一转变将给每小时计数即使inputDF静态表。这允许开发人员测试他们的业务逻辑对静态数据集和无缝应用流数据而不改变逻辑。

最后,我们告诉引擎写这个表水槽,开始流计算。

= countsDF.writeStream val查询。格式(“jdbc”).start (“jdbc: / /……”)

返回的查询StreamingQuery,处理到活动执行流和可用于管理和监控执行。

除了这些基础知识外,还有更多的操作可以通过结构化流。

映射、过滤和聚合

结构化流程序可以使用DataFrame转换数据和数据集的现有方法,包括地图、过滤、选择、和其他人。此外,运行(或无限)聚合,如从一开始的时候,都可以通过现有的api。这就是我们在监视应用程序中使用。

事件时间窗口的聚合

流媒体应用程序经常需要计算各种类型的数据窗户,包括滑动窗口相互重叠(如一个小时窗口,进步每5分钟),和翻滚的窗户,不(如每小时)。在结构化流,窗口只是表示成一个group by。每个输入事件可以被映射到一个或多个窗口,并简单地更新一个或多个结果表行。

Windows可以指定在DataFrames使用窗口函数。例如,我们可以改变我们的监测工作数行为通过滑动窗口如下:

inputDF.groupBy($“行动”,窗口($“时间”,“1小时”、“5分钟”))()

而我们之前的表单的应用程序输出结果(小时、行动、计数),这种新形式的一个将输出结果(窗口、动作数),如(“1:10-2:10”、“开放”,17)。如果迟到记录到达,我们将更新所有相应的MySQL的窗户。不像在其他系统中,窗口不仅是一个特殊的操作流计算;我们可以运行相同的代码在一个批处理作业进行分组数据以同样的方式。

窗口的集合是一个领域,我们将继续扩大结构化流。特别是,在火花2.1中,我们计划增加水印下降的功能过于旧数据当足够的时间过去了。没有这种类型的功能,所有旧的系统可能需要跟踪状态窗口,这将不是应用程序运行的规模。此外,我们计划添加支持基于会话的窗口,即事件从一个来源为变长分组会议根据业务逻辑。

加入与静态数据流

因为结构化流仅仅使用DataFrame API,它是直接加入流对静态DataFrame,比如Apache蜂巢表:

//数据每一个客户一个静态“顾客”,//然后加入流DataFrameval customersDF=spark.table(“客户”)inputDF。加入(客户sDF, "customer_id").groupBy ($“customer_name”,小时(“时间”)()

此外,静态DataFrame本身可以计算使用火花查询,让我们混批处理和流计算。

交互式查询

结构化流可以公开结果直接通过火花的JDBC服务器交互查询。在火花2.0中,有一个基本的“记忆”输出水槽为此不为大量数据而设计的。然而,在将来的版本中,这将让你写一个内存中的火花SQL表查询结果,并直接运行查询。

//拯救我们之前统计查询一个- - - - - -内存countsDF.writeStream.format(“记忆”).queryName(“计数”).outputMode(“完整的”)开始()//然后任何线程可以查询使用SQLsql(“选择总和(数量)从计数action =登录”)

比较与其他引擎

展示独特的结构化流,下一个表比较它与其他几个系统。正如我们之前所讨论的,结构化流强劲的前缀完整性的保证使它相当于批作业,容易融入更大的应用程序。此外,基于火花使集成批处理和交互式查询。

streaming-engine-comparison

结论

结构化流将是一个更简单的模型构建端到端实时应用程序,建立在功能效果最好火花流。虽然结构化流是在αApache 2.0火花,我们希望这篇文章鼓励你尝试一下。

长期的,很像DataFrame API,我们预计结构化流补充火花流媒体通过提供更多的限制但高层次的接口。如果您正在运行火花流今天,不要担心————将继续支持。但我们相信,结构化流可以打开更多的用户的实时计算。

在砖结构流也完全支持,包括自由砖社区版

阅读更多

此外,以下资源覆盖结构化流:

免费试着砖
看到所有工程的博客的帖子