如何监视PySpark流查询
2022年5月27日 在bob下载地址
流媒体是其中一个最重要的数据处理技术对摄入和分析。它为用户和开发人员提供低延迟和实时数据处理能力分析和触发动作。然而,监控数据流数据的工作负载是富有挑战性的,因为连续加工,因为它的到来。由于这种不间断流处理的性质,很难排除故障实时度量标准的情况下,开发和生产期间,提醒和仪表盘。
结构化流在Apache火花™地址的问题监控通过提供:
- 一个专用的用户界面实时指标和统计数据。有关更多信息,请参见看看新的结构化流媒体用户界面在Apache 3.0火花。
- 一个可观测的API允许报警等先进的监控能力和/或仪表盘与外部系统。
直到现在,可见API已经在PySpark失踪,这迫使用户使用Scala API为他们流查询结果报警的功能,与其他外部系统仪表盘。在Python中缺乏该功能与Python的重要性变得越来越重要的增长,考虑到几乎70%的笔记本电脑上运行命令砖是在Python中。
在砖运行时的11中,我们很高兴宣布在PySpark可见API现在可用。在这篇文章中,我们介绍了Python的API结构化流,连同一个循序渐进的场景添加到流查询报警逻辑。
可观测的API
开发人员现在可以流指标发送给外部系统,例如,报警和仪表盘与自定义指标,使用流媒体的组合查询PySpark侦听器接口和可观察到的API。流查询侦听器接口是一个抽象类必须继承和应该实现所有方法如下所示:
从pyspark.sql.streaming进口StreamingQueryListener
类MyListener(StreamingQueryListener):defonQueryStarted(自我、事件):”“”启动时调用查询。参数- - - - - - - - - - -事件::类:“pyspark.sql.streaming.listener.QueryStartedEvent”可用的属性是一样的Scala API。笔记- - - - - -这就是所谓的同步甲:“pyspark.sql.streaming.DataStreamWriter.start”,即“onQueryStart“将呼吁所有听众“DataStreamWriter.start() ' '返回对应的类:“pyspark.sql.streaming.StreamingQuery”。不阻塞在这个方法,因为它会阻止您的查询。”“”通过defonQueryProgress(自我、事件):”“”当有一些状态更新(摄入率更新等)。参数- - - - - - - - - - -事件::类:“pyspark.sql.streaming.listener.QueryProgressEvent”可用的属性是一样的Scala API。笔记- - - - - -这个方法是异步的。的状态:类:“pyspark.sql.streaming。StreamingQuery”将永远是最新的无论何时调用此方法。因此,状态:类的:“pyspark.sql.streaming.StreamingQuery”。可能会改变之前/当你处理事件。例如,你可能会发现:类:“StreamingQuery”终止当你处理“QueryProgressEvent”。”“”通过defonQueryTerminated(自我、事件):”“”当停止查询,有或没有错误。参数- - - - - - - - - - -事件::类:“pyspark.sql.streaming.listener.QueryTerminatedEvent”可用的属性是一样的Scala API。”“”通过
my_listener = MyListener ()
注意,他们所有的异步工作。
StreamingQueryListener.onQueryStarted
流媒体查询时会触发开始,例如,DataStreamWriter.start
。StreamingQueryListener.onQueryProgress
每个micro-batch执行完成时被调用。StreamingQueryListener.onQueryTerminated
被称为在查询停止时,例如,StreamingQuery.stop
。
必须添加侦听器被激活通过StreamingQueryManager
,也可以删除后如下所示:
spark.streams.addListener (my_listener)spark.streams.removeListener (my_listener)
为了捕捉自定义指标,它们必须通过补充道DataFrame.observe
。自定义指标被定义为任意聚合等功能计数(“价值”)
如下所示。
df.observe(“名字”,数(列),…)
错误警报场景
在本节中,我们将描述一个真实世界的例子用例与可观测的API。假设您有一个目录新的CSV文件不断从另一个系统,你必须摄取它们以流媒体的方式。在本例中,我们将使用一个本地文件系统API为简单起见,这样可以很容易地理解。下面的代码片段可以复制粘贴pyspark
shell运行和尝试。
首先,让我们导入必要的Python类和包,然后创建一个目录my_csv_dir
在这个场景中使用的。
进口操作系统进口shutil进口时间从pathlib进口路径从pyspark.sql.functions进口计数,坳,点燃从pyspark.sql.streaming进口StreamingQueryListener#注意:“basedir”替换为融合路径,例如,在砖“/ dbfs / tmp”#笔记本。basedir = os.getcwd ()#“/ dbfs / tmp”#我的CSV文件将被创建在这个目录后清洗“my_csv_dir”#目录的情况下,你已经跑了下面的这个例子。my_csv_dir = os.path.join (basedir,“my_csv_dir”)shutil。rmtree (my_csv_dir ignore_errors =真正的)os.makedirs (my_csv_dir)
接下来,我们定义自己的自定义流查询侦听器。侦听器将警报当有太多的畸形CSV摄入为每个过程中记录。如果畸形的记录总数的50%以上的处理记录,我们会打印出一个日志消息。然而,在生产环境中,您可以连接到外部系统,而不是简单地打印出来。
#定义我的听众。类MyListener(StreamingQueryListener):defonQueryStarted(自我、事件):打印(f”{event.name}”({事件。id})开始!”)defonQueryProgress(自我、事件):行= event.progress.observedMetrics.get (“指标”)如果行是不没有一个:如果行。畸形/ row.cnt >0.5:打印(“警告!哎哟!有太多的畸形”f”记录{row.malformed}的{row.cnt}!”)其他的:打印(f”{row.cnt}行处理!”)defonQueryTerminated(自我、事件):打印(f”{事件。id}终止了!”)
#添加我的侦听器。my_listener = MyListener ()spark.streams.addListener (my_listener)
激活听者,我们之前将它添加查询在这个例子。然而,重要的是要注意,您可以添加侦听器的异步查询开始和终止,因为他们工作。这允许你连接或分离的运行流查询没有阻止他们。
现在我们将开始一个流媒体查询中摄食的文件my_csv_dir
目录中。在处理过程中,我们也观察畸形的记录和处理记录的数量。CSV数据源存储在畸形的记录_corrupt_record
,默认情况下,我们将计算列畸形的数量记录。
#现在,开始流查询监控“my_csv_dir”目录中。#每一个时间当在那里是新CSV文件到达这里,我们会处理它们。my_csv=spark.readStream.schema (“my_key INT, my_val双_corrupt_record字符串“). csv(路径(my_csv_dir) .as_uri ())#“DataFrame。观察的计算的加工过的和畸形的记录,#和发送一个事件来侦听器。my_observed_csv=my_csv.observe (“指标”,数(点燃(1).alias(“问”),#号的加工过的行数(坳(“_corrupt_record”)) .alias(“畸形”))#号的畸形的行my_query=my_observed_csv.writeStream.format (“控制台”)。queryName(“我的观察”)。开始()
现在,我们已经定义了流媒体查询和报警功能,让我们创建CSV文件,这样他们可摄入以流媒体的方式:
#现在,我们将编写要处理CSV数据以流的方式。#这个CSV文件都是格式良好的。与开放(os.path.join (my_csv_dir“my_csv_1.csv”),“w”)作为f:_ = f.write (1.1“\ n”)_ = f.write (“123123 .123 \ n”)time . sleep (5)#假设另一个CSV文件抵达5秒。#哎哟!它有两个畸形的记录的3。我的观察者查询应该警惕!与开放(os.path.join (my_csv_dir“my_csv_error.csv”),“w”)作为f:_ = f.write (1.123“\ n”)_ = f.write (“哎哟!畸形的记录! \ n”)_ = f.write (“Arrgggh ! \ n”)time . sleep (5)#好吧,全部完成。让我们停止查询5秒。my_query.stop ()spark.streams.removeListener (my_listener)
在这里我们将看到查询开始,终止和过程是正确记录。因为有两个畸形的记录在CSV文件中,提出的警告是正确使用以下错误信息:
…
警报!哎哟!有太多的畸形记录2 3 !
…
结论
PySpark用户现在可以通过流媒体设置自定义指标和观察他们查询侦听器接口和可观察到的API。他们可以连接或分离的这种逻辑运行时动态地查询需要。这个功能地址需要仪表盘,提醒其他外部系统和报告。
流查询侦听器接口和可观察到的API可用DBR 11β,预计可在未来Apache火花。试试这两个新功能今天砖通过DBR 11β。