dolphinScheduler研究

Dolphinscheduler的调研

调度主流程
V1.0原始设计

Master (SpringApplication)

1
MasterSchedulerThread[run -> scanCommand] -> MasterExecThread[run -> executeProcess -> prepareProcess -> runProcess -> submitStandByTask -> submitTaskExec] -> MasterTaskExecThread[submitWaitComplete -> submit -> submitTask -> submitTaskToQueue] -> taskQueue

Worker(一个线程)

1
WorkerServer[run] -> FetchTaskThread[run] -> 消费taskQueue -> TaskScheduleThread[call] -> ShellTask[init -> handle] -> AbstractCommandExecutor[run -> buildProcess] -> ProcessBuilder[start]

zk实现服务注册

1
2
3
4
5
6
ZKWorkerClient[init] -> AbstractZKClient[initSystemZNode](初始化一些根节点) -> ZKWorkerClient[listenerWorker(核心是添加一个监听器PathChildrenCacheListener) -> registWorker(创建node,并将关系保存到数据库)]

ZKMasterClient[init -> listenerMaster -> listenerWorker -> registMaster(创建node,并将关系保存到数据库)]
监听:
CHILD_ADDED:添加节点
CHILD_REMOVED:移除节点,节点挂掉,要完成failover故障转移

zk实现分布式锁

1
2
3
4
String znodeLock = zkMasterClient.getWorkerFailoverLockPath();
mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock);
mutex.acquire();
mutex.release();
V3.0.0-release

MasterServer(SpringApplication)

MasterSchedulerBootstrap 负责扫描数据库中的命令(api操作,创建、启动作业等)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[1]MasterSchedulerBootstrap[run -> findCommands -> command2ProcessInstance -> ProcessServiceImpl -> MasterSchedulerBootstrap[放入workflowEventQueue]

WorkflowEventLooper[run -> 放入processInstanceExecCacheManager -> 消费workflowEventQueue] -> WorkflowStartEventHandler[handleWorkflowEvent] -> WorkflowExecuteRunnable[call -> initTaskQueue] —> 工作流的创建、初始化队列等(**重点)

任务提交
WorkflowExecuteRunnable[submitPostNode(放入readyToSubmitTaskQueue) -> (消费readyToSubmitTaskQueue)submitStandByTask -> submitTaskExec(重点) ] -> BaseTaskProcessor[action(SUBMIT) -> submit] -> CommonTaskProcessor[submitTask] -> ProcessServiceImpl[submitTaskWithRetry -> submitTask]

WorkflowExecuteRunnable[updateProcessInstanceState -> updateWorkflowInstanceStatesToDB(状态信息存入DB )] -> 放入stateEvents (状态事件-PROCESS_STATE_CHANGE) (WorkflowExecuteRunnable::handleEvents)

WorkflowStateEventHandler[handleStateEvent] -> endProcess等

任务分发
WorkflowExecuteRunnable[submitTaskExec(重点)] -> BaseTaskProcessor[action(DISPATCH) -> dispatch] -> CommonTaskProcessor[dispatchTask] -> TaskPriority放入该taskUpdateQueue队列 -> TaskPriorityQueueImpl[put]

任务运行
WorkflowExecuteRunnable[submitTaskExec(重点)] -> BaseTaskProcessor[action(RUN) -> run] -> CommonTaskProcessor[runTask]

消费taskUpdateQueue队列
[2] TaskPriorityQueueConsumer[run -> batchDispatch -> dispatchTask -> toCommand] -> ExecutorDispatcher[dispatch] -> NettyExecutorManager[execute -> doExecute] -> nettyRemotingClient[send(host, command)] CommandType.TASK_DISPATCH_REQUEST

WorkerServer(SpringApplication)

1
2
3
4
5
[3]任务分发命令:
TaskDispatchProcessor[process] -> WorkerManagerThread[offer](放入waitSubmitQueue)

[4]WorkerManagerThread[start -> run] -> 消费waitSubmitQueue -> WorkerExecService[submit] -> TaskExecuteThread[run] -> 这中间会发生netty请求,commandType为TASK_EXECUTE_RUNNING -> ShellTask[handle] -> ShellCommandExecutor[run] -> WorkerMessageSender[sendMessageWithRetry] -> TaskExecuteRunningMessageSender[sendMessage] -> WorkerRpcClient[send] -> nettyRemotingClient[send(host, command)] ->发生netty请求,commandType为TASK_EXECUTE_RESULT
作业执行完成 -> 转[6]

MasterRpcServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
MasterServer[run(MasterServer.class) -> run] -> MasterRpcServer[init] -> NettyRemotingServer[registerProcessor] -> NettyServerHandler[registerProcessor] -> 放入processors(一个map)

接收到客户端消息后
NettyServerHandler[channelRead -> processReceived -> process -> submit] -> 根据Command类型,选择相应的Processor处理器,核心是一个Processor处理器对应一个线程池,线程池去执行

[8]状态事件命令:
StateEventProcessor[process] -> StateEventResponseService[addStateChangeEvent] -> 放入eventQueue

消费eventQueue(注:该eventQueue与TaskEventService中的eventQueue不同)
启动线程StateEventResponseWorker[run -> persist] -> WorkflowExecuteThreadPool[submitStateEvent]
-> WorkflowExecuteRunnable[addStateEvent] -> 放入stateEvents

[5]任务执行命令: Event
TaskExecuteRunningProcessor[process] -> 有TaskExecuteRunningProcessor,有一个defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS)线程池->
TaskEventService[addEvent](创建一个taskEvent,把taskEvent放入eventQueue = new LinkedBlockingQueue<>()队列中)

