Spark笔记

Spark笔记

1.rdd的属性

  • 一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
  • 一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
  • RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
  • 一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
  • 一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

2.算子分为哪几类(RDD支持哪几种类型的操作)

转换(Transformation) 现有的RDD通过转换生成一个新的RDD。lazy模式,延迟执行。

转换函数包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,union,join, coalesce 等等。

动作(Action) 在RDD上运行计算,并返回结果给驱动程序(Driver)或写入文件系统。

动作操作包括:reduce,collect,count,first,take,countByKey以及foreach等等。

collect 该方法把数据收集到driver端 Array数组类型

所有的transformation只有遇到action才能被执行。

当触发执行action之后,数据类型不再是rdd了,数据就会存储到指定文件系统中,或者直接打印结果或者收集起来。

3.创建rdd的几种方式

  1. 集合并行化创建(有数据)

    val arr = Array(1,2,3,4,5)

    val rdd = sc.parallelize(arr)

    val rdd =sc.makeRDD(arr)

  2. 读取外部文件系统,如hdfs,或者读取本地文件(最常用的方式)(没数据)

    val rdd2 = sc.textFile(“hdfs://hdp-01:9000/words.txt”)

    // 读取本地文件

    val rdd2 = sc.textFile(“file:///root/words.txt”)

  3. 从父RDD转换成新的子RDD

    调用Transformation类的方法,RDD

4.spark运行流程

Worker的功能: 定时和master通信;调度并管理自身的executor

executor: 由Worker启动的,程序最终在executor中运行,(程序运行的一个容器)

spark-submit命令执行时,会根据master地址去向 Master发送请求。

Master接收到Dirver端的任务请求之后,根据任务的请求资源进行调度,(打散的策略),尽可能的把任务资源平均分配,然后向Worker发送指令

Worker收到Master的指令之后,就根据相应的资源,启动executor(cores,memory)

executor会向dirver端建立请求,通知driver,任务已经可以运行了

driver运行任务的时候,会把任务发送到executor中去运行。

5.Spark中coalesce与repartition的区别

1)关系:

两者都是用来改变 RDD 的 partition 数量的,repartition 底层调用的就是 coalesce 方法:coalesce(numPartitions, shuffle = true)

2)区别:

repartition 一定会发生 shuffle,coalesce 根据传入的参数来判断是否发生 shuffle

一般情况下增大 rdd 的 partition 数量使用 repartition,减少 partition 数量时使用coalesce

6.sortBy 和 sortByKey的区别

sortBy既可以作用于RDD[K] ,还可以作用于RDD[(k,v)]

sortByKey 只能作用于 RDD[K,V] 类型上。

7.map和mapPartitions的区别

8.数据存入Redis 优先使用map mapPartitions foreach foreachPartions哪个

使用 foreachPartition

  • 1,map mapPartition 是转换类的算子, 有返回值

  • 2, 写mysql,redis 的连接

    foreach * 100万 100万次的连接

    foreachPartions * 200 个分区 200次连接 一个分区中的数据,共用一个连接

foreachParititon 每次迭代一个分区,foreach每次迭代一个元素。

该方法没有返回值,或者Unit

主要作用于,没有返回值类型的操作(打印结果,写入到mysql数据库中)

在写入到redis,mysql的时候,优先使用foreachPartititon

9.reduceByKey和groupBykey的区别

reduceByKey会传一个聚合函数, 相当于 groupByKey + mapValues

reduceByKey 会有一个分区内聚合,而groupByKey没有 最核心的区别

结论: reduceByKey有分区内聚合,更高效,优先选择使用reduceByKey。

10.cache和checkPoint的比较

都是做 RDD 持久化的

1.缓存,是在触发action之后,把数据写入到内存或者磁盘中。不会截断血缘关系

(设置缓存级别为memory_only: 内存不足,只会部分缓存或者没有缓存,缓存会丢失,memory_and_disk :内存不足,会使用磁盘)

2.checkpoint 也是在触发action之后,执行任务。单独再启动一个job,负责写入数据到hdfs中。(把rdd中的数据,以二进制文本的方式写入到hdfs中,有几个分区,就有几个二进制文件)

