字节笔试
哪些排序是稳定的?(多选)
冒泡排序,归并排序
不稳定:堆排序,快速排序
http和https?(多选)
tls
zookeeper(多选)
高速缓存 命名服务 主节点选举 分布式锁
进程和线程(多选)
线程有独立的地址空间 错
线程有独立的堆和栈 错 独立栈,共享堆
999个节点的二叉树的高(单选)
10
大数据私房菜
大数据求top n map reduce程序
链接:https://blog.csdn.net/qq_43193797/article/details/86367610
连续三天登陆的用户
1 | select |
广告日志,求广告的曝光数和点击数
1 | package com.lx.spark |
spark应用程序的执行命令
1 | spark-submit \ |
创建rdd的方式
1.并行化集合创建 RDD
1 | val arr = Array(1,2,3,4,5,6) |
2.使用外界的数据源创建RDD,比如说本地文件系统,分布式文件系统HDFS等等。
1 | val rdd = sc.textFile("d://data.txt") |
3.通过将已有RDD使用transform算子操作产生新的RDD。
1 | val wordsRDD = lineWordRDD.flatMap(line => line.split(" ")) |
创建dataset的方式
创建Datasets 的三种方式
- 由DataFrame 转化成为 Dataset
- 通过 SparkSession.createDataset() 直接创建
- 通过toDS 方法意识转换
如何优雅地kill掉yarn上的application
1 | yarn application -kill <applicationId> |
spark on yarn关键配置
1 | ./bin/spark-submit \ |
num-executors: 该参数用于设置Spark作业总共要用多少个Executor进程来执行
executor-memory: 是每个节点上占用的内存。
driver-memory: 实际分配的内存
executor-cores: 该参数用于设置每个Executor进程的CPU core数量,这个参数决定了每个Executor进程并行执行task线程的能力。
partition分区的个数
根据 RDD 的创建方式分为两种情况:
1、从内存中创建 RDD:sc.parallelize(…),那么默认的分区数量为该程序所分配的资源的 CPU 数量。
2、从 HDFS 文件创建:sc.textFile(…) 或 spark.sql(…),每个 HDFS 文件以块(Block)的形式存储,Spark 读取时会根据具体数据格式对应的 InputFormat 进行解析,例如文本格式就用 TextInputFormat 进行解析,一般是将若干个 Block 合并为一个输入分片(InputSplit),而这个 InputSplit 数就是默认的分区数。
scala中的函数和方法的区别
链接:https://www.runoob.com/w3cnote/scala-different-function-method.html
1.方法不能作为单独的表达式而存在(参数为空的方法除外),而函数可以。
2.函数必须要有参数列表,而方法可以没有参数列表
3.方法名是方法调用,而函数名只是代表函数对象本身
4.在需要函数的地方,如果传递一个方法,会自动进行ETA展开(把方法转换为函数)
5.传名参数本质上是个方法
如何监控spark streaming
- 界面监控
- 程序监控
1 | def main(args: Array[String]): Unit = { |
Listener
1 | case class SparkMonitoringListener() extends StreamingListener { |
- API监控
SparkStreaming restAPI监控从spark2.2.0版本开始支持,目前不支持2.1.0
scala多继承
通过实现trait
flink和spark streaming实时处理的区别
SparkStreaming是基于微批处理的,所以他采用DirectDstream的方式根据计算出的每个partition要取数据的Offset范围,拉取一批数据形成Rdd进行批量处理,而且该Rdd和kafka的分区是一一对应的;
Flink是真正的流处理,他是基于事件触发机制进行处理,在KafkaConsumer拉取一批数据以后,Flink将其经过处理之后变成,逐个Record发送的事件触发式的流处理;
数据模型
- spark 采用rdd,spark streaming的Stream实际上也是一组rdd的集合
- flink的基本数据模型是数据流,以及事件的序列
运行架构
- spark是批计算,将DAG划分为不同的stage,一个完成后才可以计算下一个
- Flink是标准的流执行模式,一个事件在一个节点处理完成后可以直接发往下一个节点进行处理。
spark context作用
SparkContext是开发Spark应用的入口,它负责和整个集群的交互,包括创建RDD等。
从本质上来说,SparkContext是Spark的对外接口,负责向调用这提供Spark的各种功能。
spark streaming读取kafka,direct+receiver两种模式,为什么选择direct(直连模式)
receiver模式
采用该模式,需要一个task一直处于接受数据的状态。spark streaming相当于kafka的消费者,接受来的数据备份到其他节点完成之后,会向zookeeper更新消费者offset。
当更新完消费者偏移量后,如果driver挂掉,driver下的executor也会挂掉,就会有数据丢失的问题。
如何解决?
开启WAL机制,预写日志机制。将数据备份一份到HDFS上,完成之后更新zookeeper的offset。
效率慢,延迟大。
可以创建多个Kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。
direct模式
direct模式将kafka看成存储数据的一方,spark streaming主动取数据,无需一个task一直占用接收数据。
生成的Stream中的RDD的并行度与读取的kafka的topic的partition个数一致。提高了计算时的并行度。
spark来管理offset,默认在内存中,可以设置checkpoint。
spark core和spark sql
Spark SQL在Spark Core的基础上针对结构化数据处理进行很多优化和改进:
Spark SQL 支持很多种结构化数据源,可以让你跳过复杂的读取过程,轻松从各种数据源中读取数据。
当你使用SQL查询这些数据源中的数据并且只用到了一部分字段时,SparkSQL可以智能地只扫描这些用到的字段,而不是像SparkContext.hadoopFile中那样简单粗暴地扫描全部数据。
一千万条短信,有重复,以文本文件的形式保存,一行一条数据,请用五分钟时间,找出重复出现最多的前10条。
方法一:哈希表法 hashtable 或者 concurrentHashmap
可以用哈希表的方法对1千万条分成若干组进行边扫描边建散列表。第一次扫描,取首字节,尾字节,中间随便两字节作为Hash Code,插入到hash table中。并记录其地址和信息长度和重复次数,1千万条信息,记录这几个信息还放得下。同Hash Code且等长就疑似相同,比较一下。相同记录只加1次进hash table,但将重复次数加1。一次扫描以后,已经记录各自的重复次数,进行第二次hash table的处理。用线性时间选择可在O(n)的级别上完成前10条的寻找。分组后每份中的top10必须保证各不相同,可hash来保证,也可直接按hash值的大小来分类。
spark调优的8个方面
链接:https://zhuanlan.zhihu.com/p/54293797
yarn-client和yarn-cluster
一般yarn-client用于测试环境调试程序;yarn-cluster用于生产环境。
yarn-client和yarn-cluster的区别就在于,Driver是运行在本地客户端,它的AM只是作为一个Executor启动器,并没有Driver进程。
spark1.x和spark2.x区别
Spark 2.x新特性
1)Spark Core/SQL
在内存和CPU使用方面进一步优化Spark引擎性能,支持SQL 2003标准,支持子查询,对常用的SQL操作和DataFrame,性能有2-10倍的提升。
2)Spark Session
Spark2.0 中引入了 SparkSession 的概念,它为用户提供了一个统一的切入点来使用 Spark 的各项功能,统一了旧的SQLContext与HiveContext。用户不但可以使用 DataFrame 和Dataset 的各种 API,学习 Spark2 的难度也会大大降低。
3)统一 DataFrames 和 Datasets 的 API
它们都是提供给用户使用,包括各类操作接口的 API,1.3 版本引入 DataFrame,1.6版本引入Dataset,在 spark 2.0 中,把 dataframes 当作是一种特殊的 datasets,dataframes = datasets[row],把两者统一为datasets。
4) strutured Streaming
Spark Streaming基于Spark SQL(DataFrame / Dataset )构建了high-level API,使得Spark Streaming充分受益Spark SQL的易用性和性能提升。
spark streaming 7*24小时一直运行
Driver高可用
Spark Streaming HA将Driver元数据写到checkpoint目录下
1 | val ssc = new StreamingContext(conf, Seconds(2)) |
RDD高可用
为了防止Executor异常退出导致数据丢失,Spark Streaming提供了WAL(预写日志)机制。
Receiver只要接收到数据,会立即将数据写入一份到高可用文件系统(一般是HDFS)上的checkpoint目录中
1 | val conf = new SparkConf() |
spark streaming是exactly-once 吗?
基于direct stream的方法采用Kafka的简单消费者API,它的流程大大简化了。executor不再从Kafka中连续读取消息,也消除了receiver和WAL。还有一个改进就是Kafka分区与RDD分区是一一对应的,更可控。
driver进程只需要每次从Kafka获得批次消息的offset range,然后executor进程根据offset range去读取该批次对应的消息即可。由于offset在Kafka中能唯一确定一条消息,且在外部只能被Streaming程序本身感知到,因此消除了不一致性,达到了exactly once。
不过,由于它采用了简单消费者API,我们就需要自己来管理offset。否则一旦程序崩溃,整个流只能从earliest或者latest点恢复,这肯定是不稳妥的。offset管理在之前的文章中提到过,这里不再赘述。
Kafka作为输入源可以保证exactly once,那么处理逻辑呢?
答案是显然的,Spark Streaming的处理逻辑天生具备exactly once语义。
Spark RDD之所以被称为“弹性分布式数据集”,是因为它具有不可变、可分区、可并行计算、容错的特征。一个RDD只能由稳定的数据集生成,或者从其他RDD转换(transform)得来。如果在执行RDD lineage的过程中失败,那么只要源数据不发生变化,无论重新执行多少次lineage,都一定会得到同样的、确定的结果。
最后,我们还需要保证输出过程也符合exactly once语义。Spark Streaming的输出一般是靠foreachRDD()算子来实现,它默认是at least once的。如果输出过程中途出错,那么就会重复执行直到写入成功。为了让它符合exactly once,可以施加两种限制之一:幂等性写入(idempotent write)、事务性写入(transactional write)。