读和写流Avro数据

Apache Avro是一种常用的数据序列化系统流的世界。一个典型的解决方案是将数据在Apache卡夫卡Avro格式,元数据融合性的模式注册表,然后运行查询的流媒体框架连接到卡夫卡和模式注册表。

砖支持from_avroto_avro功能在卡夫卡建立与Avro数据流管道在注册表模式和元数据。这个函数to_avro在Avro二进制格式和编码专栏from_avro解码Avro二进制数据列。函数变换到另一列一列,和输入/输出的SQL数据类型可以是一个复杂类型或一个原始类型。

请注意

from_avroto_avro功能:

  • 可在PythonScala和Java。

  • 可以传递给SQL函数在两个批处理和流媒体查询。

也看到Avro文件数据源

基本的例子

类似于from_jsonto_json,你可以使用from_avroto_avro与任何二进制列,但你必须指定Avro手动模式。

进口orgapache火花sqlavro功能_进口orgapacheavroSchemaBuilder/ /当阅读卡夫卡的键和值的话题,解码/ /二进制(Avro)数据结构化数据。/ /结果DataFrame的模式是:<关键:字符串,价值:int >瓦尔df=火花readStream格式(“卡夫卡”)选项(“kafka.bootstrap.servers”,服务器)选项(“订阅”,“t”)负载()选择(from_avro(美元“关键”,SchemaBuilder构建器()。stringType())。作为(“关键”),from_avro(美元“价值”,SchemaBuilder构建器()。intType())。作为(“价值”))/ /将结构化数据转换为二进制字符串(键列)和/ / int(值列)并保存到卡夫卡的主题。dataDF选择(to_avro(美元“关键”)。作为(“关键”),to_avro(美元“价值”)。作为(“价值”))writeStream格式(“卡夫卡”)选项(“kafka.bootstrap.servers”,服务器)选项(“主题”,“t”)开始()

jsonFormatSchema例子

您还可以指定一个模式作为一个JSON字符串。例如,如果/ tmp / user.avsc是:

{“名称”:“example.avro”,“类型”:“记录”,“名称”:“用户”,“字段”:({“名称”:“名称”,“类型”:“字符串”},{“名称”:“favorite_color”,“类型”:(“字符串”,“零”]}]}

您可以创建一个JSON字符串:

pyspark.sql.avro.functions进口from_avro,to_avrojsonFormatSchema=开放(“/ tmp / user.avsc”,“r”)()

然后使用模式from_avro:

# 1。解码Avro数据结构。# 2。过滤器由列“favorite_color”。# 3。以Avro格式编码的“名称”列。输出=df\选择(from_avro(“价值”,jsonFormatSchema)别名(“用户”))\在哪里(的用户。favorite_color == "red"')\选择(to_avro(“user.name”)别名(“价值”))

例子与模式注册表

如果您的集群模式注册表服务,from_avro可以使用它,这样你不需要指定Avro手动模式。

下面的例子演示了阅读卡夫卡的主题“t”,假设键和值已经注册在注册表模式主题“t键”和“值”的类型字符串INT:

进口orgapache火花sqlavro功能_瓦尔schemaRegistryAddr=“https://myhost: 8081”瓦尔df=火花readStream格式(“卡夫卡”)选项(“kafka.bootstrap.servers”,服务器)选项(“订阅”,“t”)负载()选择(from_avro(美元“关键”,“t键”,schemaRegistryAddr)。作为(“关键”),from_avro(美元“价值”,“值”,schemaRegistryAddr)。作为(“价值”))

to_avro,默认输出Avro模式可能不匹配模式的目标主题注册表服务有以下原因:

  • 从火花SQL类型映射到Avro模式不是一对一的。看到支持类型的火花SQL - > Avro转换

  • 如果转换输出Avro模式记录类型,记录的名字是topLevelRecord没有默认名称空间。

如果默认输出模式to_avro匹配模式的目标主题,您可以执行以下操作:

/ /转换后的数据保存到卡夫卡作为卡夫卡主题“t”。dataDF选择(to_avro(美元“关键”,点燃(“t键”),schemaRegistryAddr)。作为(“关键”),to_avro(美元“价值”,点燃(“值”),schemaRegistryAddr)。作为(“价值”))writeStream格式(“卡夫卡”)选项(“kafka.bootstrap.servers”,服务器)选项(“主题”,“t”)开始()

否则,你必须提供模式的目标对象to_avro功能:

/ / Avro模式的JSON字符串格式的“值”。瓦尔avroSchema=/ /转换后的数据保存到卡夫卡作为卡夫卡主题“t”。dataDF选择(to_avro(美元“关键”,点燃(“t键”),schemaRegistryAddr)。作为(“关键”),to_avro(美元“价值”,点燃(“值”),schemaRegistryAddr,avroSchema)。作为(“价值”))writeStream格式(“卡夫卡”)选项(“kafka.bootstrap.servers”,服务器)选项(“主题”,“t”)开始()

注册验证外部汇合的模式

在砖运行时的12.1及以上,可以验证外部汇合的模式注册表。下面的例子演示如何配置您的模式注册表选项包括身份验证凭据和API密钥。

进口orgapache火花sqlavro功能_进口scala集合JavaConverters_瓦尔schemaRegistryAddr=“https://confluent-schema-registry-endpoint”瓦尔schemaRegistryOptions=地图(“confluent.schema.registry.basic.auth.credentials.source”- >“USER_INFO”,“confluent.schema.registry.basic.auth.user.info”- >“confluentApiKey: confluentApiSecret”)瓦尔df=火花readStream格式(“卡夫卡”)选项(“kafka.bootstrap.servers”,服务器)选项(“订阅”,“t”)负载()选择(from_avro(美元“关键”,“t键”,schemaRegistryAddr,schemaRegistryOptionsasJava)。作为(“关键”),from_avro(美元“价值”,“值”,schemaRegistryAddr,schemaRegistryOptionsasJava)。作为(“价值”))/ /转换后的数据保存到卡夫卡作为卡夫卡主题“t”。dataDF选择(to_avro(美元“关键”,点燃(“t键”),schemaRegistryAddr,schemaRegistryOptionsasJava)。作为(“关键”),to_avro(美元“价值”,点燃(“值”),schemaRegistryAddr,schemaRegistryOptionsasJava)。作为(“价值”))writeStream格式(“卡夫卡”)选项(“kafka.bootstrap.servers”,服务器)选项(“主题”,“t”)保存()/ / Avro模式的JSON字符串格式的“值”。瓦尔avroSchema=/ /转换后的数据保存到卡夫卡作为卡夫卡主题“t”。dataDF选择(to_avro(美元“关键”,点燃(“t键”),schemaRegistryAddr,schemaRegistryOptionsasJava)。作为(“关键”),to_avro(美元“价值”,点燃(“值”),schemaRegistryAddr,schemaRegistryOptionsasJava,avroSchema)。作为(“价值”))writeStream格式(“卡夫卡”)选项(“kafka.bootstrap.servers”,服务器)选项(“主题”,“t”)保存()
pyspark.sql.functions进口上校,点燃pyspark.sql.avro.functions进口from_avro,to_avroschema_registry_address=“https://confluent-schema-registry-endpoint”schema_registry_options={“confluent.schema.registry.basic.auth.credentials.source”:“USER_INFO”,“confluent.schema.registry.basic.auth.user.info”:f{关键}:{秘密}}df=(火花readStream格式(“卡夫卡”)选项(“kafka.bootstrap.servers”,服务器)选项(“订阅”,“t”)负载()选择(from_avro(数据=上校(“关键”),选项=schema_registry_options,主题=“t键”,schemaRegistryAddress=schema_registry_address)别名(“关键”),from_avro(数据=上校(“价值”),选项=schema_registry_options,主题=“值”,schemaRegistryAddress=schema_registry_address)别名(“价值”)))#转换后的数据保存到卡夫卡作为卡夫卡主题“t”。data_df选择(to_avro(数据=上校(“关键”),主题=点燃(“t键”),schemaRegistryAddress=schema_registry_address,选项=schema_registry_options)别名(“关键”),to_avro(数据=上校(“价值”),主题=点燃(“值”),schemaRegistryAddress=schema_registry_address,选项=schema_registry_options)别名(“价值”))writeStream格式(“卡夫卡”)选项(“kafka.bootstrap.servers”,服务器)选项(“主题”,“t”)保存()# Avro模式的JSON字符串格式的“值”。avro_schema=#转换后的数据保存到卡夫卡作为卡夫卡主题“t”。data_df选择(to_avro(上校(“关键”),点燃(“t键”),schema_registry_address,schema_registry_options)别名(“关键”),to_avro(上校(“价值”),点燃(“值”),schema_registry_address,schema_registry_options,avro_schema)别名(“价值”))writeStream格式(“卡夫卡”)选项(“kafka.bootstrap.servers”,服务器)选项(“主题”,“t”)保存()