3.某一个RDD被checkpoint之后,他的父依赖关系会被删除,血缘关系被截断,该RDD转换成了CheckPointRDD,以后再对该rdd的所有操作,都是从hdfs中的checkpoint的具体目录来读取数据。 缓存之后,rdd的依赖关系还是存在的。

11.spark streaming流式统计单词数量代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
object WordCountAll {
// newValues当前批次的出现的单词次数, runningCount表示之前运行的单词出现的结果
/* def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = newValues.sum + runningCount.getOrElse(0)// 将历史前几个批次的值和当前批次的值进行累加返回当前批次最终的结果
Some(newCount)
}*/
/**
* String : 单词 hello
* Seq[Int] :单词在当前批次出现的次数
* Option[Int] : 历史结果
*/
val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
//iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))
iter.flatMap{case(x,y,z)=>Some(y.sum + z.getOrElse(0)).map(m=>(x, m))}
}
// 屏蔽日志
Logger.getLogger("org.apache").setLevel(Level.ERROR)
def main(args: Array[String]) {
// 必须要开启2个以上的线程,一个线程用来接收数据,另外一个线程用来计算
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
// 设置sparkjob计算时所采用的序列化方式
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.rdd.compress", "true") // 节约大量的内存内容
// 如果你的程序出现垃圾回收时间过程,可以设置一下java的垃圾回收参数
// 同时也会创建sparkContext对象
// 批次时间 >= 批次处理的总时间 (批次数据量,集群的计算节点数量和配置)
val ssc = new StreamingContext(conf, Seconds(5))

//做checkpoint 写入共享存储中
ssc.checkpoint("c://aaa")

// 创建一个将要连接到 hostname:port 的 DStream,如 localhost:9999
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.175.101", 44444)
//updateStateByKey结果可以累加但是需要传入一个自定义的累加函数:updateFunc
val results = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
//打印结果到控制台
results.print()
//开始计算
ssc.start()
//等待停止
ssc.awaitTermination()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
object SparkWordCountApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local").setAppName("SparkWordCountApp")
val sc = new SparkContext(sparkConf)

val rdd = sc.textFile("file:////Users/lx/IdeaProjects/sparksql-train/data/input.txt")
// rdd.collect().foreach(println)
rdd.flatMap(_.split(","))
.map(word => (word, 1))
.reduceByKey(_+_).map(x => (x._2,x._1)).sortByKey(false)
.map(x => (x._1,x._2))
.collect().foreach(println)

sc.stop()
}
}

12.简述map和flatMap的区别和应用场景

map是对每一个元素进行操作,flatmap是对每一个元素操作后并压平

flatmap对每个元素进行操作后,放入一个对象返回。

13.计算曝光数和点击数

14.分别列出几个常用的transformation和action算子

  • 转换算子:map,flatmap,filter,reduceByKey,groupByKey,groupBy
  • 行动算子:foreach,foreachpartition,collect,collectAsMap,take,top,first,count,countByKey

15.按照需求使用spark编写以下程序,要求使用scala语言

当前文件a.txt的格式,请统计每个单词出现的次数

A,b,c

B,b,f,e

1
2
3
4
5
6
7
8
9
10
11
12
13
14
object WordCount {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster("local[*]")
val sc = new SparkContext(conf)

var sData: RDD[String] = sc.textFile("a.txt")
val sortData: RDD[(String, Int)] = sData.flatMap(_.split(",")).map((_,1)).reduceByKey(_+_)
sortData.foreach(print)
}
}

16.spark应用程序的执行命令是什么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/usr/local/spark-current2.3/bin/spark-submit \

--class com.wedoctor.Application \

--master yarn \

--deploy-mode client \

--driver-memory 1g \

--executor-memory 2g \

--queue root.wedw \

--num-executors 200 \

--jars /home/pgxl/liuzc/config-1.3.0.jar,/home/pgxl/liuzc/hadoop-lzo-0.4.20.jar,/home/pgxl/liuzc/elasticsearch-hadoop-hive-2.3.4.jar \

/home/pgxl/liuzc/sen.jar

17.Spark应用执行有哪些模式,其中哪几种是集群模式

  • 本地local模式
  • standalone模式
  • spark on yarn模式
  • spark on mesos模式

其中,standalone模式,spark on yarn模式,spark on mesos模式是集群模式

