开始
加载和管理数据
处理数据
政府
引用和资源
更新2月3日,2023年
给我们反馈
这包含常见的笔记本和代码示例模式处理结构化流砖。
这两个笔记本展示如何使用DataFrame API构建结构化的流媒体应用程序在Python和Scala。
在新标签页打开笔记本
Apache Cassandra是一个分布式的、低延迟、可伸缩的高度可用的OLTP数据库。
结构化流与卡桑德拉通过火花卡桑德拉连接器。这个连接器支持抽样和DataFrame api,它有原生支持编写流数据。*重要*您必须使用相应的版本的spark-cassandra-connector-assembly。
下面的示例连接到一个或多个主机在卡桑德拉数据库集群。它还指定了连接配置如检查点位置和具体用于和表名:
火花。相依。集(“spark.cassandra.connection.host”,“host1 host2”)df。writeStream\。格式(“org.apache.spark.sql.cassandra”)\。outputMode(“添加”)\。选项(“checkpointLocation”,“/道路/ /检查站”)\。选项(“用于”,“keyspace_name”)\。选项(“表”,“table_name”)\。开始()
foreachBatch ()
streamingDF.writeStream.foreachBatch ()允许重用现有的一批作家写的输出数据流查询Azure突触分析。看到foreachBatch文档获取详细信息。
streamingDF.writeStream.foreachBatch ()
要运行这个示例,您需要Azure突触分析连接器。在Azure突触分析连接器的详细信息,请参见在Azure突触分析查询数据。
从pyspark.sql.functions进口*从pyspark.sql进口*defwriteToSQLWarehouse(df,epochId):df。写\。格式(“com.databricks.spark.sqldw”)\。模式(“覆盖”)\。选项(“url”," jdbc::状态"置疑" / / < the-rest-of-the-connection-string >”)\。选项(“forward_spark_azure_storage_credentials”,“真正的”)\。选项(“数据表”,“my_table_in_dw_copy”)\。选项(“tempdir”,“wasbs: / / < your-container-name > @ < your-storage-account-name >.blob.core.windows.net/ < your-directory-name >”)\。保存()火花。相依。集(“spark.sql.shuffle.partitions”,“1”)查询=(火花。readStream。格式(“速度”)。负载()。selectExpr(“值% 10键”)。groupBy(“关键”)。数()。toDF(“关键”,“数”)。writeStream。foreachBatch(writeToSQLWarehouse)。outputMode(“更新”)。开始())
foreach ()
streamingDF.writeStream.foreach ()允许您编写的输出流查询到任意位置。
streamingDF.writeStream.foreach ()
这个例子展示了如何使用streamingDataFrame.writeStream.foreach ()在Python中给DynamoDB写信。第一步得到DynamoDB宝途资源。这个例子是用写的access_key和secret_key,但是砖建议你使用S3访问配置实例配置文件。
streamingDataFrame.writeStream.foreach ()
access_key
secret_key
定义一些辅助方法来创建DynamoDB表运行的例子。
table_name=“PythonForeachTest”defget_dynamodb():进口boto3access_key=“<访问密钥>”secret_key=“<秘密密钥>”地区=“<地区名称>”返回boto3。资源(“dynamodb”,aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=地区)defcreateTableIfNotExists():“‘创建一个DynamoDB表如果它不存在。这一定是火花驱动程序上运行,而不是在foreach。“‘dynamodb=get_dynamodb()existing_tables=dynamodb。元。客户端。list_tables()(的表名]如果table_name不在existing_tables:打印(“创建表% s”%table_name)表=dynamodb。create_table(的表=table_name,KeySchema=({“AttributeName”:“关键”,“KeyType”:“希”}),AttributeDefinitions=({“AttributeName”:“关键”,“AttributeType”:“年代”}),ProvisionedThroughput={“ReadCapacityUnits”:5,“WriteCapacityUnits”:5})打印(“等待表做好准备”)表。元。客户端。get_waiter(“table_exists”)。等待(的表=table_name)
定义的类和方法,写入DynamoDB并调用它们foreach。有两种方法来指定您的自定义逻辑foreach。
foreach
使用一个函数:这是最简单的方法,可以用来写一行。然而,客户端/连接初始化写一行将在每次调用完成。
defsendToDynamoDB_simple(行):“‘DynamoDB函数发送一行。当使用foreach,调用这个方法是遗嘱执行人与生成的输出行。“‘#创建客户端对象的执行者,#不使用客户端在驱动程序中创建的对象dynamodb=get_dynamodb()dynamodb。表(table_name)。put_item(项={“关键”:str(行(“关键”]),“数”:行(“数”]})
使用一个类开放,过程,关闭方法:这允许一个更高效的实现,客户端/连接初始化,可以编写多行。
开放
过程
关闭
类SendToDynamoDB_ForeachWriter:“‘类发送一组行DynamoDB。当使用foreach,这个类的副本将被用来写字多行执行人。看到“DataStreamWriter.foreach”的python文档为更多的细节。“‘def开放(自我,partition_id,epoch_id):#这叫做第一次当准备发送多个行。#把所有的初始化代码在open(),以便新鲜#复制这个类的初始化在开放的执行人()#将调用。自我。dynamodb=get_dynamodb()返回真正的def过程(自我,行):#这是要求每一行()被称为后开放。#这个实现发送一次一行。#为进一步增强,接触火花+ DynamoDB连接器#团队:https://github.com/audienceproject/spark-dynamodb自我。dynamodb。表(table_name)。put_item(项={“关键”:str(行(“关键”]),“数”:行(“数”]})def关闭(自我,犯错):#这叫做毕竟行处理。如果犯错:提高犯错
调用foreach在你流查询使用上面的函数或对象。
从pyspark.sql.functions进口*火花。相依。集(“spark.sql.shuffle.partitions”,“1”)查询=(火花。readStream。格式(“速度”)。负载()。selectExpr(“值% 10键”)。groupBy(“关键”)。数()。toDF(“关键”,“数”)。writeStream。foreach(SendToDynamoDB_ForeachWriter())# .foreach (sendToDynamoDB_simple) / /选择,使用一个或另一个。outputMode(“更新”)。开始())
这个例子展示了如何使用streamingDataFrame.writeStream.foreach ()在Scala中给DynamoDB写信。
这个你需要创建一个运行DynamoDB表都有一个单独的字符串键命名为“价值”。
定义的实现ForeachWriter接口执行写。
ForeachWriter
进口org。apache。火花。sql{。ForeachWriter,行}进口com。amazonaws。AmazonServiceException进口com。amazonaws。身份验证。_进口com。amazonaws。服务。dynamodbv2。AmazonDynamoDB进口com。amazonaws。服务。dynamodbv2。AmazonDynamoDBClientBuilder进口com。amazonaws。服务。dynamodbv2。模型。AttributeValue进口com。amazonaws。服务。dynamodbv2。模型。ResourceNotFoundException进口java。跑龙套。ArrayList进口scala。集合。JavaConverters。_类DynamoDbWriter扩展ForeachWriter(行]{私人瓦尔的表=“<表名称>”私人瓦尔accessKey=“< aws访问密钥>”私人瓦尔secretKey=“< aws密钥>”私人瓦尔regionName=“<地区>”/ /这个懒洋洋地将只有当初始化()开放懒惰的瓦尔ddb=AmazonDynamoDBClientBuilder。标准()。withCredentials(新AWSStaticCredentialsProvider(新BasicAWSCredentials(accessKey,secretKey)))。withRegion(regionName)。构建()/ // /这叫做第一次当准备发送多个行。/ /把所有的初始化代码内部开放的(),这样一个新鲜的/ /复制这个类的初始化在开放的执行人()/ /将被调用。/ /def开放(partitionId:长,epochId:长)={ddb/ /客户端的初始化真正的}/ // /这是要求每一行()被称为后开放。/ /这个实现发送一次一行。/ /一个更有效的实现可以发送一次批次的行。/ /def过程(行:行)={瓦尔rowAsMap=行。getValuesMap(行。模式。字段名)瓦尔dynamoItem=rowAsMap。mapValues{v:任何= >新AttributeValue(v。toString)}。asJavaddb。putItem(的表,dynamoItem)}/ // /这叫做毕竟行处理。/ /def关闭(errorOrNull:Throwable)={ddb。关闭()}}
使用DynamoDbWriter写速度流进DynamoDB。
DynamoDbWriter
火花。readStream。格式(“速度”)。负载()。选择(“价值”)。writeStream。foreach(新DynamoDbWriter)。开始()
以下笔记本显示您可以很容易地改变你的亚马逊CloudTrail日志从JSON拼花特别的高效查询。看到实时流ETL和结构化流获取详细信息。
这两个笔记本展示如何在Python和Scala使用stream-stream连接。