跳到主要内容
工程的博客

在Apache Kafka中使用结构化流处理数据

可伸缩数据@数据库的第3部分
分享这篇文章

这是第三篇文章由多部分组成的系列如何处理复杂的问题流分析使用Apache Spark。


在本博客中,我们将展示如何利用Spark SQL的api来消费和转换复杂的数据流Apache卡夫卡.使用这些简单的api,您可以表达复杂的转换,比如只发生一次事件的聚合,并将结果输出到各种系统。一起,你可以使用Apache火花和Apache Kafka:

  • 使用与处理批处理数据相同的api转换和增强从Apache Kafka读取的实时数据。
  • 将从Kafka读取的数据与存储在其他系统(包括S3, HDFS或MySQL)中的信息集成。
  • 方法提供的增量执行可自动受益催化剂优化器以及后续使用Tungsten生成的高效代码。

我们首先回顾Kafka术语,然后给出结构化流查询的示例,这些查询从Apache Kafka读取数据并向其写入数据。最后,我们将探讨一个端到端的现实用例。

Apache卡夫卡

Kafka是一种分布式发布-订阅消息系统,它很流行于获取实时数据流,并以并行和容错的方式向下游消费者提供这些数据流。这使得Kafka适合于构建实时流数据管道,在异构处理系统之间可靠地移动数据。在我们深入了解结构化流媒体支持Kafka的细节之前,让我们回顾一下一些基本的概念和术语。

数据在Kafka中被组织成主题它们被分成分区并行性。每个分区都是一个有序的、不可变的序列记录,可以认为是一个结构化的提交日志。生产者将记录附加到这些日志的尾部消费者按照他们自己的节奏阅读日志。多个消费者可以订阅到一个主题,并在传入记录到达时接收它们。当新的记录到达Kafka主题的分区时,它们被分配一个名为抵消.Kafka集群保留所有已发布的记录——无论它们是否已被使用——在一个可配置的保留期内,在此之后它们将被标记为删除。

指定从Kafka读取什么数据

卡夫卡主题的分解

Kafka主题可以被看作是一个无限流,其中的数据在可配置的时间内被保留。这个流的无限性质意味着,当开始一个新的查询时,我们必须首先决定读取什么数据以及我们将在哪里开始。在高层次上,有三种选择:

  • 最早的-从流的开头开始读取。这排除了已经从Kafka中删除的数据,因为它比保留期更老(“老化”数据)。
  • 最新的- start now,只处理在查询开始后到达的新数据。
  • 每个分区分配-为每个分区指定精确的开始偏移量,允许细粒度控制处理应该从哪里开始。例如,如果我们想确切地从其他系统或查询中断的地方开始,那么可以利用这个选项。

正如您将在下面看到的startingOffsetsOption接受上面三个选项之一,并且仅在从新的检查点开始查询时使用。如果你从现有检查点重新启动查询,那么它将总是在它离开的地方恢复,除非该偏移量的数据已经老化。类所设置的内容将决定查询行为failOnDataLoss选项中描述的Kafka集成指南

KafkaConsumer的现有用户会注意到结构化流提供了一个更细粒度的配置选项版本,auto.offset.reset.我们将这些关注点分成两个不同的参数,而不是一个选项,一个参数表示流第一次启动时该做什么(startingOffsets),以及另一个处理如果查询不能从它停止的地方开始,因为所需的数据已经老化(failOnDataLoss).

结构化流中的Apache Kafka支持

结构化流提供了一个统一的批处理和流API,使我们能够查看发布到Kafka的数据DataFrame.当以流方式处理无界数据时,我们使用与批处理相同的API并获得相同的数据一致性保证。该系统确保了端到端的精确一次容错保证,因此用户不必考虑流的低级方面。

让我们研究和探索从Kafka读取和写入的例子,然后是端到端应用程序。

从卡夫卡主题阅读记录

第一步是指定Kafka集群的位置,以及我们想从哪个主题中读取数据。Spark允许您读取单个主题、特定的主题集、主题的正则表达式模式,甚至属于一组主题的特定分区集。我们将只看一个阅读个别题目的例子,其他的可能性在Kafka集成指南

上面的数据帧是订阅“topic1”的流数据帧。属性的选项可以设置该配置DataStreamReader的位置,所需的最小参数是kafka.bootstrap.servers(即。主持人:港口)和我们想谈论的话题订阅出现。在这里,我们也指定了startingOffsets为“最早的”,它将在查询开始时读取主题中所有可用的数据。如果startingOffsets选项,则使用默认值“latest”,并且只处理在查询开始后到达的数据。