18.请说明spark中广播变量的用途

使用广播变量,每个 Executor 的内存中,只驻留一份变量副本,而不是对 每个 task 都传输一次大变量,省了很多的网络传输, 对性能提升具有很大帮助, 而且会通过高效的广播算法来减少传输代价。

19.以下代码会报错吗?如果会怎么解决 val arr = new ArrayList[String]; arr.foreach(println)

val arr = new ArrayList[String]; 这里会报错,需要改成 val arr: Array[String] = new Array[String](10)

arr.foreach(println)打印不会报空指针

20.写出你用过的spark中的算子,其中哪些会产生shuffle过程

reduceBykey:

groupByKey:

…ByKey:

21.Spark中rdd与partition的区别

RDD是什么?弹性分布式数据集。
弹性:并不是指他可以动态扩展,而是血统容错机制。
分布式:顾名思义,RDD会在多个节点上存储,就和hdfs的分布式道理是一样的。hdfs文件被切分为多个block存储在各个节点上,而RDD是被切分为多个partition。不同的partition可能在不同的节点上。在spark读取hdfs的场景下,spark把hdfs的block读到内存就会抽象为spark的partition。至于后续遇到shuffle的操作,RDD的partition可以根据Hash再次进行划分(一般pairRDD是使用key做Hash再取余来划分partition)。

22.请写出创建Dateset的几种方式

  • 由DataFrame 转化成为 Dataset
  • 通过 SparkSession.createDataset() 直接创建
  • 通过toDS 方法意识转换

23.描述一下RDD,DataFrame,DataSet的区别?

1)RDD

优点:

编译时类型安全

编译时就能检查出类型错误

面向对象的编程风格

直接通过类名点的方式来操作数据

缺点:

序列化和反序列化的性能开销

无论是集群间的通信, 还是 IO 操作都需要对对象的结构和数据进行序列化和反序列化。

GC 的性能开销,频繁的创建和销毁对象, 势必会增加 GC

2)DataFrame

DataFrame 引入了 schema 和 off-heap

schema : RDD 每一行的数据, 结构都是一样的,这个结构就存储在 schema 中。 Spark 通过 schema 就能够读懂数据, 因此在通信和 IO 时就只需要序列化和反序列化数据, 而结构的部分就可以省略了。

3)DataSet

DataSet 结合了 RDD 和 DataFrame 的优点,并带来的一个新的概念 Encoder。

当序列化数据时,Encoder 产生字节码与 off-heap 进行交互,能够达到按需访问数据的效果,而不用反序列化整个对象。Spark 还没有提供自定义 Encoder 的 API,但是未来会加入。
三者之间的转换:

24.描述一下Spark中stage是如何划分的?描述一下shuffle的概念

  • Stage概念

Spark任务会根据RDD之间的依赖关系,形成一个DAG有向无环图,DAG会提交给DAGScheduler,DAGScheduler会把DAG划分相互依赖的多个stage,划分stage的依据就是RDD之间的宽窄依赖。遇到宽依赖就划分stage,每个stage包含一个或多个task任务。然后将这些task以taskSet的形式提交给TaskScheduler运行stage是由一组并行的task组成。

  • stage切割规则

    切割规则:从后往前遇到宽依赖就切割stage。

img

  • stage计算模式

    pipeline管道计算模式,pipeline只是一种计算思想,模式。

备注:图中几个理解点:

1、Spark的pipeLine的计算模式,相当于执行了一个高阶函数f3(f2(f1(textFile))) !+!+!=3 也就是来一条数据然后计算一条数据,把所有的逻辑走完,然后落地,准确的说一个task处理遗传分区的数据 因为跨过了不同的逻辑的分区。而MapReduce是 1+1=2,2+1=3的模式,也就是计算完落地,然后在计算,然后再落地到磁盘或内存,最后数据是落在计算节点上,按reduce的hash分区落地。所以这也是比Mapreduce快的原因,完全基于内存计算。

2、管道中的数据何时落地:shuffle write的时候,对RDD进行持久化的时候。

  1. Stage的task并行度是由stage的最后一个RDD的分区数来决定的 。一般来说,一个partiotion对应一个task,但最后reduce的时候可以手动改变reduce的个数,也就是分区数,即改变了并行度。例如reduceByKey(XXX,3),GroupByKey(4),union由的分区数由前面的相加。

  2. 如何提高stage的并行度:reduceBykey(xxx,numpartiotion),join(xxx,numpartiotion)

