亚马逊运动

用于结构化流的Kinesis连接器包含在Databricks Runtime中。

使用Amazon Kinesis进行认证

对于Kinesis的身份验证,我们使用Amazon的默认凭证提供者链默认情况下。我们建议使用一个可以访问Kinesis的实例配置文件启动Databricks集群。如果希望使用密钥进行访问,可以使用选项提供密钥awsAccessKey而且awsSecretKey

你也可以承担IAM角色使用roleArn选择。您可以选择使用指定外部IDroleExternalId和会话名roleSessionName.为了承担角色,您可以使用承担角色的权限启动集群,或者通过提供访问密钥awsAccessKey而且awsSecretKey.对于跨帐户身份验证,我们建议使用roleArn以持有所承担的角色,然后可以通过Databricks AWS帐户承担该角色。有关跨帐户身份验证的详细信息,请参见使用IAM角色跨AWS帐户授权访问

请注意

运动源要求ListShardsGetRecords,GetShardIterator权限。如果你遇到亚马逊:访问否认异常时,检查您的用户或配置文件是否具有这些权限。看到通过IAM控制对Amazon Kinesis数据流资源的访问欲知详情。

模式

记录的模式是:

类型

partitionKey

字符串

数据

二进制

字符串

shardId

字符串

sequenceNumber

字符串

approximateArrivalTimestamp

时间戳

使用数据帧操作(铸造(“字符串”), udfs)来显式反序列化数据列。

快速入门

让我们从一个简单的例子开始:WordCount。下面的笔记本演示了如何使用Kinesis的结构化流运行WordCount。

Kinesis WordCount与结构化流笔记本

在新标签页打开笔记本

配置

警告

由于Kinesis执行的速率限制和Kinesis API的限制,执行一次触发器(Trigger.Once ())不被Kinesis支持。

下面是用于指定要读取的数据的最重要配置。

选项

价值

默认的

描述

streamName

以逗号分隔的流名列表。

无(必需参数)

要订阅的流名称。

地区

要指定的流的区域。

局部分辨区域

定义流的区域。

端点

Kinesis数据流的区域。

局部分辨区域

Kinesis数据流的区域端点。

initialPosition

Latest、trim_horizon、最早(trim_horizon的别名)、at_timestamp

最新的

从流中的哪里开始读取。

对于生产方面的考虑,请回顾关键技术考虑因素和最佳实践

在某个时间点开始阅读

请注意

此特性在Databricks Runtime 7.3 LTS及以上版本上可用。

