大数据私房菜-21/4/24

字节笔试

哪些排序是稳定的?(多选)

冒泡排序,归并排序

不稳定:堆排序,快速排序

http和https?(多选)

tls

zookeeper(多选)

高速缓存 命名服务 主节点选举 分布式锁

进程和线程(多选)

线程有独立的地址空间 错

线程有独立的堆和栈 错 独立栈,共享堆

999个节点的二叉树的高(单选)

10

大数据私房菜

大数据求top n map reduce程序

链接:https://blog.csdn.net/qq_43193797/article/details/86367610

连续三天登陆的用户

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
select
t2.user_id as user_id,
count(1) as times,
min(t2.login_date) as start_date,
max(t2.login_date) as end_date
from
(
select
t1.user_id,
t1.login_date,
date_sub(t1.login_date, rn) as date_diff
from
(
select
user_id,
login_date,
row_number() over(partition by user_id order by login_date asc) as rn
from
wedw_dw.t_login_info
) t1
) t2
group by t2.user_id,t2.date_diff
having times >= 3

广告日志,求广告的曝光数和点击数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.lx.spark

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

/**
* @author lx
* @date 2021/4/26 3:32 下午
*/
class AddLog {

def main(args: Array[String]): Unit = {

val spark: SparkSession = SparkSession
.builder()
.master("local[2]")
.appName("AddLog")
.getOrCreate()

import spark.implicits._;
val sc: SparkContext = spark.sparkContext

val clickRDD: RDD[String] = sc.textFile("data/click.log")
val impRDD: RDD[String] = sc.textFile("data/imp.log")

val clickRes: RDD[(String, Int)] = clickRDD.map(line => {
val arr: Array[String] = line.split("\\s+")
val adid: String = arr(3).substring(arr(3).lastIndexOf("=") + 1)
(adid, 1)
}).reduceByKey(_ + _)

val impRes: RDD[(String, Int)] = impRDD.map(line => {
val arr: Array[String] = line.split("\\s+")
val adid: String = arr(3).substring(arr(3).lastIndexOf("=") + 1)
(adid, 1)
}).reduceByKey(_ + _)

// 保存至hdfs
clickRes.fullOuterJoin(impRes)
.map(x => x._1 + ","+x._2._1.getOrElse(0)+","+ x._2._2.getOrElse(0))
.repartition(1)
.saveAsTextFile("data/add_log")
sc.stop()
}
}

spark应用程序的执行命令

1
2
3
4
5
6
7
8
9
10
spark-submit \
--class com.***.Application \
--master yarn \
--deploy-mode client \
--driver-memory 1g \
--executor-memory 2g \
--queue root.wedw \
--num-executors 200 \
--jars
/home/lx/***/config-1.3.0.jar,/home/lx/***/elasticsearch-hadoop-hive.jar,/home/lx/sen.jar

创建rdd的方式

1.并行化集合创建 RDD

1
2
3
val arr = Array(1,2,3,4,5,6)
val rdd = sc.parallelize(arr)
val sum = rdd.reduce(_+_)

2.使用外界的数据源创建RDD,比如说本地文件系统,分布式文件系统HDFS等等。

1
2
val rdd = sc.textFile("d://data.txt")
val wordCount = rdd.map(line => line.length).reduce(_+_)

3.通过将已有RDD使用transform算子操作产生新的RDD。

1
2
3
val wordsRDD = lineWordRDD.flatMap(line => line.split(" "))
// 打印RDD的内容
wordsRDD.foreach(word => println(word))

创建dataset的方式

创建Datasets 的三种方式

  • 由DataFrame 转化成为 Dataset
  • 通过 SparkSession.createDataset() 直接创建
  • 通过toDS 方法意识转换

如何优雅地kill掉yarn上的application

1
yarn application -kill <applicationId>

spark on yarn关键配置

1
2
3
4
5
6
7
8
9
10
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
  --master yarn \
  --deploy-mode cluster \
  --driver-memory 4g \
  --executor-memory 2g \
  --executor-cores 1 \
  --queue thequeue \
  examples/jars/spark-examples*.jar \
  10

num-executors: 该参数用于设置Spark作业总共要用多少个Executor进程来执行

executor-memory: 是每个节点上占用的内存。

driver-memory: 实际分配的内存

executor-cores: 该参数用于设置每个Executor进程的CPU core数量,这个参数决定了每个Executor进程并行执行task线程的能力。

partition分区的个数

根据 RDD 的创建方式分为两种情况:

​ 1、从内存中创建 RDD:sc.parallelize(…),那么默认的分区数量为该程序所分配的资源的 CPU 数量。
​ 2、从 HDFS 文件创建:sc.textFile(…) 或 spark.sql(…),每个 HDFS 文件以块(Block)的形式存储,Spark 读取时会根据具体数据格式对应的 InputFormat 进行解析,例如文本格式就用 TextInputFormat 进行解析,一般是将若干个 Block 合并为一个输入分片(InputSplit),而这个 InputSplit 数就是默认的分区数。

scala中的函数和方法的区别

链接:https://www.runoob.com/w3cnote/scala-different-function-method.html

1.方法不能作为单独的表达式而存在(参数为空的方法除外),而函数可以。

2.函数必须要有参数列表,而方法可以没有参数列表

3.方法名是方法调用,而函数名只是代表函数对象本身

4.在需要函数的地方,如果传递一个方法,会自动进行ETA展开(把方法转换为函数)

5.传名参数本质上是个方法

如何监控spark streaming

  1. 界面监控
  2. 程序监控
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def main(args: Array[String]): Unit = {

// uncomment these 2 lines if you want to see only your logs
// Logger.getLogger("org").setLevel(Level.OFF)
// Logger.getLogger("akka").setLevel(Level.OFF)

ApplicationProperties.parse(args.toList)

val sparkConf = new SparkConf().setMaster(ApplicationProperties.sparkMaster).setAppName(ApplicationProperties.appName)

val ssc = new StreamingContext(sparkConf, Seconds(ApplicationProperties.batchInterval))

//添加监控
ssc.addStreamingListener(new SparkMonitoringListener)

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream(ApplicationProperties.socketStreamHost, ApplicationProperties.socketStreamPort)

// Split each line into words
val words = lines.flatMap(_.split(" "))

// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
}

Listener

1
2
3
4
5
6
7
8
9
10
11
12
13
case class SparkMonitoringListener() extends StreamingListener {
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
println(">>> Batch started...records in batch = " + batchStarted.batchInfo.numRecords)
}

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
val start = batchCompleted.batchInfo.processingStartTime.get
val end = batchCompleted.batchInfo.processingEndTime.get
val batchTime = batchCompleted.batchInfo.batchTime
val numRecords = batchCompleted.batchInfo.numRecords
println("batch finished", start, end, end-start, batchTime.toString(), numRecords)
}
}
  1. API监控

SparkStreaming restAPI监控从spark2.2.0版本开始支持,目前不支持2.1.0

scala多继承

通过实现trait

flink和spark streaming实时处理的区别

SparkStreaming是基于微批处理的,所以他采用DirectDstream的方式根据计算出的每个partition要取数据的Offset范围,拉取一批数据形成Rdd进行批量处理,而且该Rdd和kafka的分区是一一对应的;

Flink是真正的流处理,他是基于事件触发机制进行处理,在KafkaConsumer拉取一批数据以后,Flink将其经过处理之后变成,逐个Record发送的事件触发式的流处理;

  1. 数据模型

    • spark 采用rdd,spark streaming的Stream实际上也是一组rdd的集合
    • flink的基本数据模型是数据流,以及事件的序列
  2. 运行架构

    • spark是批计算,将DAG划分为不同的stage,一个完成后才可以计算下一个
    • Flink是标准的流执行模式,一个事件在一个节点处理完成后可以直接发往下一个节点进行处理。

spark context作用

SparkContext是开发Spark应用的入口,它负责和整个集群的交互,包括创建RDD等。
从本质上来说,SparkContext是Spark的对外接口,负责向调用这提供Spark的各种功能。

spark streaming读取kafka,direct+receiver两种模式,为什么选择direct(直连模式)

  1. receiver模式

    采用该模式,需要一个task一直处于接受数据的状态。spark streaming相当于kafka的消费者,接受来的数据备份到其他节点完成之后,会向zookeeper更新消费者offset。

    当更新完消费者偏移量后,如果driver挂掉,driver下的executor也会挂掉,就会有数据丢失的问题。

    如何解决?

    开启WAL机制,预写日志机制。将数据备份一份到HDFS上,完成之后更新zookeeper的offset。

    效率慢,延迟大。

    可以创建多个Kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。

  2. direct模式

    direct模式将kafka看成存储数据的一方,spark streaming主动取数据,无需一个task一直占用接收数据。

    生成的Stream中的RDD的并行度与读取的kafka的topic的partition个数一致。提高了计算时的并行度。

    spark来管理offset,默认在内存中,可以设置checkpoint。

spark core和spark sql

Spark SQL在Spark Core的基础上针对结构化数据处理进行很多优化和改进:

Spark SQL 支持很多种结构化数据源,可以让你跳过复杂的读取过程,轻松从各种数据源中读取数据。