shuffle 和 stage

Shuffle:当Map的输出结果要被Reduce使用时,输出结果需要按key哈希,并且分发到每一个Reducer上去,这个过程就是shuffle。

spark中的shuffle:宽依赖指子 RDD 的各个分片会依赖于父RDD 的多个分片,所以会造成父 RDD 的各个分片在集群中重新分片

25.Spark 在yarn上运行需要做哪些关键的配置工作?如何kill -个Spark在yarn运行中Application

修改 spark-env.sh文件,配置hadoop的配置文件,或者yarn的配置文件即可(两者选择其中一种即可)

1
yarn application -kill <applicationId>

26.通常来说,Spark与MapReduce相比,Spark运行效率更高。请说明效率更高来源于Spark内置的哪些机制?并请列举常见spark的运行模式?

  1. spark中具有DAG有向无环图,DAG有向无环图在此过程中减少了shuffle以及落地磁盘的次数

    Spark 计算比 MapReduce 快的根本原因在于 DAG 计算模型。一般而言,DAG 相比MapReduce 在大多数情况下可以减少 shuffle 次数。Spark 的 DAGScheduler 相当于一个改进版的 MapReduce,如果计算不涉及与其他节点进行数据交换,Spark 可以在内存中一次性完成这些操作,也就是中间结果无须落盘,减少了磁盘 IO 的操作。
    spark把运算的中间数据(shuffle阶段产生的数据)存放在内存,迭代计算效率更高,mapreduce的中间结果需要落地,保存到磁盘

  2. Spark容错性高,它通过弹性分布式数据集RDD来实现高效容错,RDD是一组分布式的存储在 节点内存中的只读性的数据集,这些集合石弹性的,某一部分丢失或者出错,可以通过整个数据集的计算流程的血缘关系来实现重建,mapreduce的容错只能重新计算

  3. spark是粗粒度资源申请,也就是当提交spark application的时候,application会将所有的资源申请完毕,如果申请不到资源就等待,如果申请到资源才执行application,task在执行的时候就不需要自己去申请资源,task执行快,当最后一个task执行完之后task才会被释放。

    优点是执行速度快,缺点是不能使集群得到充分的利用

    MapReduce是细粒度资源申请,当提交application的时候,task执行时,自己申请资源,自己释放资源,task执行完毕之后,资源立即会被释放,task执行的慢,application执行的相对比较慢。

    优点是集群资源得到充分利用,缺点是application执行的相对比较慢。

27.RDD中的数据在哪?

RDD中的数据在数据源,RDD只是一个抽象的数据集,我们通过对RDD的操作就相当于对数据进行操作。

28.如果对RDD进行cache操作后,数据在哪里?

数据在第一执行cache算子时会被加载到各个Executor进程的内存中,第二次就会直接从内存中读取而不会区磁盘。

29.Spark中Partition的数量由什么决定

和Mr一样,但是Spark默认最少有两个分区。

如果是读取hdfs的文件,一般来说,partition的数量等于文件的数量。

如果单个文件的大小大于hdfs的分块大小,partition的数量就等于 “文件大小/分块大小”。

同时,也可以使用rdd的repartition方法重新划分partition。

另外,在使用聚合函数比如 reducebykey, groupbykey,可以通过指定partitioner来指定partition的数量。

30.Scala里面的函数和方法有什么区别

Scala 有函数和方法,二者在语义上的区别很小。Scala 方法是类的一部分,而函数是一个对象可以赋值给一个变量。换句话来说在类中定义的函数即是方法。

31.SparkStreaming怎么进行监控?

  1. Spark Streaming也提供了Jobs、Stages、Storage、Enviorment、Executors以及Streaming的监控

  2. Spark Streaming能够提供如此优雅的数据监控,是因在对监听器设计模式的使用。如若Spark UI无法满足你所需的监控需要,用户可以定制个性化监控信息。Spark Streaming提供了StreamingListener特质,通过继承此方法,就可以定制所需的监控

32.Spark判断Shuffle的依据?