[6]任务事件分发线程
TaskEventDispatchThread[run] -> 消费eventQueue -> TaskExecuteThreadPool[submitTaskEvent -> eventHandler](也是一个线程池) -> 放入taskExecuteThreadMap -> 初始化一个线程任务TaskExecuteRunnable[addEvent] -> 放入一个队列events

消费events -> TaskExecuteRunnable[run] -> event.getEvent()根据事件类型执行相应的事件 -> TaskRunningEventHandler[handleTaskEvent] -> ProcessInstanceExecCacheManager[getByProcessInstanceId] -> 从processInstanceExecCacheManager中取操作实例的工作流任务 -> WorkflowExecuteThreadPool[submitStateEvent] -> WorkflowExecuteRunnable[addStateEvent] -> 放入stateEvents(供EventExecuteService消费)

EventExecuteService

1
2
3
4
5
6
7
8
状态处理服务
[7]EventExecuteService[run -> eventHandler] -> WorkflowExecuteThreadPool[executeEvent] -> WorkflowExecuteRunnable[handleEvents] -> 消费stateEvents -> WorkflowStateEventHandler[handleStateEvent] | TaskStateEventHandler[handleStateEvent] -> WorkflowExecuteRunnable[taskFinished -> submitPostNode -> 转[1] -> addTaskToStandByList -> submitStandByTask(放入readyToSubmitTaskQueue) -> submitStandByTask -> submitTaskExec(放入stateEvents) -> notifyProcessHostUpdate] -> NettyExecutorManager[doExecute] -> nettyRemotingClient[send(host, command)] -> BaseTaskProcessor[submit] -> CommonTaskProcessor[submitTask]

重点:WorkflowExecuteRunnable[submitTaskExec]

