与Apache卡夫卡和砖流处理

本文描述了如何使用Apache卡夫卡作为源或汇结构化流负载运行时数据砖。

卡夫卡,看到卡夫卡的文档

从卡夫卡读取数据

下面是一个示例从卡夫卡流读:

df=(火花readStream格式(“卡夫卡”)选项(“kafka.bootstrap.servers”,“<服务器:ip >”)选项(“订阅”,“<主题>”)选项(“startingOffsets”,“最新”)负载())

砖还支持批量读卡夫卡数据源语义,如以下示例所示:

df=(火花格式(“卡夫卡”)选项(“kafka.bootstrap.servers”,“<服务器:ip >”)选项(“订阅”,“<主题>”)选项(“startingOffsets”,“最早”)选项(“endingOffsets”,“最新”)负载())

增量的批量加载,砖建议使用卡夫卡Trigger.AvailableNow。看到配置增量的批处理

在砖运行时的13.1及以上,砖提供了阅读卡夫卡的SQL函数的数据。与SQL支持流媒体只有在三角洲住在砖SQL表或流表。看到read_kafka表值函数

配置卡夫卡结构化流读者

砖提供了卡夫卡关键字作为数据格式配置连接卡夫卡0.10 +。

以下是对卡夫卡最常见的配置:

有多种方法的指定主题订阅。你应该只提供其中一个参数:

选项

价值

描述

订阅

一个以逗号分隔的话题。

主题订阅列表。

subscribePattern

Java正则表达式字符串。

该模式用于订阅主题(s)。

分配

JSON字符串{“局部药”:[0,1],“主题”:(2、4)}

具体topicPartitions消费。

其他值得注意的配置:

选项

价值

默认值

描述

kafka.bootstrap.servers

以逗号分隔的主持人:端口。

[要求]卡夫卡bootstrap.servers配置。如果你发现没有数据从卡夫卡,首先检查代理地址列表。如果代理地址列表不正确,可能没有任何错误。这是因为卡夫卡端假设经纪人最终会变得可用,在发生网络错误重试,直到永远。

failOnDataLoss

真正的

真正的

(可选的)是否失败的查询时数据丢失的可能。查询可以从卡夫卡永久无法读取数据,由于许多场景如删除话题,话题截断前处理,等等。我们试图估计保守是否数据可能丢失。有时这可能导致假警报。设置这个选项如果它不正常工作,或者你想要查询继续处理尽管数据丢失。

minPartitions

整数> = 0,0 =禁用。

0(禁用)

(可选的)最小数量的分区从卡夫卡读取。您可以配置火花使用任意最小的分区从卡夫卡使用读取minPartitions选择。通常火花的1:1映射卡夫卡topicPartitions从卡夫卡火花分区使用。如果你设置minPartitions选择一个值大于你的卡夫卡topicPartitions,火花将分配大型卡夫卡分区小的碎片。这个选项可以设置在高峰负荷,所以数据倾斜,当你流落后提高处理速度。时初始化卡夫卡在每个触发消费者的成本,这可能会影响性能,如果你使用SSL连接到卡夫卡。

kafka.group.id

卡夫卡消费者组ID。

没有设置

(可选的)组ID使用从卡夫卡在阅读。小心地使用这个。默认情况下,每个查询生成一个独特的组ID读取数据。这可以确保每个查询都有自己的消费群体,没有面临干扰其他消费者一样,因此可以阅读所有分区的订阅的主题。在某些情况下(例如,卡夫卡组的授权),您可能需要使用特定的授权组id读取数据。您可以选择设置组ID。然而,这个要特别小心,因为它可能导致不可预测的行为。

  • 并发运行查询(包括批处理和流媒体)使用相同的组ID可能会互相干扰,导致每个查询只读数据的一部分。

  • 这也可能出现在查询开始接二连三地/重新启动。减少这样的问题,设置卡夫卡消费者配置session.timeout.ms是非常小的。

startingOffsets

最早的,最新的

最新的

(可选的)查询时的起点开始,要么是“最早”,从最早的偏移量,或一个json字符串指定为每个TopicPartition的起始偏移量。在json, 2作为一个抵消最早可以用来参考,最新的1。注意:对于批处理查询,最新(隐式或通过使用1以json)是不允许的。对于流媒体查询,这只适用于当开始一个新的查询和恢复总是捡起从哪里查询。新发现的分区在查询将从最早开始。

看到结构化流卡夫卡集成指南其他可选的配置。

模式卡夫卡记录

卡夫卡记录的模式是:

