从JSON字符串或Python词典创建数据Frame

acheddataFrame从含有JSON字符串或ython词典变量创建

写由ram.sankarasubramanian

2022年7月1日

文章中我们将审查你如何从含有JSON字符串或ython词典的变量创建ApachesparkDataFrame

从JSON字符串创建spark数据Frame

  1. JSON内容从变量添加到列表
    scala导入scala集合.mable.Listbruce#seq=Listbuffer[String]json_seqjson_cont
  2. 从列表创建spark数据集
    scala valjson_ds =json_seq.toDS
  3. 使用spark.read.json解析spark数据集
    scalavaldf=spark.read.json

合并样本代码

样本代码块将前几步合并为单个示例Python样本和Scala样本执行相同任务

spythonjson_content1=json_col2+json_col2+spark.read.json
sca导入scal2s

提取字符串列并用DataFrame数据解析

  1. 从DataFrame中选择jSON列并转换成RDD类型RDD[Row].
    scache.spark.sql函数.
  2. 转换RDD[Row]RDD[String].
    scalaval字符串rdd= 行_rdd.map
  3. 使用spark.read.json剖析RDD[String].
    scalavaldf1=spark.read.json

合并样本代码

样本代码块将前几步合并成单例

scache.sql函数.

Spark数据Frame使用Python词典创建

  1. 检查数据类型并确认它属于词典类型
    %python    jsonDataDict = {"job_id":33100,"run_id":1048560,"number_in_job":1,"state":{"life_cycle_state":"PENDING","state_message":"Waiting for cluster"},"task":{"notebook_task":{"notebook_path":"/Users/user@www.neidfyre.com/path/test_notebook"}},"cluster_spec":{"new_cluster":{"spark_version":"4.3.x-scala2.11","attributes":{"type":"fixed_node","memory":"8g"},"enable_elastic_disk":"false","num_workers":1}},"cluster_instance":{"cluster_id":"0000-000000-wares10"},"start_time":1584689872601,"setup_duration":0,"execution_duration":0,"cleanup_duration":0,"creator_user_name":"user@www.neidfyre.com","run_name":"my test job","run_page_url":"https://testurl.www.neidfyre.com#job/33100/run/1","run_type":"SUBMIT_RUN"}    type(jsonDataDict)
  2. 使用son.dups转换ython词典为JSON字符串
    Python导入jsonjsonData
  3. 添加JSON内容列表
    PythonjsonDataist =
  4. 将表转换为RDD并使用解析spark.read.json.
    PythonjsonRDD=sc.parlized

合并样本代码

样本代码块将前几步合并成单例

%python    jsonDataDict = {"job_id":33100,"run_id":1048560,"number_in_job":1,"state":{"life_cycle_state":"PENDING","state_message":"Waiting for cluster"},"task":{"notebook_task":{"notebook_path":"/Users/user@www.neidfyre.com/path/test_notebook"}},"cluster_spec":{"new_cluster":{"spark_version":"4.3.x-scala2.11","attributes":{"type":"fixed_node","memory":"8g"},"enable_elastic_disk":"false","num_workers":1}},"cluster_instance":{"cluster_id":"0000-000000-wares10"},"start_time":1584689872601,"setup_duration":0,"execution_duration":0,"cleanup_duration":0,"creator_user_name":"user@www.neidfyre.com","run_name":"my test job","run_page_url":"https://testurl.www.neidfyre.com#job/33100/run/1","run_type":"SUBMIT_RUN"}    type(jsonDataDict)    import json  jsonData = json.dumps(jsonDataDict)    jsonDataList = []  jsonDataList.append(jsonData)    jsonRDD = sc.parallelize(jsonDataList)  df = spark.read.json(jsonRDD)  display(df)

示例笔记本

复习剖析JSON字符串或ython词典示例笔记本.


文章有帮助吗