回调函数:
WorkflowExecuteThread[handleEvents -> addCallback(onSuccess) -> WorkflowExecuteThreadPool[notifyProcessChanged] -> notifyProcess[notify process's master]
CommandType.STATE_EVENT_REQUEST

api(用户入口)上线一个Project

1
2
3
SchedulerController[online] -> SchedulerServiceImpl[setScheduleState -> setSchedule] -> QuartzExecutorImpl[addJob] -> RemoteScheduler[scheduleJob] 供ProcessScheduleJob调用

ProcessScheduleJob[executeInternal] -> ProcessServiceImpl[createCommand] -> commandMapper[insert] 供MasterSchedulerBootstrap去扫描
Master分析
服务注册

关键词:Zookeeper、Curator

MasterServer(Application)

1
2
3
4
5
6
MasterRegistryClient[init(心跳机制) -> start -> registry](创建节点)

MasterRegistryClient#start -> RegistryClient[addConnectionStateListener] -> ZookeeperRegistry#addConnectionStateListener -> CuratorFramework#getConnectionStateListenable#addListener(new ZookeeperConnectionStateListener(listener))
监听CONNECTED、SUSPENDED、RECONNECTED、DISCONNECTED等几种事件

MasterRegistryClient#start -> RegistryClient#subscribe -> ZookeeperRegistry#subscribe -> TreeCache#start

心跳机制

1
MasterRegistryClient[init(初始化一个心跳线程池) -> registry] -> ScheduledExecutorService[scheduleAtFixedRate] -> HeartBeatTask[run]
定时调度

关键词:Quartz

跳转至调度主流程的 V3.0.0-release 的”api(用户入口)上线一个Project” 流程分析

故障转移
FailoverExecuteThread

master主要转移实例,根据实例拿到任务列表

1
MasterRegistryClient[removeMasterNodePath] -> MasterFailoverService[failoverMaster -> doFailoverMaster -> failoverTaskInstance -> sendKillCommandToWorker(打上需要容错的状态ExecutionStatus.NEED_FAULT_TOLERANCE 和普通的任务进行区分)] -> NettyExecutorManager[doExecute] -> nettyRemotingClient.send(host, command)
Worker分析
任务重试/恢复机制
1
WorkServer#run -> MessageRetryRunner#run -> TaskExecuteRunningMessageSender#sendMessage
故障转移

work主要转移任务

1
MasterRegistryClient[removeMasterNodePath] -> WorkerFailoverService[failoverWorker -> failoverTaskInstance -> ] -> workflowExecuteThreadPool.submitStateEvent(stateEvent);
重点总结
项目模块分析
dolphinScheduler-alert
1
2
3
4
5
AlertServer[checkTable] -> 判断是否又一个插件表,有则说明告警服务启动

AlertServer[startServer] -> nettyRemotingServer.registerProcessor(CommandType.ALERT_SEND_REQUEST, alertRequestProcessor);

AlertSenderService[run -> send -> alertResultHandler] -> EmailAlertChannel[process] -> MailSender[sendMails] -> email.send();
dolphinScheduler-data-quality

细分为三个组件BatchReader、BatchTransformer、BatchWriter

1
2
3
data-quality模块基于spark实现,打包成一个jar。
spark-submit进行提交
spark程序执行过程中,生成很多临时表,再结合定义的数据质量,进行判断
dolphinScheduler-log-server
1
2
3
4
5
6
7
8
9
10
LoggerController[queryLog] -> LoggerServiceImpl[queryLog] -> LogClientService[rollViewLog] -> 
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); -> [1]

定时扫描返回结果ResponseFutrue,执行回调函数
NettyRemotingClient[start] ->
this.responseFutureExecutor.scheduleAtFixedRate(ResponseFuture::scanFutureTable, 5000, 1000, TimeUnit.MILLISECONDS);

netty会把请求转给LoggerRequestProcessor
LoggerRequestProcessor[process] -> channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque()));
返回给[1]
dolphinScheduler-standalone-server
1
2


dolphinScheduler-registry

概述

1
2
3
4
5
6
7
8
9
10
11
master注册
MasterRegistryClient[registry] -> RegistryClient[persistEphemeral(完成注册,注册的节点为临时节点,masters、works根节点为持久节点) -> handleDeadServer] -> ZookeeperRegistry[put]
MasterRegistryClient[registry] -> RegistryClient[addConnectionStateListener -> subscribe]