父RDD的一个分区中的数据有可能被分配到子RDD的多个分区中

33.Scala有没有多继承?可以实现多继承么?

trait实现多继承

由上可见,super.log通常调用trait从最后一个开始,从右往左调用。但是如果右边的trait是左边trait的超类,那么次序会调换,先调用子再调用父。

34.Sparkstreaming和flink做实时处理的区别

  • Flink的计算模型抽象是有状态的流,即源源不断没有边界的数据,并且数据的状态可以改变,对于批处理则认为是有边界的流进行处理
  • Spark的计算模型抽象是批,所有数据的表示本质上都是RDD抽象,对于流处理的支持,则是基于时间将流划分为多个批次,依次进行处理

35.Sparkcontext的作用

官方解释:SparkContext是spark功能的主要入口。其代表与spark集群的连接,能够用来在集群上创建RDD、累加器、广播变量。每个JVM里只能存在一个处于激活状态的SparkContext,在创建新的SparkContext之前必须调用stop()来关闭之前的SparkContext。
下面我们看下SparkContext究竟有什么作用:
首先,每一个Spark应用都是一个SparkContext实例,可以理解为一个SparkContext就是一个spark application的生命周期,一旦SparkContext创建之后,就可以用这个SparkContext来创建RDD、累加器、广播变量,并且可以通过SparkContext访问Spark的服务,运行任务。spark context设置内部服务,并建立与spark执行环境的连接。

SparkContext在spark应用中起到了master的作用,掌控了所有Spark的生命活动,统筹全局,除了具体的任务在executor中执行,其他的任务调度、提交、监控、RDD管理等关键活动均由SparkContext主体来完成。

36.Sparkstreaming读取kafka数据为什么选择直连方式

1
2
3
4
Direct方式则采用的是低层次的API,直接连接kafka服务器上读取数据。需要我们自己去手动维护偏移量,代码量稍微大些。不过这种方式的优点有:
1.当我们读取Topic下的数据时,它会自动对应Topic下的Partition生成相对应数量的RDD Partition,提高了计算时的并行度,提高了效率。
2.它不需要通过WAL来维持数据的完整性。采取Direct直连方式时,当数据发生丢失,只要kafka上的数据进行了复制,就可以根据副本来进行数据重新拉取。
3.它保证了数据只消费一次。因为我们将偏移量保存在一个地方,当我们读取数据时,从这里拿到数据的起始偏移量和读取偏移量确定读取范围,通过这些我们可以读取数据,当读取完成后会更新偏移量,这就保证了数据只消费一次。
1
2
3
4
5
Receive是使用的高级API,需要消费者连接Zookeeper来读取数据。是由Zookeeper来维护偏移量,不用我们来手动维护,这样的话就比较简单一些,减少了代码量。但是天下没有免费的午餐,它也有很多缺点:
1.导致丢失数据。它是由Executor内的Receive来拉取数据并存放在内存中,再由Driver端提交的job来处理数据。这样的话,如果底层节点出现错误,就会发生数据丢失。
2.浪费资源。可以采取WALs方式将数据同步到高可用数据存储平台上(HDFS,S3),那么如果再发生错误,就可以从中再次读取数据。但是这样会导致同样的数据存储了两份,浪费了资源。
3.可能会导致重复读取数据。对于公司来说,一些数据宁可丢失了一小小部分也不能重复读取,但是这种由Zookeeper来记录偏移量的方式,可能会因为Spark和Zookeeper不同步,导致一份数据读取了两次。
4.效率低。因为是分批次执行的,它是接收数据,直到达到了设定的时间间隔,才可是进行计算。而且我们在KafkaUtils.createStream()中设定的partition数量,只会增加receive的数量,不能提高并行计算的效率,但我们可以设定不同的Group和Topic创建DStream,然后再用Union合并DStream,提高并行效率。

37.离线分析什么时候用sparkcore和sparksql

经过分析,得出结论,Core适合读取结构复杂,多重map嵌套的数据。比如Avro这种数据复杂的文件类型。

SparkSQL适合读取结构简单的数据,比如parquet。

38.Sparkstreaming实时的数据不丢失的问题

