在砖结构化流模式

这包含常见的笔记本和代码示例模式处理结构化流砖。

开始使用结构化的流

如果你是全新的结构化流,明白了第一个结构化流负载运行

写卡珊德拉在Python作为结构化流水槽

Apache Cassandra是一个分布式的、低延迟、可伸缩的高度可用的OLTP数据库。

结构化流与卡桑德拉通过火花卡桑德拉连接器。这个连接器支持抽样和DataFrame api,它有原生支持编写流数据。*重要*您必须使用相应的版本的spark-cassandra-connector-assembly

下面的示例连接到一个或多个主机在卡桑德拉数据库集群。它还指定了连接配置如检查点位置和具体用于和表名:

火花相依(“spark.cassandra.connection.host”,“host1 host2”)dfwriteStream\格式(“org.apache.spark.sql.cassandra”)\outputMode(“添加”)\选项(“checkpointLocation”,“/道路/ /检查站”)\选项(“用于”,“keyspace_name”)\选项(“表”,“table_name”)\开始()

写Azure突触分析使用foreachBatch ()在Python中

streamingDF.writeStream.foreachBatch ()允许重用现有的一批作家写的输出数据流查询Azure突触分析。看到foreachBatch文档获取详细信息。

要运行这个示例,您需要Azure突触分析连接器。在Azure突触分析连接器的详细信息,请参见在Azure突触分析查询数据

pyspark.sql.functions进口*pyspark.sql进口*defwriteToSQLWarehouse(df,epochId):df\格式(“com.databricks.spark.sqldw”)\模式(“覆盖”)\选项(“url”," jdbc::状态"置疑" / / < the-rest-of-the-connection-string >”)\选项(“forward_spark_azure_storage_credentials”,“真正的”)\选项(“数据表”,“my_table_in_dw_copy”)\选项(“tempdir”,“wasbs: / / < your-container-name > @ < your-storage-account-name >.blob.core.windows.net/ < your-directory-name >”)\保存()火花相依(“spark.sql.shuffle.partitions”,“1”)查询=(火花readStream格式(“速度”)负载()selectExpr(“值% 10键”)groupBy(“关键”)()toDF(“关键”,“数”)writeStreamforeachBatch(writeToSQLWarehouse)outputMode(“更新”)开始())

Stream-Stream连接

这两个笔记本展示如何在Python和Scala使用stream-stream连接。

Stream-Stream加入Python笔记本

在新标签页打开笔记本

Scala Stream-Stream连接笔记本电脑

在新标签页打开笔记本