类型

关键

二进制

价值

二进制

主题

字符串

分区

int

抵消

时间戳

timestampType

int

关键价值总是反序列化为字节数组ByteArrayDeserializer。使用DataFrame操作(例如铸造(“字符串”))显式地反序列化键和值。

写数据到卡夫卡

下面是一个示例流写入卡夫卡:

(dfwriteStream格式(“卡夫卡”)选项(“kafka.bootstrap.servers”,“<服务器:ip >”)选项(“主题”,“<主题>”)开始())

砖还支持批处理写语义下沉卡夫卡数据,如以下示例所示:

(df格式(“卡夫卡”)选项(“kafka.bootstrap.servers”,“<服务器:ip >”)选项(“主题”,“<主题>”)保存())

配置卡夫卡结构化流的作家

重要的

在砖运行时的13.1及以上,一个更新版本的kafka-clients图书馆使用,使幂等默认写道。如果卡夫卡水槽使用版本2.8.0或低于acl配置但没有IDEMPOTENT_WRITE启用时,写失败和错误消息org.apache.kafka.common.KafkaException:不能执行事务方法因为我们一个错误状态

可以通过升级解决这个错误卡夫卡版本2.8.0以上或通过设置.option (“kafka.enable.idempotence”,“假”)虽然配置结构化流的作家。

模式提供给DataStreamWriter与卡夫卡的水槽。您可以使用以下字段:

列名

必需的或可选的

类型

关键

可选

字符串二进制

价值

要求

字符串二进制

可选

数组

主题

可选(忽略了如果主题是设置为作家选项)

字符串

分区

可选

INT

以下是常见的选项设置卡夫卡在写:

选项

价值

默认值

描述

kafka.boostrap.servers

一个以逗号分隔的<主机:端口>

没有一个

[要求]卡夫卡bootstrap.servers配置。

主题

字符串

没有设置

(可选的)集所有行可以写的主题。这个选择将会重写任何话题存在于数据的列。

includeHeaders

布尔

(可选的)是否包括卡夫卡标题行。

看到结构化流卡夫卡集成指南其他可选的配置。

检索卡夫卡指标

请注意

在砖运行时8.1及以上。

你可以得到平均最小值和最大值的补偿的数量背后的流媒体查询最新抵消在所有的订阅的主题avgOffsetsBehindLatest,maxOffsetsBehindLatest,minOffsetsBehindLatest指标。看到阅读指标交互

请注意

在砖运行时9.1及以上。

获得的字节总数估计查询过程没有消耗从订阅的主题通过检查的价值estimatedTotalBytesBehindLatest。这估计是基于批量加工的最后300秒。估计的时间框架是基于可以改变通过设置选项bytesEstimateWindowLength到一个不同的值。例如,将其设置为10分钟:

df=(火花readStream格式(“卡夫卡”)选项(“bytesEstimateWindowLength”,“10 m”)#米几分钟,您还可以使用“600年代”600秒)

如果您正在运行流在一个笔记本,你可以看到这些度量标准下原始数据流查询进展仪表板选项卡:

{“源”:({“描述”:“KafkaV2订阅(主题)”,“指标”:{“avgOffsetsBehindLatest”:“4.0”,“maxOffsetsBehindLatest”:“4”,“minOffsetsBehindLatest”:“4”,“estimatedTotalBytesBehindLatest”:“80.0”},}]}

使用SSL连接砖卡夫卡

启用SSL连接卡夫卡,听从指示的融合性的文档通过SSL加密和身份验证。您可以提供描述的配置,前缀卡夫卡。,如选项。例如,您指定的信任存储位置属性kafka.ssl.truststore.location

砖建议你:

下面的例子使用对象存储位置和砖秘密启用SSL连接:

df=(火花readStream格式(“卡夫卡”)选项(“kafka.bootstrap.servers”,)选项(“kafka.security.protocol”,“SASL_SSL”)选项(“kafka.ssl.truststore.location”,<信任存储库- - - - - -位置>)选项(“kafka.ssl.keystore.location”,<密钥存储库- - - - - -位置>)选项(“kafka.ssl.keystore.password”,dbutils秘密得到(范围= <证书- - - - - -范围- - - - - -的名字>,关键= <密钥存储库- - - - - -密码- - - - - -关键- - - - - -的名字>))选项(“kafka.ssl.truststore.password”,dbutils秘密得到(范围= <证书- - - - - -范围- - - - - -的名字>,关键= <信任存储库- - - - - -密码- - - - - -关键- - - - - -的名字>)))