砖FeatureStoreClient
-
类
databricks.feature_store.client。
FeatureStoreClient
( feature_store_uri:可选(str) =没有,model_registry_uri:可选(str) =没有 ) -
基地:
对象
客户端与砖交互功能。
-
create_table
( 名称:str primary_keys:联盟[str、列表(str)], [pyspark.sql.dataframe df:可选。DataFrame]=没有一个,*,timestamp_keys: Union[str, List[str], None] = None, partition_columns: Union[str, List[str], None] = None, schema: Optional[pyspark.sql.types.StructType] = None, description: Optional[str] = None, tags: Optional[Dict[str, str]] = None, **kwargs )→databricks.feature_store.entities.feature_table.FeatureTable -
创建并返回一个功能表与给定名称和主键。
返回的功能表有名字和主键。使用所提供的
模式
或推断模式所提供的df
。如果df
提供,此数据将被保存在一个三角洲表。支持的数据类型特点是:IntegerType
,LongType
,FloatType
,倍增式
,StringType
,BooleanType
,DateType
,TimestampType
,ShortType
,ArrayType
,MapType
,BinaryType
,DecimalType
。- 参数
-
的名字——一个表单的功能表名
< database_name >。< table_name >
例如,dev.user_features
。primary_keys——功能表的主键。如果需要多个列,例如,指定一个列名列表
[' customer_id ',“地区”)
。df-数据插入到此功能表。的模式
df
将被用作功能表模式。timestamp_keys- - - - - -
包含事件时间相关特性值的列。时间戳键和功能表的主键唯一地标识一个实体的特征值在一个时间点。
请注意
实验:这个论点在将来发布的版本中可能会改变或删除没有警告。
partition_columns- - - - - -
用于分区功能表列。如果提供了列表,列表中的列排序将用于分区。
模式——功能表模式。要么
模式
或df
必须提供。描述——功能表的描述。
标签- - - - - -
标签与功能表。
请注意
从砖开始运行时可用10.5毫升。
- 其他参数
-
路径(
可选(str)
)——在一个受支持的文件系统路径。默认数据库的位置。
-
register_table
( *,delta_table: str, primary_keys:联盟[str、列表(str)], timestamp_keys:联盟(str,列表(str),没有一个)=没有描述:可选(str) =没有标签:可选[Dict [str, str]] =没有 )→databricks.feature_store.entities.feature_table.FeatureTable -
注册一个现有三角洲与给定的表作为一个功能表主键。
返回的功能表具有相同的名称作为三角洲表。
请注意
从砖开始运行时可用10.4毫升。
- 参数
-
的名字-δ表单的表名
< database_name >。< table_name >
例如,dev.user_features
。表必须存在于metastore。primary_keys——三角洲表的主键。如果需要多个列,例如,指定一个列名列表
[' customer_id ',“地区”)
。timestamp_keys——列包含事件时间与特征值有关。在一起,功能键和主键唯一地标识的时间戳值在一个时间点。
描述——功能表的描述。
标签- - - - - -
标签与功能表。
请注意
从砖开始运行时可用10.5毫升。
- 返回
-
一个
FeatureTable
对象。
-
get_table
( 名称:str )→databricks.feature_store.entities.feature_table.FeatureTable -
得到一个功能表的元数据。
- 参数
-
的名字——一个表单的功能表名
< database_name >。< table_name >
例如,dev.user_features
。
-
drop_table
( 名称:str )→没有 -
删除指定的功能表。这个API还滴底层δ表。
请注意
从砖开始运行时可用10.5毫升。
- 参数
-
的名字——表单的功能表名
< database_name >。< table_name >
例如,dev.user_features
。
请注意
删除功能表可以导致上游生产商和下游消费者意想不到的失败(模型、端点和安排工作)。你必须删除任何现有的在线商店单独发表。
-
read_table
( 名称:str,* * kwargs )→pyspark.sql.dataframe.DataFrame -
阅读的内容功能表。
- 参数
-
的名字——一个表单的功能表名
< database_name >。< table_name >
例如,dev.user_features
。 - 返回
-
功能表内容,或将抛出一个异常,如果这个特性表不存在。
-
write_table
( 名称:str,df: pyspark.sql.dataframe.DataFrame,模式:str =“合并”,checkpoint_location:可选(str) =没有,触发:Dict (str,任何]= {“processingTime”:“5秒”} )→可选(pyspark.sql.streaming.StreamingQuery) -
写一个功能表。
如果输入
DataFrame
流,将创建一个写流。- 参数
-
的名字——一个表单的功能表名
< database_name >。< table_name >
例如,dev.user_features
。提出了一个异常如果这个特性表不存在。df——火花
DataFrame
特性数据。提出了一个异常,如果模式不匹配的功能表。模式- - - - - -
两个支持写模式:
“覆盖”
更新整个表。“合并”
插入的行吗df
进入功能表。如果df
包含功能表中的列不存在,这些列将被添加新特性。
checkpoint_location——设置结构化流
checkpointLocation
选择。通过设置一个checkpoint_location
,火花结构化流将存储的进展信息和中间状态,使故障后恢复。这个参数时仅支持论点df
是一个流DataFrame
。触发——如果
df.isStreaming
,触发
字典定义了流数据处理的时机,将打开并通过DataStreamWriter.trigger
作为参数。例如,触发={“一旦”:真正的}
将导致调用吗DataStreamWriter.trigger(一旦= True)
。
- 返回
-
如果
df.isStreaming
,返回一个PySparkStreamingQuery
。没有一个
否则。
-
add_data_sources
( *,feature_table_name: str, source_names:联盟[str、列表(str)], source_type: str =“定制” )→没有 -
将数据源添加到功能表。
- 参数
-
feature_table_name——功能表名。
source_names——数据源名称。对于多个源,指定一个列表。如果一个数据源名称已经存在,它将被忽略。
source_type- - - - - -
下列之一:
“表”
:表中格式< database_name >。< table_name >和存储在metastore(如蜂巢)。“路径”
:砖的路径,如文件系统(DBFS)。“自定义”
:手动添加数据源,一张桌子和一个路径。
-
delete_data_sources
( *,feature_table_name: str, source_names:联盟(str、列表(str)) )→没有 -
从功能表删除数据源。
请注意
所有类型的数据源(表、路径、自定义)匹配源名称将被删除。
- 参数
-
feature_table_name——功能表名。
source_names——数据源名称。对于多个源,指定一个列表。如果一个数据源名称不存在,它将被忽略。
-
publish_table
( 名称:str, online_store: databricks.feature_store.online_store_spec.online_store_spec。*,OnlineStoreSpec filter_condition:可选(str) = None,模式:str =“合并”,流:bool = False, checkpoint_location:可选(str) = None,触发:Dict [str,任何]= {“processingTime”:“5分钟”},特点:联盟(str,列表(str),没有一个)=没有 )→可选(pyspark.sql.streaming.StreamingQuery) -
发布功能表在线商店。
- 参数
-
的名字——功能表的名称。
online_store——规范的在线商店。
filter_condition——一个SQL表达式使用功能表列,过滤器功能行之前发布的在线商店。例如,
“dt>“2020-09-10”
。这类似于跑步df.filter
或者一个在哪里
条件在SQL特性表之前出版。模式- - - - - -
指定的行为当数据已经存在于这个特性表在网上商店。如果
“覆盖”
使用模式,现有的数据被替换的新数据。如果“合并”
使用模式,新的数据将被合并,在这些条件下:如果存在一个关键在线表而不是离线表,行在线表修改的。
离线表中是否存在一个重要但不是在线表,在线离线表行插入到表。
如果存在一个关键的离线和在线表,在线表行将被更新。
流媒体——如果
真正的
流数据的在线商店。checkpoint_location——设置结构化流
checkpointLocation
选择。通过设置一个checkpoint_location
,火花结构化流将存储的进展信息和中间状态,使故障后恢复。这个参数时仅支持流= True
。触发——如果
流= True
,触发
定义了流数据处理的时机。字典将打开并通过DataStreamWriter.trigger
作为参数。例如,触发={“一旦”:真正的}
将导致调用吗DataStreamWriter.trigger(一旦= True)
。特性- - - - - -
指定特性列(s)发布到网上商店。所选特征必须现有在线商店功能的超集。主键列和时间戳键列总是会出版。
请注意
这个参数时仅支持
模式=“合并”
。当特性
没有设置,整个功能表将发表。
- 返回
-
如果
流= True
,返回一个PySparkStreamingQuery
,没有一个
否则。
-
create_training_set
( df: pyspark.sql.dataframe.DataFrame,feature_lookups: List[databricks.feature_store.entities.feature_lookup.FeatureLookup], label: Union[str, List[str], None], exclude_columns: List[str] = [] )→databricks.feature_store.training_set.TrainingSet -
创建一个
TrainingSet
。- 参数
-
df- - -
DataFrame
用于连接特性。feature_lookups——功能加入到列表
DataFrame
。标签- - - - -列(s)的名字
DataFrame
包含训练集的标签。没有标号字段创建一个训练集,即为无监督训练集,指定标签=没有。exclude_columns——名称的列下降
TrainingSet
DataFrame
。
- 返回
-
一个
TrainingSet
对象。
-
log_model
( 模型:任何,artifact_path: str,*,味道:模块,training_set: databricks.feature_store.training_set.TrainingSet,registered_model_name:可选(str) =没有,await_registration_for: int = mlflow.tracking._model_registry.DEFAULT_AWAIT_MAX_SLEEP_SECONDS,* * kwargs ) -
日志MLflow模型打包功能查找信息。
请注意
的
DataFrame
返回的TrainingSet.load_df ()
必须被用来训练模型。如果它已经被修改(例如数据归一化,添加一个列,和类似的),这些修改将不会应用在推理时,导致training-serving倾斜。- 参数
-
模型——模型得救。这个模型必须能够被拯救了
flavor.save_model
。看到MLflow模型API。artifact_path——Run-relative工件路径。
味道MLflow模块用于记录模型。
味道
应该有类型ModuleType
。该模块必须有一个方法save_model
,必须支持python_function
味道。例如,mlflow.sklearn
,mlflow.xgboost
,以及类似的。training_set- - -
TrainingSet
用来训练这个模型。registered_model_name- - - - - -
请注意
实验:这个论点在将来发布的版本中可能会改变或删除没有警告。
如果有,创建一个模型版本
registered_model_name
,也创建一个模型如果一个注册名字是不存在的。await_registration_for——等待的秒数模型版本完成创建,
准备好了
的地位。默认情况下,函数等待五分钟。指定0
或没有一个
跳过等待。
- 返回
-
score_batch
( model_uri: str,df: pyspark.sql.dataframe.DataFrame,result_type: str =“双” )→pyspark.sql.dataframe.DataFrame -
评估模型
DataFrame
。模型评价所需的附加功能会自动检索
功能商店
。模型必须被记录
FeatureStoreClient.log_model ()
,包元数据模型与特征。除非出现在df
,这些特性将会抬头功能商店
和与df
前评分模型。如果一个特性是包含在
df
将使用提供的特性值,而不是那些存储在功能商店
。例如,如果一个模型训练的两个特点
account_creation_date
和num_lifetime_purchases
,如:feature_lookups=(FeatureLookup(table_name=“trust_and_safety.customer_features”,feature_name=“account_creation_date”,lookup_key=“customer_id”,),FeatureLookup(table_name=“trust_and_safety.customer_features”,feature_name=“num_lifetime_purchases”,lookup_key=“customer_id”),]与mlflow。start_run():training_set=fs。create_training_set(df,feature_lookups=feature_lookups,标签=“is_banned”,exclude_columns=(“customer_id”])…fs。log_model(模型,“模型”,味道=mlflow。sklearn,training_set=training_set,registered_model_name=“example_model”)
然后在推理时,调用者的
FeatureStoreClient.score_batch ()
必须通过一个DataFrame
包括customer_id
,lookup_key
中指定的FeatureLookups
的training_set
。如果DataFrame
包含一个列account_creation_date
,这一列的值将用于代替的功能商店
。如:# batch_df已列(“customer_id”、“account_creation_date”)预测=fs。score_batch(“模型:/ example_model / 1”,batch_df)
- 参数
-
model_uri- - - - - -
的位置,URI的格式,MLflow模型的使用记录
FeatureStoreClient.log_model ()
。之一::/ < mlflow_run_id > / run-relative /道路/ /模型
模型:/ < model_name > / < model_version >
模型:/ < model_name > / <阶段>
URI模式的更多信息,请参阅引用工件。
df- - - - - -
的
DataFrame
评分模型。功能商店
将与特性df
前评分模型。df
必须:1。包含查找键列必须加入特征的数据特征
商店中指定
feature_spec.yaml
工件。2。包含所有源键列必须得分模型,作为中指定
的
feature_spec.yaml
工件。3所示。不包含一个列
预测
,这是预留给模型的预测。df
可能包含附加列。result_type——模型的返回类型。看到
mlflow.pyfunc.spark_udf ()
result_type。
- 返回
-
一个
DataFrame
包含:所有列的
df
。所有特征值从特征检索存储。
一个列
预测
包含的输出模型。
-