面试题随笔-21/3/24

面试题随笔-21-3-24

spark rdd介绍一下

RDD:Spark的核心概念是RDD (resilient distributed dataset),指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用。

  • 为什么会产生RDD?
    • 传统的mapreduce虽然具有自动容错、平衡负载和可拓展性的优点,但是其最大的缺点是采用非循环式的数据流模型,使得在迭代计算时要进行大量的磁盘IO操作。RDD正是解决这一缺点的抽象方法。
    • RDD的具体描述RDD(弹性数据集)是Spark提供的最重要的抽象的概念,它是一种有容错机制的特殊集合,可以分布在集群的节点上,以函数式编操作集合的方式,进行各种并行操作。可以将RDD理解为一个具有容错机制的特殊集合,它提供了一种只读、只能有已存在的RDD变换而来的共享内存,然后将所有数据都加载到内存中,方便进行多次重用。a.他是分布式的,可以分布在多台机器上,进行计算。
      • 他是弹性的,计算过程中内存不够时它会和磁盘进行数据交换。
      • 这些限制可以极大的降低自动容错开销
      • 实质是一种更为通用的迭代并行计算框架,用户可以显示的控制计算的中间结果,然后将其自由运用于之后的计算。
    • RDD的容错机制实现分布式数据集容错方法有两种:数据检查点和记录更新。
    • RDD采用记录更新的方式:记录所有更新点的成本很高。所以,RDD只支持粗颗粒变换,即只记录单个块上执行的单个操作,然后创建某个RDD的变换序列(血统)存储下来;变换序列指,每个RDD都包含了他是如何由其他RDD变换过来的以及如何重建某一块数据的信息。因此RDD的容错机制又称“血统”容错。
    • 要实现这种“血统”容错机制,最大的难题就是如何表达父RDD和子RDD之间的依赖关系。实际上依赖关系可以分两种,窄依赖和宽依赖:窄依赖:子RDD中的每个数据块只依赖于父RDD中对应的有限个固定的数据块;宽依赖:子RDD中的一个数据块可以依赖于父RDD中的所有数据块。例如:map变换,子RDD中的数据块只依赖于父RDD中对应的一个数据块;groupByKey变换,子RDD中的数据块会依赖于多有父RDD中的数据块,因为一个key可能错在于父RDD的任何一个数据块中.
    • 将依赖关系分类的两个特性:第一,窄依赖可以在某个计算节点上直接通过计算父RDD的某块数据计算得到子RDD对应的某块数据;宽依赖则要等到父RDD所有数据都计算完成之后,并且父RDD的计算结果进行hash并传到对应节点上之后才能计算子RDD。第二,数据丢失时,对于窄依赖只需要重新计算丢失的那一块数据来恢复;对于宽依赖则要将祖先RDD中的所有数据块全部重新计算来恢复。所以在长“血统”链特别是有宽依赖的时候,需要在适当的时机设置数据检查点。也是这两个特性要求对于不同依赖关系要采取不同的任务调度机制和容错恢复机制。
  • RDD在Spark中的地位及作用
    • 为什么会有Spark?因为传统的并行计算模型无法有效的解决迭代计算(iterative)和交互式计算(interactive);而Spark的使命便是解决这两个问题,这也是他存在的价值和理由。
    • Spark如何解决迭代计算?其主要实现思想就是RDD,把所有计算的数据保存在分布式的内存中。迭代计算通常情况下都是对同一个数据集做反复的迭代计算,数据在内存中将大大提升IO操作。这也是Spark涉及的核心:内存计算。
    • Spark如何实现交互式计算?因为Spark是用scala语言实现的,Spark和scala能够紧密的集成,所以Spark可以完美的运用scala的解释器,使得其中的scala可以向操作本地集合对象一样轻松操作分布式数据集。
    • Spark和RDD的关系?可以理解为:RDD是一种具有容错性基于内存的集群计算抽象方法,Spark则是这个抽象方法的实现。
  • 如何操作RDD?
    • 如何获取RDD
      a.从共享的文件系统获取,(如:HDFS);
      b.通过已存在的RDD转换;
      c.将已存在scala集合(只要是Seq对象)并行化,通过调用SparkContext的parallelize方法实现;
      d.改变现有RDD的持久性;RDD是懒散,短暂的。(RDD的固化:cache缓存至内存 save保存到分布式文件系统)
    • 操作RDD的两个动作
      a.Actions:对数据集计算后返回一个数值value给驱动程序;例如:Reduce将数据集的所有元素用某个函数聚合后,将最终结果返回给程序。
      b.Transformation:根据数据集创建一个新的数据集,计算后返回一个新RDD;例如:Map将数据的每个元素经过某个函数计算后,返回一个姓的分布式数据集。

