部分源码解析

1现在源码分析

MiniSpark

  1. thrift安装
1
./configure --prefix=/usr/local/ --with-boost=/usr/local --with-libevent=/usr/local --without-csharp --without-erlang --without-go --without-haskell --without-ruby --without-cpp --without-perl --without-php --without-php_extension --without-python
  1. 源码分析
1
2
3
4
5
6
总体流程
Transform算子 action算子
构建Rdd算子的DAG,遇到action算子,触发分发。
SparkContext[textFile -> flatMap等] -> Master[assignJob] -> WorkerService.Client[doJob] -> WorkerServiceHandler[doJob] -> 不同算子做不同的事即可。

SparkContext[textFile(根据HDFS文件的分片数作为分区数)] -> Scheduler[computeRdd] -> Master[assignJob] -> WorkerServiceHandler[WorkerOpType.ReadHdfsSplit -> FlatMap算子 -> MapJob -> MapPairJob -> HashSplit(shuffle的过程) -> reduceByKey(每个分区内进行运算) -> Collect(GetSplit) 获取所有分区数据汇总]

Spark源码

  1. 运行流程简析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
从WordCount看源码

/**
* @author lx
* @date 2020/5/28 3:49 下午
*/
object WordCount {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "hadoop")
val sc = new SparkContext(conf)
val dataRDD: RDD[String] = sc.textFile("/Users/lx/IdeaProjects/practice/interview/data/hello.txt")
dataRDD.flatMap(_.split(","))
.map(x => (x, 1))
.reduceByKey(_ + _)
.saveAsTextFile("/Users/lx/IdeaProjects/practice/interview/data/output")
sc.stop()
}
}


// 构建RDD跳过
RDD[saveAsTextFile] -> PairRDDFunctions[saveAsHadoopDataset] -> SparkHadoopWriter[write] ->
SparkContext[runJob] -> DAGScheduler[runJob -> submitJob] -> DAGSchedulerEventProcessLoop[post] 添加JobSubmitted消息

另一个线程处理队列eventQueue消息
DAGSchedulerEventProcessLoop[doOnReceive] -> DAGScheduler[handleJobSubmitted -> createResultStage -> submitStage -> submitMissingTasks] -> TaskSchedulerImpl[submitTasks] ->

LocalSchedulerBackend[reviveOffers] -> TaskSchedulerImpl[resourceOffers -> resourceOfferSingleTaskSet] -> TaskSetManager[resourceOffer] -> DAGScheduler[taskStarted -> handleBeginEvent] 发送BeginEvent事件 reviveOffers返回的是taskDecription作业描述

LocalSchedulerBackend[reviveOffers] -> Executor[launchTask] -> 线程池运行TaskRunner[run] -> Task[run] -> ResultTask[runTask] / ShuffleMapTask[runTask]

**详细流程
* 创建stage的流程
DAGScheduler[createResultStage -> getOrCreateParentStages -> getOrCreateShuffleMapStage -> createShuffleMapStage]
创建ResultStage时候,根据Shuffle依赖发现,有parent。尝试先创建ShuffleMapStage
最终创建一个ResultStage、一个ShuffleMapStage

DAGScheduler[submitMissingTasks] -> TaskSchedulerImpl[submitTasks] 提交一个taskSet集合

* TaskSchedulerImpl[resourceOffers] 详解

* TaskSetManager
  1. 分模块解析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
1. 存储模块
cache、persist、checkpoint
checkpoint会切断血缘,运行完启动一个RDD重算一遍、persist不会切断血缘
persisit源码分析
RDD[getOrCompute] -> BlockManager[getOrElseUpdate -> doPutIterator] -> MemoryStore[putIteratorAsValues -> putIterator] => 最终保存在MemoryStore的entries里面,超越内存,则溢出到磁盘 => 插入成功最终返回new InterruptibleIterator

checkpoint源码分析
RDD[checkpoint] 设置rdd为ReliableRDDCheckpointData
SparkContext[runJob] -> RDD[doCheckpoint] -> RDDCheckpointData[checkpoint] -> ReliableRDDCheckpointData[doCheckpoint] -> ReliableCheckpointRDD[writeRDDToCheckpointDirectory] -> SparkContext[runJob] 重算在这里 -> ReliableCheckpointRDD[writePartitionToCheckpointFile] -> 重新返回 ReliableCheckpointRDD 赋值给 RDDCheckpointData.cpRDD属性 == RDDCheckpointData.checkpointRDD

RDD 有 checkpointRDD
RDD获取分区的时候,首选就看有没有partitions属性,没有就调用getPartitions => 对应ReliableCheckpointRDD[getPartitions]
ReliableCheckpointRDD[compute -> readCheckpointFile] 读取checkpoint中的数据

