文章中我们将审查你如何从含有JSON字符串或ython词典的变量创建ApachesparkDataFrame
从JSON字符串创建spark数据Frame
- JSON内容从变量添加到列表
scala导入scala集合.mable.Listbruce#seq=Listbuffer[String]json_seqjson_cont
- 从列表创建spark数据集
scala valjson_ds =json_seq.toDS
- 使用spark.read.json解析spark数据集
scalavaldf=spark.read.json
合并样本代码
样本代码块将前几步合并为单个示例Python样本和Scala样本执行相同任务
spythonjson_content1=json_col2+json_col2+spark.read.json
sca导入scal2s
提取字符串列并用DataFrame数据解析
- 从DataFrame中选择jSON列并转换成RDD类型RDD[Row].
scache.spark.sql函数.
- 转换RDD[Row]至RDD[String].
scalaval字符串rdd= 行_rdd.map
- 使用spark.read.json剖析RDD[String].
scalavaldf1=spark.read.json
合并样本代码
样本代码块将前几步合并成单例
scache.sql函数.
Spark数据Frame使用Python词典创建
- 检查数据类型并确认它属于词典类型
%python jsonDataDict = {"job_id":33100,"run_id":1048560,"number_in_job":1,"state":{"life_cycle_state":"PENDING","state_message":"Waiting for cluster"},"task":{"notebook_task":{"notebook_path":"/Users/user@www.neidfyre.com/path/test_notebook"}},"cluster_spec":{"new_cluster":{"spark_version":"4.3.x-scala2.11","attributes":{"type":"fixed_node","memory":"8g"},"enable_elastic_disk":"false","num_workers":1}},"cluster_instance":{"cluster_id":"0000-000000-wares10"},"start_time":1584689872601,"setup_duration":0,"execution_duration":0,"cleanup_duration":0,"creator_user_name":"user@www.neidfyre.com","run_name":"my test job","run_page_url":"https://testurl.www.neidfyre.com#job/33100/run/1","run_type":"SUBMIT_RUN"} type(jsonDataDict)
- 使用son.dups转换ython词典为JSON字符串
Python导入jsonjsonData
- 添加JSON内容列表
PythonjsonDataist =
- 将表转换为RDD并使用解析spark.read.json.
PythonjsonRDD=sc.parlized
合并样本代码
样本代码块将前几步合并成单例
%python jsonDataDict = {"job_id":33100,"run_id":1048560,"number_in_job":1,"state":{"life_cycle_state":"PENDING","state_message":"Waiting for cluster"},"task":{"notebook_task":{"notebook_path":"/Users/user@www.neidfyre.com/path/test_notebook"}},"cluster_spec":{"new_cluster":{"spark_version":"4.3.x-scala2.11","attributes":{"type":"fixed_node","memory":"8g"},"enable_elastic_disk":"false","num_workers":1}},"cluster_instance":{"cluster_id":"0000-000000-wares10"},"start_time":1584689872601,"setup_duration":0,"execution_duration":0,"cleanup_duration":0,"creator_user_name":"user@www.neidfyre.com","run_name":"my test job","run_page_url":"https://testurl.www.neidfyre.com#job/33100/run/1","run_type":"SUBMIT_RUN"} type(jsonDataDict) import json jsonData = json.dumps(jsonDataDict) jsonDataList = [] jsonDataList.append(jsonData) jsonRDD = sc.parallelize(jsonDataList) df = spark.read.json(jsonRDD) display(df)