与Apache卡夫卡和砖流处理
本文描述了如何使用Apache卡夫卡作为源或汇结构化流负载运行时数据砖。
卡夫卡,看到卡夫卡的文档。
从卡夫卡读取数据
下面是一个示例从卡夫卡流读:
df=(火花。readStream。格式(“卡夫卡”)。选项(“kafka.bootstrap.servers”,“<服务器:ip >”)。选项(“订阅”,“<主题>”)。选项(“startingOffsets”,“最新”)。负载())
砖还支持批量读卡夫卡数据源语义,如以下示例所示:
df=(火花。读。格式(“卡夫卡”)。选项(“kafka.bootstrap.servers”,“<服务器:ip >”)。选项(“订阅”,“<主题>”)。选项(“startingOffsets”,“最早”)。选项(“endingOffsets”,“最新”)。负载())
增量的批量加载,砖建议使用卡夫卡Trigger.AvailableNow
。看到配置增量的批处理。
在砖运行时的13.1及以上,砖提供了阅读卡夫卡的SQL函数的数据。与SQL支持流媒体只有在三角洲住在砖SQL表或流表。看到read_kafka表值函数。
配置卡夫卡结构化流读者
砖提供了卡夫卡
关键字作为数据格式配置连接卡夫卡0.10 +。
以下是对卡夫卡最常见的配置:
有多种方法的指定主题订阅。你应该只提供其中一个参数:
选项 |
价值 |
描述 |
---|---|---|
订阅 |
一个以逗号分隔的话题。 |
主题订阅列表。 |
subscribePattern |
Java正则表达式字符串。 |
该模式用于订阅主题(s)。 |
分配 |
JSON字符串 |
具体topicPartitions消费。 |
其他值得注意的配置:
选项 |
价值 |
默认值 |
描述 |
---|---|---|---|
kafka.bootstrap.servers |
以逗号分隔的主持人:端口。 |
空 |
[要求]卡夫卡 |
failOnDataLoss |
|
|
(可选的)是否失败的查询时数据丢失的可能。查询可以从卡夫卡永久无法读取数据,由于许多场景如删除话题,话题截断前处理,等等。我们试图估计保守是否数据可能丢失。有时这可能导致假警报。设置这个选项 |
minPartitions |
整数> = 0,0 =禁用。 |
0(禁用) |
(可选的)最小数量的分区从卡夫卡读取。您可以配置火花使用任意最小的分区从卡夫卡使用读取 |
kafka.group.id |
卡夫卡消费者组ID。 |
没有设置 |
(可选的)组ID使用从卡夫卡在阅读。小心地使用这个。默认情况下,每个查询生成一个独特的组ID读取数据。这可以确保每个查询都有自己的消费群体,没有面临干扰其他消费者一样,因此可以阅读所有分区的订阅的主题。在某些情况下(例如,卡夫卡组的授权),您可能需要使用特定的授权组id读取数据。您可以选择设置组ID。然而,这个要特别小心,因为它可能导致不可预测的行为。
|
startingOffsets |
最早的,最新的 |
最新的 |
(可选的)查询时的起点开始,要么是“最早”,从最早的偏移量,或一个json字符串指定为每个TopicPartition的起始偏移量。在json, 2作为一个抵消最早可以用来参考,最新的1。注意:对于批处理查询,最新(隐式或通过使用1以json)是不允许的。对于流媒体查询,这只适用于当开始一个新的查询和恢复总是捡起从哪里查询。新发现的分区在查询将从最早开始。 |
看到结构化流卡夫卡集成指南其他可选的配置。
模式卡夫卡记录
卡夫卡记录的模式是:
列 |
类型 |
---|---|
关键 |
二进制 |
价值 |
二进制 |
主题 |
字符串 |
分区 |
int |
抵消 |
长 |
时间戳 |
长 |
timestampType |
int |
的关键
和价值
总是反序列化为字节数组ByteArrayDeserializer
。使用DataFrame操作(例如铸造(“字符串”)
)显式地反序列化键和值。
写数据到卡夫卡
下面是一个示例流写入卡夫卡:
(df。writeStream。格式(“卡夫卡”)。选项(“kafka.bootstrap.servers”,“<服务器:ip >”)。选项(“主题”,“<主题>”)。开始())
砖还支持批处理写语义下沉卡夫卡数据,如以下示例所示:
(df。写。格式(“卡夫卡”)。选项(“kafka.bootstrap.servers”,“<服务器:ip >”)。选项(“主题”,“<主题>”)。保存())
配置卡夫卡结构化流的作家
重要的
在砖运行时的13.1及以上,一个更新版本的kafka-clients
图书馆使用,使幂等默认写道。如果卡夫卡水槽使用版本2.8.0或低于acl配置但没有IDEMPOTENT_WRITE
启用时,写失败和错误消息org.apache.kafka.common.KafkaException:不能执行事务方法因为我们是在一个错误状态
。
可以通过升级解决这个错误卡夫卡版本2.8.0以上或通过设置.option (“kafka.enable.idempotence”,“假”)
虽然配置结构化流的作家。
模式提供给DataStreamWriter与卡夫卡的水槽。您可以使用以下字段:
列名 |
必需的或可选的 |
类型 |
---|---|---|
|
可选 |
|
|
要求 |
|
|
可选 |
|
|
可选(忽略了如果 |
|
|
可选 |
|
以下是常见的选项设置卡夫卡在写:
选项 |
价值 |
默认值 |
描述 |
---|---|---|---|
|
一个以逗号分隔的 |
没有一个 |
[要求]卡夫卡 |
|
|
没有设置 |
(可选的)集所有行可以写的主题。这个选择将会重写任何话题存在于数据的列。 |
|
|
|
(可选的)是否包括卡夫卡标题行。 |
看到结构化流卡夫卡集成指南其他可选的配置。
检索卡夫卡指标
请注意
在砖运行时8.1及以上。
你可以得到平均最小值和最大值的补偿的数量背后的流媒体查询最新抵消在所有的订阅的主题avgOffsetsBehindLatest
,maxOffsetsBehindLatest
,minOffsetsBehindLatest
指标。看到阅读指标交互。
请注意
在砖运行时9.1及以上。
获得的字节总数估计查询过程没有消耗从订阅的主题通过检查的价值estimatedTotalBytesBehindLatest
。这估计是基于批量加工的最后300秒。估计的时间框架是基于可以改变通过设置选项bytesEstimateWindowLength
到一个不同的值。例如,将其设置为10分钟:
df=(火花。readStream。格式(“卡夫卡”)。选项(“bytesEstimateWindowLength”,“10 m”)#米几分钟,您还可以使用“600年代”600秒)
如果您正在运行流在一个笔记本,你可以看到这些度量标准下原始数据流查询进展仪表板选项卡:
{“源”:({“描述”:“KafkaV2订阅(主题)”,“指标”:{“avgOffsetsBehindLatest”:“4.0”,“maxOffsetsBehindLatest”:“4”,“minOffsetsBehindLatest”:“4”,“estimatedTotalBytesBehindLatest”:“80.0”},}]}
使用SSL连接砖卡夫卡
启用SSL连接卡夫卡,听从指示的融合性的文档通过SSL加密和身份验证。您可以提供描述的配置,前缀卡夫卡。
,如选项。例如,您指定的信任存储位置属性kafka.ssl.truststore.location
。
砖建议你:
下面的例子使用对象存储位置和砖秘密启用SSL连接:
df=(火花。readStream。格式(“卡夫卡”)。选项(“kafka.bootstrap.servers”,…)。选项(“kafka.security.protocol”,“SASL_SSL”)。选项(“kafka.ssl.truststore.location”,<信任存储库- - - - - -位置>)。选项(“kafka.ssl.keystore.location”,<密钥存储库- - - - - -位置>)。选项(“kafka.ssl.keystore.password”,dbutils。秘密。得到(范围= <证书- - - - - -范围- - - - - -的名字>,关键= <密钥存储库- - - - - -密码- - - - - -关键- - - - - -的名字>))。选项(“kafka.ssl.truststore.password”,dbutils。秘密。得到(范围= <证书- - - - - -范围- - - - - -的名字>,关键= <信任存储库- - - - - -密码- - - - - -关键- - - - - -的名字>)))