读和写协议缓冲区

砖之间提供本机支持序列化和反序列化的Apache火花结构和协议缓冲区(protobuf)。Protobuf支持被实现为一个Apache火花DataFrame变压器,可以使用结构化流或批处理操作。

如何反序列化和序列化协议缓冲区

在砖运行时12.1及以上的,你可以使用from_protobufto_protobuf函数来进行序列化和反序列化数据。Protobuf序列化流中常用的工作负载。

protobuf的基本语法功能是类似的读写功能。你在使用前必须进口这些函数。

from_protobuf投一个二进制列结构,to_protobuf将一个struct列转换为二进制。你必须提供一个模式指定的注册表选项确定的参数或描述符文件descFilePath论点。

from_protobuf(数据:“ColumnOrName”,messageName:可选(str]=没有一个,descFilePath:可选(str]=没有一个,选项:可选(Dict(str,str]]=没有一个)to_protobuf(数据:“ColumnOrName”,messageName:可选(str]=没有一个,descFilePath:可选(str]=没有一个,选项:可选(Dict(str,str]]=没有一个)
/ /在使用模式注册表:from_protobuf(数据:,选项:地图(字符串,字符串])/ /或者Protobuf描述符文件:from_protobuf(数据:,messageName:字符串,descFilePath:字符串,选项:地图(字符串,字符串])/ /在使用模式注册表:to_protobuf(数据:,选项:地图(字符串,字符串])/ /或者Protobuf描述符文件:to_protobuf(数据:,messageName:字符串,descFilePath:字符串,选项:地图(字符串,字符串])

下面的示例说明了如何处理二进制protobuf记录from_protobuf ()和将火花SQL结构转换为二进制protobufto_protobuf ()

注册表使用protobuf汇合的模式

砖支持使用融合性的模式注册表定义Protobuf。

pyspark.sql.protobuf.functions进口to_protobuf,from_protobufschema_registry_options={“schema.registry.subject”:“app-events-value”,“schema.registry.address”:“https://schema-registry: 8081 /”}#二进制Protobuf转换为SQL结构与from_protobuf ():proto_events_df=(input_df选择(from_protobuf(“proto_bytes”,选项=schema_registry_options)别名(“proto_event”)))# SQL结构转换为二进制与to_protobuf Protobuf ():protobuf_binary_df=(proto_events_dfselectExpr(“结构(名称、id上下文)事件”)选择(to_protobuf(“事件”,选项=schema_registry_options)别名(“proto_bytes”)))
进口orgapache火花sqlprotobuf功能_进口scala集合JavaConverters_瓦尔schemaRegistryOptions=地图(“schema.registry.subject”- >“app-events-value”,“schema.registry.address”- >“https://schema-registry: 8081 /”)/ /二进制Protobuf转换为SQL结构与from_protobuf ():瓦尔protoEventsDF=inputDF选择(from_protobuf(美元“proto_bytes”,选项=schemaRegistryOptionsasJava)作为(“proto_event”))/ / SQL结构转换为二进制Protobuf to_protobuf ():瓦尔protobufBinaryDF=protoEventsDFselectExpr(“结构(名称、id上下文)事件”)选择(to_protobuf(美元“事件”,选项=schemaRegistryOptionsasJava)作为(“proto_bytes”))

注册验证外部汇合的模式

外部汇合的模式验证注册表,更新您的注册表模式选项包括身份验证凭据和API密钥。

schema_registry_options={“schema.registry.subject”:“app-events-value”,“schema.registry.address”:“https://remote-schema-registry-endpoint”,“confluent.schema.registry.basic.auth.credentials.source”:“USER_INFO”,“confluent.schema.registry.basic.auth.user.info”:“confluentApiKey: confluentApiSecret”}
瓦尔schemaRegistryOptions=地图(“schema.registry.subject”- >“app-events-value”,“schema.registry.address”- >“https://remote-schema-registry-endpoint”,“confluent.schema.registry.basic.auth.credentials.source”:“USER_INFO”,“confluent.schema.registry.basic.auth.user.info”:“confluentApiKey: confluentApiSecret”)

使用Protobuf描述符文件

你也可以参考protobuf描述符文件可用来计算集群。确保你有适当的权限来读取文件,这取决于它的位置。

pyspark.sql.protobuf.functions进口to_protobuf,from_protobufdescriptor_file=“/道路/ / proto_descriptor.desc”proto_events_df=(input_df选择(from_protobuf(input_df价值,“BasicMessage”,descFilePath=descriptor_file)别名(“原型”)))proto_binary_df=(proto_events_df选择(to_protobuf(proto_events_df原型,“BasicMessage”,descriptor_file)别名(“字节”)))
进口orgapache火花sqlprotobuf功能_瓦尔descriptorFile=“/道路/ / proto_descriptor.desc”瓦尔protoEventsDF=inputDF选择(from_protobuf(美元“价值”,“BasicMessage”,descFilePath=descriptorFile)。作为(“原型”))瓦尔protoBytesDF=protoEventsDF选择(to_protobuf(美元“原型”,“BasicMessage”,descriptorFile)。作为(“字节”))