2. 调度模块 -> 见运行流程简析
复杂算子了解
reduceByKey
data.combineByKey(
score => (1, score), // createCombiner函数 聚合初始值
(c: MVType, newScore) => (c._1 + 1, c._2 + newScore), // mergeValue函数 key相同的时候,触发
(c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2) // mergeCombiners函数
).map { case (key, value) => (key, value._2/ value._1) }.foreach(println)
aggregateByKey
val rs = listRDD.aggregateByKey(0)((a,b)=>{ // 初始值
println("第一个"+a+","+b)
math.max(a,b) // key相同聚合
},(x,y)=>{
println("第二个"+x+","+y)
x+y // 第一个分区处理完,取两个分区结果执行。
}).collect().foreach(println)
aggregate
val r1 = rdd1.aggregate((0, 0))( // 第一次出现的时候,初始值
(u, c) => (u._1 + 1, u._2 + c._2), // 第一个分区做
(r1, r2) => (r1._1 + r2._1, r1._2 + r2._2) // 第二个分区做
)

闭包清理器
找到能访问的属性,闭包父类设置为null.

分区器
HashPartitioner
取模
RangePartitioner

* 调度详细流程
见1

* ShuffleMapTask / ResultTask
见3
  1. shuffle相关
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
shuffle_shffleId_mapId_reduceId

SortShuffleWriter[write] -> ExternalSorter[insertAll] -> SizeTrackingAppendOnlyMap[changeValue]
数据长这样
(0,hello),1
(0,hello),1
(0,scala),1
ExternalSorter[insertAll -> maybeSpillCollection] 可能溢写到磁盘
ExternalSorter[insertAll -> writePartitionedFile] 把shuffle数据写入到shuffle文件中了。
-> IndexShuffleBlockResolver[writeIndexFileAndCommit] 写入index索引文件


BlockStoreShuffleReader[read] -> ShuffleBlockFetcherIterator[initialize -> fetchLocalBlocks] -> 初始化方法中获取fetchBlocks -> BlockData[getBlockData] -> 放入results属性中

recordIter -> metricIter -> aggregatedIter -> resultIter -> ExternalSorter[insertAll] -> CompletionIterator

remote shuffle service

传统Shuffle存在以下问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
- 本地盘依赖限制了存算分离。存算分离是近年来兴起的新型架构,它解耦了计算和存储,可以更灵活地做机型设计:计算节点强CPU弱磁盘,存储节点强磁盘强网络弱CPU。计算节点无状态,可根据负载弹性伸缩。存储端,随着对象存储(OSS, S3)+数据湖格式(Delta, Iceberg, Hudi)+本地/近地缓存等方案的成熟,可当作容量无限的存储服务。用户通过计算弹性+存储按量付费获得成本节约。然而,Shuffle对本地盘的依赖限制了存算分离。

- 写放大。当Mapper Output数据量超过内存时触发外排,从而引入额外磁盘IO。
- 大量随机读。Mapper Output属于某个Reducer的数据量很小,如Output 128M,Reducer并发2000,则每个Reducer只读64K,从而导致大量小粒度随机读。对于HDD,随机读性能极差;对于SSD,会快速消耗SSD寿命。
- 高网络连接数,导致线程池消耗过多CPU,带来性能和稳定性问题。
- Shuffle数据单副本,大规模集群场景坏盘/坏节点很普遍,Shuffle数据丢失引发的Stage重算带来性能和稳定性问题。

shuffle过程

shuffle_0_0_0(一个文件) => 000.data
shuffle_0_0_1

shuffle_0_1_0(一个文件) => 010.data
shuffle_0_1_1

Partition 0 -> shuffle_0_0_0 -> reduce 0 -> part0000
shuffle_0_1_0

Partition 1 -> shuffle_0_0_1 -> reduce 1 -> part0001
shuffle_0_1_1

Shuffle file:/private/var/folders/23/db9grlvx0p5fxpzh3d_s24140000gn/T/blockmgr-533bfa70-358d-429f-833e-d2ccc92cfef5/0c/shuffle_0_0_0.data
Index file:/private/var/folders/23/db9grlvx0p5fxpzh3d_s24140000gn/T/blockmgr-533bfa70-358d-429f-833e-d2ccc92cfef5/0c/shuffle_0_0_0.index

Shuffle file:/private/var/folders/23/db9grlvx0p5fxpzh3d_s24140000gn/T/blockmgr-533bfa70-358d-429f-833e-d2ccc92cfef5/0c/shuffle_0_1_0.data
Index file:/private/var/folders/23/db9grlvx0p5fxpzh3d_s24140000gn/T/blockmgr-533bfa70-358d-429f-833e-d2ccc92cfef5/0c/shuffle_0_1_0.index

