砖FeatureStoreClient

databricks.feature_store.client。 FeatureStoreClient ( feature_store_uri:可选(str) =没有,model_registry_uri:可选(str) =没有 )

基地:对象

客户端与砖交互功能。

create_table ( 名称:str primary_keys:联盟[str、列表(str)], [pyspark.sql.dataframe df:可选。DataFrame]=没有一个,*,timestamp_keys: Union[str, List[str], None] = None, partition_columns: Union[str, List[str], None] = None, schema: Optional[pyspark.sql.types.StructType] = None, description: Optional[str] = None, tags: Optional[Dict[str, str]] = None, **kwargs )→databricks.feature_store.entities.feature_table.FeatureTable

创建并返回一个功能表与给定名称和主键。

返回的功能表有名字和主键。使用所提供的模式或推断模式所提供的df。如果df提供,此数据将被保存在一个三角洲表。支持的数据类型特点是:IntegerType,LongType,FloatType,倍增式,StringType,BooleanType,DateType,TimestampType,ShortType,ArrayType,MapType,BinaryType,DecimalType

参数
  • 的名字——一个表单的功能表名< database_name >。< table_name >例如,dev.user_features

  • primary_keys——功能表的主键。如果需要多个列,例如,指定一个列名列表[' customer_id ',“地区”)

  • df-数据插入到此功能表。的模式df将被用作功能表模式。

  • timestamp_keys- - - - - -

    包含事件时间相关特性值的列。时间戳键和功能表的主键唯一地标识一个实体的特征值在一个时间点。

    请注意

    实验:这个论点在将来发布的版本中可能会改变或删除没有警告。

  • partition_columns- - - - - -

    用于分区功能表列。如果提供了列表,列表中的列排序将用于分区。

    请注意

    在选择分区功能表列,使用列没有高基数。理想的战略是,你希望数据在每个分区至少1 GB。是一种最常用的分区列日期

    额外的信息:选择正确的分区列三角洲表

  • 模式——功能表模式。要么模式df必须提供。

  • 描述——功能表的描述。

  • 标签- - - - - -

    标签与功能表。

    请注意

    从砖开始运行时可用10.5毫升。

其他参数
  • 路径(可选(str))——在一个受支持的文件系统路径。默认数据库的位置。

register_table ( *,delta_table: str, primary_keys:联盟[str、列表(str)], timestamp_keys:联盟(str,列表(str),没有一个)=没有描述:可选(str) =没有标签:可选[Dict [str, str]] =没有 )→databricks.feature_store.entities.feature_table.FeatureTable

注册一个现有三角洲与给定的表作为一个功能表主键。

返回的功能表具有相同的名称作为三角洲表。

请注意

从砖开始运行时可用10.4毫升。

参数
  • 的名字-δ表单的表名< database_name >。< table_name >例如,dev.user_features。表必须存在于metastore。

  • primary_keys——三角洲表的主键。如果需要多个列,例如,指定一个列名列表[' customer_id ',“地区”)

  • timestamp_keys——列包含事件时间与特征值有关。在一起,功能键和主键唯一地标识的时间戳值在一个时间点。

  • 描述——功能表的描述。

  • 标签- - - - - -

    标签与功能表。

    请注意

    从砖开始运行时可用10.5毫升。

返回

一个FeatureTable对象。

get_table ( 名称:str )→databricks.feature_store.entities.feature_table.FeatureTable

得到一个功能表的元数据。

参数

的名字——一个表单的功能表名< database_name >。< table_name >例如,dev.user_features

drop_table ( 名称:str )→没有

删除指定的功能表。这个API还滴底层δ表。

请注意

从砖开始运行时可用10.5毫升。

参数

的名字——表单的功能表名< database_name >。< table_name >例如,dev.user_features

请注意

删除功能表可以导致上游生产商和下游消费者意想不到的失败(模型、端点和安排工作)。你必须删除任何现有的在线商店单独发表。

read_table ( 名称:str,* * kwargs )→pyspark.sql.dataframe.DataFrame

阅读的内容功能表。

参数

的名字——一个表单的功能表名< database_name >。< table_name >例如,dev.user_features

返回

功能表内容,或将抛出一个异常,如果这个特性表不存在。

write_table ( 名称:str,df: pyspark.sql.dataframe.DataFrame,模式:str =“合并”,checkpoint_location:可选(str) =没有,触发:Dict (str,任何]= {“processingTime”:“5秒”} )→可选(pyspark.sql.streaming.StreamingQuery)

写一个功能表。

如果输入DataFrame流,将创建一个写流。

