订阅谷歌Pub / Sub

砖连接器提供了一个内置的订阅谷歌在砖Pub / Sub运行时13.1及以上。这个连接器提供了记录的用户只有一次处理语义。

请注意

Pub / Sub可能发布重复的记录,可能到达用户的订单记录。你应该写砖代码来处理的重复和无序的记录。

语法的例子

如果你有一个谷歌服务帐户有足够特权附加到集群中,您可以使用以下配置的基本语法结构流读取Pub / Sub。看到谷歌服务帐户

瓦尔查询=火花readStream格式(“pubsub”)/ /如果没有,我们将创建一个Pubsub订阅id选项(“subscriptionId”,“mysub”)/ /需要选项(“topicId”,“fe-demo-prod-dnd”)/ /需要选项(“projectId”,“fe-prod-dbx”)/ /需要负载()

你也可以直接通过授权选项,如以下示例:

瓦尔authOptions:地图(字符串,字符串]=地图(“clientId”- >clientId,“clientEmail”- >clientEmail,“privateKey”- >privateKey,“privateKeyId”- >privateKeyId)瓦尔查询=火花readStream格式(“pubsub”)/ /如果没有,我们将创建一个Pubsub订阅id选项(“subscriptionId”,“mysub”)/ /需要选项(“topicId”,“mytopic”)/ /需要选项(“projectId”,“。”)/ /需要选项(authOptions)负载()

更多的配置选项,见为Pub / Sub流读取配置选项

配置访问Pub / Sub

砖建议使用谷歌的服务帐户(GSA)来管理连接Pub / Sub。

当使用GSA,您不需要提供额外的授权选项直接流。

请注意

gsa不支持在计算配置共享访问模式。

砖时建议使用秘密提供授权选项。以下选项需要授权连接:

  • clientEmail

  • clientId

  • privateKey

  • privateKeyId

下表描述了角色配置所需的凭证:

角色

必需的或可选的

如何使用它

角色/ pubsub.viewer角色/查看器

要求

检查是否存在订阅,订阅

角色/ pubsub.subscriber

要求

获取的数据订阅

角色/ pubsub.editor角色/编辑器

可选

允许创建订阅如果不存在,也可以使用deleteSubscriptionOnStreamStop删除订阅流终止

发布/订阅模式

流的模式匹配从Pub / Sub获取记录,如下表所述:

类型

消息id

StringType

有效载荷

ArrayType [ByteType]

属性

StringType

publishTimestampInMillis

LongType

为Pub / Sub流读取配置选项

下表描述了支持发布/订阅的选项。所有选项配置作为结构的一部分流阅读使用.option (“< optionName >”,“<用optionValue >”)语法。

请注意

一些Pub / Sub配置选项使用的概念获取而不是micro-batches。这反映了内部实现细节和选择工作类似于推论其他结构化流连接器,除了记录获取,然后处理。

选项

默认值

描述

numFetchPartitions

初始化设置的执行人流

并行火花任务获取记录的数量从一个订阅。

deleteSubscriptionOnStreamStop

如果真正的,订阅传递到流流工作结束的时候被删除。

maxBytesPerTrigger

没有一个

批量大小的软限制在每个处理micro-batch触发。

maxRecordsPerFetch

1000年

在处理之前记录的数量来获取每个任务记录。

maxFetchPeriod

10秒

每个任务的时间获取之前处理记录。砖建议使用默认值。

增量为Pub / Sub批处理语义

您可以使用Trigger.AvailableNow消费记录可用Pub / Sub一批增量来源。

砖记录时间戳的,当你开始阅读的Trigger.AvailableNow设置。记录由批处理包括所有之前获取数据和任何新出版的记录一个时间戳不到记录流开始时间戳。

看到配置增量的批处理

监测流指标

结构化流进展量度报告获取的记录数量和准备过程,记录的大小获取并准备过程中,流开始以来和副本的数量。下面是一个例子,这些指标:

“指标”:{“numDuplicatesSinceStreamStart”:“1”,“numRecordsReadyToProcess”:“1”,“sizeOfRecordsReadyToProcess”:“8”}

限制

投机执行(spark.speculation)不支持发布/订阅。