在这种模式下,我们可以使用checkpoint + WAL + ReliableReceiver的方式保证不丢失数据,就是说在driver端打开chechpoint,用于定期的保存driver端的状态信息到HDFS上,保证driver端的状态信息不会丢失;在接收数据Receiver所在的Executor上打开WAL,使得接收到的数据保存在HDFS中,保证接收到的数据不会丢失;因为我们使用的是ReliableReceiver,所以在Receiver挂掉的期间,是不会接收数据,当这个Receiver重启的时候,会从上次消费的地方开始消费。

39.简述宽依赖和窄依赖概念,groupByKey,reduceByKey,map,filter,union五种操作哪些会导致宽依赖,哪些会导致窄依赖?

宽依赖:父RDD的分区被子RDD的多个分区使用 例如 groupByKey、reduceByKey、sortByKey等操作会产生宽依赖,会产生shuffle

窄依赖:父RDD的每个分区都只被子RDD的一个分区使用 例如map、filter、union等操作会产生窄依赖

40.数据倾斜可能会导致哪些问题,如何监控和排查,在设计之初,要考虑哪些来避免?

危害一:任务长时间挂起,资源利用率下降

计算作业通常是分阶段进行的,阶段与阶段之间通常存在数据上的依赖关系,也就是说后一阶段需要等前一阶段执行完才能开始。

举个例子,Stage1在Stage0之后执行,假如Stage1依赖Stage0产生的数据结果,那么Stage1必须等待Stage0执行完成后才能开始,如果这时Stage0因为数据倾斜问题,导致任务执行时长过长,或者直接挂起,那么Stage1将一直处于等待状态,整个作业也就一直挂起。这个时候,资源被这个作业占据,但是却只有极少数task在执行,造成计算资源的严重浪费,利用率下降。

危害二:由引发内存溢出,导致任务失败

数据发生倾斜时,可能导致大量数据集中在少数几个节点上,在计算执行中由于要处理的数据超出了单个节点的能力范围,最终导致内存被撑爆,报OOM异常,直接导致任务失败。

危害三:作业执行时间超出预期,导致后续依赖数据结果的作业出错

有时候作业与作业之间,并没有构建强依赖关系,而是通过执行时间的前后时间差来调度,当前置作业未在预期时间范围内完成执行,那么当后续作业启动时便无法读取到其所需要的最新数据,从而导致连续出错。

可以看出,数据倾斜问题,就像是一个隐藏的杀手,潜伏在数据处理与分析的过程中,只要一出手,非死即伤。那么它又是如何产生的呢?想要解决它,我们就要先了解它。

为什么会产生数据倾斜?

3.1:读入数据的时候就是倾斜的

读入数据是计算任务的开始,但是往往这个阶段就可能已经开始出现问题了。

对于一些本身就可能倾斜的数据源,在读入阶段就可能出现个别partition执行时长过长或直接失败,如读取id分布跨度较大的mysql数据、partition分配不均的kafka数据或不可分割的压缩文件。

这些场景下,数据在读取阶段或者读取后的第一个计算阶段,就会容易执行过慢或报错。

3.2:shuffle产生倾斜

在shuffle阶段造成倾斜,在实际的工作中更加常见,比如特定key值数量过多,导致join发生时,大量数据涌向一个节点,导致数据严重倾斜,个别节点的读写压力是其他节点的好几倍,容易引发OOM错误。

3.3:过滤导致倾斜

有些场景下,数据原本是均衡的,但是由于进行了一系列的数据剔除操作,可能在过滤掉大量数据后,造成数据的倾斜。

例如,大部分节点都被过滤掉了很多数据,只剩下少量数据,但是个别节点的数据被过滤掉的很少,保留着大部分的数据。这种情况下,一般不会OOM,但是倾斜的数据可能会随着计算逐渐累积,最终引发问题。

怎么预防或解决数据倾斜问题?

4.1.尽量保证数据源是均衡的

程序读入的数据源通常是上个阶段其他作业产生的,那么我们在上个阶段作业生成数据时,就要注意这个问题,尽量不要给下游作业埋坑。

如果所有作业都注意到并谨慎处理了这个问题,那么出现读入时倾斜的可能性会大大降低。

这个有个小建议,在程序输出写文件时,尽量不要用coalesce,而是用repartition,这样写出的数据,各文件大小往往是均衡的。

