Dolphinscheduler的调研
调度主流程
V1.0原始设计
Master (SpringApplication)
1 | MasterSchedulerThread[run -> scanCommand] -> MasterExecThread[run -> executeProcess -> prepareProcess -> runProcess -> submitStandByTask -> submitTaskExec] -> MasterTaskExecThread[submitWaitComplete -> submit -> submitTask -> submitTaskToQueue] -> taskQueue |
Worker(一个线程)
1 | WorkerServer[run] -> FetchTaskThread[run] -> 消费taskQueue -> TaskScheduleThread[call] -> ShellTask[init -> handle] -> AbstractCommandExecutor[run -> buildProcess] -> ProcessBuilder[start] |
zk实现服务注册
1 | ZKWorkerClient[init] -> AbstractZKClient[initSystemZNode](初始化一些根节点) -> ZKWorkerClient[listenerWorker(核心是添加一个监听器PathChildrenCacheListener) -> registWorker(创建node,并将关系保存到数据库)] |
zk实现分布式锁
1 | String znodeLock = zkMasterClient.getWorkerFailoverLockPath(); |
V3.0.0-release
MasterServer(SpringApplication)
MasterSchedulerBootstrap 负责扫描数据库中的命令(api操作,创建、启动作业等)
1 | [1]MasterSchedulerBootstrap[run -> findCommands -> command2ProcessInstance -> ProcessServiceImpl -> MasterSchedulerBootstrap[放入workflowEventQueue] |
WorkerServer(SpringApplication)
1 | [3]任务分发命令: |
MasterRpcServer
1 | MasterServer[run(MasterServer.class) -> run] -> MasterRpcServer[init] -> NettyRemotingServer[registerProcessor] -> NettyServerHandler[registerProcessor] -> 放入processors(一个map) |
EventExecuteService
1 | 状态处理服务 |
api(用户入口)上线一个Project
1 | SchedulerController[online] -> SchedulerServiceImpl[setScheduleState -> setSchedule] -> QuartzExecutorImpl[addJob] -> RemoteScheduler[scheduleJob] 供ProcessScheduleJob调用 |
Master分析
服务注册
关键词:Zookeeper、Curator
MasterServer(Application)
1 | MasterRegistryClient[init(心跳机制) -> start -> registry](创建节点) |
心跳机制
1 | MasterRegistryClient[init(初始化一个心跳线程池) -> registry] -> ScheduledExecutorService[scheduleAtFixedRate] -> HeartBeatTask[run] |
定时调度
关键词:Quartz
跳转至调度主流程的 V3.0.0-release 的”api(用户入口)上线一个Project” 流程分析
故障转移
FailoverExecuteThread
master主要转移实例,根据实例拿到任务列表
1 | MasterRegistryClient[removeMasterNodePath] -> MasterFailoverService[failoverMaster -> doFailoverMaster -> failoverTaskInstance -> sendKillCommandToWorker(打上需要容错的状态ExecutionStatus.NEED_FAULT_TOLERANCE 和普通的任务进行区分)] -> NettyExecutorManager[doExecute] -> nettyRemotingClient.send(host, command) |
Worker分析
任务重试/恢复机制
1 | WorkServer#run -> MessageRetryRunner#run -> TaskExecuteRunningMessageSender#sendMessage |
故障转移
work主要转移任务
1 | MasterRegistryClient[removeMasterNodePath] -> WorkerFailoverService[failoverWorker -> failoverTaskInstance -> ] -> workflowExecuteThreadPool.submitStateEvent(stateEvent); |
重点总结
项目模块分析
dolphinScheduler-alert
1 | AlertServer[checkTable] -> 判断是否又一个插件表,有则说明告警服务启动 |
dolphinScheduler-data-quality
细分为三个组件BatchReader、BatchTransformer、BatchWriter
1 | data-quality模块基于spark实现,打包成一个jar。 |
dolphinScheduler-log-server
1 | LoggerController[queryLog] -> LoggerServiceImpl[queryLog] -> LogClientService[rollViewLog] -> |
dolphinScheduler-standalone-server
1 |
dolphinScheduler-registry
概述
1 | master注册 |
master注册-深入剖析
1 | 入口MasterRegistryClient[start] |
Work注册-深入剖析
1 | 入口WorkerRegistryClient[registry] |
dolphinScheduler-remote
基于netty的rpc通信,并未封装成rpc框架
1.Future异步方式
1 | NioEventLoopGroup eventExecutors = new NioEventLoopGroup(); |
2.Promise异步方式
1 | EventLoop next = new NioEventLoopGroup().next(); |
dolphinScheduler-master
从MasterServer,即main方法出发
1 | public void run() throws SchedulerException { |
从package角度
1 | builder |
工作流的创建
1 | WorkflowExecuteRunnable[buildFlowDag -> generateFlowDag] -> DagHelper[generateFlowDag -> /** |
作业以及作业流并行度问题
如何实现翻牌?
主要通过由quartz来完成
dolphinScheduler-worker
1 | config |
dolphinScheduler-meter
整合Prometheus + grafna
dolphinScheduler-microbench
基准测试?
dolphinScheduler-python
基于python实现一套
dolphinScheduler-spi
1 | alert plugins、task plugins、datasource plugins、registry-plugins |
dolphinScheduler-tools
1 | 执行数据库脚本,之类的初始化工具等 |
dolphinScheduler-server
1 | master、worker中通用server代码,HeartBeatTask |
dolphinScheduler-service
1 | master、worker中通用service |
dolphinScheduler-api
1 | 环境管理 |
数据库表设计
1 | CREATE TABLE `t_ds_task_definition` ( |
项目部署
集群部署(伪集群方式)
dolphinScheduler还是要跑起来,学习更好的产品设计
功能启发
dolphinScheduler是如何实现DAG的?