pyspark.sql.DataFrame.observe¶
-
DataFrame。
观察
( 观察:联盟(观察,str],*exprs:pyspark.sql.column.Column )→DataFrame¶ -
定义(命名)DataFrame指标观察。这个方法返回一个“观察”DataFrame返回相同的结果作为输入,用以下保证:
-
- 它将计算定义的聚合(指标)的所有数据流经
-
数据集。
-
- 它将报告的价值定义聚合列一旦我们完成
-
点。年底完成点可以是一个查询(批处理模式)或一个流媒体时代的终结。的价值总量只反映了数据处理,因为前面的完成点。
指标列必须包含一个文本(如点燃(42)),或者应该包含一个或多个聚合函数(例如sum (a)或(a + b) +和avg (c) -点燃(1))。表达式包含输入数据集引用的列必须包装在一个聚合函数。
用户可以通过添加Python的观察这些指标
StreamingQueryListener
,Scala / Javaorg.apache.spark.sql.streaming.StreamingQueryListener
或Scala / Java的org.apache.spark.sql.util.QueryExecutionListener
火花会话。笔记
当
观察
是观察
,这个方法只支持批处理查询。当观察
是一个字符串,该方法适用于批处理和流媒体查询。目前还不支持连续执行。例子
当
观察
是观察
下面,只有批量查询工作。> > >从pyspark.sql.functions进口上校,数,点燃,马克斯> > >从pyspark.sql进口观察> > >观察=观察(“我的指标”)> > >observed_df=df。观察(观察,数(点燃(1))。别名(“数”),马克斯(上校(“年龄”)))> > >observed_df。数()2> > >观察。得到{“计数”:2,“马克斯(年龄)”:5}
当
观察
是一个字符串,也流查询工作如下。> > >从pyspark.sql.streaming进口StreamingQueryListener> > >类MyErrorListener(StreamingQueryListener):…defonQueryStarted(自我,事件):…通过……defonQueryProgress(自我,事件):…行=事件。进步。observedMetrics。得到(“my_event”)…#触发如果错误的数量超过5%…num_rows=行。钢筋混凝土…num_error_rows=行。伦理委员会…比=num_error_rows/num_rows…如果比>0.05:…#触发警报…通过……defonQueryTerminated(自我,事件):…通过…> > >火花。流。addListener(MyErrorListener())> > >#观察行数(rc)和错误行数(erc)的流数据集…observed_ds=df。观察(…“my_event”,…数(点燃(1))。别名(“钢筋混凝土”),…数(上校(“错误”))。别名(“伦理委员会”))> > >observed_ds。writeStream。格式(“控制台”)。开始()
-