亚马逊运动

结构化流的运动连接器是包含在砖运行时。

验证与亚马逊运动

与运动进行身份验证,我们使用亚马逊的默认凭据提供程序链默认情况下。我们建议启动你的砖集群实例配置文件,可以访问动作。如果你想使用键访问,您可以使用选项提供awsAccessKeyawsSecretKey

你也可以假设一个我的角色使用roleArn选择。您可以选择指定外部IDroleExternalId和一个会话名称roleSessionName。为了承担的角色,您可以启动集群与权限承担角色或通过提供访问密钥awsAccessKeyawsSecretKey。cross-account认证,我们推荐使用roleArn假定的角色,然后可以通过你的砖AWS帐户。cross-account认证的更多信息,请参阅委托银行登录在AWS帐户使用我的角色

请注意

运动源需要ListShards,GetRecords,GetShardIterator权限。如果你遇到亚马逊:访问否认异常,检查你的用户或概要文件有这些权限。看到控制访问亚马逊运动数据流资源使用我为更多的细节。

模式

记录的模式是:

类型

partitionKey

字符串

数据

二进制

字符串

shardId

字符串

sequenceNumber

字符串

approximateArrivalTimestamp

时间戳

使用DataFrame操作(铸造(“字符串”)udf)显式地反序列化数据列。

快速入门

让我们先从一个简单的例子:WordCount。以下笔记本演示了如何使用结构化运行WordCount流运动。

与结构化流运动WordCount笔记本

在新标签页打开笔记本

配置

重要的

在砖运行时13.1及以上的,你可以使用Trigger.AvailableNow与运动。看到摄取动作记录作为增量的批处理

这里有最重要的配置指定读取的数据。

选项

价值

默认的

描述

streamName

一个以逗号分隔的名称。

(没有一个所需的参数)

订阅流的名字。

地区

指定流区域。

在本地解决地区

流中定义的区域。

端点

地区的运动数据流。

在本地解决地区

运动区域端点数据流。

initialPosition

最新、trim_horizon最早trim_horizon(化名),at_timestamp

最新的

从哪里开始阅读的流。

生产注意事项,审查关键技术因素和最佳实践

在时间点上开始阅读

请注意

这个特性可以在砖运行时7.3 LTS及以上。