ServerNodeManager加载
ServerNodeManager[afterPropertiesSet -> load -> updateMasterNodes -> syncMasterNodes] -> ServerNodeManager[load -> syncWorkerGroupNodes]
HeartBeatTask

work
WorkerServer[run] -> WorkerRegistryClient[registry] -> RegistryClient[handleDeadServer] -> ZookeeperRegistry
WorkerServer[run] -> WorkerRegistryClient[handleDeadServer]

master注册-深入剖析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
入口MasterRegistryClient[start]

涉及的类
ZookeeperRegistry zookeeper的实际操作,全部由该类完成
ZookeeperConnectionStateListener 监听到zookeeper的连接状态变化
MasterConnectionStateListener 监听到事件后,具体处理由MasterConnectionStateListener (连接、失连、重连、挂起)

TreeCacheListener来监听/nodes下的路径变化
MasterRegistryDataListener 监听到事件后,具体处理由MasterRegistryDataListener (增加、移除节点)

入口ServerNodeManager[afterPropertiesSet] 管理所有的Master、Work

涉及的类
masterNodes = new HashSet<>(); 所有的master
masterPriorityQueue = new MasterPriorityQueue(); 按照创建时间有一个优先级,和masterNodes一一对应

心跳
HeartBeat
cpuUsage 0.05, cpu使用率
memoryUsage 0.73, 内存使用率
loadAverage 2.88, 平均负载
availablePhysicalMemorySize 4.26, 可用物理内存大小
maxCpuloadAvg 24.0, 最大cpu平均负载
reservedMemory 0.3, 预留内存
startupTime 1667649412403, 启动时间
reportTime 1667649884474, 汇报时间
serverStatus 0, 服务状态
processId 49606, 系统进程id
workerHostWeight 0,
workerExecThreadCount 0,
workerWaitingTaskCount 0

Work注册-深入剖析

1
2
3
入口WorkerRegistryClient[registry]

涉及的类
dolphinScheduler-remote

基于netty的rpc通信,并未封装成rpc框架

1.Future异步方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
EventLoop next = eventExecutors.next();
Future<?> submit = next.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
int a = 1+2;
Thread.sleep(2000);
return a;
}
});
submit.addListener(new GenericFutureListener<Future<? super Object>>() {
@Override
public void operationComplete(Future<? super Object> future) throws Exception {
System.out.println(future.getNow());
}
});

剖析:
EventLoop实现了ScheduledExecutorService(可以理解为一个线程池)
NioEventLoopGroup 线程池组?

next.submit => 创建一个带有callable的PromiseTask
线程执行完成后,会执行setSuccessInternal()把结果放入DefaultPromise中result中
DefaultPromise中.addListener
=>
// result!=null
if (isDone()) {
notifyListeners();
}
通知监听者,执行事件。

2.Promise异步方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
EventLoop next = new NioEventLoopGroup().next();
DefaultPromise<Object> promise = new DefaultPromise<>(next);
new Thread(() -> {
try {
int a = 9 / 3;
Thread.sleep(2000);
promise.setSuccess(a);
} catch (InterruptedException e) {
promise.setFailure(e.getCause());
e.printStackTrace();
}
}).start();
System.out.println(promise.get());

promise.setSuccess(a); => 线程执行结束,直接设置结果result
promise.get() =>
核心为await方法
public Promise<V> await() throws InterruptedException {
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
checkDeadLock();
// 拿不到,就等待。直到setSuccess执行,会notifyAll
synchronized (this) {
while (!isDone()) {
incWaiters();
try {
wait();
} finally {
decWaiters();
}
}
}
return this;
}
dolphinScheduler-master

