亚马逊运动
用于结构化流的Kinesis连接器包含在Databricks Runtime中。
使用Amazon Kinesis进行认证
对于Kinesis的身份验证,我们使用Amazon的默认凭证提供者链默认情况下。我们建议使用一个可以访问Kinesis的实例配置文件启动Databricks集群。如果希望使用密钥进行访问,可以使用选项提供密钥awsAccessKey
而且awsSecretKey
.
你也可以承担IAM角色使用roleArn
选择。您可以选择使用指定外部IDroleExternalId
和会话名roleSessionName
.为了承担角色,您可以使用承担角色的权限启动集群,或者通过提供访问密钥awsAccessKey
而且awsSecretKey
.对于跨帐户身份验证,我们建议使用roleArn
以持有所承担的角色,然后可以通过Databricks AWS帐户承担该角色。有关跨帐户身份验证的详细信息,请参见使用IAM角色跨AWS帐户授权访问.
请注意
运动源要求ListShards
,GetRecords
,GetShardIterator
权限。如果你遇到亚马逊:访问否认
异常时,检查您的用户或配置文件是否具有这些权限。看到通过IAM控制对Amazon Kinesis数据流资源的访问欲知详情。
模式
记录的模式是:
列 |
类型 |
---|---|
partitionKey |
字符串 |
数据 |
二进制 |
流 |
字符串 |
shardId |
字符串 |
sequenceNumber |
字符串 |
approximateArrivalTimestamp |
时间戳 |
使用数据帧操作(铸造(“字符串”)
, udfs)来显式反序列化数据
列。
配置
警告
由于Kinesis执行的速率限制和Kinesis API的限制,执行一次触发器(Trigger.Once ()
)不被Kinesis支持。
下面是用于指定要读取的数据的最重要配置。
选项 |
价值 |
默认的 |
描述 |
---|---|---|---|
streamName |
以逗号分隔的流名列表。 |
无(必需参数) |
要订阅的流名称。 |
地区 |
要指定的流的区域。 |
局部分辨区域 |
定义流的区域。 |
端点 |
Kinesis数据流的区域。 |
局部分辨区域 |
Kinesis数据流的区域端点。 |
initialPosition |
Latest、trim_horizon、最早(trim_horizon的别名)、 |
最新的 |
从流中的哪里开始读取。 |
对于生产方面的考虑,请回顾关键技术考虑因素和最佳实践.
在某个时间点开始阅读
请注意
此特性在Databricks Runtime 7.3 LTS及以上版本上可用。
要在某个时间点开始阅读,可以使用at_timestamp
的值。initialPosition
选择。您可以将值指定为JSON字符串,例如{“at_timestamp”:“06/25/202010:23:45PDT "}
.流查询将读取给定时间戳(包括)时或之后的所有更改。它使用Java默认格式解析时间戳。你可以通过在JSON字符串中提供一个额外的字段显式地指定格式,例如:
(火花.readStream.格式(“运动”).选项(“streamName”,kinesisStreamName).选项(“地区”,kinesisRegion).选项(“initialPosition”,'{"at_timestamp": "06/25/2020 10:23:45 PDT", "format": "MM/dd/yyyy HH: MM:ss ZZZ"}').选项(“awsAccessKey”,awsAccessKeyId).选项(“awsSecretKey”,awsSecretKey).负载())
此外,还有用于控制从Kinesis读取的吞吐量和延迟的配置。Kinesis源在后台线程中运行Spark作业,定期预取Kinesis数据并将其缓存到Spark执行器的内存中。只有在每个预取步骤完成后,流查询才会处理缓存的数据,并使数据可用于处理。因此,这个预取步骤决定了很多观察到的端到端延迟和吞吐量。您可以通过以下选项控制性能。
选项 |
价值 |
默认的 |
描述 |
---|---|---|---|
maxRecordsPerFetch |
一个正整数。 |
10000年 |
每个API请求要读取多少记录到Kinesis。返回的记录数量实际上可能更高,这取决于是否使用Kinesis Producer Library将子记录聚合为单个记录。 |
maxFetchRate |
正十进制,表示数据速率,单位为MB/s。 |
1.0(最大= 2.0) |
每个分片预取数据的速度。这是为了限制取回的速率,避免运动节流。2.0 MB/s是Kinesis允许的最大速率。 |
minFetchPeriod |
例如,一个持续时间字符串, |
400ms (min = 200ms) |
连续预取间隔时间。这是为了限制取回的频率并避免Kinesis节流。200ms是最小值,因为kineesis最多允许5次/秒的抓取。 |
maxFetchDuration |
例如,一个持续时间字符串, |
十年代 |
在使预取的新数据可用于处理之前缓冲多长时间。 |
fetchBufferSize |
例如,一个字节字符串, |
20 gb |
为下一次触发器缓冲多少数据。该值用作停止条件,而不是严格的上限,因此可能会缓存比指定的值更多的数据。 |
shardsPerTask |
一个正整数。 |
5 |
每个Spark任务并行预取多少个Kinesis分片。在理想的情况下 |
shardFetchInterval |
例如,一个持续时间字符串, |
1 |
多久轮询一次Kinesis进行重分片。 |
awsAccessKey |
字符串 |
没有违约。 |
AWS访问密钥。 |
awsSecretKey |
字符串 |
没有违约。 |
与接入密钥对应的AWS秘密访问密钥。 |
roleArn |
字符串 |
没有违约。 |
访问Kinesis时要承担的角色的Amazon资源名(ARN)。 |
roleExternalId |
字符串 |
没有违约。 |
可选值,可在将访问权限委托给AWS帐户时使用。看到如何使用外部ID. |
roleSessionName |
字符串 |
没有违约。 |
假设角色会话的标识符,当不同主体或出于不同原因假设相同角色时,该标识符唯一标识会话。 |
coalesceThresholdBlockSize |
一个正整数。 |
10000000年 |
自动合并发生的阈值。如果平均块大小小于此值,则预取的块将合并到 |
coalesceBinSize |
一个正整数。 |
128000000年 |
合并后的大致块大小。 |
请注意
选项的默认值已经被选择,这样两个读取器(Spark或其他)可以同时使用Kinesis流而不会达到Kinesis速率限制。如果你有更多的消费者,你必须相应地调整选项。例如,你可能不得不减少maxFetchRate
,并增加minFetchPeriod
.
以下是针对特定用例的一些建议配置。
从Kinesis到S3的ETL
当您对长期存储执行ETL时,您希望使用少量的大文件。在这种情况下,您可能需要设置一个较大的流触发间隔,例如5-10分钟。此外,你可能想要增加你的薪水maxFetchDuration
这样你就可以缓冲在处理过程中会被写出来的大块,并增加fetchBufferSize
这样你就不会在触发器之间过早停止抓取,并开始在你的流中落后。
低延迟监控和警报
当您有一个警报用例时,您希望更低的延迟。要做到这一点:
确保Kinesis流只有一个消费者(也就是说,只有你的流查询,没有其他人),这样我们就可以优化你唯一的流查询,以便在不遇到Kinesis速率限制的情况下尽可能快地获取。
设置选项
maxFetchDuration
到一个很小的值(比如,200毫秒
),以尽快开始处理撷取的资料。设置选项
minFetchPeriod
来210毫秒
尽可能频繁地获取。设置选项
shardsPerTask
或者将集群配置为#核在集群> =2*(#运动碎片)/shardsPerTask
.这样可以保证后台预取任务和流查询任务可以并发执行。
如果您看到您的查询每5秒接收一次数据,那么很可能就是这样达到运动速率限制.检查您的配置。
警告
如果删除并重新创建Kinesis流,则不能重用任何现有检查点目录来重新启动流查询。您必须删除检查点目录并从头开始这些查询。
指标
请注意
在Databricks Runtime 8.1及以上版本中可用。
Kinesis报告了消费者在每个工作空间的流开始后落后的毫秒数。您可以获得流查询过程中所有工作空间的毫秒数的平均值、最小值和最大值(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reading-metrics-interactively)作为avgMsBehindLatest
,maxMsBehindLatest
,minMsBehindLatest
指标。如果在笔记本中运行流,则可以在原始数据页中的流查询进度仪表板:
{“源”:[{“描述”:“KinesisV2(流)”,“指标”:{“avgMsBehindLatest”:“32000.0”,“maxMsBehindLatest”:“32000”,“minMsBehindLatest”:“32000”},}]}