面试题随笔-21/3/28
Spark 机器学习和 Spark 图计算接触过没有,举例说明你用它做过什么?
Spark 提供了很多机器学习库,我们只需要填入数据,设置参数就可以用了。使用起来非常方便。另外一方面,由于它把所有的东西都写到了内部,我们无法修改其实现过程。要想修改里面的某个环节,还的修改源码,重新编译。比如 kmeans 算法,如果没有特殊需求,很方便。但是spark内部使用的两个向量间的距离是欧式距离。如果你想改为余弦或者马氏距离,就的重新编译源码了。Spark 里面的机器学习库都是一些经典的算法,这些代码网上也好找。这些代码使用起来叫麻烦,但是很灵活。Spark 有一个很大的优势,那就是 RDD。模型的训练完全是并行的。
Spark 的 ML 和 MLLib 两个包区别和联系
技术角度上,面向的数据集类型不一样:ML 的 API 是面向 Dataset 的(Dataframe 是 Dataset 的子集,也就是 Dataset[Row]), mllib 是面对 RDD 的。Dataset 和 RDD 有啥不一样呢?Dataset 的底端是 RDD。Dataset 对 RDD 进行了更深一层的优化,比如说有 sql 语言类似的黑魔法,Dataset 支持静态类型分析所以在 compile time 就能报错,各种 combinators(map,foreach 等)性能会更好,等等。
编程过程上,构建机器学习算法的过程不一样:ML 提倡使用 pipelines,把数据想成水,水从管道的一段流入,从另一端流出。ML 是1.4比 Mllib 更高抽象的库,它解决如何简洁的设计一个机器学习工作流的问题,而不是具体的某种机器学习算法。未来这两个库会并行发展。
Spark RDD是怎么容错的,基本原理是什么?
一般来说,分布式数据集的容错性有两种方式:数据检查点和记录数据的更新。
面向大规模数据分析,数据检查点操作成本很高,需要通过数据中心的网络连接在机器之间复制庞大的数据集,而网络带宽往往比内存带宽低得多,同时还需要消耗更多的存储资源。因此,Spark选择记录更新的方式。但是,如果更新粒度太细太多,那么记录更新成本也不低。因此,RDD只支持粗粒度转换,即只记录单个块上执行的单个操作,然后将创建RDD的一系列变换序列(每个RDD都包含了他是如何由其他RDD变换过来的以及如何重建某一块数据的信息。因此RDD的容错机制又称“血统(Lineage)”容错)记录下来,以便恢复丢失的分区。
Lineage本质上很类似于数据库中的重做日志(Redo Log),只不过这个重做日志粒度很大,是对全局数据做同样的重做进而恢复数据。
为什么要用Yarn来部署Spark?
因为 Yarn 支持动态资源配置。Standalone 模式只支持简单的固定资源分配策略,每个任务固定数量的 core,各 Job 按顺序依次分配资源,资源不够的时候就排队。这种模式比较适合单用户的情况,多用户的情境下,会有可能有些用户的任务得不到资源。
Yarn 作为通用的资源调度平台,除了 Spark 提供调度服务之外,还可以为其他系统提供调度,如 Hadoop MapReduce, Hive 等。
说说yarn-cluster和yarn-client的异同点。
cluster 模式会在集群的某个节点上为 Spark 程序启动一个称为 Master 的进程,然后 Driver 程序会运行正在这个 Master 进程内部,由这种进程来启动 Driver 程序,客户端完成提交的步骤后就可以退出,不需要等待 Spark 程序运行结束,这是适合生产环境的运行方式
client 模式也有一个 Master 进程,但是 Driver 程序不会运行在这个 Master 进程内部,而是运行在本地,只是通过 Master 来申请资源,直到运行结束,这种模式非常适合需要交互的计算。显然 Driver 在 client 模式下会对本地资源造成一定的压力。
解释一下 groupByKey, reduceByKey 还有 reduceByKeyLocally
Groupbykey: 当调用一个(K, V)对的数据集时,返回一个(K,可迭代
Note: 如果分组是为了对每个键执行聚合(比如求和或平均值),那么使用 reduceByKey 或 aggregateByKey 将产生更好的性能。
Note: 默认情况下,输出中的并行度取决于父 RDD 的分区数。您可以传递一个可选的 numPartitions 参数来设置不同数量的任务。
Reducebykey: 当对一个(K, V)对的数据集调用时,返回一个(K, V)对的数据集,其中每个键的值使用给定的 reduce 函数 func 进行聚合,与 groupByKey 类似,reduce 任务的数量可以通过第二个可选参数进行配置。
ReducebykeyLocally: 该函数将RDD[K,V]中每个K对应的V值根据映射函数来运算,运算结果映射到一个Map[K,V]中,而不是RDD[K,V]。
说说 persist() 和 cache() 的异同
RDD的cache和persist的区别
cache()是persist()的简化方式,调用persist的无参版本,也就是调用persist(StorageLevel.MEMORY_ONLY),cache只有一个默认的缓存级别MEMORY_ONLY,即将数据持久化到内存中,而persist可以通过传递一个 StorageLevel 对象来设置缓存的存储级别。
DataFrame的cache和persist的区别
cache()依然调用的persist(),但是persist调用cacheQuery,而cacheQuery的默认存储级别为MEMORY_AND_DISK,这点和rdd是不一样的。
可以解释一下这两段程序的异同吗
1 | val counter = 0 |
所有在 Driver 程序追踪的代码看上去好像在 Driver 上计算,实际上都不在本地,每个 RDD 操作都被转换成 Job 分发至集群的执行器 Executor 进程中运行,即便是单机本地运行模式,也是在单独的执行器进程上运行,与 Driver 进程属于不用的进程。所以每个 Job 的执行,都会经历序列化、网络传输、反序列化和运行的过程。
再具体一点解释是 foreach 中的匿名函数 x => counter += x 首先会被序列化然后被传入计算节点,反序列化之后再运行,因为 foreach 是 Action 操作,结果会返回到 Driver 进程中。
在序列化的时候,Spark 会将 Job 运行所依赖的变量、方法全部打包在一起序列化,相当于它们的副本,所以 counter 会一起被序列化,然后传输到计算节点,是计算节点上的 counter 会自增,而 Driver 程序追踪的 counter 则不会发生变化。执行完成之后,结果会返回到 Driver 程序中。而 Driver 中的 counter 依然是当初的那个 Driver 的值为0。
因此说,RDD 操作不能嵌套调用,即在 RDD 操作传入的函数参数的函数体中,不可以出现 RDD 调用。
说说map和mapPartitions的区别
map 中的 func 作用的是 RDD 中每一个元素,而 mapPartitioons 中的 func 作用的对象是 RDD 的一整个分区。所以 func 的类型是 Iterator
groupByKey和reduceByKey是属于Transformation还是 Action?
前者,因为 Action 输出的不再是 RDD 了,也就意味着输出不是分布式的,而是回送到 Driver 程序。以上两种操作都是返回 RDD,所以应该属于 Transformation。
说说Spark支持的3种集群管理器
Standalone 模式:资源管理器是 Master 节点,调度策略相对单一,只支持先进先出模式。
Hadoop Yarn 模式:资源管理器是 Yarn 集群,主要用来管理资源。Yarn 支持动态资源的管理,还可以调度其他实现了 Yarn 调度接口的集群计算,非常适用于多个集群同时部署的场景,是目前最流行的一种资源管理系统。
Apache Mesos:Mesos 是专门用于分布式系统资源管理的开源系统,与 Yarn 一样是 C++ 开发,可以对集群中的资源做弹性管理。
说说Worker和Excutor的异同
Worker 是指每个及节点上启动的一个进程,负责管理本节点,jps 可以看到 Worker 进程在运行。Excutor 每个Spark 程序在每个节点上启动的一个进程,专属于一个 Spark 程序,与 Spark 程序有相同的生命周期,负责 Spark 在节点上启动的 Task,管理内存和磁盘。如果一个节点上有多个 Spark 程序,那么相应就会启动多个Excutor。
说说Spark提供的两种共享变量
Spark 程序的大部分操作都是 RDD 操作,通过传入函数给 RDD 操作函数来计算,这些函数在不同的节点上并发执行,内部的变量有不同的作用域,不能相互访问,有些情况下不太方便。
广播变量,是一个只读对象,在所有节点上都有一份缓存,创建方法是 SparkContext.broadcast()。创建之后再更新它的值是没有意义的,一般用 val 来修改定义。
计数器,只能增加,可以用计数或求和,支持自定义类型。创建方法是 SparkContext.accumulator(V, name)。只有 Driver 程序可以读这个计算器的变量,RDD 操作中读取计数器变量是无意义的。
以上两种类型都是 Spark 的共享变量。
说说检查点的意义
在容错机制中,如果一个节点死机了,而且运算窄依赖,则只要把丢失的父RDD分区重算即可,不依赖于其他节点。
而宽依赖需要父RDD的所有分区都存在,重算就很昂贵了。可以这样理解开销的经济与否:在窄依赖中,在子RDD的分区丢失、重算父RDD分区时,父RDD相应分区的所有数据都是子RDD分区的数据,并不存在冗余计算。
在宽依赖情况下,丢失一个子RDD分区重算的每个父RDD的每个分区的所有数据并不是都给丢失的子RDD分区用的,会有一部分数据相当于对应的是未丢失的子RDD分区中需要的数据,这样就会产生冗余计算开销,这也是宽依赖开销更大的原因。因此如果使用Checkpoint算子来做检查点,不仅要考虑Lineage是否足够长,也要考虑是否有宽依赖,对宽依赖加Checkpoint是最物有所值的。
说说Spark的高可用和容错
Spark 应用程序的高可用性主要包含两个部分:集群环境的高可用以及应用程序的容错特性;集群环境的高可用,主要由集群框架来控制,比如 spark on yarn 模式下的 ResourceManager 的 HA、Spark Standalone 模式下的 Master HA 等特性的设置保障集群的高可用性;至于应用程序的容错需要考虑应用的各个组成部分的容错。
spark 应用程序执行过程中,一般存在以下失败的情况:
- Driver 集成宕机:Driver 运行机器宕机、Driver 程序运行过程中异常导致宕机
- Executor 进程宕机:Executor 所在的work 宕机,Exector 和 Driver 通信超时
- Task 执行失败:task 执行过程发生异常导致失败
Driver 进程宕机解决方案:
- 监控机器机器是否存活,如果机器宕机,重启服务机器和 spark 集群
- 通过 spark job 的 history 服务监控应用是否执行成功,如果执行失败,通过开发人员重启服务即可
- SparkStreaming 中,重启spark应用后,可通过 checkpoint 进行job数据恢复
Executor 宕机解决方案:选择一个work 节点重启Executor 进程,Driver 重新分配任务
Task 执行失败解决方案:
Spark 会自动进行 task 重试机制,如果某个 task 失败重试次数超过3次(spark.task.maxFailures)后,当前job 执行失败;local 模式默认不启用 task 重试机制。
Task 数据恢复/重新运行的机制实际上是 RDD 容错机制,即 Lineage 机制,RDD的 Lineage 机制记录的是粗粒度的特定数据的 Transformation 操作行为。当这个 RDD 的部分数据丢失时,它可以通过 lineage 获取足够的信息来重新运算和恢复丢失的数据分区;该机制体现在RDD上就是RDD依赖特性。
如果 rdd 的 lineage 的生命线特别长,此时某些 task 执行失败的恢复成本就会比较高,那么可以采用检查点或缓存的方式将数据冗余下来,当检查点/缓存点之后的rdd的task出现异常的时候,可以直接从检查点重新构建lineage,可以减少执行开销。
解释一下Spark Master的选举过程
Master作为Spark standalone模式的核心,如果Master出现异常,那么集群就不能正常工作。所以Spark会从Standby中选择一个节点作为Master.
Spark支持以下几种策略,这种策略可以通过配置文件spark-env.sh配置spark.deploy.recoveryMode
ZOOKEEPER: 集群元数据持久化到zookeeper,当master出现异常的时候,zookeeper会通过选举机制选举出新的Master,新的Master接管集群时需要从zookeeper获取持久化信息,并根据这些信息恢复集群状态。
FILESYSTEM: 集群的元数据持久化到文件系统,当Master出现异常的时候,只要在该机器上重启Master,启动后的Master获取持久化信息并根据持久化信息恢复集群状态。
CUSTOM: 自定义恢复模式,实现StandaloneRecoveryModeFactory抽象类进行实现,并把该类配置到配置文件,当Master出现异常,会根据用户自定义的方式进行恢复集群状况。
NONE: 不持久化集群元数据,当Master出现异常时,新启动的Master不进行恢复集群状态。
说说Spark如何实现序列化组件的
Spark通过两种方式来创建序列化器
Java序列化
在默认情况下,Spark 采用 Java的 ObjectOutputStream 序列化一个对象。该方式适用于所有实现了 java.io.Serializable 的类。通过继承 java.io.Externalizable,你能进一步控制序列化的性能。Java序列化非常灵活,但是速度较慢,在某些情况下序列化的结果也比较大。
Kryo序列化
Spark 也能使用 Kryo(版本2)序列化对象。Kryo 不但速度极快,而且产生的结果更为紧凑(通常能提高10倍)。Kryo 的缺点是不支持所有类型,为了更好的性能,你需要提前注册程序中所使用的类(class)。
Java 的序列化比较简单,就和前面的一样,下面主要介绍Kryo序列化的使用。
Kryo序列化怎么用?
可以在创建 SparkContext 之前,通过调用 System.setProperty(“spark.serializer”, “spark.KryoSerializer”),将序列化方式切换成Kryo。
但是 Kryo 需要用户进行注册,这也是为什么 Kryo 不能成为 Spark 序列化默认方式的唯一原因,但是建议对于任何“网络密集型”(network-intensive)的应用,都采用这种方式进行序列化方式。
Kryo文档描述了很多便于注册的高级选项,例如添加用户自定义的序列化代码。
如果对象非常大,你还需要增加属性 spark.kryoserializer.buffer.mb 的值。该属性的默认值是32,但是该属性需要足够大以便能够容纳需要序列化的最大对象。
最后,如果你不注册你的类,Kryo仍然可以工作,但是需要为了每一个对象保存其对应的全类名(full class name),这是非常浪费的。
说说对Master的理解
Master 是local-cluster 部署模式和 Standalone 部署模式中,整个 Spark 集群最为重要的组件之一,分担了对整个集群资源的管理和分配的工作。
local-cluster 下,Master 作为 JVM 进程的对象启动,而在 Standalone 模式下,就是单独的进程启动。
说说什么是窗口间隔和滑动间隔
对于窗口操作,在其窗口内部会有 N 个批处理数据,批处理数据的个数由窗口间隔决定,其为窗口持续的时间,在窗口操作中只有窗口间隔满足了才会触发批数据的处理(指一开始的阶段)。
滑动间隔是指经过多长时间窗口滑动一次形成新的窗口,滑动传功库默认情况下和批次间隔的相同,而窗口间隔一般设置得要比它们都大。
说说Spark的WAL(预写日志)机制?
也叫 WriteAheadLogs,通常被用于数据库和文件系统中,保证数据操作的持久性。预写日志通常是先将操作写入到一个持久可靠的日志文件中,然后才对数据施加该操作,当加入施加该操作中出现异常,可以通过读取日志文件并重新施加该操作,从而恢复系统。
当 WAL 开启后,所有收到的数据同时保存到了容错文件系统的日志文件中,当 Spark Streaming 失败,这些接受到的数据也不会丢失。另外,接收数据的正确性只在数据被预写到日志以后接收器才会确认。已经缓存但还没有保存的数据可以在 Driver 重新启动之后由数据源再发送一次(经常问)。
这两个机制保证了数据的零丢失,即所有的数据要么从日志中恢复,要么由数据源重发。
Spark Streaming小文件问题
使用 Spark Streaming 时,如果实时计算结果要写入到 HDFS,那么不可避免的会遇到一个问题,那就是在默认情况下会产生非常多的小文件,这是由 Spark Streaming 的微批处理模式和 DStream(RDD) 的分布式(partition)特性导致的,Spark Streaming 为每个 Partition 启动一个独立的线程(一个 task/partition 一个线程)来处理数据,一旦文件输出到 HDFS,那么这个文件流就关闭了,再来一个 batch 的 parttition 任务,就再使用一个新的文件流,那么假设,一个 batch 为10s,每个输出的 DStream 有32个 partition,那么一个小时产生的文件数将会达到(3600/10)*32=11520个之多。
众多小文件带来的结果是有大量的文件元信息,比如文件的 location、文件大小、block number 等需要 NameNode 来维护,NameNode 会因此鸭梨山大。不管是什么格式的文件,parquet、text、JSON 或者 Avro,都会遇到这种小文件问题,这里讨论几种处理 Spark Streaming 小文件的典型方法。
- 增加 batch 大小:这种方法很容易理解,batch 越大,从外部接收的 event 就越多,内存积累的数据也就越多,那么输出的文件数也就会变少,比如上边的时间从10s增加为100s,那么一个小时的文件数量就会减少到1152个。但别高兴太早,实时业务能等那么久吗,本来人家10s看到结果更新一次,现在要等快两分钟,是人都会骂娘。所以这种方法适用的场景是消息实时到达,但不想挤压在一起处理,因为挤压在一起处理的话,批处理任务在干等,这时就可以采用这种方法。
- Coalesce:文章开头讲了,小文件的基数是 batch_number * partition_number,而第一种方法是减少 batch_number,那么这种方法就是减少 partition_number 了,这个 api 不细说,就是减少初始的分区个数。看过 spark 源码的童鞋都知道,对于窄依赖,一个子 RDD 的 partition 规则继承父 RDD,对于宽依赖(就是那些个xxxByKey操作),如果没有特殊指定分区个数,也继承自父 rdd。那么初始的 SourceDstream 是几个 partition,最终的输出就是几个 partition。所以 Coalesce 大法的好处就是,可以在最终要输出的时候,来减少一把 partition 个数。但是这个方法的缺点也很明显,本来是32个线程在写256M数据,现在可能变成了4个线程在写256M数据,而没有写完成这256M数据,这个 batch 是不算结束的。那么一个 batch 的处理时延必定增长,batch 挤压会逐渐增大。
- Spark Streaming 外部来处理:我们既然把数据输出到 hdfs,那么说明肯定是要用 Hive 或者 Spark Sql 这样的“sql on hadoop”系统类进一步进行数据分析,而这些表一般都是按照半小时或者一小时、一天,这样来分区的(注意不要和 Spark Streaming 的分区混淆,这里的分区,是用来做分区裁剪优化的),那么我们可以考虑在 Spark Streaming 外再启动定时的批处理任务来合并 Spark Streaming 产生的小文件。这种方法不是很直接,但是却比较有用,“性价比”较高,唯一要注意的是,批处理的合并任务在时间切割上要把握好,搞不好就可能会去合并一个还在写入的 Spark Streaming 小文件。
- 自己调用 foreach 去 append:Spark Streaming 提供的 foreach 这个api (一种 Action 操作),可以让我们自定义输出计算结果的方法。那么我们其实也可以利用这个特性,那就是每个 batch 在要写文件时,并不是去生成一个新的文件流,而是把之前的文件打开。考虑这种方法的可行性,首先,HDFS 上的文件不支持修改,但是很多都支持追加,那么每个 batch 的每个 partition 就对应一个输出文件,每次都去追加这个 partition 对应的输出文件,这样也可以实现减少文件数量的目的。这种方法要注意的就是不能无限制的追加,当判断一个文件已经达到某一个阈值时,就要产生一个新的文件进行追加了。所以大概就是一直32个文件。
Spark的UDF?
因为目前 Spark SQL 本身支持的函数有限,一些常用的函数都没有,比如 len, concat…etc 但是使用 UDF 来自己实现根据业务需要的功能是非常方便的。Spark SQL UDF 其实是一个 Scala 函数,被 catalyst 封装成一个 Expression 结点,最后通过 eval 方法计根据当前 Row 计算 UDF 的结果。UDF 对表中的单行进行转换,以便为每行生成单个对应的输出值。例如,大多数 SQL 环境提供 UPPER 函数返回作为输入提供的字符串的大写版本。
用户自定义函数可以在 Spark SQL 中定义和注册为 UDF,并且可以关联别名,这个别名可以在后面的 SQL 查询中使用。作为一个简单的示例,我们将定义一个 UDF 来将以下 JSON 数据中的温度从摄氏度(degrees Celsius)转换为华氏度(degrees Fahrenheit)。
Mesos下粗粒度和细粒度对比?
粗粒度运行模式:Spark 应用程序在注册到 Mesos 时会分配对应系统资源,在执行过程中由 SparkContext 和 Executor 直接交互,该模式优点是由于资源长期持有减少了资源调度的时间开销,缺点是该模式下 Mesos 无法感知资源使用的变化,容易造成系统资源的闲置,无法被 Mesos 其他框架使用,造成资源浪费。
细粒度的运行模式:Spark 应用程序是以单个任务的粒度发送到 Mesos 中执行,在执行过程中 SparkContext 并不能和 Executor 直接交互,而是由 Mesos Master 进行统一的调度管理,这样能够根据整个 Mesos 集群资源使用的情况动态调整。该模式的优点是系统资源能够得到充分利用,缺点是该模式中每个人物都需要从 Mesos 获取资源,调度延迟较大,对于 Mesos Master 开销较大。
Spark Local和Standalone有什么区别
Spark一共有5种运行模式:Local,Standalone,Yarn-Cluster,Yarn-Client 和 Mesos。
Local:Local 模式即单机模式,如果在命令语句中不加任何配置,则默认是 Local 模式,在本地运行。这也是部署、设置最简单的一种模式,所有的 Spark 进程都运行在一台机器或一个虚拟机上面。
Standalone:Standalone是 Spark 自身实现的资源调度框架。如果我们只使用 Spark 进行大数据计算,不使用其他的计算框架(如MapReduce或者Storm)时,就采用 Standalone 模式就够了,尤其是单用户的情况下。Standalone 模式是 Spark 实现的资源调度框架,其主要的节点有 Client 节点、Master 节点和 Worker 节点。其中 Driver 既可以运行在 Master 节点上中,也可以运行在本地 Client 端。当用 spark-shell 交互式工具提交 Spark 的 Job 时,Driver 在 Master 节点上运行;当使用 spark-submit 工具提交 Job 或者在 Eclipse、IDEA 等开发平台上使用 new SparkConf.setManager(“spark://master:7077”) 方式运行 Spark 任务时,Driver 是运行在本地 Client 端上的。
Standalone 模式的部署比较繁琐,需要把 Spark 的部署包安装到每一台节点机器上,并且部署的目录也必须相同,而且需要 Master 节点和其他节点实现 SSH 无密码登录。启动时,需要先启动 Spark 的 Master 和 Slave 节点。提交命令类似于:
1 | ./bin/spark-submit \ |
说说SparkContext和SparkSession有什么区别?
Application: 用户编写的 Spark 应用程序,Driver 即运行上述 Application 的 main() 函数并且创建 SparkContext。Application 也叫应用。
SparkContext: 整个应用的上下文,控制应用的生命周期。
RDD: 不可变的数据集合,可由 SparkContext 创建,是 Spark 的基本计算单元。
SparkSession: 可以由上节图中看出,Application、SparkSession、SparkContext、RDD之间具有包含关系,并且前三者是1对1的关系。SparkSession 是 Spark 2.0 版本引入的新入口,在这之前,创建一个 Application 对应的上下文是这样的:
1 | //set up the spark configuration and create contexts |
现在 SparkConf、SparkContext 和 SQLContext 都已经被封装在 SparkSession 当中,并且可以通过 builder 的方式创建:
1 | // Create a SparkSession. No need to create SparkContext |
如果Spark Streaming停掉了,如何保证Kafka的重新运作是合理的呢
首先要说一下 Spark 的快速故障恢复机制,在节点出现故障的情况下,传统流处理系统会在其他节点上重启失败的连续算子,并可能重新运行先前数据流处理操作获取部分丢失数据。在此过程中只有该节点重新处理失败的过程。只有在新节点完成故障前所有计算后,整个系统才能够处理其他任务。
在 Spark 中,计算将会分成许多小的任务,保证能在任何节点运行后能够正确合并,因此,就算某个节点出现故障,这个节点的任务将均匀地分散到集群中的节点进行计算,相对于传递故障恢复机制能够更快地恢复。
列举Spark中 Transformation 和 Action算子
Transformantion:Map, Filter, FlatMap, Sample, GroupByKey, ReduceByKey, Union, Join, Cogroup, MapValues, Sort, PartionBy
Action:Collect, Reduce, Lookup, Save (主要记住,结果不是 RDD 的就是 Action)
Spark经常说的Repartition是个什么玩意
简单的说:返回一个恰好有numPartitions个分区的RDD,可以增加或者减少此RDD的并行度。内部,这将使用shuffle重新分布数据,如果你减少分区数,考虑使用coalesce,这样可以避免执行shuffle。
目的:
- 避免小文件
- 减少 Task 个数
- 但是会增加每个 Task 处理的数据量
Spark Streaming Duration的概念
Spark Streaming 是微批处理。
1 | SparkConf sparkConf = new SparkConf().setAppName("SparkStreaming").setMaster("local[*]"); |
Durations.seconds(1000)设置的是 sparkstreaming 批处理的时间间隔,每隔 Batch Duration 时间去提交一次 job,如果 job 的处理时间超过 Batch Duration,会使得 job 无法按时提交,随着时间推移,越来越多的作业被拖延,最后导致整个 Streaming作业被阻塞,无法做到实时处理数据。
简单写一个WordCount程序
1 | sc.textFile("/Users/runzhliu/workspace/spark-2.2.1-bin-hadoop2.7/README.md") |
说说Yarn-cluster的运行阶段
在 Yarn-cluset 模式下,当用户向 Yarn 提交一个应用程序后,Yarn 将两个阶段运行该应用程序:
- 第一阶段是把 Spark 的 Driver 作为一个 Application Master 在 Yarn 集群中先启动。
- 第二阶段是由 Application Master 创建应用程序,然后为它向 Resource Manager 申请资源,并启动 Executor 来运行任务集,同时监控它的整个过程,直到运行介绍结束。
说说Standalone模式下运行Spark程序的大概流程
Standalone 模式分别由客户端、Master 节点和 Worker 节点组成。在 Spark Shell 提交计算代码的时候,所在机器作为客户端启动应用程序,然后向 Master 注册应用程序,由 Master 通知 Worker 节点启动 Executor,Executor 启动之后向客户端的 Driver 注册,最后由 Driver 发送执行任务给 Executor 并监控任务执行情况。该程序代码中,在触发计算行数动作之前,需要设置缓存代码,这样在执行计算行数行为的时候进行缓存数据,缓存后再运行计算行数。
如何区分 Appliction(应用程序)还有 Driver(驱动程序)
Application 是指用户编写的 Spark 应用程序,包含驱动程序 Driver 和分布在集群中多个节点上运行的 Executor 代码,在执行过程之中由一个或多个做作业组成。
Driver 是 Spark 中的 Driver 即运行上述 Application 的 main 函数并且创建 SparkContext,其中创建 SparkContext 的目的是为了准备 Spark 应用程序的运行环境。在 Spark 中由 sc 负责与 ClusterManager 通信,进行资源的申请,任务的分配和监控等。当 Executor 部分运行完毕后,Driver 负责把 sc 关闭,通常 Driver 会拿 SparkContext 来代表。
介绍一下 Spark 通信的启动方式
Spark 启动过程主要是 Master 与 Worker 之间的通信,首先由 Worker 节点向 Master 发送注册消息,然后 Master 处理完毕后,返回注册成功消息或失败消息,如果成功注册,那么 Worker 就会定时发送心跳消息给 Master。
介绍一下 Spark 运行时候的消息通信
用户提交应用程序时,应用程序的 SparkContext 会向 Master 发送应用注册消息,并由 Master 给该应用分配 Executor,Excecutor 启动之后,Executor 会向 SparkContext 发送注册成功消息。
当 SparkContext 的 RDD 触发行动操作之后,将创建 RDD 的 DAG。通过 DAGScheduler 进行划分 Stage 并把 Stage 转化为 TaskSet,接着 TaskScheduler 向注册的 Executor 发送执行消息,Executor 接收到任务消息后启动并运行。最后当所有任务运行时候,由 Driver 处理结果并回收资源。
解释一下Stage
每个作业会因为 RDD 之间的依赖关系拆分成多组任务集合,称为调度阶段,也叫做任务集。调度阶段的划分由 DAGScheduler 划分,调度阶段有 Shuffle Map Stage 和 Result Stage 两种。
描述一下Worker异常的情况
Spark 独立运行模式 Standalone 采用的是 Master/Slave 的结构,其中 Slave 是由 Worker 来担任的,在运行的时候会发送心跳给 Master,让 Master 知道 Worker 的实时状态;
另一方面,Master 也会检测注册的 Worker 是否超时,因为在集群运行的过程中,可能由于机器宕机或者进程被杀死等原因造成 Worker 异常退出。
描述一下Master异常的情况
Master 出现异常的时候,会有几种情况,而在独立运行模式 Standalone 中,Spark 支持几种策略,来让 Standby Master 来接管集群。主要配置的地方在于 spark-env.sh 文件中。配置项是 spark.deploy.recoveryMode 进行设置,默认是 None。
- ZOOKEEPER:集群元数据持久化到 Zookeeper 中,当 Master 出现异常,ZK 通过选举机制选举新的 Master,新的 Master 接管的时候只要从 ZK 获取持久化信息并根据这些信息恢复集群状态。StandBy 的 Master 随时候命的。
- FILESYSTEM:集群元数据持久化到本地文件系统中,当 Master 出现异常的时候,只要在该机器上重新启动 Master,启动后新的 Master 获取持久化信息并根据这些信息恢复集群的状态。
- CUSTOM:自定义恢复方式,对 StandaloneRecoveryModeFactory 抽象类进行实现并把该类配置到系统中,当 Master 出现异常的时候,会根据用户自定义的方式进行恢复集群状态。
- NONE:不持久化集群的元数据,当出现异常的是,新启动 Master 不进行信息恢复集群状态,而是直接接管集群。
Spark的存储体系
简单来讲,Spark存储体系是各个Driver与Executor实例中的BlockManager所组成的;
但是从一个整体来看,把各个节点的BlockManager看成存储体系的一部分,那存储体系就有了更多衍生的内容,比如块传输服务、map任务输出跟踪器、Shuffle管理器等。
简述Spark Streaming
具有高吞吐量和容错能力强的特点,输入源有很多,如 Kafka, Flume, Twitter 等待。
关于流式计算的做法,如果按照传统工具的做法把数据存储到数据库中再进行计算,这样是无法做到实时的,而完全把数据放到内存中计算,万一宕机、断电了,数据也就丢失了。
因此 Spark 流式计算引入了检查点 CheckPoint 和日志,以便能够从中恢复计算结果。而本质上 Spark Streaming 是接收实时输入数据流并把他们按批次划分,然后交给 Spark 计算引擎处理生成按照批次划分的结果流。
知道 Hadoop MRv1 的局限吗
可扩展性查,在运行的时候,JobTracker 既负责资源管理,又负责任务调度,当集群繁忙的时候,JobTracker 很容易成为瓶颈,最终导致可扩展性的问题。
可用性差,采用单节点的 Master 没有备用 Master 以及选举操作,这导致一旦 Master 出现故障,整个集群将不可用。
资源利用率低,TaskTracker 使用 slot 等量划分本节点上的资源量,slot 代表计算资源将各个 TaskTracker 上的空闲 slot 分配给 Task 使用,一些 Task 并不能充分利用 slot,而其他 Task 无法使用这些空闲的资源。有时会因为作业刚刚启动等原因导致 MapTask 很多,而 Reduce Task 任务还没调度的情况,这时 Reduce slot 也会被闲置。
不能支持多种 MapReduce 框架,无法通过可插拔方式将自身的 MapReduce 框架替换为其他实现,例如 Spark,Storm。
说说Spark的特点,相对于MR来说
- 减少磁盘 I/O,MR 会把 map 端将中间输出和结果存储在磁盘中,reduce 端又需要从磁盘读写中间结果,势必造成磁盘 I/O 称为瓶颈。Spark 允许将 map 端的中间结果输出和结果存储在内存中,reduce 端在拉取中间结果的时候避免了大量的磁盘 I/O。
- 增加并行度,由于把中间结果写到磁盘与从磁盘读取中间结果属于不同的环节,Hadoop 将他们简单地通过串行执行衔接起来,Spark 则把不同的环节抽象成为 Stage,允许多个 Stage 既可以串行又可以并行执行。
- 避免重新计算,当 Stage 中某个分区的 Task 执行失败后,会重新对此 Stage 调度,但在重新调度的时候会过滤已经执行成功的分区任务,所以不会造成重复计算和资源浪费。
- 可选的 Shuffle 排序,MR 在 Shuffle 之前有着固定的排序操作,而 Spark 则可以根据不同场景选择在 map 端排序还是 reduce 排序。
- 灵活的内存管理策略,Spark 将内存分为堆上的存储内存、堆外的存储内存,堆上的执行内存,堆外的执行内存4个部分。
说说Spark Narrow Dependency的分类
OneToOneDependency
RangeDependency
Task的分类
Task 指具体的执行任务,一个 Job 在每个 Stage 内都会按照 RDD 的 Partition 数量,创建多个 Task,Task 分为 ShuffleMapTask 和 ResultTask 两种。ShuffleMapStage 中的 Task 为 ShuffleMapTask,而 ResultStage 中的 Task 为 ResultTask。ShuffleMapTask 和 ResultTask 类似于 Hadoop 中的 Map 任务和 Reduce 任务。
Spark的编程模型
1.创建应用程序 SparkContext
2.创建RDD,有两种方式,
方式一:输入算子,即读取外部存储创建RDD,Spark与Hadoop完全兼容,所以对Hadoop所支持的文件类型或者数据库类型,Spark同样支持。
方式二:从集合创建RDD
3.Transformation算子,这种变换并不触发提交作业,完成作业中间过程处理。也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。
4.Action 算子,这类算子会触发 SparkContext 提交 Job 作业。并将数据输出 Spark系统。
5.保存结果
6.关闭应用程序
Spark的计算模型
用户程序对 RDD 通过多个函数进行操作,将 RDD 进行转换。
Block-Manager 管理 RDD 的物理分区,每个 Block 就是节点上对应的一个数据块,可以存储在内存或者磁盘。
而 RDD 中的 partition 是一个逻辑数据块,对应相应的物理块 Block。
本质上一个 RDD 在代码中相当于是数据的一个元数据结构,存储着数据分区及其逻辑结构映射关系,存储着 RDD 之前的依赖转换关系。
总述Spark的架构
从集群部署的角度来看,Spark 集群由集群管理器 Cluster Manager、工作节点 Worker、执行器 Executor、驱动器 Driver、应用程序 Application 等部分组成。
- Cluster Manager:主要负责对集群资源的分配和管理,Cluster Manager在YARN 部署模式下为 RM,在 Mesos 下为 Mesos Master,Standalone 模式下为 Master。CM 分配的资源属于一级分配,它将各个 Worker 上的内存、CPU 等资源分配给 Application,但是不负责对 Executor 的资源分类。Standalone 模式下的 Master 会直接给 Application 分配内存、CPU 及 Executor 等资源。
- Worker:Spark 的工作节点。在 YARN 部署模式下实际由 NodeManager 替代。Worker 节点主要负责,把自己的内存、CPU 等资源通过注册机制告知 CM,创建 Executor,把资源和任务进一步分配给 Executor,同步资源信息,Executor 状态信息给 CM 等等。Standalone 部署模式下,Master 将 Worker 上的内存、CPU 以及 Executor 等资源分配给 Application 后,将命令 Worker 启动 CoarseGrainedExecutorBackend 进程(此进程会创建 Executor 实例)。
- Executor:执行计算任务的一线组件,主要负责任务的执行及与 Worker Driver 信息同步。
- Driver:Application 的驱动程序,Application 通过 Driver 与 CM、Executor 进行通信。Driver 可以运行在 Application 中,也可以由 Application 提交给 CM 并由 CM 安排 Worker 运行。
- Application:用户使用 Spark 提供的 API 编写的应用程序,Application 通过 Spark API 将进行 RDD 的转换和 DAG 的创建,并通过 Driver 将 Application 注册到 CM,CM 将会根据 Application 的资源需求,通过一级资源分配将 Excutor、内存、CPU 等资源分配给 Application。Drvier 通过二级资源分配将 Executor 等资源分配给每一个任务,Application 最后通过 Driver 告诉 Executor 运行任务。
一句话说说 Spark Streaming 是如何收集和处理数据的
在 Spark Streaming 中,数据采集是逐条进行的,而数据处理是按批 mini batch进行的,因此 Spark Streaming 会先设置好批处理间隔 batch duration,当超过批处理间隔就会把采集到的数据汇总起来成为一批数据交给系统去处理。
解释一下窗口间隔window duration和滑动间隔slide duration
红色的矩形就是一个窗口,窗口 hold 的是一段时间内的数据流。
这里面每一个 time 都是时间单元,在官方的例子中,每隔 window size 是3 time unit, 而且每隔2个单位时间,窗口会 slide 一次。
所以基于窗口的操作,需要指定2个参数:
window length - The duration of the window (3 in the figure)
slide interval - The interval at which the window-based operation is performed (2 in the figure).
窗口大小,个人感觉是一段时间内数据的容器。滑动间隔,就是我们可以理解的 cron 表达式吧。
窗口间隔一般大于(批处理间隔、滑动间隔)。这都是理解窗口操作的关键。
介绍一下Spark Streaming的foreachRDD(func)方法
将函数应用于 DStream 的 RDD 上,这个操作会输出数据到外部系统,比如保存 RDD 到文件或者网络数据库等。需要注意的是 func 函数是运行该 Streaming 应用的 Driver 进程里执行的。
简单描述一下Spark Streaming的容错原理
Spark Streaming 的一个特点就是高容错。
首先 Spark RDD 就有容错机制,每一个 RDD 都是不可变的分布式可重算的数据集,其记录这确定性的操作血统,所以只要输入数据是可容错的,那么任意一个 RDD 的分区出错或不可用,都是可以利用原始输入数据通过转换操作而重新计算出来的。
预写日志通常被用于数据库和文件系统中,保证数据操作的持久性。预写日志通常是先将操作写入到一个持久可靠的日志文件中,然后才对数据施加该操作,当加入施加操作中出现了异常,可以通过读取日志文件并重新施加该操作。
另外接收数据的正确性只在数据被预写到日志以后接收器才会确认,已经缓存但还没保存的数据可以在 Driver 重新启动之后由数据源再发送一次,这两个机制确保了零数据丢失,所有数据或者从日志中恢复,或者由数据源重发。
DStream 有几种转换操作
Transform Operation、Window Operations、Join Operations
聊聊Spark Streaming的运行架构
Spark Streaming相对其他流处理系统最大的优势在于流处理引擎和数据处理在同一软件栈,其中Spark Streaming功能主要包括流处理引擎的流数据接收与存储以及批处理作业的生成与管理,而Spark Core负责处理Spark Streaming发送过来的作业。
Spark Streaming分为Driver端和Client端,运行在Driver端为StreamingContext实例,该实例包括DStreamGraph和JobScheduler(包括ReceiverTracker和JobGenerator)等,而Client包括ReceiverSupervisor和Receiver等。
Spark Streaming进行流数据处理大致可以分为:启动流数据引擎、接收及存储流数据、处理流数据和输出处理结果等4个步骤。
初始化StreamingContext对象,在该对象启动过程中实例化DStreamGraph和JobScheduler
DStreamGraph用于存放DStream以及DStream之间的依赖关系等信息。
JobScheduler中包括ReceiverTracker和JobGenerator。ReceiverTracker为Driver端流数据接收器(Receiver)的管理者;
JobGenerator为批处理作业生成器。
在ReceiverTracker启动过程中,根据流数据接收器分发策略通知对应的Executor中的流数据接收管理器
(ReciverSupervisor)启动,再由ReciverSupervisor启动流数据接收器。
当流数据接收器Receiver启动后,持续不断地接收实时流数据,根据传过来数据的大小进行判断,如果数据量很小,
则攒多条数据成一块,然后再进行块存储;如果数据量大,则直接进行块存储。
对于这些数据Receiver直接交到ReciverSupervisor,由其进行数据转储操作。块存储根据设置是否预写日志分为两种:
- 一种是使用非预写日志BlockManagerBasedBlockHandler方法直接写到Worker的内存或磁盘中
- 另一种是进行预写日志WriteAheadLogBasedBlockHandler方法,即在预写日志同时把数据写入到Worker的内存或磁盘中
数据存储完毕后,ReciverSupervisor会把数据存储的云信息上报给ReceiverTracker,ReceiverTracker再把这些信息转发给ReceivedBlockTracker,由它负责管理收到数据块的元信息。
在StreamingContext的JobGenerator中维护一个定时器,该定时器在批处理时间到来时会进行生成作业的操作。在该操作中会进行如下操作:
a.通知ReceiverTracker将接收到的数据进行提交,在提交时采用synchronized关键字进行处理,保证每条数据被划入一个且只被划入一个批次中。
b.要求DStreamGraph根据DStream依赖关系生成作业序列Seq[Job]
c.从第一步中ReceiverTracker获取本批次数据的元数据。
d.把批处理时间time、作业序列Seq[Job]和本批次数据的元数据包装为JobSet,调用JobScheduler.submitJobSet(JobSet)提交给JobScheduler,JobScheduler将把这些作业发送给Spark核心进行处理,由于该执行为异步,因此本步执行速度将非常快。
e.只要提交结束(不管作业是否被执行),SparkStreaming对整个系统做一个检查点(checkpoint)
Spark核心的作业对数据进行处理,处理完毕后输出到外部系统,如数据库或文件系统,输出的数据可以被外部系统所使用。
由于实时流数据的数据源源不断地流入,Spark会周而复始地进行数据处理,响应也会持续不断地输出结果。
说说DStreamGraph
Spark Streaming 中作业生成与 Spark 核心类似,对 DStream 进行的各种操作让它们之间的操作会被记录到名为DStream 使用输出操作时,这些依赖关系以及它们之间的操作会被记录到名为 DStreamGraph 的对象中表示一个作业。这些作业注册到 DStreamGraph 并不会立即运行,而是等到 Spark Streaming 启动之后,达到批处理时间,才根据 DG 生成作业处理该批处理时间内接收的数据。
创建RDD的方式以及如何继承创建RDD
Spark 可以从 Hadoop 支持的任何存储源创建分布式数据集,包括本地文件系统、HDFS、Cassandra、HBase、Amazon S3等。Spark 支持文本文件、SequenceFile 和任何其他 Hadoop InputFormat。可以使用SparkContext的textFile方法创建文本文件RDDs。
1 | val distFile = sc.textFile("data.txt") |
分析一下Spark Streaming的transform()和updateStateByKey()两个操作
transform(func) 操作:允许 DStream 任意的 RDD-to-RDD 函数。
updateStateByKey 操作:可以保持任意状态,同时进行信息更新,先定义状态,后定义状态更新函数。
说说Spark Streaming的输出操作
print()、
saveAsTextFiles(prefix, [suffix])、
saveAsObjectFiles(prefix, [suffix])、
saveAsHadoopFiles(prefix, [suffix])、
foreachRDD(func)
谈谈Spark Streaming Driver端重启会发生什么
恢复计算:使用检查点信息重启 Driver 端,重构上下文并重启接收器
恢复元数据块:为了保证能够继续下去所必备的全部元数据块都被恢复
未完成作业的重新形成:由于失败而没有处理完成的批处理,将使用恢复的元数据再次产生 RDD 和对应的作业
读取保存在日志中的块数据:在这些作业执行的时候,块数据直接从预写日志中读出,这将恢复在日志中可靠地保存所有必要的数据
重发尚未确认的数据:失败时没有保存到日志中的缓存数据将由数据源再次发送
再谈Spark Streaming的容错性
实时流处理系统需要长时间接收并处理数据,这个过程中出现异常是难以避免的,需要流程系统具备高容错性。Spark Streaming 一开始就考虑了两个方面。
- 利用 Spark 自身的容错设计、存储级别和 RDD 抽象设计能够处理集群中任何 Worker 节点的故障
- Spark 运行多种运行模式,其 Driver 端可能运行在 Master 节点或者集群中的任意节点,这样让 Driver 端具备容错能力是很大的挑战,但是由于其接收的数据是按照批进行存储和处理,这些批次数据的元数据可以通过执行检查点的方式定期写入到可靠的存储中,在 Driver 端重新启动中恢复这些状态
当接收到的数据缓存在 Executor 内存中的丢失风险要怎么处理呢?
如果是独立运行模式/Yarn/Mesos 模式,当 Driver 端失败的时候,该 Driver 端所管理的 Executor 以及内存中数据将终止,即时 Driver 端重新启动这些缓存的数据也不能被恢复。为了避免这种数据损失,就需要预写日志功能了。
当 Spark Streaming 应用开始的时候,也就是 Driver 开始的时候,接收器成为长驻运行任务,这些接收器接收并保存流数据到 Spark 内存以供处理。
- 接收器将数据分成一系列小块,存储到 Executor 内存或磁盘中,如果启动预写日志,数据同时还写入到容错文件系统的预写日志文件。
- 通知 StreamingContext,接收块中的元数据被发送到 Driver 的 StreamingContext,这个元数据包括两种,一是定位其 Executor 内存或磁盘中数据位置的块编号,二是块数据在日志中的偏移信息(如果启用 WAL 的话)。
流数据如何存储
作为流数据接收器调用 Receiver.store 方式进行数据存储,该方法有多个重载方法,如果数据量很小,则攒多条数据成数据块再进行块存储,如果数据量大,则直接进行块存储。
StreamingContext启动时序图吗
- 初始化 StreamingContext 中的 DStreamGraph 和 JobScheduler,进而启动 JobScheduler 的 ReceiveTracker 和 JobGenerator。
- 初始化阶段会进行成员变量的初始化,重要的包括 DStreamGraph(包含 DStream 之间相互依赖的有向无环图),JobScheduler(定时查看 DStreamGraph,然后根据流入的数据生成运行作业),StreamingTab(在 Spark Streaming 运行的时候对流数据处理的监控)。
- 然后就是创建 InputDStream,接着就是对 InputDStream 进行 flatMap, map, reduceByKey, print 等操作,类似于RDD 的转换操作。
- 启动 JobScheduler,实例化并启动 ReceiveTracker 和 JobGenerator。
- 启动 JobGenerator
- 启动 ReceiverTracker