工程的博客

看看Apache Spark 3.0中新的结构化流UI

分享这篇文章
这是阿里巴巴软件工程师于根茂的客座社区帖子。

结构化流最初是在Apache Spark 2.0中引入的。它已被证明是构建分布式流处理应用程序的最佳平台。bob体育客户端下载SQL/Dataset/DataFrame api和Spark内置函数的统一使得开发人员可以轻松实现他们复杂的需求,例如流聚合、流-流连接和窗口支持。自从结构化流发布以来,开发人员经常要求一种更好的方式来管理他们的流,就像我们在Spark流(即DStream)中所做的那样。在Apache Spark 3.0中,我们为结构化流发布了一个新的可视化UI。

新的结构化流UI提供了一种简单的方法,可以用有用的信息和统计数据监控所有流作业,使开发调试期间的故障排除更容易,并通过实时指标提高生产的可观察性。用户界面显示了两组统计信息:1)流查询作业的汇总信息,2)流查询的详细统计信息,包括输入速率、处理速率、输入行数、批处理持续时间、操作持续时间等。

流查询作业的汇总信息

当开发人员提交一个流SQL查询时,它将被列在结构化流选项卡中,其中包括活动的流查询和已完成的流查询。结果表中会列出流查询的一些基本信息,包括查询名称、状态、ID、运行ID、提交时间、查询持续时间、最后一批ID,以及平均输入速率、平均处理速率等汇总信息。流查询状态有三种类型,即运行完成了而且失败的.所有完成了而且失败的查询列在已完成的流查询表中。Error列显示失败查询的异常详细信息。

Spark 3.0结构化流选项卡,显示查询统计信息、状态和结果。

我们可以通过单击运行ID链接查看流查询的详细统计信息。

详细统计信息

Statistics页面显示了包括输入/处理速率、延迟和详细操作持续时间在内的指标,这些指标对于洞察流查询的状态非常有用,使您能够轻松调试查询处理中的异常。

Spark 3.0结构化流统计视图提供了查询的详细统计信息。
Spark 3.0结构化流统计视图提供了查询的详细统计信息。

它包含以下指标:

  • 输入率:数据到达的总(所有来源的)速率。
  • 过程速率:Spark处理数据的聚合(所有源)速率。
  • 批处理时间:每批处理的持续时间。
  • 操作时间:以毫秒为单位执行各种操作所花费的时间。

跟踪的操作如下:

  • addBatch:从源读取微批处理的输入数据、处理它并将批处理的输出写入接收器所花费的时间。这将占用微批处理的大部分时间。
  • getBatch:准备逻辑查询以从源读取当前微批的输入所花费的时间。
  • getOffset:查询数据源是否有新的输入数据所花费的时间。
  • walCommit:将偏移量写入元数据日志。
  • queryPlanning:生成执行计划。

需要注意的是,并不是所有列出的操作都将显示在UI中。不同类型的数据源上有不同的操作,因此列出的部分操作可能在一个流查询中执行。

使用UI解决流性能问题

在本节中,我们将介绍一些情况,在这些情况下,新的结构化流UI表明发生了一些不寻常的事情。在高层次上,演示查询看起来像这样,在每种情况下,我们将假设一些先决条件:

进口java.util.UUID
              val bootstrapServers =…Val主题=…val checkpointLocation =“/ tmp /临时——“+ UUID.randomUUID.toString
              Val线=火花.readStream格式“卡夫卡”.option (“kafka.bootstrap.servers”bootstrapServers).option (“订阅”、主题).load ().selectExpr ("CAST(值为字符串)"作为(字符串)val wordCounts = lines.flatMap(_.split(”“) .groupBy (“价值”) .count ()
              val query = wordCounts.writeStream.outputMode (“完整的”格式“控制台”.option (“checkpointLocation”checkpointLocation).start ()

处理能力不足导致延迟增加

在第一种情况下,我们运行查询以尽快处理Apache Kafka数据。在每个批处理中,流作业将处理Kafka中所有可用的数据。如果处理能力不足以处理批量数据,则延迟将迅速提高。最直观的判断是输入行而且批处理时间将呈线性上升。的过程速率提示流式作业最多只能处理大约8000条记录/秒。但是现在输入速度大约是2万条记录/秒。我们可以为流作业提供更多的执行资源,或者添加足够的分区来处理所有消费者,以跟上生产者的步伐。

Spark 3.0结构化流统计视图中的示例分析,演示了如何通过检查输入速率和批处理持续时间来检测延迟。

稳定但时延高

对于本节中的情况,与前面的情况有什么不同?延迟并没有持续增加,而是保持稳定,如下图所示:

Spark 3.0 Structured Streaming Statistics视图中的示例分析,演示如何分析不同的延迟问题。

我们发现过程速率能同时保持稳定吗输入速度.这意味着作业的处理能力足以处理输入数据。但是,每批处理的持续时间,即延迟,仍然高达20秒。高延迟的主要原因是每个批处理中有太多的数据。通常我们可以通过增加这个作业的并行度来减少延迟。在为Spark任务增加10个Kafka分区和10个内核后,我们发现延迟大约是5秒——比20秒要好得多。

Spark 3.0 Structured Streaming Statistics视图中的示例分析,演示了如何在延迟较低时确定消费滞后的原因。

使用“操作持续时间”图表进行故障排除

操作持续时间图表以毫秒为单位显示执行各种操作所花费的时间。它有助于了解每个批次的时间分布,并使故障排除更容易。让我们利用性能改进"火星- 30915:以Apache Spark社区为例,在查找最新批处理ID时,避免读取元数据日志文件。

在此之前,当压缩后的元数据日志变得很大时,压缩后的下一批处理要比其他批处理花费更多的时间。

Spark 3.0 Structured Streaming中的Operation Duration图表显示了执行任何操作所花费的毫秒数,这在排除流作业问题时是非常宝贵的,例如对压缩日志文件的不必要读取。

经过代码调查,发现并修复了对压缩日志文件的不必要读取。下面的操作时长图证实了我们预期的效果:

在Spark 3.0结构化流中,操作持续时间图的前后比较对于确认代码修复和其他优化是非常宝贵的。

未来的发展

如上所示,新的结构化流UI将通过更有用的流查询信息帮助开发人员更好地监控他们的流作业。作为早期版本,新的UI仍在开发中,并将在未来的版本中进行改进。在未来有几个功能可以做,包括但不限于以下:

  • 更多流查询执行细节:延迟数据、水印、状态数据度量等。
  • Spark历史服务器支持结构化流UI。
  • 针对不寻常情况的更明显的提示:发生延迟等。

尝试新的UI

在新的Databricks Runtime 7.1中尝试Apache Spark 3.0中的这个新的Spark Streaming UI。如果您正在使用Databricks笔记本,它还为您提供了一种简单的方法来查看笔记本中任何流查询的状态管理查询.你可以报名参加免费帐户Databricks并在几分钟内免费开始,不需要信用卡。

奥莱利学习火花书

免费第二版包括Spark 3.0的更新,包括新的Python类型提示Pandas udf,新的日期/时间实现等。

免费试用Databricks

相关的帖子

看到所有工程的博客的帖子