remote shuffle serivce 主要思想 push shuffle + partition数据聚合

Spark SQL源码

查hive
1
2
3
4
5
6
7
8
9
10
11
12
13
spark.sql("select deptno,dname from default.dept_part where dt='2021-08-02'").show()

Dataset[showString -> getRows -> take -> head -> withAction -> collectFromPlan] ->
CollectLimitExec[executeCollect] -> SparkPlan[executeTake -> getByteArrayRdd -> execute -> doExecute -> executeQuery -> ]

ProjectExec[inputRDDs] -> InputAdapter[inputRDDs] -> HiveTableScanExec[doExecute] -> TableReader[makeRDDForPartitionedTable -> makeRDDForPartitionedTable -> createHadoopRdd] -> RDD[mapPartitionsWithIndexInternal]



物理计划
CollectLimit 21
+- *(1) Project [cast(deptno#0 as string) AS deptno#8, dname#1]
+- Scan hive default.dept_part [deptno#0, dname#1], HiveTableRelation `default`.`dept_part`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [deptno#0, dname#1, loc#2], [dt#3], [isnotnull(dt#3), (dt#3 = 2021-08-02)]
全阶段代码生成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public Object generate(Object[] references) {
return new GeneratedIteratorForCodegenStage1(references);
}

/*wsc_codegenStageId*/
final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
private Object[] references;
private scala.collection.Iterator[] inputs;
private scala.collection.Iterator inputadapter_input_0;
private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] project_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];

public GeneratedIteratorForCodegenStage1(Object[] references) {
this.references = references;
}

public void init(int index, scala.collection.Iterator[] inputs) {
partitionIndex = index;
this.inputs = inputs;
inputadapter_input_0 = inputs[0];
project_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 64);

}

protected void processNext() throws java.io.IOException {
// ScanExec
while (inputadapter_input_0.hasNext() && !stopEarly()) {
InternalRow inputadapter_row_0 = (InternalRow) inputadapter_input_0.next();
// ProjectExec
// 两个表达式
// cast(input[0, int, true] as string) AS deptno#8
boolean inputadapter_isNull_0 = inputadapter_row_0.isNullAt(0);
int inputadapter_value_0 = inputadapter_isNull_0 ?
-1 : (inputadapter_row_0.getInt(0));
boolean project_isNull_0 = inputadapter_isNull_0;
UTF8String project_value_0 = null;
if (!inputadapter_isNull_0) {
project_value_0 = UTF8String.fromString(String.valueOf(inputadapter_value_0));
}
// input[1, string, true]
boolean inputadapter_isNull_1 = inputadapter_row_0.isNullAt(1);
UTF8String inputadapter_value_1 = inputadapter_isNull_1 ?
null : (inputadapter_row_0.getUTF8String(1));
project_mutableStateArray_0[0].reset();

project_mutableStateArray_0[0].zeroOutNullBytes();

if (project_isNull_0) {
project_mutableStateArray_0[0].setNullAt(0);
} else {
project_mutableStateArray_0[0].write(0, project_value_0);
}

if (inputadapter_isNull_1) {
project_mutableStateArray_0[0].setNullAt(1);
} else {
project_mutableStateArray_0[0].write(1, inputadapter_value_1);
}
append((project_mutableStateArray_0[0].getRow()));
if (shouldStop()) return;
}
}
}
表达式代码生成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public java.lang.Object generate(Object[] references) {
return new SpecificUnsafeProjection(references);
}

class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {

private Object[] references;
private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];

public SpecificUnsafeProjection(Object[] references) {
this.references = references;
mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 32);

}

public void initialize(int partitionIndex) {

}

// Scala.Function1 need this
public java.lang.Object apply(java.lang.Object row) {
return apply((InternalRow) row);
}

public UnsafeRow apply(InternalRow i) {
mutableStateArray_0[0].reset();


mutableStateArray_0[0].zeroOutNullBytes();

boolean isNull_0 = i.isNullAt(0);
int value_0 = isNull_0 ?
-1 : (i.getInt(0));
if (isNull_0) {
mutableStateArray_0[0].setNullAt(0);
} else {
mutableStateArray_0[0].write(0, value_0);
}

boolean isNull_1 = i.isNullAt(1);
UTF8String value_1 = isNull_1 ?
null : (i.getUTF8String(1));
if (isNull_1) {
mutableStateArray_0[0].setNullAt(1);
} else {
mutableStateArray_0[0].write(1, value_1);
}
return (mutableStateArray_0[0].getRow());
}


}

