读和写流Avro数据
Apache Avro是一种常用的数据序列化系统流的世界。一个典型的解决方案是将数据在Apache卡夫卡Avro格式,元数据融合性的模式注册表,然后运行查询的流媒体框架连接到卡夫卡和模式注册表。
砖支持from_avro
和to_avro
功能在卡夫卡建立与Avro数据流管道在注册表模式和元数据。这个函数to_avro
在Avro二进制格式和编码专栏from_avro
解码Avro二进制数据列。函数变换到另一列一列,和输入/输出的SQL数据类型可以是一个复杂类型或一个原始类型。
也看到Avro文件数据源。
基本的例子
类似于from_json和to_json,你可以使用from_avro
和to_avro
与任何二进制列,但你必须指定Avro手动模式。
进口org。apache。火花。sql。avro。功能。_进口org。apache。avro。SchemaBuilder/ /当阅读卡夫卡的键和值的话题,解码/ /二进制(Avro)数据结构化数据。/ /结果DataFrame的模式是:<关键:字符串,价值:int >瓦尔df=火花。readStream。格式(“卡夫卡”)。选项(“kafka.bootstrap.servers”,服务器)。选项(“订阅”,“t”)。负载()。选择(from_avro(美元“关键”,SchemaBuilder。构建器()。stringType())。作为(“关键”),from_avro(美元“价值”,SchemaBuilder。构建器()。intType())。作为(“价值”))/ /将结构化数据转换为二进制字符串(键列)和/ / int(值列)并保存到卡夫卡的主题。dataDF。选择(to_avro(美元“关键”)。作为(“关键”),to_avro(美元“价值”)。作为(“价值”))。writeStream。格式(“卡夫卡”)。选项(“kafka.bootstrap.servers”,服务器)。选项(“主题”,“t”)。开始()
jsonFormatSchema例子
您还可以指定一个模式作为一个JSON字符串。例如,如果/ tmp / user.avsc
是:
{“名称”:“example.avro”,“类型”:“记录”,“名称”:“用户”,“字段”:({“名称”:“名称”,“类型”:“字符串”},{“名称”:“favorite_color”,“类型”:(“字符串”,“零”]}]}
您可以创建一个JSON字符串:
从pyspark.sql.avro.functions进口from_avro,to_avrojsonFormatSchema=开放(“/ tmp / user.avsc”,“r”)。读()
然后使用模式from_avro
:
# 1。解码Avro数据结构。# 2。过滤器由列“favorite_color”。# 3。以Avro格式编码的“名称”列。输出=df\。选择(from_avro(“价值”,jsonFormatSchema)。别名(“用户”))\。在哪里(的用户。favorite_color == "red"')\。选择(to_avro(“user.name”)。别名(“价值”))
例子与模式注册表
如果您的集群模式注册表服务,from_avro
可以使用它,这样你不需要指定Avro手动模式。
下面的例子演示了阅读卡夫卡的主题“t”,假设键和值已经注册在注册表模式主题“t键”和“值”的类型字符串
和INT
:
进口org。apache。火花。sql。avro。功能。_瓦尔schemaRegistryAddr=“https://myhost: 8081”瓦尔df=火花。readStream。格式(“卡夫卡”)。选项(“kafka.bootstrap.servers”,服务器)。选项(“订阅”,“t”)。负载()。选择(from_avro(美元“关键”,“t键”,schemaRegistryAddr)。作为(“关键”),from_avro(美元“价值”,“值”,schemaRegistryAddr)。作为(“价值”))
为to_avro
,默认输出Avro模式可能不匹配模式的目标主题注册表服务有以下原因:
从火花SQL类型映射到Avro模式不是一对一的。看到支持类型的火花SQL - > Avro转换。
如果转换输出Avro模式记录类型,记录的名字是
topLevelRecord
没有默认名称空间。
如果默认输出模式to_avro
匹配模式的目标主题,您可以执行以下操作:
/ /转换后的数据保存到卡夫卡作为卡夫卡主题“t”。dataDF。选择(to_avro(美元“关键”,点燃(“t键”),schemaRegistryAddr)。作为(“关键”),to_avro(美元“价值”,点燃(“值”),schemaRegistryAddr)。作为(“价值”))。writeStream。格式(“卡夫卡”)。选项(“kafka.bootstrap.servers”,服务器)。选项(“主题”,“t”)。开始()
否则,你必须提供模式的目标对象to_avro
功能:
/ / Avro模式的JSON字符串格式的“值”。瓦尔avroSchema=…/ /转换后的数据保存到卡夫卡作为卡夫卡主题“t”。dataDF。选择(to_avro(美元“关键”,点燃(“t键”),schemaRegistryAddr)。作为(“关键”),to_avro(美元“价值”,点燃(“值”),schemaRegistryAddr,avroSchema)。作为(“价值”))。writeStream。格式(“卡夫卡”)。选项(“kafka.bootstrap.servers”,服务器)。选项(“主题”,“t”)。开始()
注册验证外部汇合的模式
在砖运行时的12.1及以上,可以验证外部汇合的模式注册表。下面的例子演示如何配置您的模式注册表选项包括身份验证凭据和API密钥。
进口org。apache。火花。sql。avro。功能。_进口scala。集合。JavaConverters。_瓦尔schemaRegistryAddr=“https://confluent-schema-registry-endpoint”瓦尔schemaRegistryOptions=地图(“confluent.schema.registry.basic.auth.credentials.source”- >“USER_INFO”,“confluent.schema.registry.basic.auth.user.info”- >“confluentApiKey: confluentApiSecret”)瓦尔df=火花。readStream。格式(“卡夫卡”)。选项(“kafka.bootstrap.servers”,服务器)。选项(“订阅”,“t”)。负载()。选择(from_avro(美元“关键”,“t键”,schemaRegistryAddr,schemaRegistryOptions。asJava)。作为(“关键”),from_avro(美元“价值”,“值”,schemaRegistryAddr,schemaRegistryOptions。asJava)。作为(“价值”))/ /转换后的数据保存到卡夫卡作为卡夫卡主题“t”。dataDF。选择(to_avro(美元“关键”,点燃(“t键”),schemaRegistryAddr,schemaRegistryOptions。asJava)。作为(“关键”),to_avro(美元“价值”,点燃(“值”),schemaRegistryAddr,schemaRegistryOptions。asJava)。作为(“价值”))。writeStream。格式(“卡夫卡”)。选项(“kafka.bootstrap.servers”,服务器)。选项(“主题”,“t”)。保存()/ / Avro模式的JSON字符串格式的“值”。瓦尔avroSchema=…/ /转换后的数据保存到卡夫卡作为卡夫卡主题“t”。dataDF。选择(to_avro(美元“关键”,点燃(“t键”),schemaRegistryAddr,schemaRegistryOptions。asJava)。作为(“关键”),to_avro(美元“价值”,点燃(“值”),schemaRegistryAddr,schemaRegistryOptions。asJava,avroSchema)。作为(“价值”))。writeStream。格式(“卡夫卡”)。选项(“kafka.bootstrap.servers”,服务器)。选项(“主题”,“t”)。保存()
从pyspark.sql.functions进口上校,点燃从pyspark.sql.avro.functions进口from_avro,to_avroschema_registry_address=“https://confluent-schema-registry-endpoint”schema_registry_options={“confluent.schema.registry.basic.auth.credentials.source”:“USER_INFO”,“confluent.schema.registry.basic.auth.user.info”:f”{关键}:{秘密}”}df=(火花。readStream。格式(“卡夫卡”)。选项(“kafka.bootstrap.servers”,服务器)。选项(“订阅”,“t”)。负载()。选择(from_avro(数据=上校(“关键”),选项=schema_registry_options,主题=“t键”,schemaRegistryAddress=schema_registry_address)。别名(“关键”),from_avro(数据=上校(“价值”),选项=schema_registry_options,主题=“值”,schemaRegistryAddress=schema_registry_address)。别名(“价值”)))#转换后的数据保存到卡夫卡作为卡夫卡主题“t”。data_df。选择(to_avro(数据=上校(“关键”),主题=点燃(“t键”),schemaRegistryAddress=schema_registry_address,选项=schema_registry_options)。别名(“关键”),to_avro(数据=上校(“价值”),主题=点燃(“值”),schemaRegistryAddress=schema_registry_address,选项=schema_registry_options)。别名(“价值”))。writeStream。格式(“卡夫卡”)。选项(“kafka.bootstrap.servers”,服务器)。选项(“主题”,“t”)。保存()# Avro模式的JSON字符串格式的“值”。avro_schema=…#转换后的数据保存到卡夫卡作为卡夫卡主题“t”。data_df。选择(to_avro(上校(“关键”),点燃(“t键”),schema_registry_address,schema_registry_options)。别名(“关键”),to_avro(上校(“价值”),点燃(“值”),schema_registry_address,schema_registry_options,avro_schema)。别名(“价值”))。writeStream。格式(“卡夫卡”)。选项(“kafka.bootstrap.servers”,服务器)。选项(“主题”,“t”)。保存()