read_kafka表值函数

适用于:检查标记是的砖的SQL检查标记是的砖运行时13.1及以后

读取数据从一个Apache卡夫卡集群并返回数据以表格形式。

可以读取数据从一个或多个卡夫卡的话题。它同时支持批量查询和流摄入。

请注意

流只能用于三角洲生活表。

语法

read_kafka([option_key= >option_value](,])

参数

  • option_key:的名称选项配置。你必须使用引号(“)选项包含点()。

  • option_value:一个常数表达式设置选项。接受文字和标量函数。

返回

记录从一个Apache卡夫卡集群使用以下模式:

  • 关键二进制:卡夫卡记录的关键。

  • 价值二进制:卡夫卡的值记录。

  • 主题字符串:卡夫卡主题的名称记录被读取。

  • 分区INT:卡夫卡分区读取记录的ID。

  • 抵消长整型数字:抵消卡夫卡的记录的数量TopicPartition

  • 时间戳时间戳:记录时间戳值。的timestampType列定义了这个时间戳对应。

  • timestampType整数:时间戳中指定的类型时间戳列。

  • 数组< STRUCT <关键:字符串,值:二元> >:头值作为记录的一部分提供(如果启用)。

例子

——一个批处理查询阅读从一个主题。>选择价值:从read_kafka字符串值(bootstrapServers = > kafka_server: 9092,订阅= > '事件')限制10;——一个更高级的查询与卡夫卡的安全凭据。> SELECT * FROM read_kafka (bootstrapServers = > kafka_server: 9092,订阅= > '事件',startingOffsets = >“最早”、“kafka.security。' = > ' SASL_SSL协议”、“kafka.sasl。' = > '平原机制”、“kafka.sasl.jaas。配置' = > ' kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule需要用户名= " {USER_NAME}”密码={密码}“;”);——流摄入从卡夫卡JSON解析。> catalog.schema创建或刷新流表。raw_events选择价值::字符串:事件——提取字段“事件”to_timestamp(价值::字符串:ts) ts -提取字段“t”,从流read_kafka时间戳(bootstrapServers = > kafka_server: 9092,订阅= > '事件');

选项

你可以找到详细的选项列表Apache火花文档

需要选择

为连接到您的卡夫卡集群提供下面的选项。

选项

bootstrapServers

类型:字符串

一个以逗号分隔的主机/端口对指向卡夫卡集群。

默认值:无

提供以下选项中只有一个配置,将数据从卡夫卡主题。

选项

分配

类型:字符串

一个JSON字符串,其中包含的具体topic-partitions消费。例如,对于“{”局部药”:[0,1],“topicB”: (2、4)}”,局部药是0号和1号分区将消耗。

默认值:无

订阅

类型:字符串

卡夫卡主题阅读的逗号分隔列表。

默认值:无

subscribePattern

类型:字符串

一个正则表达式匹配主题订阅。

默认值:无

杂项选项

read_kafka可用于批量查询以及在线查询。下面的选项指定它们适用于哪种类型的查询。

选项

endingOffsets

类型:字符串查询类型:批处理

补偿阅读,直到一批查询“最新”指定最新的记录,或一个JSON字符串指定为每个TopicPartition终结抵消。在JSON,1作为一个偏移量可以用来参考最新。2(早期)作为一个偏移量是不允许的。

默认值:“最新”

endingOffsetsByTimestamp

类型:字符串查询类型:批处理

一个JSON字符串指定一个时间戳为每个TopicPartition读,直到结束。需要提供的时间戳作为长时间戳值以毫秒为单位1970-01-01就是UTC例如,1686444353000。看到请注意下面行为的细节和时间戳。endingOffsetsByTimestamp优先于endingOffsets

默认值:无

endingTimestamp

类型:字符串查询类型:批处理

一个字符串值的时间戳以毫秒为单位1970-01-01就是UTC例如,“1686444353000”。如果卡夫卡不返回匹配的偏移,偏移量将被设置为最新。看到请注意下面行为的细节和时间戳。注意:endingTimestamp优先于endingOffsetsByTimestampendingOffsets

默认值:无

includeHeaders

类型:布尔查询类型:流媒体和批处理

是否包括卡夫卡标题行。

默认值:

卡夫卡。< consumer_option >

类型:字符串查询类型:流媒体和批处理

任何卡夫卡的消费者可以通过特定的选项卡夫卡。前缀。这些选项时需要引号包围,否则你会得到一个解析器错误。你可以找到在卡夫卡的选项文档

注意:你不应该用这个函数设置以下选项:key.deserializer,value.deserializer,bootstrap.servers,group.id

默认值:无

maxOffsetsPerTrigger

类型:查询类型:流媒体

速度限制的最大数量补偿或行处理每触发间隔。指定的偏移量的总数将在TopicPartitions比例分割。

默认值:无

startingOffsets

类型:字符串查询类型:流媒体和批处理

查询时的起点开始,“最早”从最早的偏移量,“最新”也就是从最新的补偿,或为每个TopicPartition JSON字符串指定的起始偏移量。在JSON,2作为一个抵消最早可以用来参考,1最新的。

注意:对于批处理查询,最新(隐式或通过使用1以JSON)是不允许的。对于流媒体查询,这只适用于当启动一个新的查询。重启流查询将继续从查询中定义的补偿检查站。新发现的分区在查询将从最早开始。

默认值:“最新”流,“最早”对批处理

startingOffsetsByTimestamp

类型:字符串查询类型:流媒体和批处理

一个JSON字符串指定为每个TopicPartition开始时间戳。需要提供的时间戳作为长时间戳值以毫秒为单位1970-01-01就是UTC例如,1686444353000。看到请注意下面行为的细节和时间戳。如果卡夫卡不返回匹配的抵消,这种行为将遵循的价值选择startingOffsetsByTimestampStrategystartingOffsetsByTimestamp优先于startingOffsets

注意:对于流媒体查询,这只适用于当启动一个新的查询。重启流查询将继续从查询中定义的补偿检查站。新发现的分区在查询将从最早开始。

默认值:无

startingOffsetsByTimestampStrategy

类型:字符串查询类型:流媒体和批处理

抵消这一策略时使用指定的开始时间戳(全球或每个分区)和返回的抵消卡夫卡不匹配。可用的策略是:

  • “错误”:失败的查询

    • “最新”:分配的最新抵消这些分区火花可以阅读从这些分区后micro-batches更新记录。

默认值:“错误”

startingTimestamp

类型:字符串查询类型:流媒体和批处理

一个字符串值的时间戳以毫秒为单位1970-01-01就是UTC例如,“1686444353000”。看到请注意下面行为的细节和时间戳。如果卡夫卡不返回匹配的抵消,这种行为将遵循的价值选择startingOffsetsByTimestampStrategystartingTimestamp优先于startingOffsetsByTimestampstartingOffsets

注意:对于流媒体查询,这只适用于当启动一个新的查询。重启流查询将继续从查询中定义的补偿检查站。新发现的分区在查询将从最早开始。

默认值:无

请注意

返回的偏移量为每个分区是最早的抵消的时间戳大于或等于给定的时间戳在相应的分区。在不同行为选择之间如果卡夫卡不返回匹配的抵消,检查每个选项的描述。

火花只是通过时间戳信息KafkaConsumer.offsetsForTimes,原因不解释或价值。为更多的细节KafkaConsumer.offsetsForTimes,请参考文档)。此外,时间戳的意义在这里可以根据卡夫卡不同配置(log.message.timestamp.type)。有关详细信息,请参见Apache卡夫卡文档