Celeborn

源码分析

1. 读写主要过程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Shuffle Write(写)
SortBasedShuffleWriter
SortBasedShuffleWriter[write -> write0] -> SortBasedPusher[insertRecord -> pushData] -> DataPusher[addTask -> 放入workingQueue]

消费workingQueue(DataPusher-Thread)
DataPusher[pushData] -> ShuffleClientImpl[pushData -> pushOrMergeData] -> TransportClient[pushData -> 发送netty消息]

TransportChannelHandler[channelRead -> handle] -> PushDataHandler[receive -> handleCore -> handlePushData] -> FileWriter[write -> flush -> addTask] -> Flusher.scala[addTask -> 放入workingQueues]

消费workingQueue(Flusher-Thread)
Flusher[init] -> FlushTask[LocalFlushTask[flush]]

Shuffle Read(读)
RssShuffleReader.scala[read] -> ShuffleClientImpl[readPartition] -> RpcEndpointRef[askSync -> ask] -> NettyRpcEnv[ask -> postToOutbox] -> RpcOutboxMessage[sendWith] -> TransportClient[sendRpc] // 返回ReduceFileGroups(partition)
ShuffleClientImpl[readPartition]-> RssInputStream[create] -> return new RssInputStreamImpl
2. 角色通信
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Master
Master[main -> initialize] -> HttpService.scala[startHttpServer] -> HttpServer[start]

http的核心处理器
HttpRequestHandler[channelRead0 -> handleRequest]

rpcEnv属性
RpcEnv[create] -> NettyRpcEnv.scala[create -> startServer] -> TransportContext.java[createServer]

NettyRpcEnv.scala[startServer] -> Dispatcher.scala[registerRpcEndpoint] -> endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef) -> Inbox[messages.add(OnStart) 默认将Start命令加入处理]

netty的核心处理器
NettyRpcHandler[receive -> processRpc] -> Dispatcher.scala[postRemoteMessage -> postMessage] -> Inbox.scala[post -> 放入Inbox.messages中 -> 同时放入Dispatcher.receivers集合中]
(ps Inbox属于EndPointData的属性)

MessageLoop线程消费receivers集合
MessageLoop[run] -> Inbox.scala[process] -> Master[receiveAndReply] -> 看看Master都处理哪些请求

Master[receive] -> PbCheckForWorkerTimeout、CheckForApplicationTimeOut、PbWorkerLost
(这些都是Master启动时候,启动的轮训线程)

####################
Worker
Worker[main -> initialize -> registerWithMaster向Master注册] ->

pushDataHandler:PushDataHandler
replicateHandler:PushDataHandler
fetchHandler:FetchHandler

####################
LifecycleManager => application端
3. 概念理解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
TransportClient 对应客户端
TransportChannelHandler 对应服务端

BaseMessageHandler 消息处理器:
PushDataHandler(服务于worker,对应shuffle写)
NettyRpcHandler(作为rpc通信,基础通信“收件箱”)
FetchHandler(服务于worker,对应shuffle读)

Master是一个Endpoint

有RpcEndpointRef属性的,可以通过类似以下方法。向Endpoint发送请求
endpointRef.ask(message, rpcTimeout, ClassTag$.MODULE$.apply(clz));
endpoint.askSync
只有这种调用方式,是走NettyRpcHandler
4. shuffle完整过程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
Shuffle Write(写)
RssShuffleManager[registerShuffle] ->
return new RssShuffleHandle<>(
newAppId, // 根据context生成appID
lifecycleManager.getRssMetaServiceHost(), // 生命周期管理器的ip
lifecycleManager.getRssMetaServicePort(), // 生命周期管理器的端口
lifecycleManager.getUserIdentifier(), // 一个由租户id、租户名称的用户标志
shuffleId, // 这次shuffle的id
dependency.rdd().getNumPartitions(), // rdd的分区数
dependency);

RssShuffleManager[getWriter] ->
return new SortBasedShuffleWriter<>(
h.dependency(),
h.newAppId(),
h.numMappers(), // 分区数和mappers相对应
context,
celebornConf,
client, // ShuffleClient
metrics); // ShuffleWriteMetricsReporter


SortBasedShuffleWriter[write -> write0] -> SortBasedPusher[insertRecord ->
将record序列化为二进制,并写的字节数组输出流serBuffer中
serBuffer中复制进内存

pageCursor + required > currentPage.getBaseOffset() + currentPage.size()
PS: 这里注意,当页偏移量 + 需要 > 再申请一页的总大小,则强制pushData。
-> pushData] -> DataPusher[addTask -> 放入workingQueue]