参数
  • 的名字——一个表单的功能表名< database_name >。< table_name >例如,dev.user_features。提出了一个异常如果这个特性表不存在。

  • df——火花DataFrame特性数据。提出了一个异常,如果模式不匹配的功能表。

  • 模式- - - - - -

    两个支持写模式:

    • “覆盖”更新整个表。

    • “合并”插入的行吗df进入功能表。如果df包含功能表中的列不存在,这些列将被添加新特性。

  • checkpoint_location——设置结构化流checkpointLocation选择。通过设置一个checkpoint_location,火花结构化流将存储的进展信息和中间状态,使故障后恢复。这个参数时仅支持论点df是一个流DataFrame

  • 触发——如果df.isStreaming,触发字典定义了流数据处理的时机,将打开并通过DataStreamWriter.trigger作为参数。例如,触发={“一旦”:真正的}将导致调用吗DataStreamWriter.trigger(一旦= True)

返回

如果df.isStreaming,返回一个PySparkStreamingQuery没有一个否则。

add_data_sources ( *,feature_table_name: str, source_names:联盟[str、列表(str)], source_type: str =“定制” )→没有

将数据源添加到功能表。

参数
  • feature_table_name——功能表名。

  • source_names——数据源名称。对于多个源,指定一个列表。如果一个数据源名称已经存在,它将被忽略。

  • source_type- - - - - -

    下列之一:

    • “表”:表中格式< database_name >。< table_name >和存储在metastore(如蜂巢)。

    • “路径”:砖的路径,如文件系统(DBFS)。

    • “自定义”:手动添加数据源,一张桌子和一个路径。

delete_data_sources ( *,feature_table_name: str, source_names:联盟(str、列表(str)) )→没有

从功能表删除数据源。

请注意

所有类型的数据源(表、路径、自定义)匹配源名称将被删除。

参数
  • feature_table_name——功能表名。

  • source_names——数据源名称。对于多个源,指定一个列表。如果一个数据源名称不存在,它将被忽略。

publish_table ( 名称:str, online_store: databricks.feature_store.online_store_spec.online_store_spec。*,OnlineStoreSpec filter_condition:可选(str) = None,模式:str =“合并”,流:bool = False, checkpoint_location:可选(str) = None,触发:Dict [str,任何]= {“processingTime”:“5分钟”},特点:联盟(str,列表(str),没有一个)=没有 )→可选(pyspark.sql.streaming.StreamingQuery)

发布功能表在线商店。

参数
  • 的名字——功能表的名称。

  • online_store——规范的在线商店。

  • filter_condition——一个SQL表达式使用功能表列,过滤器功能行之前发布的在线商店。例如,“dt>“2020-09-10”。这类似于跑步df.filter或者一个在哪里条件在SQL特性表之前出版。

  • 模式- - - - - -

    指定的行为当数据已经存在于这个特性表在网上商店。如果“覆盖”使用模式,现有的数据被替换的新数据。如果“合并”使用模式,新的数据将被合并,在这些条件下:

    • 如果存在一个关键在线表而不是离线表,行在线表修改的。

    • 离线表中是否存在一个重要但不是在线表,在线离线表行插入到表。

    • 如果存在一个关键的离线和在线表,在线表行将被更新。

  • 流媒体——如果真正的流数据的在线商店。

  • checkpoint_location——设置结构化流checkpointLocation选择。通过设置一个checkpoint_location,火花结构化流将存储的进展信息和中间状态,使故障后恢复。这个参数时仅支持流= True

  • 触发——如果流= True,触发定义了流数据处理的时机。字典将打开并通过DataStreamWriter.trigger作为参数。例如,触发={“一旦”:真正的}将导致调用吗DataStreamWriter.trigger(一旦= True)

  • 特性- - - - - -

    指定特性列(s)发布到网上商店。所选特征必须现有在线商店功能的超集。主键列和时间戳键列总是会出版。

    请注意

    这个参数时仅支持模式=“合并”。当特性没有设置,整个功能表将发表。

返回

如果流= True,返回一个PySparkStreamingQuery,没有一个否则。

create_training_set ( df: pyspark.sql.dataframe.DataFrame,feature_lookups: List[databricks.feature_store.entities.feature_lookup.FeatureLookup], label: Union[str, List[str], None], exclude_columns: List[str] = [] )→databricks.feature_store.training_set.TrainingSet

创建一个TrainingSet

参数
  • df- - -DataFrame用于连接特性。

  • feature_lookups——功能加入到列表DataFrame

  • 标签- - - - -列(s)的名字DataFrame包含训练集的标签。没有标号字段创建一个训练集,即为无监督训练集,指定标签=没有。

  • exclude_columns——名称的列下降TrainingSetDataFrame

返回