从MasterServer,即main方法出发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void run() throws SchedulerException {
// init rpc server
this.masterRPCServer.start();

// install task plugin
this.taskPluginManager.installPlugin();

// self tolerant
this.masterRegistryClient.init();
this.masterRegistryClient.start();
this.masterRegistryClient.setRegistryStoppable(this);

this.masterSchedulerBootstrap.init();
this.masterSchedulerBootstrap.start();

this.eventExecuteService.start();
this.failoverExecuteThread.start();

this.scheduler.start();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (Stopper.isRunning()) {
close("MasterServer shutdownHook");
}
}));
}

从package角度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
builder
TaskExecutionContextBuilder 任务执行上下文构建
cāche
ProcessInstanceExecCacheManager Hashmap缓存
config
MasterConfig
consumer
TaskPriorityQueueConsumer 任务优先级队列消费者
controller
WorkflowExecuteController 查询作业流的执行情况
dispatch
ExecutorDispatcher
选择work去分发,可定义分发策略(通过心跳获取work的cpu、内存等负载信息计算权重)
然后执行发送命令,重试次数为3词,分发3次
executor.NettyExecutorManager nettyRemotingClient 分发的过程中,和work进行通信
event
状态事件处理
PROCESS_STATE_CHANGE(0, "process state change") -> WorkflowStateEventHandler
定时调度、执行完,生成下一个执行时间的命令插入command表
TASK_STATE_CHANGE(1, "task state change") -> TaskStateEventHandler
是否完成,提交后后的任务实例在activeTaskProcessMap
iTaskProcessor.action(TaskAction.RUN); 执行它
PROCESS_TIMEOUT(2, "process timeout") -> WorkflowTimeoutStateEventHandler
发送告警 this.processAlertManager.sendProcessTimeoutAlert(this.processInstance, projectUser);
TASK_TIMEOUT(3, "task timeout") -> TaskTimeoutStateEventHandler
告警、失败、失败并告警三种策略
WAIT_TASK_GROUP(4, "wait task group") -> TaskWaitTaskGroupStateHandler
有一个任务组队列的概念,等待该队列的事件
TASK_RETRY(5, "task retry") -> TaskRetryStateEventHandler
把重试队列中的任务实例 waitToRetryTaskInstanceMap
重新提交到readyToSubmitTaskQueue重新执行一遍
PROCESS_BLOCKED(6, "process blocked") -> WorkflowBlockStateEventHandler
从内存中taskInstanceMap获取任务实例,解析其参数,发现阻塞并告警
metrics
统计指标
processor
queue 处理过程中存放的一些队列
作业流、作业之间通信的processor,注册在netty上
StateEventProcessor -> processor.queue.StateEventResponseService
主要有两种,一种PROCESS_STATE_CHANGE 和 TASK_STATE_CHANGE
StateEventResponseWorker -> 刷新process or task实例的状态
runner.WorkflowExecuteRunnable消费stateEvents中的状态事件
根据状态类型,分发给不同的状态处理器EventHandler进行处理
registry
注册到zookeeper等上,再把注册信息管理起来,ServerNodeManager等
rpc
通信,配合一系列的Processor
rpc.MasterRPCServer
runner
task 任务执行
真正的作业流执行,重点可关注 WorkflowEventLooper、ITaskProcessor(不同的节点,对应不同的处理)
service
额外的服务层
查询执行数据、故障转移服务

工作流的创建