消费workingQueue(DataPusher-Thread)
DataPusher[pushData] -> ShuffleClientImpl[pushData -> pushOrMergeData -> registerShuffle] ->

// 1. 先注册shuffle
LifecycleManager[receiveAndReply -> handleRegisterShuffle]
// 2. get slots from Master
LifecycleManager[requestSlotsWithRetry -> requestRequestSlots] -> RssHARetryClient[askSync -> sendMessageInner] -> NettyRpcEnv.scala[ask -> postToOutbox] -> 1-1
// 3. 预留Slot
LifecycleManager[reserveSlotsWithRetry -> reserveSlots -> requestReserveSlots] NettyRpcEnv.scala[ask] -> 发送ReserveSlots -> 2-1
注册完成 RegisterShuffleResponse(StatusCode.Success, locations.asJava)

把result.put(partitionLoc.getId(), partitionLoc); 记录在reducePartitionMap中

1-1:
Master[receiveAndReply -> executeWithLeaderChecker -> handleRequestSlots] -> MasterUtil[offerSlots -> offerSlots] -> HAMasterMetaManager[handleRequestSlots] -> HARaftServer[submitRequest] 同步更改Master集群中的状态 -> AbstractMetaManager[updateRequestSlotsMeta] -> WorkInfo[allocateSlots] -> 更新Master集群中的worker信息、slots信息和心跳信息。
Master[handleRequestSlots] -> NettyRpcCallContext[reply] 返回slots

2-1:
Worker[receiveAndReply -> handleReserveSlots] -> Worker.PartitionLocationInfo 会记录预留的分区
返回 ReserveSlotsResponse(StatusCode.Success)

TransportClient[pushData -> 发送PushData]
TransportChannelHandler[channelRead -> handle -> processPushData] -> PushDataHandler[receivePushData -> handlePushData] ->
// 1. for master, send data to slave
FileWriter[write -> flush -> addTask] -> Flusher.scala[addTask -> 放入workingQueues]

消费workingQueue(Flusher-Thread)
Flusher[init] -> FlushTask[LocalFlushTask[flush]]

SortBasedShuffleWriter[write -> close] -> ShuffleClientImpl[mapperEnd]
LifecycleManager[receiveAndReply -> handleMapperEnd -> StageEnd]
LifecycleManager[receive -> handleStageEnd -> requestCommitFiles]
Worker[receiveAndReply -> handleCommitFiles -> commitFiles] -> FileWriter[close -> flush(最后一次) -> returnBuffer 释放磁盘空间]


Shuffle Read(读)
RssShuffleReader.scala[read] -> ShuffleClientImpl[readPartition] -> RpcEndpointRef[askSync -> ask] -> NettyRpcEnv[ask -> postToOutbox] -> RpcOutboxMessage[sendWith] -> TransportClient[sendRpc]

LifecycleManager[receiveAndReply -> handleGetReducerFileGroup] // 返回partitionLocation

ShuffleClientImpl[readPartition] 返回ReduceFileGroups(partitionLocation) -> RssInputStream[create] -> return new RssInputStreamImpl 创建流 -> 返回 resultIter

RssInputStream具体
RssInputStreamImpl[moveToNextReader -> createReaderWithRetry -> createReader] -> WorkerPartitionReader[WorkerPartitionReader] -> TransportClient[sendRpcSync -> sendRpc] ->
发送RpcRequest -> 3-1

3-1
FetchHandler[receive -> handleOpenStream] -> RpcResponseCallback[onSuccess] -> // flip "copy" to make it readable 返回的是StreamHandle(记录了流id和可读的Chunks数量)
RssInputStreamImpl[moveToNextReader -> getNextChunk (会一直fetch chunk) ] -> WorkerPartitionReader[next -> fetchChunks] -> TransportClient[fetchChunk -> ChunkFetchRequest] -> 4-1

4-1
Worker[receive -> handleChunkFetchRequest] -> ChunkStreamManager[getChunk] -> FileManagedBuffers[chunk] -> TransportResponseHandler[handle] -> ChunkReceivedCallback[onSuccess] => ByteBuf加入WorkerPartitionReader[results]

主要重写RssInputStream的read方法
RssInputStream[read -> fillBuffer] (read返回一个字节、read(b)读取数据到b数组中)
5. 角色详细解读(涉及属性)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
Master(端)
Master.statusSystem -> MetaHandler[setUpMasterRatisServer] -> HARaftServer.java[newMasterRatisServer] -> 最终更新到RaftServer.raftGroup中 -> HARaftServer[start]

另外启动scheduledRoleChecker线程 -> updateServerRole -> setServerRole -> cachedPeerRole信息缓存起来(保存leader)