一个TrainingSet对象。

log_model ( 模型:任何,artifact_path: str,*,味道:模块,training_set: databricks.feature_store.training_set.TrainingSet,registered_model_name:可选(str) =没有,await_registration_for: int = mlflow.tracking._model_registry.DEFAULT_AWAIT_MAX_SLEEP_SECONDS,* * kwargs )

日志MLflow模型打包功能查找信息。

请注意

DataFrame返回的TrainingSet.load_df ()必须被用来训练模型。如果它已经被修改(例如数据归一化,添加一个列,和类似的),这些修改将不会应用在推理时,导致training-serving倾斜。

参数
  • 模型——模型得救。这个模型必须能够被拯救了flavor.save_model。看到MLflow模型API

  • artifact_path——Run-relative工件路径。

  • 味道MLflow模块用于记录模型。味道应该有类型ModuleType。该模块必须有一个方法save_model,必须支持python_function味道。例如,mlflow.sklearn,mlflow.xgboost,以及类似的。

  • training_set- - -TrainingSet用来训练这个模型。

  • registered_model_name- - - - - -

    请注意

    实验:这个论点在将来发布的版本中可能会改变或删除没有警告。

    如果有,创建一个模型版本registered_model_name,也创建一个模型如果一个注册名字是不存在的。

  • await_registration_for——等待的秒数模型版本完成创建,准备好了的地位。默认情况下,函数等待五分钟。指定0没有一个跳过等待。

返回

没有一个

score_batch ( model_uri: str,df: pyspark.sql.dataframe.DataFrame,result_type: str =“双” )→pyspark.sql.dataframe.DataFrame

评估模型DataFrame

模型评价所需的附加功能会自动检索功能商店

模型必须被记录FeatureStoreClient.log_model (),包元数据模型与特征。除非出现在df,这些特性将会抬头功能商店和与df前评分模型。

如果一个特性是包含在df将使用提供的特性值,而不是那些存储在功能商店

例如,如果一个模型训练的两个特点account_creation_datenum_lifetime_purchases,如:

feature_lookups=(FeatureLookup(table_name=“trust_and_safety.customer_features”,feature_name=“account_creation_date”,lookup_key=“customer_id”,),FeatureLookup(table_name=“trust_and_safety.customer_features”,feature_name=“num_lifetime_purchases”,lookup_key=“customer_id”),]mlflowstart_run():training_set=fscreate_training_set(df,feature_lookups=feature_lookups,标签=“is_banned”,exclude_columns=(“customer_id”])fslog_model(模型,“模型”,味道=mlflowsklearn,training_set=training_set,registered_model_name=“example_model”)

然后在推理时,调用者的FeatureStoreClient.score_batch ()必须通过一个DataFrame包括customer_id,lookup_key中指定的FeatureLookupstraining_set。如果DataFrame包含一个列account_creation_date,这一列的值将用于代替的功能商店。如:

# batch_df已列(“customer_id”、“account_creation_date”)预测=fsscore_batch(“模型:/ example_model / 1”,batch_df)
参数
  • model_uri- - - - - -

    的位置,URI的格式,MLflow模型的使用记录FeatureStoreClient.log_model ()。之一:

    • :/ < mlflow_run_id > / run-relative /道路/ /模型

      • 模型:/ < model_name > / < model_version >

      • 模型:/ < model_name > / <阶段>

    URI模式的更多信息,请参阅引用工件

  • df- - - - - -

    DataFrame评分模型。功能商店将与特性df前评分模型。df必须:

    1。包含查找键列必须加入特征的数据特征

    商店中指定feature_spec.yaml工件。

    2。包含所有源键列必须得分模型,作为中指定

    feature_spec.yaml工件。

    3所示。不包含一个列预测,这是预留给模型的预测。

    df可能包含附加列。

  • result_type——模型的返回类型。看到mlflow.pyfunc.spark_udf ()result_type。

返回

一个DataFrame包含:

  1. 所有列的df

    1. 所有特征值从特征检索存储。

    2. 一个列预测包含的输出模型。

set_feature_table_tag ( *,table_name: str,关键:str,价值:str )→没有

创建或更新的功能表相关联的一个标签。如果标签对应的键已经存在,它的值和新值将被重写。

请注意

从砖开始运行时可用10.5毫升。

参数
  • table_name——功能表名

  • 关键——标签关键

  • 价值——标签值

delete_feature_table_tag ( *,table_name: str,关键:str )→没有

删除与功能表相关联的标记。删除一个不存在的标签将发出一个警告。

请注意

从砖开始运行时可用10.5毫升。

参数
  • table_name——功能表名。

  • 关键——标签删除的关键。