当你使用SQL查询这些数据源中的数据并且只用到了一部分字段时,SparkSQL可以智能地只扫描这些用到的字段,而不是像SparkContext.hadoopFile中那样简单粗暴地扫描全部数据。

一千万条短信,有重复,以文本文件的形式保存,一行一条数据,请用五分钟时间,找出重复出现最多的前10条。

方法一:哈希表法 hashtable 或者 concurrentHashmap

可以用哈希表的方法对1千万条分成若干组进行边扫描边建散列表。第一次扫描,取首字节,尾字节,中间随便两字节作为Hash Code,插入到hash table中。并记录其地址和信息长度和重复次数,1千万条信息,记录这几个信息还放得下。同Hash Code且等长就疑似相同,比较一下。相同记录只加1次进hash table,但将重复次数加1。一次扫描以后,已经记录各自的重复次数,进行第二次hash table的处理。用线性时间选择可在O(n)的级别上完成前10条的寻找。分组后每份中的top10必须保证各不相同,可hash来保证,也可直接按hash值的大小来分类。

spark调优的8个方面

链接:https://zhuanlan.zhihu.com/p/54293797

yarn-client和yarn-cluster

一般yarn-client用于测试环境调试程序;yarn-cluster用于生产环境。

yarn-client和yarn-cluster的区别就在于,Driver是运行在本地客户端,它的AM只是作为一个Executor启动器,并没有Driver进程。

spark1.x和spark2.x区别

Spark 2.x新特性
1)Spark Core/SQL

在内存和CPU使用方面进一步优化Spark引擎性能,支持SQL 2003标准,支持子查询,对常用的SQL操作和DataFrame,性能有2-10倍的提升。

2)Spark Session

Spark2.0 中引入了 SparkSession 的概念,它为用户提供了一个统一的切入点来使用 Spark 的各项功能,统一了旧的SQLContext与HiveContext。用户不但可以使用 DataFrame 和Dataset 的各种 API,学习 Spark2 的难度也会大大降低。

3)统一 DataFrames 和 Datasets 的 API

它们都是提供给用户使用,包括各类操作接口的 API,1.3 版本引入 DataFrame,1.6版本引入Dataset,在 spark 2.0 中,把 dataframes 当作是一种特殊的 datasets,dataframes = datasets[row],把两者统一为datasets。

4) strutured Streaming

Spark Streaming基于Spark SQL(DataFrame / Dataset )构建了high-level API,使得Spark Streaming充分受益Spark SQL的易用性和性能提升。

spark streaming 7*24小时一直运行

  1. Driver高可用

    Spark Streaming HA将Driver元数据写到checkpoint目录下

1
2
val ssc = new StreamingContext(conf, Seconds(2))
ssc.checkpoint(checkpointDirectory)
  1. RDD高可用

    为了防止Executor异常退出导致数据丢失,Spark Streaming提供了WAL(预写日志)机制。

    Receiver只要接收到数据,会立即将数据写入一份到高可用文件系统(一般是HDFS)上的checkpoint目录中

1
2
val conf = new SparkConf()
conf.set("spark.streaming.receiver.writeAheadLog.enable ","true");

spark streaming是exactly-once 吗?

基于direct stream的方法采用Kafka的简单消费者API,它的流程大大简化了。executor不再从Kafka中连续读取消息,也消除了receiver和WAL。还有一个改进就是Kafka分区与RDD分区是一一对应的,更可控。
driver进程只需要每次从Kafka获得批次消息的offset range,然后executor进程根据offset range去读取该批次对应的消息即可。由于offset在Kafka中能唯一确定一条消息,且在外部只能被Streaming程序本身感知到,因此消除了不一致性,达到了exactly once。
不过,由于它采用了简单消费者API,我们就需要自己来管理offset。否则一旦程序崩溃,整个流只能从earliest或者latest点恢复,这肯定是不稳妥的。offset管理在之前的文章中提到过,这里不再赘述。

Kafka作为输入源可以保证exactly once,那么处理逻辑呢?

答案是显然的,Spark Streaming的处理逻辑天生具备exactly once语义。
Spark RDD之所以被称为“弹性分布式数据集”,是因为它具有不可变、可分区、可并行计算、容错的特征。一个RDD只能由稳定的数据集生成,或者从其他RDD转换(transform)得来。如果在执行RDD lineage的过程中失败,那么只要源数据不发生变化,无论重新执行多少次lineage,都一定会得到同样的、确定的结果。

最后,我们还需要保证输出过程也符合exactly once语义。Spark Streaming的输出一般是靠foreachRDD()算子来实现,它默认是at least once的。如果输出过程中途出错,那么就会重复执行直到写入成功。为了让它符合exactly once,可以施加两种限制之一:幂等性写入(idempotent write)、事务性写入(transactional write)。