砖连接
请注意
砖建议你使用dbx或者是砖扩展Visual Studio代码为当地的发展而不是砖连接。
砖允许您连接到连接你最喜欢的IDE (IntelliJ, Eclipse, PyCharm Visual Studio代码),笔记本电脑服务器如飞艇,和其他自定义应用程序数据砖集群。
本文解释了砖连接是如何工作的,走你通过与砖连接的步骤开始,解释如何解决可能出现的问题在使用砖连接时,使用砖和不同运行连接和运行在一个砖笔记本。
概述
砖是砖的客户端库运行时连接。它允许您使用火花api编写工作和远程数据砖集群上运行它们,而不是在当地引发会话。
例如,当您运行DataFrame命令spark.read.format(“铺”).load (…) .groupBy (…) .agg(…),告诉()
使用砖连接,解析和规划工作的运行在本地机器上。然后,工作的逻辑表示发送到火花集群中的服务器运行在砖执行。
砖连接,您可以:
从任何Python运行大规模刺激就业,Java, Scala,或R应用程序。任何你可以
进口pyspark
,进口org.apache.spark
,或要求(SparkR)
现在,您可以运行火花工作直接从您的应用程序,而不需要安装任何IDE插件或使用火花提交脚本。单步调试和调试代码在IDE甚至在处理远程集群。
快速迭代开发库。您不需要重新启动集群在砖连接改变Python和Java库依赖关系后,因为每个客户机会话集群中的相互隔离。
关闭闲置集群没有失去工作。因为客户端应用程序是与集群脱钩,这是影响集群重启或升级,这通常会导致你失去所有的变量,抽样和DataFrame对象定义在一个笔记本上。
请注意
Python开发的SQL查询,砖建议您使用Python的砖SQL的连接器而不是砖连接。Python的砖SQL的连接器是比砖更容易建立连接。同时,砖连接解析和计划工作在本地机器上运行,运行在远程计算资源而工作。这可以让它尤其难以调试运行时错误。Python的砖SQL连接器直接提交SQL查询远程计算资源和获取结果。
需求
只有以下砖运行时版本的支持:
砖运行时10.4 LTS ML,砖LTS 10.4运行时
砖运行时9.1 LTS ML,砖LTS 9.1运行时
砖运行时7.3 LTS ML,砖LTS 7.3运行时
小版本的Python安装客户端必须一样的小砖集群的Python版本。表显示了Python版本与每个砖安装运行时。
砖的运行时版本的
Python版本
LTS 11.3 LTS ML, 11.3
3.9
LTS 10.4 LTS ML, 10.4
3.8
LTS 9.1 LTS ML, 9.1
3.8
LTS 7.3 LTS ML, 7.3
3.7
例如,如果您正在使用Conda本地开发环境和集群上运行Python 3.7中,您必须创建一个环境版本,例如:
dbconnect conda创建——名称python=37 conda激活dbconnect
砖连接主要和次要的包的版本必须匹配你的砖运行时版本。砖建议你总是使用最新的包砖连接相匹配你的砖的运行时版本。例如,当使用砖运行时7.3 LTS集群,使用
databricks-connect = = 7.3 . *
包中。请注意
看到砖连接的发布说明可用数据砖连接的列表发布和维护更新。
Java运行时环境(JRE) 8。客户测试了OpenJDK 8 JRE。客户端不支持Java 11。
请注意
在Windows上,如果你看到一个错误,砖连接找不到winutils.exe
,请参阅找不到winutils。exe在Windows上。
设置客户端
请注意
在你开始之前建立砖连接的客户端,您必须符合要求的砖的连接。
步骤1:安装客户端
卸载PySpark。这是必需的,因为
databricks-connect
包与PySpark冲突。有关详细信息,请参见冲突PySpark安装。皮普卸载pyspark
安装砖连接的客户端。
pip安装- u“databricks-connect = = 7.3 *”。#或X.Y.*来match your cluster version.
请注意
总是指定
databricks-connect = = X.Y. *
而不是databricks-connect = X.Y
,确保最新的安装包。
步骤2:配置连接属性
收集以下配置属性:
配置连接。您可以使用CLI、SQL配置或环境变量。从最高到最低配置方法的优先级是:SQL配置钥匙,CLI和环境变量。
CLI
运行
databricks-connect
。databricks-connect配置
许可证将显示:
版权(2018年)砖,公司。这图书馆(的“软件”)可能不是使用除了在连接与的被许可方的砖的使用平台服务依照达成协议bob体育客户端下载…
接受许可和供应配置值。为砖的主机和砖的令牌,输入工作区URL和个人访问令牌您在步骤1中指出。
你接受以上协议吗?[y / N] y设置新的配置值(离开输入空接受默认):砖主机(目前没有价值,必须从https://]: < databricks-url >砖令牌(没有当前值):< databricks-token >集群ID(例如,0921 - 001415 jelly628)(没有当前值):< cluster-id > Org ID (Azure-only,看到了吗? o = orgId URL) [0]: < org-id >端口[15001]:<口>
SQL配置或环境变量。下面的表显示了SQL配置键和对应的环境变量配置属性您在步骤1中指出。设置SQL配置键,使用
sql(“集配置=值”)
。例如:sql(“集spark.databricks.service.clusterId = 0304 - 201045 - abcdefgh”)
。参数
SQL配置关键
环境变量名称
砖的主机
spark.databricks.service.address
DATABRICKS_ADDRESS
砖的令牌
spark.databricks.service.token
DATABRICKS_API_TOKEN
集群ID
spark.databricks.service.clusterId
DATABRICKS_CLUSTER_ID
Org ID
spark.databricks.service.orgId
DATABRICKS_ORG_ID
港口
spark.databricks.service.port
DATABRICKS_PORT
测试连接数据砖。
databricks-connect测试
如果您配置的集群没有运行,测试配置的集群将继续运行,直到它开始autotermination时间。输出应该类似:
* PySpark是安装在/…/ 3.5.6 / lib / python3.5 /网站/ PySpark *检查java版本的java版本“1.8.0_152”java (TM)(构建1.8.0_152-b16) java SE运行时环境热点(TM) 64位服务器虚拟机(构建25.152 b16转椅,混合模式)*测试scala命令18/12/10 16:38:44 NativeCodeLoader警告:无法加载native-hadoop库为您的平台……bob体育客户端下载使用builtin-java类,适用的使用引发的违约log4j配置文件:org/apache/spark/log4j-defaults。属性默认日志级别设置为“警告”。调整日志级别使用sc.setLogLevel(中的)。对于SparkR,使用setLogLevel(中的)。18/12/10 16:38:50警告MetricsSystem:使用默认名称SparkStatusTracker因为无论是spark.metrics.namespace还是spark.app来源。我d is set. 18/12/10 16:39:53 WARN SparkServiceRPCClient: Now tracking server state for 5abb7c7e-df8e-4290-947c-c9a38601024e, invalidating prev state 18/12/10 16:39:59 WARN SparkServiceRPCClient: Syncing 129 files (176036 bytes) took 3003 ms Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.0-SNAPSHOT /_/ Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_152) Type in expressions to have them evaluated. Type :help for more information. scala> spark.range(100).reduce(_ + _) Spark context Web UI available at https://10.8.5.214:4040 Spark context available as 'sc' (master = local[*], app id = local-1544488730553). Spark session available as 'spark'. View job details at
/?o=0#/setting/clusters/ /sparkUi View job details at ?o=0#/setting/clusters/ /sparkUi res0: Long = 4950 scala> :quit * Testing python command 18/12/10 16:40:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 18/12/10 16:40:17 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set. 18/12/10 16:40:28 WARN SparkServiceRPCClient: Now tracking server state for 5abb7c7e-df8e-4290-947c-c9a38601024e, invalidating prev state View job details at /?o=0#/setting/clusters/ /sparkUi
设置您的IDE或笔记本电脑服务器
部分描述了如何配置您的首选IDE或笔记本电脑服务器使用砖连接的客户端。
在本节中:
Jupyter笔记本
砖连接配置脚本自动添加包到您的项目配置。开始一个Python内核中运行:
从pyspark.sql进口SparkSession火花=SparkSession。构建器。getOrCreate()
要启用%的sql
简称跑步和可视化的SQL查询,使用以下代码片段:
从IPython.core.magic进口line_magic,line_cell_magic,魔法,magics_class@magics_class类DatabricksConnectMagics(魔法):@line_cell_magicdefsql(自我,行,细胞=没有一个):如果细胞和行:提高ValueError(细胞魔法”“线必须是空的,行)试一试:从autovizwidget.widget.utils进口display_dataframe除了ImportError:打印(“请运行“pip安装autovizwidget”启用可视化部件。”)display_dataframe=λx:x返回display_dataframe(自我。get_spark()。sql(细胞orgydF4y2Ba行)。toPandas())defget_spark(自我):user_ns=get_ipython()。user_ns如果“火花”在user_ns:返回user_ns(“火花”]其他的:从pyspark.sql进口SparkSessionuser_ns(“火花”]=SparkSession。构建器。getOrCreate()返回user_ns(“火花”]知识产权=get_ipython()知识产权。register_magics(DatabricksConnectMagics)
PyCharm
砖连接配置脚本自动添加包到您的项目配置。
Python 3集群
当你创建一个PyCharm项目,选择现有的翻译。从下拉菜单中,选择您创建(参见Conda环境需求)。
去Run >编辑配置。
添加
PYSPARK_PYTHON = python3
作为一个环境变量。
SparkR和RStudio桌面
下载并解压bob下载地址开源的火花到您的本地机器上。选择相同的版本在你的砖集群(Hadoop 2.7)。
运行
databricks-connectget-jar-dir
。这个命令返回一个路径/usr/local/lib/python3.5/dist-packages / pyspark / jar
。复制的文件路径上面一个目录例如,JAR目录文件路径/usr/local/lib/python3.5/dist-packages / pyspark
,这是SPARK_HOME
目录中。配置火花自由路径和火花家里通过将它们添加到你的R脚本。集
< spark-lib-path >
你打开的目录开源火花包在步骤1中。bob下载地址集< spark-home-path >
从步骤2砖连接目录。#指向OSS包路径,例如,/ / /…/ spark-2.4.0-bin-hadoop2.7道路图书馆(SparkR,lib.loc=.libPaths(c(file.path(“< spark-lib-path >”,“R”,“自由”),.libPaths())))#点砖PySpark连接安装,例如,/ / /…/ PySpark路径Sys.setenv(SPARK_HOME=“< spark-home-path >”)
发起一个火花会话并开始运行SparkR命令。
sparkR.session()df< -as.DataFrame(忠实的)头(df)df1< -有斑纹的(df,函数(x){x},模式(df))收集(df1)
sparklyr和RStudio桌面
预览
这个特性是在公共预览。
请注意
砖建议你使用dbx或者是砖扩展Visual Studio代码为当地的发展而不是砖连接。
在你开始使用砖连接之前,必须符合要求的和设置客户端砖的连接。
你可以复制sparklyr-dependent代码开发本地使用砖连接并运行它在你的砖砖笔记本或RStudio托管服务器的工作区以最小的不需要修改代码。
需求
sparklyr 1.2或以上。
与匹配砖砖运行时的7.3或以上连接。
安装、配置和使用sparklyr
在RStudio桌面,从凹口安装sparklyr 1.2或以上或从GitHub安装最新的主版本。
#安装从凹口install.packages(“sparklyr”)从GitHub或安装最新的主版本号install.packages(“devtools”)devtools::install_github(“sparklyr / sparklyr”)
激活Python环境砖连接安装和运行以下命令在终端得到
< spark-home-path >
:databricks-connect get-spark-home
发起一个火花会话并开始运行sparklyr命令。
图书馆(sparklyr)sc< -spark_connect(方法=“砖”,spark_home=“< spark-home-path >”)iris_tbl< -copy_to(sc,虹膜,覆盖=真正的)图书馆(dplyr)src_tbls(sc)iris_tbl% > %数
关闭连接。
spark_disconnect(sc)
IntelliJ (Scala或Java)
运行
databricks-connectget-jar-dir
。的依赖关系指向的目录返回的命令。去文件> >项目结构>模块依赖项>“+”符号> jar或目录。
为了避免冲突,我们强烈建议删除任何其他火花从类路径中安装。如果这是不可能的,确保jar添加在前面的类路径中。特别是,他们必须提前安装其他版本的火花(否则你会使用其中一个其他火花版本和本地运行或扔一个
ClassDefNotFoundError
)。检查在IntelliJ突破的设置选项。默认值是所有并将导致网络超时如果你设置断点调试。将其设置为线程为了避免停止网络后台线程。
Eclipse
运行
databricks-connectget-jar-dir
。点的外部jar配置目录返回的命令。去图书馆项目菜单>属性> Java构建路径> >添加外部jar。
为了避免冲突,我们强烈建议删除任何其他火花从类路径中安装。如果这是不可能的,确保jar添加在前面的类路径中。特别是,他们必须提前安装其他版本的火花(否则你会使用其中一个其他火花版本和本地运行或扔一个
ClassDefNotFoundError
)。
Visual Studio代码
验证Python扩展安装。
打开命令面板(命令+ Shift + P在macOS和Ctrl + Shift + P在Windows / Linux)。
选择一个Python解释器。去代码> Preferences >设置,并选择python的设置。
运行
databricks-connectget-jar-dir
。从命令返回的目录添加到用户设置JSON
python.venvPath
。这应该被添加到Python配置。禁用短绒。单击…在右边编辑json设置。修改后的设置如下:
如果与一个虚拟环境中运行,这是推荐的方式为Python开发在VS代码中,在命令面板类型
选择python翻译
并指向您的环境匹配集群的Python版本。例如,如果您的集群是Python 3.5,你的当地环境应该是Python 3.5。
SBT
使用SBT,您必须配置您的build.sbt
文件链接对砖的连接罐而不是通常的火花库依赖关系。你这样做的unmanagedBase
下面的示例构建文件指令,假设一个Scala应用的com.example.Test
主要对象:
从IDE运行示例
进口java.util.ArrayList;进口并不知道;进口java.sql.Date;进口org.apache.spark.sql.SparkSession;进口org.apache.spark.sql.types。*;进口org.apache.spark.sql.Row;进口org.apache.spark.sql.RowFactory;进口org.apache.spark.sql.Dataset;公共类应用程序{公共静态无效主要(字符串[]arg游戏)抛出异常{SparkSession火花=SparkSession。构建器()。浏览器名称(“临时工演示”)。配置(“spark.master”,“本地”)。getOrCreate();/ /创建一个火花DataFrame组成的高和低的温度/ /通过机场代码和日期。StructType模式=新StructType(新StructField[]{新StructField(“AirportCode”,数据类型。StringType,假,元数据。空()),新StructField(“日期”,数据类型。DateType,假,元数据。空()),新StructField(“TempHighF”,数据类型。IntegerType,假,元数据。空()),新StructField(“TempLowF”,数据类型。IntegerType,假,元数据。空()),});列表<行>dataList=新ArrayList<行>();dataList。添加(RowFactory。创建(“BLI”,日期。返回对象的值(“2021-04-03”),52,43));dataList。添加(RowFactory。创建(“BLI”,日期。返回对象的值(“2021-04-02”),50,38));dataList。添加(RowFactory。创建(“BLI”,日期。返回对象的值(“2021-04-01”),52,41));dataList。添加(RowFactory。创建(“PDX”,日期。返回对象的值(“2021-04-03”),64年,45));dataList。添加(RowFactory。创建(“PDX”,日期。返回对象的值(“2021-04-02”),61年,41));dataList。添加(RowFactory。创建(“PDX”,日期。返回对象的值(“2021-04-01”),66年,39));dataList。添加(RowFactory。创建(“海”,日期。返回对象的值(“2021-04-03”),57,43));dataList。添加(RowFactory。创建(“海”,日期。返回对象的值(“2021-04-02”),54,39));dataList。添加(RowFactory。创建(“海”,日期。返回对象的值(“2021-04-01”),56,41));数据集<行>临时工=火花。createDataFrame(dataList,模式);/ /创建一个表的数据砖集群,然后填满/ /表DataFrame的内容。/ /从先前的运行,如果表已经存在/ /先删除它。火花。sql(“使用默认”);火花。sql(“如果存在demo_temps_table”删除表);临时工。写()。saveAsTable(“demo_temps_table”);/ /查询砖集群上的表,返回的行/ /在机场代码不是BLI和日期晚/ /比2021-04-01。组织和秩序的结果高/ /温度按照降序排列。数据集<行>df_temps=火花。sql(“从demo_temps_table SELECT *”+“AirportCode ! = BLI和日期>‘2021-04-01’”+“GROUP BY AirportCode,日期、TempHighF TempLowF”+“TempHighF DESC秩序”);df_temps。显示();/ /结果:/ // / + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +/ / | AirportCode | |日期TempHighF | TempLowF |/ / + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +/ / | PDX | 64 | 2021-04-03 | |/ / | PDX | 61 | 2021-04-02 | 41 |/ /海洋57 43 | | | 2021-04-03 | |54海/ / | | 2021-04-02 | | |/ / + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +/ /清理被删除的表数据砖集群。火花。sql(“DROP TABLE demo_temps_table”);}}
从pyspark.sql进口SparkSession从pyspark.sql.types进口*从datetime进口日期火花=SparkSession。构建器。浏览器名称(“temps-demo”)。getOrCreate()#创建一个火花DataFrame组成的高和低的温度#机场代码和日期。模式=StructType([StructField(“AirportCode”,StringType(),假),StructField(“日期”,DateType(),假),StructField(“TempHighF”,IntegerType(),假),StructField(“TempLowF”,IntegerType(),假)])数据=((“BLI”,日期(2021年,4,3),52,43),(“BLI”,日期(2021年,4,2),50,38),(“BLI”,日期(2021年,4,1),52,41),(“PDX”,日期(2021年,4,3),64年,45),(“PDX”,日期(2021年,4,2),61年,41),(“PDX”,日期(2021年,4,1),66年,39),(“海”,日期(2021年,4,3),57,43),(“海”,日期(2021年,4,2),54,39),(“海”,日期(2021年,4,1),56,41]]临时工=火花。createDataFrame(数据,模式)#砖集群的创建一个表,然后填满# DataFrame的表的内容。#从先前的运行,如果表已经存在#删除它。火花。sql(使用默认的)火花。sql(“删除表如果存在demo_temps_table”)临时工。写。saveAsTable(“demo_temps_table”)#查询砖集群上的表,返回的行#在机场代码不是BLI和日期晚比2021-04-01 #。组织和秩序的结果高#温度按照降序排列。df_temps=火花。sql(“从demo_temps_table SELECT *”\“AirportCode ! = BLI和日期>‘2021-04-01’”\“GROUP BY AirportCode,日期、TempHighF TempLowF”\“TempHighF DESC秩序”)df_temps。显示()#结果:## + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +# | AirportCode | |日期TempHighF | TempLowF |# + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +45 # | PDX | 64 | 2021-04-03 | |# | PDX | 61 | 2021-04-02 | 41 |43 57 #海| | 2021-04-03 | | |54 #海| | 2021-04-02 | | |# + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +#清理被删除的表数据砖集群。火花。sql(“DROP TABLE demo_temps_table”)
进口org。apache。火花。sql。SparkSession进口org。apache。火花。sql。类型。_进口org。apache。火花。sql。行进口java。sql。日期对象演示{def主要(arg游戏:数组(字符串]){瓦尔火花=SparkSession。构建器。主(“本地”)。getOrCreate()/ /创建一个火花DataFrame组成的高和低的温度/ /通过机场代码和日期。瓦尔模式=StructType(数组(StructField(“AirportCode”,StringType,假),StructField(“日期”,DateType,假),StructField(“TempHighF”,IntegerType,假),StructField(“TempLowF”,IntegerType,假)))瓦尔数据=列表(行(“BLI”,日期。返回对象的值(“2021-04-03”),52,43),行(“BLI”,日期。返回对象的值(“2021-04-02”),50,38),行(“BLI”,日期。返回对象的值(“2021-04-01”),52,41),行(“PDX”,日期。返回对象的值(“2021-04-03”),64年,45),行(“PDX”,日期。返回对象的值(“2021-04-02”),61年,41),行(“PDX”,日期。返回对象的值(“2021-04-01”),66年,39),行(“海”,日期。返回对象的值(“2021-04-03”),57,43),行(“海”,日期。返回对象的值(“2021-04-02”),54,39),行(“海”,日期。返回对象的值(“2021-04-01”),56,41))瓦尔抽样=火花。sparkContext。makeRDD(数据)瓦尔临时工=火花。createDataFrame(抽样,模式)/ /创建一个表的数据砖集群,然后填满/ /表DataFrame的内容。/ /从先前的运行,如果表已经存在/ /先删除它。火花。sql(“使用默认”)火花。sql(“如果存在demo_temps_table”删除表)临时工。写。saveAsTable(“demo_temps_table”)/ /查询砖集群上的表,返回的行/ /在机场代码不是BLI和日期晚/ /比2021-04-01。组织和秩序的结果高/ /温度按照降序排列。瓦尔df_temps=火花。sql(“从demo_temps_table SELECT *”+“AirportCode ! = BLI和日期>‘2021-04-01’”+“GROUP BY AirportCode,日期、TempHighF TempLowF”+“TempHighF DESC秩序”)df_temps。显示()/ /结果:/ // / + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +/ / | AirportCode | |日期TempHighF | TempLowF |/ / + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +/ / | PDX | 64 | 2021-04-03 | |/ / | PDX | 61 | 2021-04-02 | 41 |/ /海洋57 43 | | | 2021-04-03 | |54海/ / | | 2021-04-02 | | |/ / + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +/ /清理被删除的表数据砖集群。火花。sql(“DROP TABLE demo_temps_table”)}}
使用依赖关系
通常你的主类或Python文件将有其他依赖jar文件和文件。您可以添加这样的依赖jar文件和文件通过调用sparkContext.addJar (“path-to-the-jar”)
orgydF4y2BasparkContext.addPyFile(文件路径)
。你也可以加入鸡蛋和zip文件的文件addPyFile ()
接口。每次你在IDE运行代码,依赖jar文件和文件在集群上安装。
从自由进口喷火从pyspark.sql进口SparkSession火花=SparkSession。构建器。getOrCreate()sc=火花。sparkContext# sc.setLogLevel(“信息”)打印(“测试简单的计数”)打印(火花。范围(One hundred.)。数())打印(“测试addPyFile隔离”)sc。addPyFile(“lib.py”)打印(sc。并行化(范围(10))。地图(λ我:喷火(2))。收集())类喷火(对象):def__init__(自我,x):自我。x=x
Python + Java udf
从pyspark.sql进口SparkSession从pyspark.sql.column进口_to_java_column,_to_seq,列# #在这个例子中,udf。jar包含编译后的Java / Scala udf的:#包com.example##进口org.apache.spark.sql._#进口org.apache.spark.sql.expressions._#进口org.apache.spark.sql.functions.udf##{对象测试# val plusOne: UserDefinedFunction = udf((我:长)= > i + 1)#}火花=SparkSession。构建器\。配置(“spark.jars”,“/道路/ / udf.jar”)\。getOrCreate()sc=火花。sparkContextdefplus_one_udf(上校):f=sc。_jvm。com。例子。测试。plusOne()返回列(f。应用(_to_seq(sc,(上校),_to_java_column)))sc。_jsc。addJar(“/道路/ / udf.jar”)火花。范围(One hundred.)。withColumn(“plusOne”,plus_one_udf(“id”))。显示()
包com。例子进口org。apache。火花。sql。SparkSession情况下类喷火(x:字符串)对象测试{def主要(arg游戏:数组(字符串):单位={瓦尔火花=SparkSession。构建器()…。getOrCreate();火花。sparkContext。setLogLevel(“信息”)println(“运行简单的显示查询……”)火花。读。格式(“铺”)。负载(“/ tmp / x”)。显示()println(“运行简单的UDF查询……”)火花。sparkContext。addJar(”。/目标/ scala - 2.11 / hello-world_2.11-1.0.jar”)火花。udf。注册(“f”,(x:Int)= >x+1)火花。范围(10)。selectExpr(“f (id)”)。显示()println(“运行定制对象查询……”)瓦尔obj=火花。sparkContext。并行化(Seq(喷火(“再见”),喷火(“嗨”)))。收集()println(obj。toSeq)}}
访问DBUtils
您可以使用dbutils.fs
和dbutils.secrets
公用事业的砖公用事业模块。支持命令dbutils.fs.cp
,dbutils.fs.head
,dbutils.fs.ls
,dbutils.fs.mkdirs
,dbutils.fs.mv
,dbutils.fs.put
,dbutils.fs.rm
,dbutils.secrets.get
,dbutils.secrets.getBytes
,dbutils.secrets.list
,dbutils.secrets.listScopes
。看到文件系统实用程序(dbutils.fs)或运行dbutils.fs.help ()
和秘密效用(dbutils.secrets)或运行dbutils.secrets.help ()
。
从pyspark.sql进口SparkSession从pyspark.dbutils进口DBUtils火花=SparkSession。构建器。getOrCreate()dbutils=DBUtils(火花)打印(dbutils。fs。ls(“dbfs: /))打印(dbutils。秘密。listScopes())
使用砖运行时的7.3 LTS以上时,访问DBUtils模块的方式在当地或砖集群的工作,使用以下get_dbutils ()
:
defget_dbutils(火花):从pyspark.dbutils进口DBUtils返回DBUtils(火花)
否则,使用以下get_dbutils ()
:
defget_dbutils(火花):如果火花。相依。得到(“spark.databricks.service.client.enabled”)= =“真正的”:从pyspark.dbutils进口DBUtils返回DBUtils(火花)其他的:进口IPython返回IPython。get_ipython()。user_ns(“dbutils”]
瓦尔dbutils=com。砖。服务。DBUtilsprintln(dbutils。fs。ls(“dbfs: /))println(dbutils。秘密。listScopes())
访问Hadoop文件系统
您也可以直接访问DBFS使用标准的Hadoop文件系统接口:
>进口org。apache。hadoop。fs。_/ /得到新的DBFS连接>瓦尔dbfs=文件系统。得到(火花。sparkContext。hadoopConfiguration)dbfs:org。apache。hadoop。fs。文件系统=com。砖。后端。守护进程。数据。客户端。DBFS@二维036335年/ /列表文件>dbfs。listStatus(新路径(“dbfs: /))res1:数组(org。apache。hadoop。fs。FileStatus]=数组(FileStatus{路径=dbfs:/美元;isDirectory=真正的;…})/ /打开文件>瓦尔流=dbfs。开放(新路径(“dbfs: /道路/ / your_file”))流:org。apache。hadoop。fs。FSDataInputStream=org。apache。hadoop。fs。FSDataInputStream@7aa4ef24/ /获取文件内容为字符串>进口org。apache。下议院。io。_>println(新字符串(IOUtils。toByteArray(流)))
Hadoop设置配置
在客户端可以设置使用Hadoop的配置spark.conf.set
API,它适用于SQL和DataFrame操作。Hadoop配置设置sparkContext
必须在集群配置中设置或使用一个笔记本。这是因为配置设置sparkContext
不与用户会话,但适用于整个集群。
故障排除
运行databricks-connect测试
检查连接问题。本节介绍您可能遇到的一些常见问题,以及如何解决它们。
Python版本不匹配
检查您使用的Python版本当地至少有相同的小版本版本在集群上(例如,3.5.1
与3.5.2
是好的,3.5
与3.6
不是)。
如果你有多个Python版本安装在本地,确保砖使用正确的连接是通过设置PYSPARK_PYTHON
环境变量(例如,PYSPARK_PYTHON = python3
)。
服务器未启用
确保集群火花服务器启用了spark.databricks.service.server.enabled真正的
。您应该看到下面的线在司机日志如果它是:
18/10/25 21:39:18信息SparkConfUtils美元:设置火花配置:spark.databricks.service.server。启用- >真实……18/10/25 21:39:21信息SparkContext:加载火花服务RPC服务器18/10/25 21:39:21信息SparkServiceRPCServer:火花服务RPC服务器开始18/10/25 21:39:21信息服务器:jetty-9.3.20。v20170531 18/10/25 21:39:21信息AbstractConnector:开始ServerConnector@6a6c7f42 {HTTP / 1.1, (HTTP / 1.1)}{0.0.0.0:15001} 18/10/25 21:39:21信息服务器:@5879ms开始
冲突PySpark安装
的databricks-connect
包与PySpark冲突。安装两个初始化时将导致错误引发上下文在Python中。这可以体现在几个方面,包括“流破坏”或“找不到”的错误。如果你有PySpark安装到您的Python环境,确保安装databricks-connect之前卸载。卸载PySpark之后,一定要完全重新安装砖连接的包:
pip卸载pyspark pip卸载databricks-connect pip安装- u“databricks-connect = = 9.1 *”。#或X.Y.*来match your cluster version.
相互冲突的SPARK_HOME
如果您以前使用过火花机,IDE可以配置为使用一个其他版本的火花而不是砖连接的火花。这可以体现在几个方面,包括“流破坏”或“找不到”的错误。你可以看到哪个版本的火花被检查的价值SPARK_HOME
环境变量:
系统。出。println(系统。采用(“SPARK_HOME”));
进口操作系统打印(操作系统。环境(“SPARK_HOME”])
println(sys。env。得到(“SPARK_HOME”))
冲突或失踪路径
二进制文件的条目
可以配置路径,这样的命令spark-shell
将运行其他之前安装的二进制代替砖提供的一个连接。这可能会导致databricks-connect测试
失败。你应该确保优先考虑砖连接的二进制文件,或删除之前安装的。
如果你不能运行命令spark-shell
,也有可能你的路径并不是自动建立的皮普安装
,你将需要添加安装本
手动dir到您的路径。可以使用砖与ide,即使这不是设置。然而,databricks-connect测试
命令将不能正常工作。
冲突的序列化设置在集群上
如果你看到“流损坏”运行时错误databricks-connect测试
,这可能是由于不兼容的集群序列化配置。例如,设置spark.io.compression.codec
配置会导致这个问题。为了解决这个问题,考虑从集群移除这些配置设置,或设置配置在砖连接的客户端。
找不到winutils.exe
在Windows上
如果您使用的是砖连接在Windows上看:
错误壳牌:失败的来定位的winutils二进制在的hadoop二进制路径java。io。IOException:可以不定位可执行的零\本\winutils。exe在的Hadoop二进制文件。
按照指示在Windows上配置Hadoop路径。
限制
砖连接不支持下面的砖的特点和第三方平台:bob体育客户端下载
结构化的流。
运行任意代码不是一个远程集群上火花工作的一部分。
本机Scala、Python和Rδ表操作的api(例如,
DeltaTable.forPath
不支持)。然而,SQL API (spark.sql (…)
)和三角洲湖操作和火花的API(例如,spark.read.load
三角洲表上)都支持。进入副本。
使用SQL函数、Python或Scala udf的一部分服务器的目录。然而,当地引入Scala和Python udf的工作。
Apache飞艇0.7。x和是low.
连接到集群访问控制表。
连接到集群处理隔离(换句话说,启用
spark.databricks.pyspark.enableProcessIsolation
被设置为真正的
)。δ
克隆
SQL命令。全局临时视图。
考拉。
创建表表作为选择…
SQL命令并不总是工作。相反,使用spark.sql(“选择……”).write.saveAsTable(“表”)
。