数组求topk

  • 小根堆
  • 快排 (需要内存足够)

二叉树后序遍历

先序 ==>(左右交换) 逆后序 ==>(再逆序) 后序

MapReduce介绍一下

MapReduce是一种计算模型,该模型可以将大型数据处理任务分解成很多单个的、可以在服务器集群中并行执行的任务,而这些任务的计算结果可以合并在一起来计算最终的结果。简而言之,Hadoop Mapreduce是一个易于编程并且能在大型集群(上千节点)快速地并行得处理大量数据的软件框架,以可靠,容错的方式部署在商用机器上。

MapReduce这个术语来自两个基本的数据转换操作:map过程和reduce过程。

一、map

map操作会将集合中的元素从一种形式转化成另一种形式,在这种情况下,输入的键值对会被转换成零到多个键值对输出。其中输入和输出的键必须完全不同,而输入和输出的值则可能完全不同。

二、reduce

某个键的所有键值对都会被分发到同一个reduce操作中。确切的说,这个键和这个键所对应的所有值都会被传递给同一个Reducer。reduce过程的目的是将值的集合转换成一个值(例如求和或者求平均),或者转换成另一个集合。这个Reducer最终会产生一个键值对。需要说明的是,如果job不需要reduce过程的话,那么reduce过程也是可以不用的。

mapreduce中间有个combine是干嘛的,有什么好处,有什么使用限制吗?

combiner是reduce的实现,在map端运行计算任务,减少map端的输出数据。

作用就是优化。

但是combiner的使用场景是mapreduce的map输出结果和reduce输入输出一样。

partition的默认实现是hashpartition,是map端将数据按照reduce个数取余,进行分区,不同的reduce来copy自己的数据。

partition的作用是将数据分到不同的reduce进行计算,加快计算效果。

编写MapReduce作业时,如何做到在Reduce阶段,先对key排序,再对value排序?

该问题通常称为“二次排序”,最常用的方法是将value放到key中,实现一个组合Key,然后自定义key排序规则(为key实现一个WritableComparable)。

如何使用MapReduce实现两表join,可考虑以下几种情况:(1)一个表大,一个表小(可放到内存中) (2)两个表均是大表