要在某个时间点开始阅读,可以使用at_timestamp的值。initialPosition选择。您可以将值指定为JSON字符串,例如{“at_timestamp”:“06/25/202010:23:45PDT "}.流查询将读取给定时间戳(包括)时或之后的所有更改。它使用Java默认格式解析时间戳。你可以通过在JSON字符串中提供一个额外的字段显式地指定格式,例如:

火花readStream格式“运动”选项“streamName”kinesisStreamName选项“地区”kinesisRegion选项“initialPosition”'{"at_timestamp": "06/25/2020 10:23:45 PDT", "format": "MM/dd/yyyy HH: MM:ss ZZZ"}'选项“awsAccessKey”awsAccessKeyId选项“awsSecretKey”awsSecretKey负载()

此外,还有用于控制从Kinesis读取的吞吐量和延迟的配置。Kinesis源在后台线程中运行Spark作业,定期预取Kinesis数据并将其缓存到Spark执行器的内存中。只有在每个预取步骤完成后,流查询才会处理缓存的数据,并使数据可用于处理。因此,这个预取步骤决定了很多观察到的端到端延迟和吞吐量。您可以通过以下选项控制性能。

选项

价值

默认的

描述

maxRecordsPerFetch

一个正整数。

10000年

每个API请求要读取多少记录到Kinesis。返回的记录数量实际上可能更高,这取决于是否使用Kinesis Producer Library将子记录聚合为单个记录。

maxFetchRate

正十进制,表示数据速率,单位为MB/s。

1.0(最大= 2.0)

每个分片预取数据的速度。这是为了限制取回的速率,避免运动节流。2.0 MB/s是Kinesis允许的最大速率。

minFetchPeriod

例如,一个持续时间字符串,1一秒钟。

400ms (min = 200ms)

连续预取间隔时间。这是为了限制取回的频率并避免Kinesis节流。200ms是最小值,因为kineesis最多允许5次/秒的抓取。

maxFetchDuration

例如,一个持续时间字符串,1米1分钟。

十年代

在使预取的新数据可用于处理之前缓冲多长时间。

fetchBufferSize

例如,一个字节字符串,2 gb10 mb

20 gb

为下一次触发器缓冲多少数据。该值用作停止条件,而不是严格的上限,因此可能会缓存比指定的值更多的数据。

shardsPerTask

一个正整数。

5

每个Spark任务并行预取多少个Kinesis分片。在理想的情况下集群> =运动碎片/shardsPerTask对于最小的查询延迟和最大的资源使用。

shardFetchInterval

例如,一个持续时间字符串,2米2分钟。

1

多久轮询一次Kinesis进行重分片。

awsAccessKey

字符串

没有违约。

AWS访问密钥。

awsSecretKey

字符串

没有违约。

与接入密钥对应的AWS秘密访问密钥。

roleArn

字符串

没有违约。

访问Kinesis时要承担的角色的Amazon资源名(ARN)。

roleExternalId

字符串

没有违约。

可选值,可在将访问权限委托给AWS帐户时使用。看到如何使用外部ID

roleSessionName

字符串

没有违约。

假设角色会话的标识符,当不同主体或出于不同原因假设相同角色时,该标识符唯一标识会话。

coalesceThresholdBlockSize

一个正整数。

10000000年

自动合并发生的阈值。如果平均块大小小于此值,则预取的块将合并到coalesceBinSize

coalesceBinSize

一个正整数。

128000000年

合并后的大致块大小。

请注意

选项的默认值已经被选择,这样两个读取器(Spark或其他)可以同时使用Kinesis流而不会达到Kinesis速率限制。如果你有更多的消费者,你必须相应地调整选项。例如,你可能不得不减少maxFetchRate,并增加minFetchPeriod

以下是针对特定用例的一些建议配置。

从Kinesis到S3的ETL

当您对长期存储执行ETL时,您希望使用少量的大文件。在这种情况下,您可能需要设置一个较大的流触发间隔,例如5-10分钟。此外,你可能想要增加你的薪水maxFetchDuration这样你就可以缓冲在处理过程中会被写出来的大块,并增加fetchBufferSize这样你就不会在触发器之间过早停止抓取,并开始在你的流中落后。

低延迟监控和警报

当您有一个警报用例时,您希望更低的延迟。要做到这一点:

  • 确保Kinesis流只有一个消费者(也就是说,只有你的流查询,没有其他人),这样我们就可以优化你唯一的流查询,以便在不遇到Kinesis速率限制的情况下尽可能快地获取。

  • 设置选项maxFetchDuration到一个很小的值(比如,200毫秒),以尽快开始处理撷取的资料。

  • 设置选项minFetchPeriod210毫秒尽可能频繁地获取。

  • 设置选项shardsPerTask或者将集群配置为集群> =2(#运动碎片)/shardsPerTask.这样可以保证后台预取任务和流查询任务可以并发执行。

如果您看到您的查询每5秒接收一次数据,那么很可能就是这样达到运动速率限制.检查您的配置。

警告

如果删除并重新创建Kinesis流,则不能重用任何现有检查点目录来重新启动流查询。您必须删除检查点目录并从头开始这些查询。

指标

请注意

在Databricks Runtime 8.1及以上版本中可用。

Kinesis报告了消费者在每个工作空间的流开始后落后的毫秒数。您可以获得流查询过程中所有工作空间的毫秒数的平均值、最小值和最大值(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reading-metrics-interactively)作为avgMsBehindLatestmaxMsBehindLatest,minMsBehindLatest指标。如果在笔记本中运行流,则可以在原始数据页中的流查询进度仪表板

“源”“描述”“KinesisV2(流)”“指标”“avgMsBehindLatest”“32000.0”“maxMsBehindLatest”“32000”“minMsBehindLatest”“32000”},

写入到Kinesis

下面的代码片段可以用作ForeachSink向Kinesis写入数据。它需要数据集[(字符串,数组(字节)))

请注意

下面的代码片段提供至少一次语义上的,不止一次。

Kinesis Foreach水槽笔记本

在新标签页打开笔记本