df.printSchema ()揭示了我们的DataFrame的模式。

| - - -关键:二进制(可空=真正的| - - -价值:二进制(可空=真正的| - - -主题:字符串(可空=真正的|——分区:整数(可空=真正的|——offset: long (nullable =真正的|——timestamp: timestamp (nullable =真正的|——timestampType:整数(可空=真正的

返回的DataFrame包含Kafka记录的所有熟悉字段及其相关元数据。现在我们可以使用所有熟悉的DataFrame或Dataset操作来转换结果。但是,通常情况下,我们将从解析键和值列中的二进制值开始。如何解释这些blob是特定于应用程序的。幸运的是,Spark SQL包含许多用于常见序列化类型的内置转换,我们将在下面展示。

数据存储为UTF8字符串

如果Kafka记录的字节表示UTF8字符串,我们可以简单地使用强制转换将二进制数据转换为正确的类型。

df.selectExpr ("CAST(key AS STRING)","CAST(值为字符串)"

数据存储为JSON

JSON是另一种常见的写入Kafka的数据格式。在这种情况下,我们可以使用内置的from_json函数连同预期的模式一起将二进制值转换为Spark SQL结构。

价值Schema: {"a":1, "b": "string"}模式StructType()。添加(“a”,IntegerType())。添加(“b”,StringType ())df。选择(\坳(“关键”)。(“字符串”),from_json(坳(“价值”)。(模式),“字符串”))

用户定义的序列化器和反序列化器

在某些情况下,您可能已经拥有实现Kafka反序列化接口.您可以利用下面所示的Scala代码将其包装为用户定义函数(UDF)。

对象MyDeserializerWrapper {val deser =新的MyDeserializer}spark.udf.register (“序列化”,(主题:字符串,字节: Array[Byte]) =>MyDeserializerWrapper.deser.deserialize(主题,字节df.selectExpr (反序列化("topic1", value) AS消息""" "

注意,上面的DataFrame代码类似于指定value.deserializer当使用标准的Kafka消费者时。

使用Spark作为Kafka生成器

将数据从任何Spark支持的数据源写入Kafka就像调用一样简单writeStream在任何包含名为“value”的列和可选的名为“key”列的DataFrame上。如果未指定键列,则将自动添加空值键列。在某些情况下,空值键列可能导致Kafka中的数据分区不均匀,应谨慎使用。

对象的选项中静态地指定数据帧记录的目标主题DataStreamWriter或者在每条记录的基础上,作为数据框架中名为“topic”的列。

#将键值数据从DataFrame写入Kafka主题查询= df \.selectExpr ("CAST(userId AS STRING) AS key","to_json(struct(*)) AS value") \.writeStream \格式“卡夫卡”) \.option (“kafka.bootstrap.servers”,“host1:端口1,host2:端口2”) \.option (“主题”,“人类”) \.option (“checkpointLocation”,“/ HDFS dir /道路/”) \.start ()

上面的查询获取一个包含用户信息的DataFrame,并将其写入Kafka。userId被序列化为字符串并用作键。我们获取DataFrame的所有列,并将它们序列化为JSON字符串,将结果放入记录的值中。

写入Kafka的两个必需选项是kafka.bootstrap.serverscheckpointLocation.在上面的例子中,可以使用一个附加的主题选项来设置要写入的单个主题,如果“topic”列存在于DataFrame中,这个选项将覆盖“topic”列。

Nest设备的端到端示例

在本节中,我们将探索一个涉及Kafka以及其他数据源和接收器的端到端管道。我们将处理的数据集涉及的集合设备日志,JSON格式这里描述.我们将特别检查来自Nest摄像头的数据,它看起来像以下JSON:

“设备”: {“相机”: {“device_id”“awJo6rH……”,“last_event”: {“has_sound”真正的,“has_motion”真正的,“has_person”真正的,“start_time”“2016 - 12 - 29 t00:00:00.000z”,“end_time”“2016 - 12 - 29 t18:42:00.000z”}}}

我们还将加入一个包含映射的静态数据集(称为“device_locations”)device_idzip_code设备的注册地址。

如何构建我们的工作流程的说明

在高层次上,所需的工作流类似于上图。给定Nest摄像头的一系列更新,我们想使用Spark来执行几个不同的任务:

  • 使用Parquet等柱状格式创建所有事件的高效、可查询的历史存档。
  • 执行低延迟事件时间聚合,并将结果推回Kafka供其他消费者使用。
  • 在Kafka中对存储在压缩主题中的数据执行批量报告。

虽然这些听起来像是完全不同的用例,但你可以在一个端到端的Spark应用程序中使用DataFrames和Structured Streaming来执行它们!在接下来的小节中,我们将逐一介绍各个步骤,从摄取到处理再到存储聚合结果。

从Kafka读取巢设备日志

我们的第一步是从Kafka读取原始的Nest数据流,并投影出我们感兴趣的摄像头数据。我们首先从Kafka记录中解析Nest JSON,通过调用from_json函数,并提供预期的JSON模式和时间戳格式。然后,我们对数据应用各种转换,并投影与相机数据相关的列,以便简化接下来部分中的数据处理。

JSON数据的预期模式

模式StructType () \添加("metadata", StructType() \添加("access_token", StringType()) \添加("client_version", IntegerType())) \添加("devices", StructType() \添加("温控器",MapType(StringType(), StructType()。添加(…)))\添加("smoke_co_alarms", MapType(StringType(), StructType()。添加(…)))\添加("相机",MapType(StringType(), StructType()。添加(…)))\添加(“公司名称”,StructType()。添加(…)))\添加("structures", MapType(StringType(), StructType()。添加(…)))nestTimestampFormat“yyyy-MM-dd 'HH: mm: ss.sss 'Z’”

解析原始JSON

jsonOptions{"timestampFormat": nestTimestampFormat}解析火花\.readStream \.format \(“卡夫卡”).option(“kafka.bootstrap。服务器","localhost:9092") \.option("subscribe", "nest-logs") \.load () \选择(from_json(坳(“价值”)。("string"), schema, jsonOptions).alias("parsed_value"))

项目相关栏目

相机解析\选择(爆炸(“parsed_value.devices.cameras”))\选择(“价值。*”)目击相机\选择(“device_id”、“last_event。Has_person ", "last_event.start_time") \在哪里(坳(“has_person”)真正的

要创建相机DataFrame,我们首先取消嵌套“相机”json字段,使其成为顶级。因为“相机”是一个MapType,所以每一行都包含一个键值对的映射。我们用爆炸函数为每个键-值对创建一个新行,使数据平坦化。最后,我们使用star()来取消“value”列的嵌套。以下是通话结果camera.printSchema ()

|——device_id:字符串(可空=真正的|——software_version:字符串(可空=真正的|——structure_id:字符串(可空=真正的|——where_id:字符串(可空=真正的|——where_name:字符串(可空=真正的| - - -名称:字符串(可空=真正的|——name_long:字符串(可空=真正的|——is_online:布尔(可空=真正的|——is_streaming:布尔(可空=真正的|——is_audio_input_enable:布尔(可空=真正的|——last_is_online_change: timestamp (nullable =真正的|——is_video_history_enabled:布尔(可空=真正的|——web_url:字符串(可空=真正的|——app_url:字符串(可空=真正的|——is_public_share_enabled:布尔(可空=真正的|——activity_zones:数组(可空=真正的| |——element: struct (containsNull =真正的| | |——名称:字符串(可空=真正的| | |——id:字符串(可空=真正的|——public_share_url:字符串(可空=真正的|——snapshot_url:字符串(可空=真正的|——last_event: struct (nullable =真正的| |—has_sound:布尔(可空=真正的| |—has_motion:布尔(可空=真正的| |——has_person:布尔(可空=真正的| |——start_time: timestamp (nullable =真正的| |——end_time: timestamp (nullable =真正的| |——urls_expire_time: timestamp (nullable =真正的| |——web_url:字符串(可空=真正的| |—app_url:字符串(可空=真正的| |——image_url:字符串(可空=真正的| |——animated_image_url:字符串(可空=真正的| |——activity_zone_ids:数组(可空=真正的| | |——元素:字符串(containsNull =真正的

聚合和写回Kafka

我们现在将处理目击数据框架通过增加每个目击与它的位置。回想一下,我们有一些位置数据,可以通过设备id查找设备的邮政编码。我们首先创建一个表示此位置数据的DataFrame,然后将其与目击DataFrame,与设备id匹配。我们在这里做的是加入流媒体DataFrame目击与一个静态位置的数据帧!

添加位置数据

locationDFspark.table(“device_locations”)。选择(“device_id”、“zip_code”)sightingLoc目击事件。加入(locationDF“device_id”)

汇总统计数据并写入Kafka

现在,让我们生成一个流聚合,计算每个邮政编码中每个小时的摄像人员目击次数,并将其写入压缩的Kafka主题1称为“nest-camera-stats”。

sightingLoc \.groupBy(“zip_code”,窗口("start_time", "1小时"))\() \选择(\to_json(结构体(“zip_code”、“窗口”)).alias(“关键”),坳(“计数”)。(“字符串”).alias \(“价值”)).writeStream \.format \(“卡夫卡”).option(“kafka.bootstrap。服务器","localhost:9092") \.option("topic", "nest-camera-stats") \.option("checkpointLocation", "/path/to/HDFS/dir") \.outputMode \(“完整的”)开始()

上面的查询将处理发生的任何目击事件,并将目击事件的更新计数写入Kafka,并输入该目击事件的邮政编码和小时窗口。随着时间的推移,对同一个键的多次更新将导致使用该键的许多记录,Kafka主题压缩将在该键的新值到达时删除旧的更新。通过这种方式,压缩试图确保最终只保留任何给定键的最新值。

在持久存储中归档结果

除了将聚合结果写入Kafka之外,我们可能还希望将原始相机记录保存在持久存储中以供以后使用。下面的示例写出相机数据帧到S3的Parquet格式。我们选择了Parquet进行压缩和柱状存储,尽管支持许多不同的格式,如ORC、Avro、CSV等,以适应不同的用例。

相机。写Stream \格式“铺”) \.option (“startingOffsets”,“最早”) \.option (“路径”,“s3: / / nest-logs”) \.option (“checkpointLocation”,“/ HDFS dir /道路/”) \.start ()

注意,我们可以简单地重用它们相机DataFrame启动多个流查询。例如,我们可以查询DataFrame以获得离线摄像机的列表,并向网络操作中心发送通知以进行进一步调查。

批量查询报表

我们的下一个例子将在Kafka“nest-camera-stats”压缩主题上运行一个批量查询,并生成一个报告,显示有大量目击事件的邮政编码。

编写批处理查询类似于流式查询,不同之处在于我们使用方法,而不是readStream方法和而不是writeStream

批量读取和格式化数据

报告火花\.read \.format \(“卡夫卡”).option(“kafka.bootstrap。服务器","localhost:9092") \.option("subscribe", "nest-camera-stats") \.load () \选择(\json_tuple(坳(“关键”)。("string"), "zip_code", "window")。别名(“zip_code”、“窗口”),坳(“价值”)。(“字符串”)。(“整数”).alias \(“计数”))在哪里("count > 1000") \选择("zip_code", "window") \截然不同的()

该报告DataFrame可用于报告或创建显示极端目击事件的实时仪表板。

结论

在这篇博文中,我们展示了从Kafka中消费和转换实时数据流的示例。我们实现了一个端到端的示例连续应用程序,展示了使用结构化流api编程的简便性和易用性,同时利用了这些api提供的强大的精确一次语义。

在本系列的后续博客文章中,我们将介绍更多内容:

  • 监控流媒体应用程序
  • 使用结构化流计算事件时间聚合

如果你想了解更多关于结构化流的BOB低频彩知识,这里有一些有用的链接:

在Apache Spark 2.1中尝试结构化流今天就试试Databricks吧


额外的配置

Kafka集成指南
包含在Kafka中处理数据的进一步示例和Spark特定配置选项。
卡夫卡消费者而且生产商配置文档
Kafka自己的配置可以通过DataStreamReader.option而且DataStreamWriter.option卡夫卡。前缀,如:

stream.option (“kafka.bootstrap.servers”,“主持人:港口”

可能卡夫卡参数,请参见Kafka消费者配置文档用于与读取数据相关的参数,以及Kafka生产者配置文档中与写入数据相关的参数。

看到Kafka集成指南Spark管理的选项列表,因此是不可配置的。


  1. 一个压缩的Kafka主题是一个通过压缩来强制保留的主题,以确保日志中至少有每个键的最后一个状态。看到Kafka日志压缩获取更多信息。
免费试用Databricks

相关的帖子

看到所有工程的博客的帖子