开始阅读的时候,你可以使用一个at_timestampinitialPosition选择。您指定的值作为一个JSON字符串,如{“at_timestamp”:“06/25/202010:23:45PDT "}。流查询将读取所有更改或在给定的时间戳(包容)。它使用Java的默认格式解析时间戳。您可以显式地指定格式通过提供JSON字符串中的一个额外字段,例如:

(火花readStream格式(“运动”)选项(“streamName”,kinesisStreamName)选项(“地区”,kinesisRegion)选项(“initialPosition”,”{at_timestamp”:“06/25/2020 10:23:45 PDT”、“格式”:“MM / dd / yyyy HH: MM: ss ZZZ”}”)选项(“awsAccessKey”,awsAccessKeyId)选项(“awsSecretKey”,awsSecretKey)负载())

此外,还有配置从运动控制阅读的吞吐量和延迟。运动源在一个后台线程运行引发工作定期预抓取动作的内存和缓存的数据引发执行人。缓存数据流查询处理每个预取步骤完成后,使数据进行处理。因此,这个预取步骤决定了许多观察到的端到端延时和吞吐量。你可以使用以下选项控制性能。

选项

价值

默认的

描述

maxRecordsPerFetch

一个正整数。

10000年

有多少记录读取/ API请求动作。返回的记录数可能是高取决于sub-records聚合到单个记录使用运动生产商库。

maxFetchRate

积极的十进制表示数据率MB / s。

1.0 (max = 2.0)

每切分速度预取数据。这是限制获取速度,避免运动调节。2.0 MB / s是动作允许的最大速率。

minFetchPeriod

例如,一个时间字符串11秒。

400 ms (min = 200毫秒)

多长时间等之间的连续预取的尝试。这是限制获取的频率,避免运动节流。200 ms是最低动作最多允许5获取/秒。

maxFetchDuration

例如,一个时间字符串1米1分钟。

十年代

多久之前缓冲预取的新的数据使其可用于处理。

fetchBufferSize

一个字节的字符串,例如,2 gb10 mb

20 gb

多少数据缓冲区为下一个触发器。这是作为一个停止条件,而不是一个严格的上限,因此更多的数据可能比指定的缓冲这个值。

shardsPerTask

一个正整数。

5

有多少运动碎片从每火花任务并行预取。在理想的情况下#集群> =#运动碎片/shardsPerTask最小查询延迟和最大资源使用情况。

shardFetchInterval

例如,一个时间字符串2米为2分钟。

1

为重新切分调查运动的频率。

awsAccessKey

字符串

没有违约。

AWS访问密钥。

awsSecretKey

字符串

没有违约。

AWS秘密访问密钥对应的访问密钥。

roleArn

字符串

没有违约。

亚马逊资源名(攻击)的角色承担当访问动作。

roleExternalId

字符串

没有违约。

可以使用一个可选值,当授权访问AWS帐户。看到如何使用外部ID吗

roleSessionName

字符串

没有违约。

一个标识符假设角色的会话,惟一地标识一个会话时相同的角色是由不同的主体或不同的原因。

coalesceThresholdBlockSize

一个正整数。

10000000年

的阈值自动合并发生。如果平均块大小小于此值,预取块合并向coalesceBinSize

coalesceBinSize

一个正整数。

128000000年

合并后的近似的块大小。

请注意

默认值已被选定的选项,这样两个读者(火花或其他)可以同时使用一个运动流没有触及运动速度限制。如果你有更多的消费者,你必须相应地调整选项。例如,您可能会减少maxFetchRate,增加minFetchPeriod

这里有一些建议配置特定的用例。

ETL从运动到S3

当你执行ETL长期存储,你宁愿有少量的大文件。在这种情况下,您可能希望设置一个大型流触发间隔,例如,5 - 10分钟。此外,您可能想要增加你的maxFetchDuration这样你缓冲大块将要写入处理期间,和增加fetchBufferSize所以你不要停止抓取过早在触发器之间,并开始落后在你流。

低延迟监控和报警

当你有一个报警用例,你会想要更低的延迟。实现:

  • 确保只有一个消费者(也就是说,只有你流查询和没有人)的运动流,这样我们就可以优化你的只有流查询获取尽可能快没有跑到运动速度限制。

  • 设置选项maxFetchDuration小值(说,200毫秒)开始尽可能快地处理获取数据。

  • 设置选项minFetchPeriod210毫秒获取尽可能经常。

  • 设置选项shardsPerTask或配置集群等#集群> =2*(#运动碎片)/shardsPerTask。这将确保背景预取和流媒体查询任务可以并行执行任务。

如果你看到,你查询接收数据每5秒,那么很可能你击球动作速度限制。回顾你的配置。

警告

如果你删除并重新创建一个动作流,你不能重用任何现有的检查点目录重新启动流查询。你必须删除目录和检查站这些查询从头开始。

指标

请注意

在砖运行时8.1及以上。

运动报道的毫秒数消费者已经落后于流的开始为每个工作区。你可以得到平均、最小和最大的毫秒数在所有的工作区流查询过程中(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html reading-metrics-interactively)作为avgMsBehindLatest,maxMsBehindLatest,minMsBehindLatest指标。如果您正在运行流在一个笔记本,你可以看到这些度量标准下原始数据流查询进展仪表板选项卡:

{“源”:({“描述”:“KinesisV2(流)”,“指标”:{“avgMsBehindLatest”:“32000.0”,“maxMsBehindLatest”:“32000”,“minMsBehindLatest”:“32000”},}]}

摄取动作记录作为增量的批处理

在砖运行时的13.1及以上,砖支持使用Trigger.AvailableNow与运动数据源增量批语义。下面描述了基本配置:

  1. 现在micro-batch读触发时可用模式,当前时间记录的砖客户机。

  2. 砖民调的源系统之间的所有记录时间戳记录时间和前一个检查点。

  3. 砖加载这些记录使用Trigger.AvailableNow语义。

请注意

砖是最好的尝试使用所有记录在消息队列资源存在阅读时阅读。因为小的潜在差异在时间戳和缺乏保证数据源的排序,一些记录可能不包括在批触发。省略了记录下触发micro-batch加工的一部分。

看到配置增量的批处理

写信给运动

下面的代码片段可以用作ForeachSink写数据来运动。它需要一个数据集[(字符串,数组(字节)))

请注意

下面的代码片段至少一次语义,不是一次。

运动Foreach水槽笔记本

在新标签页打开笔记本