StateMachine单独详解
SimpleStateMachineStorage 这是一个简单状态机的存储
重要的方法
applyTransaction 执行事务log

以WorkerLost命令为例,说下提交请求后,raft集群发生了什么。
HARaftServer[submitRequest(WorkerLost)] -> RaftServerProxy[submitClientRequestAsync] -> StateMachine[applyTransaction -> runCommand ] -> MetaHandler[handleWriteRequest] -> AbstractMetaManager[updateWorkerLostMeta]

takeSnapshot 保证状态机快照

Master.rpcEnv -> 角色通信章节
执行onStart方法 -> checkForWorkerTimeOutTask(定时发送 pbCheckForWorkerTimeout 命令) -> Master[receive -> timeoutDeadWorkers(判断是否超时,超时就发送WorkLost命令)]
-> Master[receive -> handleWorkerLost] -> HAMasterMetaManager[handleWorkerLost] -> HARaftServer[submitRequest(WorkerLost)] -> AbstractMetaManager[updateWorkerLostMeta] -> 从Master集群中的works中移除

执行onStart方法 -> checkForApplicationTimeOutTask (定时发送 CheckForApplicationTimeOut 命令) ->
Master[receive -> timeoutDeadApplications(判断是否超时,超时就发送ApplicationLost命令)] -> Master[receive -> handleApplicationLost] -> HAMasterMetaManager[handleAppLost] -> HARaftServer[submitRequest(handleAppLost)] -> AbstractMetaManager[updateAppLostMeta] ->
1.释放WorkInfo中的slot 2. 从registeredShuffle中把shuffle移除

Master.partitionSizeUpdateService单独线程定期更新 -> HAMasterMetaManager[handleUpdatePartitionSize] -> HARaftServer[submitRequest(UpdatePartitionSize)] -> AbstractMetaManager[updatePartitionSize] -> 更新Master集群中的estimatedPartitionSize估算分区大小

Master的服务监控
Master
Master[main -> initialize] -> MetricsSystem.scala[start]

new RPCSource -> AbstractSource[addCounter -> 加入到namedCounters中] -> 1-1
new MasterSource(conf) -> AbstractSource[addGauge] -> MetricRegistry[gauge] -> 加入namedGauges中
注册两种Source
metricsSystem.registerSource(rpcSource)
metricsSystem.registerSource(masterSource)

1-1
更新监控指标 -> NettyRpcHandler[internalReceive] -> RPCSource[updateMessageMetrics -> incCounter(RPCPushDataNum)] -> Counter[inc] PushData为例


RssShuffleManager (application端)
RssShuffleManager[registerShuffle -> initializeLifecycleManager] -> 初始化LifecycleManager 、初始ShuffleClient -> LifecycleManager[initialize]

ApplicationHeartbeater 心跳线程
LifecycleManager[用来发送心跳] -> ApplicationHeartbeater[run] -> RssHARetryClient[send] ->
Master[receiveAndReply -> handleHeartbeatFromApplication] -> HAMasterMetaManager[handleAppHeartbeat] -> HARaftServer[submitRequest] -> RaftServerProxy[submitClientRequestAsync(AppHeartbeat)] -> AbstractMetaManager[updateAppHeartbeatMeta] -> 更新Master集群中的estimatedPartitionSize估算分区大小


Worker(端)
先注册到Master集群,再心跳
Worker[initialize -> registerWithMaster] -> RssHARetryClient[askSync(RegisterWorker.apply -> PbRegisterWorker)] -> Master[receiveAndReply -> executeWithLeaderChecker -> handleRegisterWorker] -> HAMasterMetaManager[handleRegisterWorker] -> HARaftServer[submitRequest(RegisterWorker)] -> AbstractMetaManager[updateRegisterWorkerMeta] -> 更新Master集群中的works

Worker[initialize -> heartBeatToMaster] -> RssHARetryClient[askSync(HeartbeatFromWorker)] -> Master[receiveAndReply -> executeWithLeaderChecker -> handleHeartbeatFromWorker] -> HAMasterMetaManager[handleWorkerHeartbeat] -> HARaftServer[submitRequest(WorkerHeartbeat)]-> AbstractMetaManager[updateWorkerHeartbeatMeta] -> 更新Master集群中的works,更新WorkerInfo slots每个work可用的插槽

Kyuubi

1.主流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
KyuubiBeeLine入手
1.建立连接
KyuubiBeeLine[main -> mainWithInputRedirection] -> Beeline[begin] -> KyuubiCommands[connect] -> KyuubiDatabaseConnection[connect -> getConnectionFromDefaultDriver] -> KyuubiHiveDriver[connect] -> KyuubiConnection[openSession] -> TCLIService.Iface[OpenSession](远程调用)

