使用RDD写S3桶时拒绝访问

使用RDD写S3桶时学习解决拒绝访问错误

写由亚当巴夫拉卡

2022年5月31日

问题

使用RDDs故障写入S3桶驱动节点可以写入, 工人节点返回拒绝访问错误写DataFrameAPI,

举个例子 假设你运行下列代码

%scala    import java.io.File  import java.io.Serializable  import org.apache.spark.{SparkConf, SparkContext}  import org.apache.hadoop.fs.{FileSystem, Path}  import org.apache.hadoop.conf.Configuration  import java.net.URI  import scala.collection.mutable  import org.apache.spark.streaming.{Seconds, StreamingContext}  import org.apache.spark.streaming.dstream.InputDStream    val ssc = new StreamingContext(sc, Seconds(10))  val rdd1 = sc.parallelize(Seq(1,2))  val rdd2 = sc.parallelize(Seq(3,4))  val inputStream = ssc.queueStream[Int](mutable.Queue(rdd1,rdd2))  val result = inputStream.map(x => x*x)  val count = result.foreachRDD { rdd =>  val config = new Configuration(sc.hadoopConfiguration) with Serializable   rdd.mapPartitions {     _.map { entry =>         val fs = FileSystem.get(URI.create("s3://dx.lz.company.fldr.dev/part_0000000-123"), config)         val path = new Path("s3://dx.lz.company.fldr.dev/part_0000000-123")         val file = fs.create(path)         file.write("foobar".getBytes)         file.close()     }   }.count()  }    println(s"Count is $count")    ssc.start()

返回下列错误:

sparkExceptive: 因阶段故障中止作业3级失败4次,最近故障3级失败7 10205.244228TID+0Request ID: F81ADFACBCDFE626,  Extended Request ID: 1DNcBUHsmUFFI9a1lz0yGt4dnRjdY5V3C+J/DiEeg8Z4tMOLphZwW2U+sdxmr8fluQZ1R/3BCep,

因果

使用RDD写工节点时,IAM策略拒绝使用可串行化中表示val配置=新配置可串行.

求解

有两种方法解决这个问题:

选项1:使用数据框架

%scala    dbutils.fs.put("s3a://dx.lz.company.fldr.dev/test-gopi/test0.txt", "foobar")  val df = spark.read.text("s3a://dx.lz.company.fldr.dev/test-gopi/test0.txt")  df.write.text("s3a://dx.lz.company.fldr.dev/test-gopi/text1.txt")  val df1 = spark.read.text("s3a://dx.lz.company.fldr.dev/test-gopi/text1.txt")

选项2:使用串行编译

如果要使用RDDs,使用:

scalaval配置sc.b广播

例举 :

%scala    import java.io.File  import java.io.Serializable  import org.apache.spark.{SparkConf, SparkContext}  import org.apache.hadoop.fs.{FileSystem, Path}  import org.apache.hadoop.conf.Configuration  import java.net.URI  import scala.collection.mutable  import org.apache.spark.streaming.{Seconds, StreamingContext}  import org.apache.spark.streaming.dstream.InputDStream  import org.apache.spark.util.SerializableConfiguration    val ssc = new StreamingContext(sc, Seconds(10))  val rdd1 = sc.parallelize(Seq(1,2))  val rdd2 = sc.parallelize(Seq(3,4))  val inputStream = ssc.queueStream[Int](mutable.Queue(rdd1,rdd2))  val result = inputStream.map(x => x*x)  val count = result.foreachRDD { rdd =>  //val config = new Configuration(sc.hadoopConfiguration) with Serializable  val config = sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))  rdd.mapPartitions {     _.map { entry =>         val fs = FileSystem.get(URI.create("s3://pathpart_0000000-123"), config.value.value)         val path = new Path("s3:///path/part_0000000-123")         val file = fs.create(path)         file.write("foobar".getBytes)         file.close()     }   }.count()  }    println(s"Count is $count")    ssc.start()


文章有帮助吗