map join:
map side join 是针对一下场景进行的优化。两个待连接的表中,有一个表非常大,而另一个非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每一个map task内存中存在一份(比如放在hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否具有相同key的记录,入股有,则连接后输出即可。
场景:MapJoin 适用于有一份数据较小的连接情况。
做法:直接将较小的数据加载到内存中,按照连接的关键字建立索引,大份数据作为MapTask的输入数据对 map()方法的每次输入都去内存当中直接去匹配连接。然后把连接结果按 key 输出,这种方法要使用 hadoop中的 DistributedCache 把小份数据分布到各个计算节点,每个 maptask 执行任务的节点都需要加载该数据到内存,并且按连接关键字建立索引。

reduce side join是一种最简单的join方式,其主要思想如下:

在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0表示来自文件File1,tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件中的数据打标签。

在reduce阶段,reduce函数获取key相同的来自File1和File2文件的value list, 然后对于同一个key,对File1和File2中的数据进行join(笛卡尔乘积)。即:reduce阶段进行实际的连接操作。

SemiJoin,也叫半连接,是从分布式数据库中借鉴过来的方法。它的产生动机是:对于reduce side join,跨机器的数据传输量非常大,这成了join操作的一个瓶颈,如果能够在map端过滤掉不会参加join操作的数据,则可以大大节省网络IO。

实现方法很简单:选取一个小表,假设是File1,将其参与join的key抽取出来,保存到文件File3中,File3文件一般很小,可以放到内存中。在map阶段,使用DistributedCache将File3复制到各个TaskTracker上,然后将File2中不在File3中的key对应的记录过滤掉,剩下的reduce阶段的工作与reduce side join相同。

当面试的时候问到,MapReduce 架构设计、Yarn架构设计、Yarn的工作流程、MapReduce job 提交到 Yarn的工作流程 (面试题为同一题),其实都是同一个问题。

1.用户向yarn提交应用程序(job),其中包括application Master程序,启动application Master命令等

2.RM为该job分配了一个容器,并于对应的NM通信,要求它在这个容器中启动job的MR application Master程序
3.启动程序之后,application Master首先向ApplicationsManager注册,用户就可以直接在web界面上查看job的整个运行状态和日志
4.applicationMaster向Resource Scheduler采用轮询的方式通过RPC协议去申请和领取资源列表

5.一旦applicationMaster申请到资源后,便与对应的NM节点通信,要求启动任务

6.NM为任务task设置好运行环境(环境变量,jar),将任务的启动命令写在一个脚本文件中,并通过脚本文件【启动任务
7.各个task通过RPC协议向applicationMaster汇报自己的状态和进度,以让applicationMaster随时掌握各个任务的运行状态,从而可以在任务运行时重新启动任务,则web界面可以实时查看job当前运行状态。
8.job运行完成后,applicationMaster向RM注销自己并关闭自己

如何使用MapReduce实现全排序(即数据全局key有序)? 你给出的算法可能要求仅启动一个Reduce Task,那么如何对算法改进,可以同时启动多个Reduce Task提高排序效率。

  • 使用一个reduce实现全局排序
    我们知道,MapReduce默认情况下只保证同一个分区中的key是有序的,不能保证全局有序。如果我们将所有的数据都用一个reduce来处理,就可以实现全局有序。
    缺点:此方法的缺点也很明显,所有数据发送到一个reduce进行排序,不但不能充分利用集群的分布式资源,在数据量很大的情况下,很容易出现OOM的情况,效率也很低

  • 自定义分区函数实现全局排序

    MapReduce默认的分区函数为HashPartitioner,其具体的实现原理是计算map的输出key的hashCode值,然后用该值对reduce个数取余,不同的余数对应不同的reduce,余数相同的key将会被发送到同一个reduce中。

    如果我们能够实现一个分区函数,使得
    所以的key < 1000的数据都发送到reduce0;
    所有1000 < key < 10000的数据都发送到reduce1;
    ……
    其余的key都发送到reduceN;

    这样就实现了reduce0的数据一定全部小于reduce1的数据,reduce1的数据全部小于reduce2的数据,……,同时每个reduce中数据是局部有序的,这样就实现了全局有序

    缺点:该方法的难点在于我们必须手动找到各个reduce的分界点,且尽量是的分散到每个reduce的数据均衡。如果想增加或减少reduce个数,必须再次寻找分界点,非常不灵活。

  • 使用TotalOrderPartitioner实现全局排序

    MapReduce内部包含一个TotalOrderPartitoner的分区实现类,主要解决全排序问题。其原理和方法2类似,根据key的分界点将不同的key发送到不同的reduce中,区别在于方法2需要人工寻找分界点,而该方法是有程序计算得到。

    数据抽样

    为了寻找到合适的key分界点,我们需要对数据的整体分布有所了解;如果数据量很大,我们就不可能对所有数据进行分析然后选出N-1的分界点(假设reduce个数为N),最适合的方式就是通过数据抽样,对抽样的数据进行分析从而选出合适的分割点。MapReduce提供了三种抽样方法:

    SplitSampler:从n个split中选择前m条记录取样(不适合有序数据)
    RandomSampler:随机取样(通用采样器)
    IntervalSampler:从n个split中按照一定间隔取样(通常适用于有序数据)

    三种抽样方法都通过getSample(InputFormat inf, Job job)函数返回抽样得到的key数组,除了IntervalSampler方法返回的抽样key是有序的,其他都是无序的

    获取到抽样的key之后就可以对其进行排序,然后选择N-1个分界点,最后将这些key的分界点存储到指定的HDFS文件中。

mapreduce优化

1)数据输入小文件处理:

(1)合并小文件:对小文件进行归档(har)、自定义 inputformat 将小文件存储成

sequenceFile 文件。

(2)采用 ConbinFileInputFormat 来作为输入,解决输入端大量小文件场景。

(3)对于大量小文件 Job,可以开启 JVM 重用。

2)map 阶段

(1)增大环形缓冲区大小。由 100m 扩大到 200m

(2)增大环形缓冲区溢写的比例。由 80%扩大到 90%

(3)减少对溢写文件的 merge 次数。

(4)不影响实际业务的前提下,采用 combiner 提前合并,减少 I/O。

3)reduce 阶段

(1)合理设置 map 和 reduce 数:两个都不能设置太少,也不能设置太多。太少,会

导致 task 等待,延长处理时间;太多,会导致 map、reduce 任务间竞争资源,造成处理

超时等错误。

(2)设置 map、reduce 共存:调整 slowstart.completedmaps 参数,使 map 运行到一

定程度后,reduce 也开始运行,减少 reduce 的等待时间。

(3)规避使用 reduce,因为 Reduce 在用于连接数据集的时候将会产生大量的网络消耗。

(4)增加每个 reduce 去 map 中拿数据的并行数

(5)集群性能可以的前提下,增大 reduce 端存储数据内存的大小。

4)IO 传输

(1)采用数据压缩的方式,减少网络 IO 的的时间。安装 Snappy 和 LZOP 压缩编码器。

(2)使用 SequenceFile 二进制文件

5)整体

(1)MapTask 默认内存大小为 1G,可以增加 MapTask 内存大小为 4-5g