4.2.对大数据集做过滤,结束后做repartition

对比较大的数据集做完过滤后,如果过滤掉了绝大部分数据,在进行下一步操作前,最好可以做一次repartition,让数据重回均匀分布的状态,否则失衡的数据集,在进行后续计算时,可能会逐渐累积倾斜的状态,容易产生错误。

4.3.对小表进行广播

如果两个数据量差异较大的表做join时,发生数据倾斜的常见解决方法,是将小表广播到每个节点去,这样就可以实现map端join,从而省掉shuffle,避免了大量数据在个别节点上的汇聚,执行效率也大大提升。

4.4.编码时要注意,不要人为造成倾斜

在写代码时,也要多加注意不要使用容易出问题的算子,如上文提到的coalesce。

另外,也要注意不要人为造成倾斜,如作者一次在帮别人排查倾斜问题时发现,他在代码中使用开窗函数,其中写到over (partition by 1),这样就把所有数据分配到一个分区内,人为造成了倾斜。

4.5.join前优化

个别场景下,两个表join,某些特殊key值可能很多,很容易产生数据倾斜,这时可以根据实际计算进行join前优化。

如计算是先join后根据key聚合,那可以改为先根据key聚合然后再join。又如,需求是join后做distinct操作,在不影响结果的前提下,可以改为先distinct,然后再join。这些措施都是可以有效避免重复key过多导致join时倾斜。

41.有一千万条短信,有重复,以文本文件的形式保存,一行一条数据,请用五分钟时间,找出重复出现最多的前10条

采用内存映射办法。

首先,1千万条短信按现在的短信长度将不会超过1GB空间,使用内存映射文件比较合适,可以一次映射 (如果有更大的数据量,可以采用分段映射),由于不需要频繁使用文件I/O和频繁分配小内存,这将大大提高了数据的加载速度。

其次,对每条短信的第i(i从0到70)个字母按ASCII码进行分组,也就是创建树。i是树的深度,也是短信第i个字母。

这个问题主要是解决两方面的问题:

  1. 内容的加载。

  2. 短信内容的比较。

采用内存映射技术可以解决内容加载的性能问题(不仅是不需要调用文件I/O函数,而且也不需要每读出一条短信都要分配一小块内存),而使用树技术可以有效地减少比较次数。

42.现有一文件,格式如下,请用spark统计每个单词出现的次数

见15

43.共享变量和累加器

累加器(accumulator)是 Spark 中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。而广播变量用来高效分发较大的对象。

共享变量出现的原因:

通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。

Spark 的两个共享变量,累加器与广播变量,分别为结果聚合与广播这两种常见的通信模式突破了这一限制。

44.当Spark涉及到数据库的操作时,如何减少Spark运行中的数据库连接数?

使用 foreachPartition 代替 foreach,在 foreachPartition 内获取数据库的连接。

45.特别大的数据,怎么发送到excutor中?

当在excutor端使用了Driver变量,不使用广播变量,在每个excutor中有多少的task就有多少个Driver端变量副本
导致的问题:占用了网络IO,速度慢
如果使用广播变量在每一个excutor端只有一份Driver端的变量副本

46.spark调优都做过哪些方面?

​ 了解完了Spark作业运行的基本原理之后,对资源相关的参数就容易理解了。所谓的Spark资源参数调优,其实主要就是对Spark运行过程中各个使用资源的地方,通过调节各种参数,来优化资源使用的效率,从而提升Spark作业的执行性能。以下参数就是Spark中主要的资源参数,每个参数都对应着作业运行原理中的某个部分,我们同时也给出了一个调优的参考值。

num-executors

  参数说明:该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。

  参数调优建议:每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。

executor-memory

  参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。

  参数调优建议:每个Executor进程的内存设置4G8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,就代表了你的Spark作业申请到的总内存量(也就是所有Executor进程的内存总和),这个量是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的总内存量最好不要超过资源队列最大总内存的1/31/2,避免你自己的Spark作业占用了队列所有的资源,导致别的同学的作业无法运行。

executor-cores

  参数说明:该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。

  参数调优建议:Executor的CPU core数量设置为24个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。同样建议,如果是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/31/2左右比较合适,也是避免影响其他同学的作业运行。

