pyspark.sql.DataFrame.observe

DataFrame。 观察 ( 观察:联盟(观察,str],*exprs:pyspark.sql.column.Column )→DataFrame

定义(命名)DataFrame指标观察。这个方法返回一个“观察”DataFrame返回相同的结果作为输入,用以下保证:

  • 它将计算定义的聚合(指标)的所有数据流经

    数据集。

  • 它将报告的价值定义聚合列一旦我们完成

    点。年底完成点可以是一个查询(批处理模式)或一个流媒体时代的终结。的价值总量只反映了数据处理,因为前面的完成点。

指标列必须包含一个文本(如点燃(42)),或者应该包含一个或多个聚合函数(例如sum (a)或(a + b) +和avg (c) -点燃(1))。表达式包含输入数据集引用的列必须包装在一个聚合函数。

用户可以通过添加Python的观察这些指标StreamingQueryListener,Scala / Javaorg.apache.spark.sql.streaming.StreamingQueryListener或Scala / Java的org.apache.spark.sql.util.QueryExecutionListener火花会话。

参数
观察 观察或str

str指定的名字,或一个观察实例获取指标。

添加支持str在这个参数。

exprs

列表达式()。

返回
DataFrame

观察到的DataFrame

笔记

观察观察,这个方法只支持批处理查询。当观察是一个字符串,该方法适用于批处理和流媒体查询。目前还不支持连续执行。

例子

观察观察下面,只有批量查询工作。

> > >pyspark.sql.functions进口上校,,点燃,马克斯> > >pyspark.sql进口观察> > >观察=观察(“我的指标”)> > >observed_df=df观察(观察,(点燃(1))别名(“数”),马克斯(上校(“年龄”)))> > >observed_df()2> > >观察得到{“计数”:2,“马克斯(年龄)”:5}

观察是一个字符串,也流查询工作如下。

> > >pyspark.sql.streaming进口StreamingQueryListener> > >MyErrorListener(StreamingQueryListener):defonQueryStarted(自我,事件):通过defonQueryProgress(自我,事件):=事件进步observedMetrics得到(“my_event”)#触发如果错误的数量超过5%num_rows=钢筋混凝土num_error_rows=伦理委员会=num_error_rows/num_rows如果>0.05:#触发警报通过defonQueryTerminated(自我,事件):通过> > >火花addListener(MyErrorListener())> > >#观察行数(rc)和错误行数(erc)的流数据集observed_ds=df观察(“my_event”,(点燃(1))别名(“钢筋混凝土”),(上校(“错误”))别名(“伦理委员会”))> > >observed_dswriteStream格式(“控制台”)开始()