Apache卡夫卡
用于结构化流的Apache Kafka连接器打包在Databricks Runtime中。你可以使用卡夫卡
连接器连接到Kafka 0.10+和kafka08
连接器连接到Kafka 0.8+(已弃用)。
模式
记录的模式是:
列 |
类型 |
---|---|
关键 |
二进制 |
价值 |
二进制 |
主题 |
字符串 |
分区 |
int |
抵消 |
长 |
时间戳 |
长 |
timestampType |
int |
的关键
和价值
总是反序列化为字节数组与ByteArrayDeserializer
.使用数据帧操作(铸造(“字符串”)
, udfs)来显式反序列化键和值。
快速入门
让我们从一个规范的WordCount示例开始。下面的笔记本演示了如何使用Kafka的结构化流运行WordCount。
请注意
本笔记本示例使用Kafka 0.10。要使用Kafka 0.8,将格式更改为kafka08
(即,.format(“kafka08”)
).
配置
有关配置选项的完整列表,请参见Spark结构化流+ Kafka集成指南.首先,这里是最常用配置选项的一个子集。
请注意
由于结构化流仍在开发中,此列表可能不是最新的。
有多种方法可以指定要订阅的主题。您应该只提供以下参数中的一个:
选项 |
价值 |
支持的Kafka版本 |
描述 |
---|---|---|---|
订阅 |
以逗号分隔的主题列表。 |
0.8, 0.10 |
要订阅的主题列表。 |
subscribePattern |
Java正则表达式字符串。 |
0.10 |
用于订阅主题的模式。 |
分配 |
JSON字符串 |
0.8, 0.10 |
要使用的特定topicPartitions。 |
其他值得注意的配置:
选项 |
价值 |
默认值 |
支持的Kafka版本 |
描述 |
---|---|---|---|---|
kafka.bootstrap.servers |
主机:端口列表,以逗号分隔。 |
空 |
0.8, 0.10 |
[必选]卡夫卡 |
failOnDataLoss |
|
|
0.10 |
[可选]当数据可能丢失时,查询是否失败。由于许多情况,比如删除主题、处理前截断主题等,查询可能永远无法从Kafka读取数据。我们试图保守地估计数据是否可能丢失。有时这会引起错误的警报。将此选项设置为 |
minPartitions |
整数>= 0,0 = disabled。 |
0(禁用) |
0.10 |
[可选]Kafka最小分区数在Spark 2.1.0-db2及以上版本中,您可以配置Spark使用任意最小的分区从Kafka读取 |
kafka.group.id |
Kafka消费组ID。 |
没有设置 |
0.10 |
[可选]从Kafka读取时使用的组ID。Spark 2.2+支持。请谨慎使用。默认情况下,每个查询生成一个唯一的组ID用于读取数据。这确保每个查询都有自己的消费者组,不受任何其他消费者的干扰,因此可以读取其订阅主题的所有分区。在某些情况下(例如,Kafka基于组的授权),您可能希望使用特定的授权组id来读取数据。您可以选择设置组ID。但是,这样做要非常谨慎,因为它可能会导致意想不到的行为。
|
startingOffsets |
最早,最晚 |
最新的 |
0.10 |
[可选]查询开始时的起始点,可以是"最早的偏移量",也可以是指定每个TopicPartition起始偏移量的json字符串。在json中,-2作为偏移量可以用来表示最早,-1表示最晚。注意:对于批量查询,不允许使用latest(隐式或在json中使用-1)。对于流查询,这只适用于新查询开始时,并且恢复总是从查询停止的地方开始。查询期间新发现的分区最早开始。 |
看到结构化流式Kafka集成指南其他可选配置。
重要的
你不应该为Kafka 0.10连接器设置以下Kafka参数,因为它会抛出异常:
group.id
: 2.2以下版本不支持设置。auto.offset.reset
:请设置source选项startingOffsets
指定从哪里开始。为了保持一致性,结构化流(相对于Kafka消费者)在内部管理偏移量的消耗。这确保在动态订阅新主题/分区后不会遗漏任何数据。startingOffsets
仅当您开始一个新的流查询时才适用,并且从检查点恢复总是从查询停止的地方开始。key.deserializer
:键总是被反序列化为字节数组ByteArrayDeserializer
.使用DataFrame操作显式地反序列化键。value.deserializer
:值总是被反序列化为字节数组ByteArrayDeserializer
.使用DataFrame操作显式地反序列化值。enable.auto.commit
:不允许设置该参数。Spark会在内部跟踪Kafka的偏移量,不会提交任何偏移量。interceptor.classes
: Kafka源总是以字节数组的形式读取键和值。用起来不安全ConsumerInterceptor
因为它可能会中断查询。
指标
请注意
在Databricks Runtime 8.1及以上版本中可用。
方法可以获得流查询在所有订阅主题中落后于最新可用偏移量的偏移量的平均值、最小值和最大值avgOffsetsBehindLatest
,maxOffsetsBehindLatest
,minOffsetsBehindLatest
指标。看到交互式阅读指标.
请注意
在Databricks Runtime 9.1及以上版本中可用。
的值,获得查询进程未从订阅的主题消耗的估计总字节数estimatedTotalBytesBehindLatest
.这个估计是基于最近300秒内处理的批次。估算所基于的时间范围可以通过设置该选项来更改bytesEstimateWindowLength
换一个不同的值。例如,设置为10分钟:
df=火花.readStream\.格式(“卡夫卡”)\.选项(“bytesEstimateWindowLength”,“10 m”)//米为分钟,你可以也使用“600年代”为600秒
如果在笔记本中运行流,则可以在原始数据页中的流查询进度仪表板:
{“源”:[{“描述”:“KafkaV2订阅(主题)”,“指标”:{“avgOffsetsBehindLatest”:“4.0”,“maxOffsetsBehindLatest”:“4”,“minOffsetsBehindLatest”:“4”,“estimatedTotalBytesBehindLatest”:“80.0”},}]}
使用SSL
要启用Kafka的SSL连接,请遵循Confluent文档中的说明使用SSL进行加密和身份验证.您可以提供此处描述的配置,并以卡夫卡。
,作为选项。例如,在属性中指定信任存储位置kafka.ssl.truststore.location
.
我们建议您:
一旦路径挂载并存储秘密,您可以执行以下操作:
df=火花.readStream\.格式(“卡夫卡”)\.选项(“kafka.bootstrap.servers”,...)\.选项(“kafka.security.protocol”,“SASL_SSL”)\.选项(“kafka.ssl.truststore.location”,<dbfs-信任存储库-位置>)\.选项(“kafka.ssl.keystore.location”,<dbfs-密钥存储库-位置>)\.选项(“kafka.ssl.keystore.password”,dbutils.秘密.得到(范围= <证书-范围-的名字>,关键= <密钥存储库-密码-关键-的名字>))\.选项(“kafka.ssl.truststore.password”,dbutils.秘密.得到(范围= <证书-范围-的名字>,关键= <信任存储库-密码-关键-的名字>))