在砖结构化流模式
这包含常见的笔记本和代码示例模式处理结构化流砖。
开始使用结构化的流
如果你是全新的结构化流,明白了第一个结构化流负载运行。
写卡珊德拉在Python作为结构化流水槽
Apache Cassandra是一个分布式的、低延迟、可伸缩的高度可用的OLTP数据库。
结构化流与卡桑德拉通过火花卡桑德拉连接器。这个连接器支持抽样和DataFrame api,它有原生支持编写流数据。*重要*您必须使用相应的版本的spark-cassandra-connector-assembly。
下面的示例连接到一个或多个主机在卡桑德拉数据库集群。它还指定了连接配置如检查点位置和具体用于和表名:
火花。相依。集(“spark.cassandra.connection.host”,“host1 host2”)df。writeStream\。格式(“org.apache.spark.sql.cassandra”)\。outputMode(“添加”)\。选项(“checkpointLocation”,“/道路/ /检查站”)\。选项(“用于”,“keyspace_name”)\。选项(“表”,“table_name”)\。开始()
写Azure突触分析使用foreachBatch ()
在Python中
streamingDF.writeStream.foreachBatch ()
允许重用现有的一批作家写的输出数据流查询Azure突触分析。看到foreachBatch文档获取详细信息。
要运行这个示例,您需要Azure突触分析连接器。在Azure突触分析连接器的详细信息,请参见在Azure突触分析查询数据。
从pyspark.sql.functions进口*从pyspark.sql进口*defwriteToSQLWarehouse(df,epochId):df。写\。格式(“com.databricks.spark.sqldw”)\。模式(“覆盖”)\。选项(“url”," jdbc::状态"置疑" / / < the-rest-of-the-connection-string >”)\。选项(“forward_spark_azure_storage_credentials”,“真正的”)\。选项(“数据表”,“my_table_in_dw_copy”)\。选项(“tempdir”,“wasbs: / / < your-container-name > @ < your-storage-account-name >.blob.core.windows.net/ < your-directory-name >”)\。保存()火花。相依。集(“spark.sql.shuffle.partitions”,“1”)查询=(火花。readStream。格式(“速度”)。负载()。selectExpr(“值% 10键”)。groupBy(“关键”)。数()。toDF(“关键”,“数”)。writeStream。foreachBatch(writeToSQLWarehouse)。outputMode(“更新”)。开始())