使用JDBC查询数据库
Databricks支持使用JDBC连接外部数据库。本文通过Python、SQL和Scala中的示例提供了配置和使用这些连接的基本语法。
Partner Connect为与许多外部数据源同步数据提供了优化的集成。看到什么是Databricks Partner Connect?。
重要的
本文中的示例不包括JDBC url中的用户名和密码。Databricks推荐使用秘密来存储数据库凭据。例如:
用户名=dbutils。秘密。得到(范围=“jdbc”,关键=“用户名”)密码=dbutils。秘密。得到(范围=“jdbc”,关键=“密码”)
瓦尔用户名=dbutils。秘密。得到(范围=“jdbc”,关键=“用户名”)瓦尔密码=dbutils。秘密。得到(范围=“jdbc”,关键=“密码”)
要用SQL引用Databricks的秘密,您必须在初始化集群时配置Spark配置属性。
有关秘密管理的完整示例,请参见秘密工作流示例。
用JDBC读取数据
您必须配置许多设置来使用JDBC读取数据。类型使用不同的格式< jdbc_url >
。
employees_table=(火花。读。格式(“jdbc”)。选项(“url”,“< jdbc_url >”)。选项(“数据表”,“< table_name >”)。选项(“用户”,“<用户名>”)。选项(“密码”,“<密码>”)。负载())
创建临时视图employees_table_vw使用JDBC选项(url“< jdbc_url >”,数据表“< table_name >”,用户“<用户名>”,密码' <密码> ')
瓦尔employees_table=火花。读。格式(“jdbc”)。选项(“url”,“< jdbc_url >”)。选项(“数据表”,“< table_name >”)。选项(“用户”,“<用户名>”)。选项(“密码”,“<密码>”)。负载()
Spark自动从数据库表中读取模式,并将其类型映射回Spark SQL类型。
employees_table。printSchema
描述employees_table_vw
employees_table。printSchema
你可以对这个JDBC表运行查询:
显示(employees_table。选择(“年龄”,“工资”)。groupBy(“年龄”)。avg(“工资”))
选择年龄,avg(工资)作为工资从employees_table_vw集团通过年龄
显示(employees_table。选择(“年龄”,“工资”).groupBy(“年龄”).avg(“工资”))
用JDBC写数据
使用JDBC将数据保存到表中使用与读取类似的配置。示例如下:
(employees_table。写。格式(“jdbc”)。选项(“url”,“< jdbc_url >”)。选项(“数据表”,“< new_table_name >”)。选项(“用户”,“<用户名>”)。选项(“密码”,“<密码>”)。保存())
创建表格new_employees_table使用JDBC选项(url“< jdbc_url >”,数据表“< table_name >”,用户“<用户名>”,密码' <密码> ')作为选择*从employees_table_vw
employees_table。写。格式(“jdbc”)。选项(“url”,“< jdbc_url >”)。选项(“数据表”,“< new_table_name >”)。选项(“用户”,“<用户名>”)。选项(“密码”,“<密码>”)。保存()
默认行为尝试创建一个新表,如果具有该名称的表已经存在,则抛出错误。
您可以使用以下语法向现有表追加数据:
(employees_table。写。格式(“jdbc”)。选项(“url”,“< jdbc_url >”)。选项(“数据表”,“< new_table_name >”)。选项(“用户”,“<用户名>”)。选项(“密码”,“<密码>”)。模式(“添加”)。保存())
创建表格如果不存在new_employees_table使用JDBC选项(url“< jdbc_url >”,数据表“< table_name >”,用户“<用户名>”,密码' <密码> ');插入成new_employees_table选择*从employees_table_vw;
employees_table。写。格式(“jdbc”)。选项(“url”,“< jdbc_url >”)。选项(“数据表”,“< new_table_name >”)。选项(“用户”,“<用户名>”)。选项(“密码”,“<密码>”)。模式(“添加”)。保存()
您可以使用以下语法覆盖已存在的表:
(employees_table。写。格式(“jdbc”)。选项(“url”,“< jdbc_url >”)。选项(“数据表”,“< new_table_name >”)。选项(“用户”,“<用户名>”)。选项(“密码”,“<密码>”)。模式(“覆盖”)。保存())
创建或取代表格new_employees_table使用JDBC选项(url“< jdbc_url >”,数据表“< table_name >”,用户“<用户名>”,密码' <密码> ')作为选择*从employees_table_vw;
employees_table。写。格式(“jdbc”)。选项(“url”,“< jdbc_url >”)。选项(“数据表”,“< new_table_name >”)。选项(“用户”,“<用户名>”)。选项(“密码”,“<密码>”)。模式(“覆盖”)。保存()
控制JDBC查询的并行性
默认情况下,JDBC驱动程序只使用一个线程查询源数据库。为了提高读取的性能,需要指定一些选项来控制Databricks同时对数据库进行多少次查询。对于小型集群,设置numPartitions
选项等于集群中执行器核的数量,可确保所有节点并行查询数据。
警告
设置numPartitions
在大型集群上设置过高的值可能会导致远程数据库的性能下降,因为太多同时进行的查询可能会使服务不堪重负。这对于应用程序数据库来说尤其麻烦。注意不要将这个值设置为50以上。
请注意
控件选择具有在源数据库中计算的索引的列,从而加快查询速度partitionColumn
。
下面的代码示例演示了为8核集群配置并行性:
employees_table=(火花。读。格式(“jdbc”)。选项(“url”,“< jdbc_url >”)。选项(“数据表”,“< table_name >”)。选项(“用户”,“<用户名>”)。选项(“密码”,“<密码>”)#可以使用的列,该列具有均匀分布的可用于并行化的值范围。选项(“partitionColumn”,“< partition_key >”)使用partitionColumn来获取数据。选项(“下界”,“< min_value >”)# max值为partitionColumn提取数据。选项(“upperBound”,“< max_value >”)#要将数据分布到的分区数量。不要设置很大(~数百)。选项(“numPartitions”,8)。负载())
创建临时视图employees_table_vw使用JDBC选项(url“< jdbc_url >”,数据表“< table_name >”,用户“<用户名>”,密码' <密码> ',partitionColumn“< partition_key >”,下界“< min_value >”,upperBound“< max_value >”,numPartitions8)
瓦尔employees_table=火花。读。格式(“jdbc”)。选项(“url”,“< jdbc_url >”)。选项(“数据表”,“< table_name >”)。选项(“用户”,“<用户名>”)。选项(“密码”,“<密码>”)//可以使用的列,该列具有均匀分布的可用于并行化的值范围。选项(“partitionColumn”,“< partition_key >”)//使用partitionColumn提取数据的最小值。选项(“下界”,“< min_value >”)//使用partitionColumn获取数据。选项(“upperBound”,“< max_value >”)//分配数据的分区数。不要设置很大(~数百)。选项(“numPartitions”,8)。负载()
请注意
Databricks支持所有Apache Spark配置JDBC的选项。
当使用JDBC写入数据库时,Apache Spark使用内存中的分区数量来控制并行性。可以在写入数据之前对数据重新分区,以控制并行性。避免在大型集群上使用大量分区,以避免使远程数据库不堪重负。下面的示例演示了在写入之前将分区重新划分到八个分区:
(employees_table。重新分区(8)。写。格式(“jdbc”)。选项(“url”,“< jdbc_url >”)。选项(“数据表”,“< new_table_name >”)。选项(“用户”,“<用户名>”)。选项(“密码”,“<密码>”)。保存())
创建表格new_employees_table使用JDBC选项(url“< jdbc_url >”,数据表“< table_name >”,用户“<用户名>”,密码' <密码> ')作为选择/*+重分区(8)*/*从employees_table_vw
employees_table。重新分区(8)。写。格式(“jdbc”)。选项(“url”,“< jdbc_url >”)。选项(“数据表”,“< new_table_name >”)。选项(“用户”,“<用户名>”)。选项(“密码”,“<密码>”)。保存()
下推一个查询到数据库引擎
您可以将整个查询下推到数据库,只返回结果。的表格
参数标识要读取的JDBC表。您可以使用SQL查询中任何有效的内容从
条款。
pushdown_query="(select * from employees where emp_no < 10008) as emp_alias"employees_table=(火花。读。格式(“jdbc”)。选项(“url”,“< jdbc_url >”)。选项(“数据表”,pushdown_query)。选项(“用户”,“<用户名>”)。选项(“密码”,“<密码>”)。负载())
创建临时视图employees_table_vw使用JDBC选项(url“< jdbc_url >”,数据表"(select * from employees where emp_no < 10008) as emp_alias",用户“<用户名>”,密码' <密码> ')
瓦尔pushdown_query="(select * from employees where emp_no < 10008) as emp_alias"瓦尔employees_table=火花。读。格式(“jdbc”)。选项(“url”,“< jdbc_url >”)。选项(“数据表”,pushdown_query)。选项(“用户”,“<用户名>”)。选项(“密码”,“<密码>”)。负载()
控制每个查询获取的行数
JDBC驱动程序有一个fetchSize
参数,该参数控制一次从远程数据库获取的行数。
设置 |
结果 |
---|---|
过低 |
由于多次往返导致的高延迟(每个查询返回的行很少) |
太高了 |
内存不足错误(在一个查询中返回了太多数据) |
最佳值取决于工作负载。注意事项包括:
查询返回多少列?
返回什么数据类型?
每列返回的字符串有多长?
系统可能有非常小的默认值,并受益于调优。例如:Oracle的默认值fetchSize
是10。将其增加到100可以将需要执行的查询总数减少到原来的1 / 10。JDBC结果是网络流量,因此要避免非常大的数字,但对于许多数据集,最佳值可能是数千。
使用fetchSize
选项,如下例所示:
employees_table=(火花。读。格式(“jdbc”)。选项(“url”,“< jdbc_url >”)。选项(“数据表”,“< table_name >”)。选项(“用户”,“<用户名>”)。选项(“密码”,“<密码>”)。选项(“fetchSize”,“100”)。负载())
创建临时视图employees_table_vw使用JDBC选项(url“< jdbc_url >”,数据表“< table_name >”,用户“<用户名>”,密码' <密码> '。fetchSizeOne hundred.)
瓦尔employees_table=火花。读。格式(“jdbc”)。选项(“url”,“< jdbc_url >”)。选项(“数据表”,“< table_name >”)。选项(“用户”,“<用户名>”)。选项(“密码”,“<密码>”)。选项(“fetchSize”,“100”)。负载()