dolphinscheduler3.4.1源码

源码解析

主流程

Master服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
MasterRegistryClient 服务注册
- 启动master心跳检测线程 MasterHeartBeatTask
- MasterRegistryClient#registry => RegistryClient#persistEphemeral 完成注册
- 新增状态管理监听器 MasterConnectionStateListener, 管理连接、断开等事件
MasterCoordinator 主从调度服务节点
- TaskGroupCoordinator
ClusterManager 集群管理器,管理master和worker的元数据
ClusterStateMonitors master故障转移,work故障转移事件处理
- ZookeeperTreeCacheListenerAdapter监听zookeeper的节点的增删改 => AbstractClusterSubscribeListener#notify => MasterClusters#onServerAdded => MasterSlotChangeListenerAdaptor#onServerAdded => onMasterSlotChanged

- 如何是remove删除节点,就会发送 MasterFailoverEvent故障转移事件 => MasterFailoverEventHandler#handle => FailoverCoordinator#failoverMaster => FailoverCoordinator#doMasterFailover => WorkflowFailover#failoverWorkflow => 作业流实例修改为需要转移,command命令都修改成 RECOVER_TOLERANCE_FAULT_PROCESS => WorkflowFailoverCommandHandler#assembleWorkflowInstance 会把作业流实例的状态设置成故障前的状态
WorkflowEngine 作业流引擎
- 1、WorkflowEventBusCoordinator 作业流事件处理器 (WorkflowEventBus)
- WorkflowRunningStateAction[onStartEvent] => triggerTasks => 发布TaskStartLifecycleEvent => TaskStartLifecycleEventHandler[handle] => TaskSubmittedStateAction[onStartEvent] => tryToDispatchTask => TaskSubmittedStateAction[onDispatchEvent] => WorkerGroupDispatcherCoordinator#dispatchTask
=> WorkerGroupDispatcher#dispatchTask(注:TaskDispatchableEventBus) => doDispatchTask => TaskExecutorClient#dispatch => TaskExecutorClient#dispatch
=> (Work服务) PhysicalTaskExecutorOperatorImpl#dispatchTask => hysicalTaskEngineDelegator#dispatchLogicTask
=> TaskEngine#submitTask(TaskExecutorEventBus) => executorContainer.dispatch(taskExecutor) => 发布TaskExecutorDispatchedLifecycleEvent事件
=> TaskExecutorEventBusCoordinator#doFireTaskExecutorEventBus => (向master汇报状态变化)

作业执行
=> AbstractTaskExecutorContainer#startAllThreadTaskExecutorWorker => TaskExecutorWorker#start => taskExecutor.start() => publishTaskRunningEvent发布作业运行时间 => (向master汇报状态变化) => doTriggerTaskPlugin => ShellTask#handle
=> 完成的时候 向master发送 TaskExecutorSuccessLifecycleEvent => TaskExecutorLifecycleEventListener#onTaskExecutorSuccessLifecycleEvent => TaskExecutorLifecycleEventRemoteReporter#reportTaskExecutorLifecycleEvent => 入队列 taskExecutionEventsQueue
=> TaskExecutorLifecycleEventRemoteReporter#handleTaskExecutionEventChannel => TaskExecutorEventRemoteReporterClient#ZreportTaskExecutionEventToMaster =>
TaskExecutorEventListenerImpl#onTaskExecutorSuccess (回到Master服务)
=> 发送TaskSuccessLifecycleEvent (WorkflowEventBus) => TaskSuccessLifecycleEventHandler#handle => TaskSuccessStateAction#onSucceedEvent => TaskExecutorClient#ackTaskExecutorLifecycleEvent => PhysicalTaskExecutorOperatorImpl#ackPhysicalTaskExecutorLifecycleEvent (回到Worker服务)

- 2、CommandEngine 是 DolphinScheduler Master 节点的命令引擎核心组件,负责从数据库获取命令并触发工作流执行。它是 Master 节点调度和执行工作流的"大脑"。
- 页面启动,作业流实例为 SUBMITTED_SUCCESS
- bootstrapCommand commandHandler[handleCommand] -> RunWorkflowCommandHandler[assembleWorkflowInstance] 将作业流实例修改为运行中
- bootstrapWorkflowExecutionRunnable 产生Event WorkflowStartLifecycleEvent => 1会去处理
-
- 3、WorkerGroupDispatcherCoordinator Master 节点中的 Worker 组任务调度协调器,负责管理和协调不同 Worker 组的任务分发。它是 Master 将任务分配给 Worker 节点的核心组件。
- 4、LogicTaskEngineDelegator Master 节点中的逻辑任务引擎代理,负责在 Master 节点本地执行逻辑任务(Logic Task),而不是将任务分发到 Worker 节点执行
- 5、QuartzScheduler 是 DolphinScheduler 的定时调度插件实现,基于成熟的 Quartz 调度框架,负责管理工作流的定时触发执行。它是 DolphinScheduler 定时调度功能的核心组件。
Worker服务
1
2
3
4
5
PhysicalTaskEngineDelegator
- 1、TaskEngine 负责执行
- 2、PhysicalTaskExecutorLifecycleEventReporter 向master汇报状态变化
作业故障转移
作业在运行和分发两个状态下,可以故障转移 => 发起TaskFailoverLifecycleEvent => TaskFailoverLifecycleEventHandler#handle => TaskDispatchStateAction#onFailoverEvent => AbstractTaskStateAction#failoverTask => TaskExecutionRunnable#failover => 把旧作业实例都修改为NEED_FAULT_TOLERANCE,生成一个新的作业实例 => 重新发起 TaskStartLifecycleEvent
超时功能
1
2
延迟队列
taskDefinition的timeout不为空的话,会发送TaskTimeoutLifecycleEvent。处理事件的时候,如果作业还没有完成,直接kill掉。
RPC
1
2
3
4
5
6
7
8
客户端
Clients#withService => Clients.JdkDynamicRpcClientProxyBuilder#withHost => ClientInvocationHandler#invoke => SyncClientMethodInvoker#invoke => nettyRemotingClient.sendSync(syncRequestDto);

服务端
JdkDynamicServerHandler#processReceived => 会根据方法的唯一签名methodIdentifier 从缓存中methodInvokerMap获取 ServerMethodInvoker => 调用本地方法

Master、work等服务,启动的时候,都会将注解RpcService的接口,封装成一个ServerMethodInvoker保存在methodInvokerMap里
SpringServerMethodInvokerDiscovery#postProcessAfterInitialization => RpcServer#registerServerMethodInvokerProvider => NettyRemotingServer#registerMethodInvoker => JdkDynamicServerHandler#registerMethodInvoker
其他
1
2
3
4
5
ILifecycleEventHandler 不同事件的处理器
- AbstractTaskLifecycleEventHandler 作业事件处理
- AbstractWorkflowLifecycleEventHandler 作业流事件处理器
AbstractWorkflowStateAction 当前作业流的状态,针对不同事件的具体动作。
AbstractTaskStateAction 当前作业的状态,针对不同事件的具体动作。举例:作业运行中,故障转onFailoverEvent