问题
使用RDDs故障写入S3桶驱动节点可以写入, 工人节点返回拒绝访问错误写DataFrameAPI,
举个例子 假设你运行下列代码
%scala import java.io.File import java.io.Serializable import org.apache.spark.{SparkConf, SparkContext} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.conf.Configuration import java.net.URI import scala.collection.mutable import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.InputDStream val ssc = new StreamingContext(sc, Seconds(10)) val rdd1 = sc.parallelize(Seq(1,2)) val rdd2 = sc.parallelize(Seq(3,4)) val inputStream = ssc.queueStream[Int](mutable.Queue(rdd1,rdd2)) val result = inputStream.map(x => x*x) val count = result.foreachRDD { rdd => val config = new Configuration(sc.hadoopConfiguration) with Serializable rdd.mapPartitions { _.map { entry => val fs = FileSystem.get(URI.create("s3://dx.lz.company.fldr.dev/part_0000000-123"), config) val path = new Path("s3://dx.lz.company.fldr.dev/part_0000000-123") val file = fs.create(path) file.write("foobar".getBytes) file.close() } }.count() } println(s"Count is $count") ssc.start()
返回下列错误:
sparkExceptive: 因阶段故障中止作业3级失败4次,最近故障3级失败7 10205.244228TID+0Request ID: F81ADFACBCDFE626, Extended Request ID: 1DNcBUHsmUFFI9a1lz0yGt4dnRjdY5V3C+J/DiEeg8Z4tMOLphZwW2U+sdxmr8fluQZ1R/3BCep,
因果
使用RDD写工节点时,IAM策略拒绝使用可串行化中表示val配置=新配置可串行.
求解
有两种方法解决这个问题:
选项1:使用数据框架
%scala dbutils.fs.put("s3a://dx.lz.company.fldr.dev/test-gopi/test0.txt", "foobar") val df = spark.read.text("s3a://dx.lz.company.fldr.dev/test-gopi/test0.txt") df.write.text("s3a://dx.lz.company.fldr.dev/test-gopi/text1.txt") val df1 = spark.read.text("s3a://dx.lz.company.fldr.dev/test-gopi/text1.txt")
选项2:使用串行编译
如果要使用RDDs,使用:
scalaval配置sc.b广播
例举 :
%scala import java.io.File import java.io.Serializable import org.apache.spark.{SparkConf, SparkContext} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.conf.Configuration import java.net.URI import scala.collection.mutable import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.util.SerializableConfiguration val ssc = new StreamingContext(sc, Seconds(10)) val rdd1 = sc.parallelize(Seq(1,2)) val rdd2 = sc.parallelize(Seq(3,4)) val inputStream = ssc.queueStream[Int](mutable.Queue(rdd1,rdd2)) val result = inputStream.map(x => x*x) val count = result.foreachRDD { rdd => //val config = new Configuration(sc.hadoopConfiguration) with Serializable val config = sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration)) rdd.mapPartitions { _.map { entry => val fs = FileSystem.get(URI.create("s3://pathpart_0000000-123"), config.value.value) val path = new Path("s3:///path/part_0000000-123") val file = fs.create(path) file.write("foobar".getBytes) file.close() } }.count() } println(s"Count is $count") ssc.start()