driver-memory

  参数说明:该参数用于设置Driver进程的内存。

  参数调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。

spark.default.parallelism

  参数说明:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。

  参数调优建议:Spark作业的默认task数量为500~1000个较为合适。很多同学常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!因此Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。

spark.storage.memoryFraction

  参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。

  参数调优建议:如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

spark.shuffle.memoryFraction

  参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。

  参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。
资源参数的调优,没有一个固定的值,需要同学们根据自己的实际情况(包括Spark作业中的shuffle操作数量、RDD持久化操作数量以及spark web ui中显示的作业gc情况),同时参考本篇文章中给出的原理以及调优建议,合理地设置上述参数。

47.spark任务为什么会被yarn kill掉?

因为spark任务是向yarn申请资源的。

48.Spark on Yarn作业执行流程?yarn-client和yarn-cluster有什么区别?

Spark on Yarn作业执行流程?

1.Spark Yarn Client 向 Yarn 中提交应用程序。
2.ResourceManager 收到请求后,在集群中选择一个 NodeManager,并为该应用程序分配一个 Container,在这个 Container 中启动应用程序的 ApplicationMaster, ApplicationMaster 进行 SparkContext 等的初始化。
3.ApplicationMaster 向 ResourceManager 注册,这样用户可以直接通过 ResourceManager 查看应用程序的运行状态,然后它将采用轮询的方式通过RPC协议为各个任务申请资源,并监控它们的运行状态直到运行结束。
4.ApplicationMaster 申请到资源(也就是Container)后,便与对应的 NodeManager 通信,并在获得的 Container 中启动 CoarseGrainedExecutorBackend,启动后会向 ApplicationMaster 中的 SparkContext 注册并申请 Task。
5.ApplicationMaster 中的 SparkContext 分配 Task 给 CoarseGrainedExecutorBackend 执行,CoarseGrainedExecutorBackend 运行 Task 并向ApplicationMaster 汇报运行的状态和进度,以让 ApplicationMaster 随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。
6.应用程序运行完成后,ApplicationMaster 向 ResourceManager申请注销并关闭自己。

yarn-client和yarn-cluster有什么区别?

  1. 理解YARN-Client和YARN-Cluster深层次的区别之前先清楚一个概念:Application Master。在YARN中,每个Application实例都有一个ApplicationMaster进程,它是Application启动的第一个容器。它负责和ResourceManager打交道并请求资源,获取资源之后告诉NodeManager为其启动Container。从深层次的含义讲YARN-Cluster和YARN-Client模式的区别其实就是ApplicationMaster进程的区别

    1. YARN-Cluster模式下,Driver运行在AM(Application Master)中,它负责向YARN申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉Client,作业会继续在YARN上运行,因而YARN-Cluster模式不适合运行交互类型的作业
    2. YARN-Client模式下,Application Master仅仅向YARN请求Executor,Client会和请求的Container通信来调度他们工作,也就是说Client不能离开。

49.Flatmap底层编码实现?

Spark flatMap 源码:

1
2
3
4
5
6
7
8
/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}

Scala flatMap 源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/** Creates a new iterator by applying a function to all values produced by this iterator
* and concatenating the results.
*
* @param f the function to apply on each element.
* @return the iterator resulting from applying the given iterator-valued function
* `f` to each value produced by this iterator and concatenating the results.
* @note Reuse: $consumesAndProducesIterator
*/
def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] {
private var cur: Iterator[B] = empty
private def nextCur() { cur = f(self.next()).toIterator }
def hasNext: Boolean = {
// Equivalent to cur.hasNext || self.hasNext && { nextCur(); hasNext }
// but slightly shorter bytecode (better JVM inlining!)
while (!cur.hasNext) {
if (!self.hasNext) return false
nextCur()
}
true
}
def next(): B =<span style="color:#ffffff"> <span style="background-color:rgb(255,0,0)">(if (hasNext) cur else empty).next()</span></span>
}

flatMap其实就是将RDD里的每一个元素执行自定义函数f,这时这个元素的结果转换成iterator,最后将这些再拼接成一个新的RDD,也可以理解成原本的每个元素由横向执行函数f后再变为纵向。画红部分一直在回调,当RDD内没有元素为止。