server端
KyuubiTBinaryFrontendService[OpenSession] -> TFrontendService[getSessionHandle] -> AbstractBackendService[openSession] -> KyuubiSessionManager[openSession] -> SessionManager[openSession] -> (初始化)KyuubiSessionImpl[open -> runOperation] -> AbstractSession[runOperation] -> AbstractOperation[run -> runInternal] -> LaunchEngine[runInternal] => 提交了一个异步任务

KyuubiSessionImpl[openEngineSession] -> EngineRef[getOrCreate -> create] -> ProcBuilder[start] -> ProcessBuilder[start](启动spark-submit)

KyuubiSyncThriftClient[openSession (真正openSession)]

2.执行sql
KyuubiCommands[sql -> execute -> executeInternal] -> BeeLine[createStatement] -> KyuubiConnection[createStatement](初始化KyuubiStatement) -> KyuubiStatement[execute -> executeWithConfOverlay -> runAsyncOnServer] -> TCLIService.Iface[ExecuteStatement](远程调用)

server端
TFrontendService[ExecuteStatement] -> AbstractBackendService[executeStatement] -> AbstractSession[executeStatement] -> SparkSQLOperationManager[newExecuteStatementOperation] ->
AbstractSession[runOperation] -> SparkSessionImpl[runOperation] -> AbstractOperation[run -> runInternal] -> spark.ExecuteStatement[executeStatement] -> spark.sql(statement) 执行结果放入一个迭代器中

=> 再回到客户端
KyuubiStatement[executeWithConfOverlay -> waitForOperationToComplete(不断查询操作是否完成)] -> KyuubiQueryResultSet[build](初始化KyuubiQueryResultSet) -> KyuubiQueryResultSet[next] -> TCLIService.Iface[FetchResults](远程调用)

KyuubiCommands[executeInternal] -> KyuubiStatement[getResultSet] -> BeeLine[print] 打印结果

2.角色解读

1
2
3
4
5
6
7
KyuubiHiveDriver
-- TODO

KyuubiServer.scala
KyuubiServer[main -> startServer -> initialize -> addService(KinitAuxiliaryService、PeriodicGCService等等)] -> Serverable[initialize -> addService(AbstractBackendService、AbstractFrontendService)] -> CompositeService[initialize -> 初始化所有的Service]

KyuubiServer[main -> startServer -> start]

Calcite

1.主流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
1.SqlToRelConverter
CalciteSQLConverter[main] -> SqlToRelConverter[convertQuery -> convertQueryRecursive] ->
// 以select为例
SqlToRelConverter[convertSelect -> convertSelectImpl] ->
// from -> join
SqlToRelConverter[convertFrom -> convertJoin(递归调用convertFrom左边、convertFrom右边)] ->
// Identifier 真正的from?
SqlToRelConverter[convertIdentifier] ->
// on condition
SqlToRelConverter[convertOnCondition] -> convertJoin结束 ->
// where
SqlToRelConverter[convertWhere]
......

2.优化器
1.HepPlanner RBO模型

2.VolcanoPlanner CBO模型

CMU15445源码阅读

1
2
3
4
5
6
7
8
9
10
11
12
1. buffer pool manager
extending hash table(buffer pool的核心存储)
K-LRU 移除的时候,该移除谁
2. b+ tree
如何insert remove,存在array里面
3. query execution
实现算子,火山模型
优化、后序遍历
table_page,理解表中的数据到底是怎么存储的。
是一个个的Tuple(4kb)
4. concurrency control
两阶段锁、锁兼容、三种隔离级别

Gluten源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
入口
GlutenPlugin
-Driver Plugin
-Executor Plugin

一、 Driver Plugin
a. 注册GlutenSQLAppStatusListener spark sql的事件监听
- onExecutionStart
- onExecutionEnd

b. setPredefinedConfigs
gluten driver端的配置项初始化

c. GlutenDriverEndpoint 和Executor通信
- Executor启动后,向Driver这边通知

d. registerMetrics spark.gluten.ui.enabled
- 统计指标

e. BackendsApiManager的初始化
主要是onDriverStart- 加载native lib 暂不深入

二、 Executor Plugin
a. GlutenExecutorEndpoint 和Driver通信
b. taskListeners 对于Task的事件监听器
主要是onTaskStart task启动的事件处理
BackendsApiManager初始化
主要是onExecutorStart

三、 GlutenSessionExtensions (核心)
ColumnarQueryStagePrepOverrides,
ColumnarOverrides,
StrategyOverrides,
OthersExtensionOverrides
所有的查询,都会经过这些优化