1
2
3
4
5
6
7
8
9
10
WorkflowExecuteRunnable[buildFlowDag -> generateFlowDag] -> DagHelper[generateFlowDag ->     /**
* generate task nodes needed by dag
*
* @param taskNodeList taskNodeList 该process 所有的tasknode
* @param startNodeNameList startNodeNameList 开始tasknode
* @param recoveryNodeCodeList recoveryNodeCodeList 只执行dag中的一部分,恢复作业流中用到
* @param taskDependType taskDependType 任务依赖类型
* @return task node list
*/
public static List<TaskNode> generateFlowNodeListByStartNode(List<TaskNode> taskNodeList, List<String> startNodeNameList,List<String> recoveryNodeCodeList, TaskDependType taskDependType)

作业以及作业流并行度问题

如何实现翻牌?

主要通过由quartz来完成

dolphinScheduler-worker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
config
BeanConfig 告警客户端服务
WorkConfig
message
work向master发送的消息
MessageRetryRunner 重试消息
TaskExecuteResultMessageSender 作业执行结果消息
TaskExecuteRunningMessageSender 作业执行运行中的消息
metrics
统计指标
任务类型执行计数 TASK_TYPE_EXECUTE_COUNTER
未知的任务 UNKNOWN_TASK_EXECUTE_COUNTER
processor
任务执行、任务分发、修改Host
registry
work注册到zookeeper等上
rpc
通信,配合一系列的Processor
rpc.WorkerRpcServer 接受
rpc.WorkerRpcClient 发送
runner
TaskDispatchProcessor -> waitSubmitQueue(等待提交的队列) -> WorkerManagerThread -> WorkerExecService -> TaskExecuteThread
dolphinScheduler-meter

整合Prometheus + grafna

dolphinScheduler-microbench

基准测试?

dolphinScheduler-python

基于python实现一套

dolphinScheduler-spi
1
2
alert plugins、task plugins、datasource plugins、registry-plugins
公用的类,提取到spi
dolphinScheduler-tools
1
执行数据库脚本,之类的初始化工具等
dolphinScheduler-server
1
master、worker中通用server代码,HeartBeatTask
dolphinScheduler-service
1
master、worker中通用service
dolphinScheduler-api
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
环境管理
export HADOOP_HOME=${HADOOP_HOME:-/opt/soft/hadoop}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/opt/soft/hadoop/etc/hadoop}
export SPARK_HOME1=${SPARK_HOME1:-/opt/soft/spark1}
export SPARK_HOME2=${SPARK_HOME2:-/opt/soft/spark2}
export HIVE_HOME=${HIVE_HOME:-/opt/soft/hive}
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH

