读和写协议缓冲区
砖之间提供本机支持序列化和反序列化的Apache火花结构和协议缓冲区(protobuf)。Protobuf支持被实现为一个Apache火花DataFrame变压器,可以使用结构化流或批处理操作。
如何反序列化和序列化协议缓冲区
在砖运行时12.1及以上的,你可以使用from_protobuf
和to_protobuf
函数来进行序列化和反序列化数据。Protobuf序列化流中常用的工作负载。
protobuf的基本语法功能是类似的读写功能。你在使用前必须进口这些函数。
from_protobuf
投一个二进制列结构,to_protobuf
将一个struct列转换为二进制。你必须提供一个模式指定的注册表选项
确定的参数或描述符文件descFilePath
论点。
from_protobuf(数据:“ColumnOrName”,messageName:可选(str]=没有一个,descFilePath:可选(str]=没有一个,选项:可选(Dict(str,str]]=没有一个)to_protobuf(数据:“ColumnOrName”,messageName:可选(str]=没有一个,descFilePath:可选(str]=没有一个,选项:可选(Dict(str,str]]=没有一个)
/ /在使用模式注册表:from_protobuf(数据:列,选项:地图(字符串,字符串])/ /或者Protobuf描述符文件:from_protobuf(数据:列,messageName:字符串,descFilePath:字符串,选项:地图(字符串,字符串])/ /在使用模式注册表:to_protobuf(数据:列,选项:地图(字符串,字符串])/ /或者Protobuf描述符文件:to_protobuf(数据:列,messageName:字符串,descFilePath:字符串,选项:地图(字符串,字符串])
下面的示例说明了如何处理二进制protobuf记录from_protobuf ()
和将火花SQL结构转换为二进制protobufto_protobuf ()
。
注册表使用protobuf汇合的模式
砖支持使用融合性的模式注册表定义Protobuf。
从pyspark.sql.protobuf.functions进口to_protobuf,from_protobufschema_registry_options={“schema.registry.subject”:“app-events-value”,“schema.registry.address”:“https://schema-registry: 8081 /”}#二进制Protobuf转换为SQL结构与from_protobuf ():proto_events_df=(input_df。选择(from_protobuf(“proto_bytes”,选项=schema_registry_options)。别名(“proto_event”)))# SQL结构转换为二进制与to_protobuf Protobuf ():protobuf_binary_df=(proto_events_df。selectExpr(“结构(名称、id上下文)事件”)。选择(to_protobuf(“事件”,选项=schema_registry_options)。别名(“proto_bytes”)))
进口org。apache。火花。sql。protobuf。功能。_进口scala。集合。JavaConverters。_瓦尔schemaRegistryOptions=地图(“schema.registry.subject”- >“app-events-value”,“schema.registry.address”- >“https://schema-registry: 8081 /”)/ /二进制Protobuf转换为SQL结构与from_protobuf ():瓦尔protoEventsDF=inputDF。选择(from_protobuf(美元“proto_bytes”,选项=schemaRegistryOptions。asJava)。作为(“proto_event”))/ / SQL结构转换为二进制Protobuf to_protobuf ():瓦尔protobufBinaryDF=protoEventsDF。selectExpr(“结构(名称、id上下文)事件”)。选择(to_protobuf(美元“事件”,选项=schemaRegistryOptions。asJava)。作为(“proto_bytes”))
注册验证外部汇合的模式
外部汇合的模式验证注册表,更新您的注册表模式选项包括身份验证凭据和API密钥。
schema_registry_options={“schema.registry.subject”:“app-events-value”,“schema.registry.address”:“https://remote-schema-registry-endpoint”,“confluent.schema.registry.basic.auth.credentials.source”:“USER_INFO”,“confluent.schema.registry.basic.auth.user.info”:“confluentApiKey: confluentApiSecret”}
瓦尔schemaRegistryOptions=地图(“schema.registry.subject”- >“app-events-value”,“schema.registry.address”- >“https://remote-schema-registry-endpoint”,“confluent.schema.registry.basic.auth.credentials.source”:“USER_INFO”,“confluent.schema.registry.basic.auth.user.info”:“confluentApiKey: confluentApiSecret”)
使用Protobuf描述符文件
你也可以参考protobuf描述符文件可用来计算集群。确保你有适当的权限来读取文件,这取决于它的位置。
从pyspark.sql.protobuf.functions进口to_protobuf,from_protobufdescriptor_file=“/道路/ / proto_descriptor.desc”proto_events_df=(input_df。选择(from_protobuf(input_df。价值,“BasicMessage”,descFilePath=descriptor_file)。别名(“原型”)))proto_binary_df=(proto_events_df。选择(to_protobuf(proto_events_df。原型,“BasicMessage”,descriptor_file)。别名(“字节”)))
进口org。apache。火花。sql。protobuf。功能。_瓦尔descriptorFile=“/道路/ / proto_descriptor.desc”瓦尔protoEventsDF=inputDF。选择(from_protobuf(美元“价值”,“BasicMessage”,descFilePath=descriptorFile)。作为(“原型”))瓦尔protoBytesDF=protoEventsDF。选择(to_protobuf(美元“原型”,“BasicMessage”,descriptorFile)。作为(“字节”))