ColumnarQueryStagePrepOverrides
FallbackOnANSIMode, FallbackMultiCodegens,FallbackBroadcastExchange

1. ColumnarOverrideRules
ColumnarRule
preColumnarTransitions
preOverrides() 规则都在这里
FallbackOnANSIMode ansimode 则所有的算子,都不支持 => 用于回退
FallbackMultiCodegens 代码生成算子超过十二层,就不考虑优化了?
PlanOneRowRelation
FallbackEmptySchemaRelation
AddTransformHintRule 哪些算子是需要转换的 hivescan为例 => 大概率不支持
TransformHints.tagNotTransformable 打上不支持的标签
TransformPreOverrides
replaceWithTransformerPlan
applyScanNotTransformable
applyScanTransformer
BackendsApiManager.getSparkPlanExecApiInstance.genHiveTableScanExecTransformer
return hiveTableScanExecTransformer
RewriteTransformer

ColumnarRule
postColumnarTransitions
fallbackPolicy
ExpandFallbackPolicy
fallback
countStageFallbackCostInternal 对于InMemoryTableScanExec、QueryStageExec两个支持列运算的算子,fallback成本是增加的,其他的是减少的
countFallback 只要有列转行,成本增加
fallbackToRowBasedPlan
# 在这个地方,把不支持列的插入 row => columnar HiveTableScanExec就不支持
InsertTransitions.insertTransitions (把整个计划串起来,支持列的计算,在其前后进行行列转换)
transformColumnarToRowExec 列转行、行转列生成都在这里

postOverrides
TransformPostOverrides
# DataWritingCommandExec 写入hive数据,原来需要列转行,现在直接写入velox,不需要了。
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPostRules()
NativeWritePostRule (spark 在执行SQL的时候,就构建FakeRowAdaptor 把列转成行)
FakeRowAdaptor
RowToVeloxColumnarExec
doExecuteColumnarInternal
RowToVeloxColumnarExec.toColumnarBatchIterator
nativeConvert

四、算子执行的入口
TakeOrderedAndProjectExecTransformer 是一个很重要的转换器 触发WholeStageTransformer
doExecuteColumnar
获取(columnarInputRDDs)
child.executeColumnar => 拿到RDD
WholeStageTransformer
doExecuteColumnar
# 这里会
doWholeStageTransform
generateWholeStageTransformContext
child.doTransform()
# SparkPlan => Substrait Plan 这里会完成
PlanBuilder.makePlan
return WholeStageTransformContext(planNode, substraitContext)
return GlutenWholeStageColumnarRDD 最终构造为一个RDD



五、 不太清楚的点
1. AQE动态分区裁剪的处理
ExpressionConverter#transformDynamicPruningExpr
这块还没看懂。

六、GlutenWholeStageColumnarRDD详解
compute方法
BackendsApiManager.getIteratorApiInstance.genFirstStageIterator
NativePlanEvaluator#createKernelWithBatchIterator
PlanEvaluatorJniWrapper#nativeCreateKernelWithIterator
1. 获取运行上下文Runtime
2. parsePlan 将substrait Plan解析成substrait::Plan Protobuf对象,并保存一份到文件
3. 最终返回一个JniColumnarBatchIterator 有next函数来返回一个ColumnarBatch
VeloxRuntime::createResultIterator
1. substrait::Plan 转成 veloxPlan
2. 构建 WholeStageResultIteratorFirstStage
3. WholeStageResultIterator.next() 函数来返回VeloxColumnarBatch,实际上并没有返回,存在了objectStore中
4. ColumnarBatchOutIterator#nativeNext的时候,再从objectStore中执行VeloxColumnarBatch.next() 函数返回
NativePlanEvaluator#createOutIterator
构建ColumnarBatchOutIterator
1. GeneralOutIterator#next => nextInternal() => nativeNext() => ColumnarBatches.create
2. nativeNext 具体实现
a. 先找到ResultIterator
b. 调用ResultIterator的next方法

七、每个Transformer算子,详解
HiveTableScanExecTransformer
如何转换成substrait的plan

八、ColumnarOverrideRules
PlanOneRowRelation
处理select *** 没有from的情况
AddTransformHintRule
标识哪些算子能转。就转成对应的substrait plan
TransformPreOverrides
replaceWithTransformerPlan
动态分区裁剪处理
genFilterExec

6.824源码阅读

1
2
3
lab1 mapreduce
lab2 学习笔记(从源码角度)
主要是学习Raft的实现

Mini-lsm源码阅读

1
2
3
4
5
Week1:
1.实现MemTable
2.实现MemTableIterator (MemTable做一个范围的迭代器)
3.实现MergeIterator,多个MemTableIterator
4.