./sbin/start-thriftserver.sh \
--master yarn \
--driver-memory 2G \
--executor-memory 2G \
--num-executors 2 \
--executor-cores 2 \
--hiveconf hive.server2.thrift.port=10000
数据库表设计
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
CREATE TABLE `t_ds_task_definition` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'self-increasing id', 自增id
`code` bigint(20) NOT NULL COMMENT 'encoding', 编码
`name` varchar(200) DEFAULT NULL COMMENT 'task definition name', 作业名
`version` int(11) DEFAULT '0' COMMENT 'task definition version', 版本
`description` text COMMENT 'description', 描述
`project_code` bigint(20) NOT NULL COMMENT 'project code', 项目编码
`user_id` int(11) DEFAULT NULL COMMENT 'task definition creator id', 用户id
`task_type` varchar(50) NOT NULL COMMENT 'task type', 任务类型
`task_params` longtext COMMENT 'job custom parameters', 任务参数
`flag` tinyint(2) DEFAULT NULL COMMENT '0 not available, 1 available', 上下线
`task_priority` tinyint(4) DEFAULT NULL COMMENT 'job priority', 任务优先级
`worker_group` varchar(200) DEFAULT NULL COMMENT 'worker grouping', 分组
`environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code', 环境编码
`fail_retry_times` int(11) DEFAULT NULL COMMENT 'number of failed retries', 失败重试次数
`fail_retry_interval` int(11) DEFAULT NULL COMMENT 'failed retry interval', 失败重试间隔
`timeout_flag` tinyint(2) DEFAULT '0' COMMENT 'timeout flag:0 close, 1 open', 是否超时
`timeout_notify_strategy` tinyint(4) DEFAULT NULL COMMENT 'timeout notification policy: 0 warning, 1 fail', 超时策略
`timeout` int(11) DEFAULT '0' COMMENT 'timeout length,unit: minute', 超时事件
`delay_time` int(11) DEFAULT '0' COMMENT 'delay execution time,unit: minute', 延迟执行时间
`resource_ids` text COMMENT 'resource id, separated by comma', 资源ids
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime NOT NULL COMMENT 'update time',
`task_group_id` int(11) DEFAULT NULL COMMENT 'task group id', 任务分组
`task_group_priority` tinyint(4) DEFAULT '0' COMMENT 'task group priority', 分组优先级
PRIMARY KEY (`id`,`code`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;


CREATE TABLE `t_ds_process_definition` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'self-increasing id',
`code` bigint(20) NOT NULL COMMENT 'encoding',
`name` varchar(255) DEFAULT NULL COMMENT 'process definition name',
`version` int(11) DEFAULT '0' COMMENT 'process definition version',
`description` text COMMENT 'description',
`project_code` bigint(20) NOT NULL COMMENT 'project code',
`release_state` tinyint(4) DEFAULT NULL COMMENT 'process definition release state:0:offline,1:online',
`user_id` int(11) DEFAULT NULL COMMENT 'process definition creator id',
`global_params` text COMMENT 'global parameters',
`flag` tinyint(4) DEFAULT NULL COMMENT '0 not available, 1 available',
`locations` text COMMENT 'Node location information',
`warning_group_id` int(11) DEFAULT NULL COMMENT 'alert group id',
`timeout` int(11) DEFAULT '0' COMMENT 'time out, unit: minute',
`tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id',
`execution_type` tinyint(4) DEFAULT '0' COMMENT 'execution_type 0:parallel,1:serial wait,2:serial discard,3:serial priority',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime NOT NULL COMMENT 'update time',
PRIMARY KEY (`id`,`code`),
UNIQUE KEY `process_unique` (`name`,`project_code`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;

CREATE TABLE `t_ds_command` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key',
`command_type` tinyint(4) DEFAULT NULL COMMENT 'Command type: 0 start workflow, 1 start execution from current node, 2 resume fault-tolerant workflow, 3 resume pause process, 4 start execution from failed node, 5 complement, 6 schedule, 7 rerun, 8 pause, 9 stop, 10 resume waiting thread',
`process_definition_code` bigint(20) NOT NULL COMMENT 'process definition code',
`process_definition_version` int(11) DEFAULT '0' COMMENT 'process definition version',
`process_instance_id` int(11) DEFAULT '0' COMMENT 'process instance id',
`command_param` text COMMENT 'json command parameters',
`task_depend_type` tinyint(4) DEFAULT NULL COMMENT 'Node dependency type: 0 current node, 1 forward, 2 backward',
`failure_strategy` tinyint(4) DEFAULT '0' COMMENT 'Failed policy: 0 end, 1 continue',
`warning_type` tinyint(4) DEFAULT '0' COMMENT 'Alarm type: 0 is not sent, 1 process is sent successfully, 2 process is sent failed, 3 process is sent successfully and all failures are sent',
`warning_group_id` int(11) DEFAULT NULL COMMENT 'warning group',
`schedule_time` datetime DEFAULT NULL COMMENT 'schedule time',
`start_time` datetime DEFAULT NULL COMMENT 'start time',
`executor_id` int(11) DEFAULT NULL COMMENT 'executor id',
`update_time` datetime DEFAULT NULL COMMENT 'update time',
`process_instance_priority` int(11) DEFAULT NULL COMMENT 'process instance priority: 0 Highest,1 High,2 Medium,3 Low,4 Lowest',
`worker_group` varchar(64) DEFAULT NULL COMMENT 'worker group',
`environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code',
`dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flag:0 normal, 1 dry run', 空跑(直接成功)
PRIMARY KEY (`id`),
KEY `priority_id_index` (`process_instance_priority`,`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8;
项目部署
集群部署(伪集群方式)

dolphinScheduler还是要跑起来,学习更好的产品设计

功能启发

dolphinScheduler是如何实现DAG的?