(2)ReduceTask 默认内存大小为 1G,可以增加 ReduceTask 内存大小为 4-5g

(3)可以增加 MapTask 的 cpu 核数,增加 ReduceTask 的 cpu 核数

(4)增加每个 container 的 cpu 核数和内存大小

(5)调整每个 Map Task 和 Reduce Task 最大重试次数

Hadoop MapReduce中资源管理模型是怎样的,有什么缺点,如何改进?

MapReduce将资源划分成map slot和reduce slot,其中map slot供map task使用,reduce slot供reduce slot使用,这种模型存在以下几个问题:

(1)slot之间不能共享,导致资源利用率低(比如map slot空闲而reduce slot紧缺时,Reduce Task不能使用空闲的map slot)。

(2)slot数目静态配置,不能动态修改

(3)slot这种划分资源方式粒度过大,导致集群资源利用情况不好精细化控制,比如一个map task可能无法充分利用一个map slot对应的资源。

hadoop2.0已经采用基于真实资源需求量的资源管理方案

解释什么是Capacity Scheduler?

答:Capacity Scheduler是Hadoop自带的一个多租户多队列作业调度器,它按照队列组织作业和用户,以达到多用户共享集群的目的。

Hadoop 2.0中YARN是什么?(最近部分明年毕业的同学找工作过程遇到了关于YARN的笔试题,比如今天的EMC笔试题)

答:YARN是Yet Another Resource Negotiator的简称,是Hadoop 2.0新引入的资源管理系统,负责集群中资源的统一管理和调度,它允许用户在其上运行各类计算框架,而不仅仅限于MapReduce一种。

YARN和MapReduce有什么关系?

答:MapReduce是一种两阶段计算框架,它由资源管理和任务调度两部分组成,其中资源管理部分是通用的模块,不仅MapReduce需要,其他计算框架,比如Spark等也需要这种模块,于是Hadoop 2.0将资源管理部分独立出来,创建了YARN这一分支。YARN出现后,多个计算框架运行在一个集群中共享资源成为可能。YARN可看做一个“云操作系统”,而MapReduce可看做运行在该操作系统上的“应用程序”。

hdfs小文件过多会怎么样

  • 先说对小文件的定义,一般来说小于等于30M的文件,都叫小文件。在HDFS中,通常NN维护一个文件的名称,目录结构等大约是250字节。现实中,HDFS的小文件如果不做任何操作增长会很快,现在假设NN节点的内存为4G,差不多42亿字节,现在在HDFS上有一亿个小文件,那么需要250乘一亿大约是250亿字节,这样会将NN撑爆。小文件到达一定数目,就会将NN节点撑爆。就算NN能够存储,对于hive,spark计算时,小文件意味着需要更多的task和资源,同样也会将节点弄挂掉。

  • 解决方案

    生产上首先需要设置小文件的阈值,到达这个值对小文件进行合并。对于这个合并,一种是在HDFS存储之前就进行合并,还有一种就是计算完之后根据业务周期来进行合并。后一种需要在计算时额外对小文件进行调整。

spark中job,stage,task之间的关系

  • 什么是job

    Job简单讲就是提交给spark的任务。

  • 什么是stage

    Stage是每一个job处理过程要分为的几个阶段。

  • 什么是task

    Task是每一个job处理过程要分几为几次任务。Task是任务运行的最小单位。最终是要以task为单位运行在executor中。

  • Job和stage和task之间有什么关系

    Job <—> 一个或多个stage <—> 一个或多个task

  • 一个stage的task的数量是有谁来决定的?

    是由输入文件的切片个数来决定的。在HDFS中不大于128m的文件算一个切片(默认128m)。通过算子修改了某一个rdd的分区数量,task数量也会同步修改。

  • 一个job任务的task数量是由谁来决定的?

    一个job任务可以有一个或多个stage,一个stage又可以有一个或多个task。所以一个job的task数量是 (多个stage的task数量)的总和。

  • 每一个stage中的task最大的并行度?

    并行度:是指指令并行执行的最大条数。在指令流水中,同时执行多条指令称为指令并行。

    理论上:每一个stage下有多少的分区,就有多少的task,task的数量就是我们任务的最大的并行度。(一般情况下,我们一个task运行的时候,使用一个cores)

    实际上:最大的并行度,取决于我们的application任务运行时使用的executor拥有的cores的数量。

  • 如果我们的task数量超过这个cores的总数怎么办?

    –num-executors 2 –executor-memory 512m –executor-cores 2

    当前stage有200个task,先执行cores个数量的task,然后等待cpu资源空闲后,继续执行剩下的task。

  • spark执行时读条中的内容讲解

    Stage60:当前的stage编号

    (105+2)/200:200:当前stage的task的数量;105:已完成的task数量;4:等待执行的task数量。