开始
加载和管理数据
处理数据
政府
参考和资源
2023年1月20日更新
给我们反馈
它包含笔记本和代码示例,用于在数据库里使用结构化流的常见模式。
这两本笔记本展示了如何使用DataFrame API在Python和Scala中构建结构化流应用程序。
在新标签页打开笔记本
Apache Cassandra是一个分布式、低延迟、可伸缩、高可用的OLTP数据库。
结构化流与Cassandra通过Spark Cassandra连接器.该连接器同时支持RDD和DataFrame api,并且具有写入流数据的本机支持。*重要的对应版本spark-cassandra-connector-assembly.
下面的示例连接到Cassandra数据库集群中的一个或多个主机。它还指定连接配置,如检查点位置和特定的键空间和表名:
火花.相依.集(“spark.cassandra.connection.host”,“host1 host2”)df.writeStream\.格式(“org.apache.spark.sql.cassandra”)\.outputMode(“添加”)\.选项(“checkpointLocation”,“/道路/ /检查站”)\.选项(“用于”,“keyspace_name”)\.选项(“表”,“table_name”)\.开始()
foreachBatch ()
streamingDF.writeStream.foreachBatch ()允许您重用现有的批处理数据写入器,将流查询的输出写入Azure Synapse Analytics。看到foreachBatch文档获取详细信息。
streamingDF.writeStream.foreachBatch ()
要运行此示例,您需要Azure Synapse Analytics连接器。有关Azure Synapse Analytics连接器的详细信息,请参见在Azure Synapse Analytics中查询数据.
从pyspark.sql.functions进口*从pyspark.sql进口*defwriteToSQLWarehouse(df,epochId):df.写\.格式(“com.databricks.spark.sqldw”)\.模式(“覆盖”)\.选项(“url”," jdbc::状态"置疑" / / < the-rest-of-the-connection-string >”)\.选项(“forward_spark_azure_storage_credentials”,“真正的”)\.选项(“数据表”,“my_table_in_dw_copy”)\.选项(“tempdir”,“wasbs: / / < your-container-name > @ < your-storage-account-name > .blob.core.windows.net/ < your-directory-name >”)\.保存()火花.相依.集(“spark.sql.shuffle.partitions”,“1”)查询=(火花.readStream.格式(“速度”).负载().selectExpr("值% 10为键").groupBy(“关键”).数().toDF(“关键”,“数”).writeStream.foreachBatch(writeToSQLWarehouse).outputMode(“更新”).开始())
foreach ()
streamingDF.writeStream.foreach ()允许您将流查询的输出写入任意位置。
streamingDF.writeStream.foreach ()
这个例子展示了如何使用streamingDataFrame.writeStream.foreach ()在Python中写入DynamoDB。第一步获取DynamoDB boto资源。这个例子是为使用而编写的access_key而且secret_key,但Databricks建议您使用使用实例概要配置S3访问.
streamingDataFrame.writeStream.foreach ()
access_key
secret_key
定义几个辅助方法来创建运行示例的DynamoDB表。
table_name=“PythonForeachTest”defget_dynamodb():进口boto3access_key=“< >访问关键”secret_key=“<密钥>”地区=“<地区名称>”返回boto3.资源(“dynamodb”,aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=地区)defcreateTableIfNotExists():“‘如果DynamoDB表不存在,则创建它。这必须在Spark驱动程序上运行,而不是在foreach内部。“‘dynamodb=get_dynamodb()existing_tables=dynamodb.元.客户端.list_tables() (的表名]如果table_name不在existing_tables:打印(“创建表% s"%table_name)表格=dynamodb.create_table(的表=table_name,KeySchema=[{“AttributeName”:“关键”,“KeyType”:“希”}),AttributeDefinitions=[{“AttributeName”:“关键”,“AttributeType”:“年代”}),ProvisionedThroughput={“ReadCapacityUnits”:5,“WriteCapacityUnits”:5})打印(“等待桌子准备好”)表格.元.客户端.get_waiter(“table_exists”).等待(的表=table_name)
定义写入DynamoDB并调用它们的类和方法foreach.有两种方法来指定自定义逻辑foreach.
foreach
使用函数:这是一种简单的方法,可以用来一次写一行。然而,客户端/连接初始化写入一行将在每次调用中完成。
defsendToDynamoDB_simple(行):“‘函数将一行发送到DynamoDB。当与' foreach '一起使用时,这个方法将在执行器中被调用使用生成的输出行。“‘#在执行器中创建客户端对象#不要使用在驱动程序中创建的客户端对象dynamodb=get_dynamodb()dynamodb.表格(table_name).put_item(项={“关键”:str(行[“关键”]),“数”:行[“数”]})
使用类开放,过程,关闭方法:这允许一个更有效的实现,其中一个客户端/连接被初始化,多行可以被写入。
开放
过程
关闭
类SendToDynamoDB_ForeachWriter:“‘类将一组行发送到DynamoDB。当与' foreach '一起使用时,这个类的副本将用于写入执行程序中的多行。参见python文档中的“DataStreamWriter.foreach”欲知详情。“‘def开放(自我,partition_id,epoch_id):#在准备发送多行时首先调用。#把所有的初始化代码放在open()里面,这样就可以这个类的拷贝在执行器中初始化#将被调用。自我.dynamodb=get_dynamodb()返回真正的def过程(自我,行):在open()被调用后的每一行都被调用。#这个实现一次发送一行。#要进一步增强,请联系Spark+DynamoDB连接器#团队:https://github.com/audienceproject/spark-dynamodb自我.dynamodb.表格(table_name).put_item(项={“关键”:str(行[“关键”]),“数”:行[“数”]})def关闭(自我,犯错):在处理完所有行之后调用。如果犯错:提高犯错
调用foreach在您的流查询与上述函数或对象。
从pyspark.sql.functions进口*火花.相依.集(“spark.sql.shuffle.partitions”,“1”)查询=(火花.readStream.格式(“速度”).负载().selectExpr("值% 10为键").groupBy(“关键”).数().toDF(“关键”,“数”).writeStream.foreach(SendToDynamoDB_ForeachWriter())#.foreach(sendToDynamoDB_simple) //选项,使用其中一个.outputMode(“更新”).开始())
这个例子展示了如何使用streamingDataFrame.writeStream.foreach ()在Scala中写入DynamoDB。
要运行这个程序,你必须创建一个DynamoDB表,它有一个名为“value”的字符串键。
方法的实现ForeachWriter执行写操作的接口。
ForeachWriter
进口org.apache.火花.sql{。ForeachWriter,行}进口com.amazonaws.AmazonServiceException进口com.amazonaws.身份验证._进口com.amazonaws.服务.dynamodbv2.AmazonDynamoDB进口com.amazonaws.服务.dynamodbv2.AmazonDynamoDBClientBuilder进口com.amazonaws.服务.dynamodbv2.模型.AttributeValue进口com.amazonaws.服务.dynamodbv2.模型.ResourceNotFoundException进口java.跑龙套.ArrayList进口scala.集合.JavaConverters._类DynamoDbWriter扩展ForeachWriter[行]{私人瓦尔的表=“<表名>”私人瓦尔accessKey=""私人瓦尔secretKey=""私人瓦尔regionName=地区“< >”//只有在调用open()时才会被惰性地初始化懒惰的瓦尔ddb=AmazonDynamoDBClientBuilder.标准().withCredentials(新AWSStaticCredentialsProvider(新BasicAWSCredentials(accessKey,secretKey))).withRegion(regionName).构建()////在准备发送多行时首先调用。//把所有的初始化代码放在open()里面,这样就可以//在执行程序中初始化这个类的副本,其中open()//将被调用。//def开放(partitionId:长,epochId:长)={ddb//强制客户端初始化真正的}////在open()被调用后的每一行都被调用。//每次发送一行。//一个更有效的实现是一次发送多个行。//def过程(行:行)={瓦尔rowAsMap=行.getValuesMap(行.模式.字段名)瓦尔dynamoItem=rowAsMap.mapValues{v:任何= >新AttributeValue(v.toString)}。asJavaddb.putItem(的表,dynamoItem)}////在所有行处理完毕后调用。//def关闭(errorOrNull:Throwable)={ddb.关闭()}}
使用DynamoDbWriter将一个速率流写入DynamoDB。
DynamoDbWriter
火花.readStream.格式(“速度”).负载().选择(“价值”).writeStream.foreach(新DynamoDbWriter).开始()
下面的笔记本演示了如何轻松地将Amazon CloudTrail日志从JSON转换为Parquet,以便进行高效的特别查询。看到实时流ETL与结构化流获取详细信息。
这两本笔记本展示了如何在Python和Scala中使用流-流连接。