分布式任务调度系统-PowerJob

最近在调研分布式任务如何选型,最终选择了比较年轻的PowerJob,下面会简单介绍下这个框架的使用以及它的运行流程。

一、选择PowerJob的原因

1.1:同类产品对比

官方文档给出的同类产品对比图:

表1

作者の人生经验:https://zhuanlan.zhihu.com/p/157614020

1.2:特点

定制方面:代码较简单,易于理解和改造,比如我们就集成了自己的服务发现平台来管理powerjob服务端节点。

功能方面:很全面,我们能想到的功能它全部支持

体积方面:非常轻量,代码量少,而且不依赖外部乱七八糟的服务(比如zk),仅需要一个mysql即可

1.3:成熟度

产品上线仅3个月,已积累1.8k的star:

图1

并且已有较大的公司和机构接入:

图2

二、PowerJob的工作流程

2.1:基本概念:app、worker、job、server

app可以理解为我们的一个工程项目,worker可以理解成一个app的集群节点,而job则是一个任务(可以是简单的定时任务,也可以是复杂的MapReduce),它跟具体某个app对应,而一个app则可以拥有很多job,它们的关系如下:

图3

server即为PowerJob节点,主要负责任务的监听和派发,可以单点部署,也可以集群部署,它的工作流程详细参考2.22.3

2.2:app&server的绑定

首先,在worker里配置上所需的server节点信息(这些节点信息也可以从服务发现获取),在worker启动时会注册到server,此时server便拥有了所有app的worker信息:

图4

这层绑定关系在worker们启动后即可确认,这时worker端会启动两个定时任务,一个是heartbeat,用来给server端发送心跳,这样server端即可知道对应app有多少个worker在运行了,另外一个是discovery,用来同步server端状态,如果有备份server,可以用来做高可用。

图里powerjob服务端是单点部署,这不高可用,下面来看下powerjob是如何实现服务高可用的。

2.3:高可用

2.2的过程运转的不错,但是如果server端故障,那么所有的任务将直接终止无法执行,这是我们不愿意看到的,因此需要给powerjob备份一个节点,假设现在它有两个节点,那么当单点故障时,powerjob会通过discovery机制做故障转移:

图5

我们目前基于集群部署,一般有3台机器,一台master,两台slave

2.4:server端的调度

说完了整体的绑定流程,下面来详细看下server端是如何轮询和派发任务的:

图6

2.5:部署顺序

  1. 部署PowerJob的server端(这个一般情况下都预先部署好了)
  2. 编写自己的job类app项目,写好各类job,在自身配置文件里指定一个server来调度自己
  3. 前往PowerJob客户端注册该app信息
  4. 启动发布该app项目(此时app集群会跟对应的server绑定上)
  5. 在PowerJob客户端利用该app登录,将该app里的job配置上去(此时便可指定cron表达式、并发度、是否mapreduce之类的信息)

经过上面的步骤,你在app内编写的job程序便可被对应的PowerJob的server调度到了,不过上面的过程是裸用powerjob时需要做的,现在已经被我们大仓简化了,具体用法会出使用文档。

三、任务类型&验证

3.1:如何定义PowerJob任务

任务类必须要实现powerjob提供的一些接口,它们继承关系图如下:

图7

业务方只需要继承(实现)这些类(接口)即可,powerjob在执行任务时会率先从spring上下文里获取实例,如果你没使用spring,那么powerjob就会利用反射机制来触发你的业务逻辑(这在下方具体实例中有所体现,表单里填写的是类的全限定名)。

3.1:任务类型-单机任务

这种就是普通定期执行的任务,属于最常用最普通的任务,现在来做下测试,测试用例代码如下:

代码块1
1
2
3
4
5
6
7
8
9
10
@Slf4j
@Component
public class StandaloneProcessor implements BasicProcessor { //实现BasicProcessor接口

@Override
public ProcessResult process(TaskContext context) { //核心触发逻辑
log.info("简单定时任务-触发!,参数是:{}", context.getJobParams());
return new ProcessResult(true, context + ": " + true);
}
}

然后将我们的job发布,发布完成后在powerjob平台对应app下配置该任务的调度信息:

图8

配好之后就可以运行我们的job了,来看看日志平台的打印:

图9

3.2:任务类型-广播任务

3.2.1:广播任务改造

将上面的任务改成广播模式:

图10

图11

其实就是由原先单机触发,广播给worker集群里每个节点都触发一次。

3.2.2:广播模式接口实现

实例代码如下:

代码块2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Slf4j
@Component
public class BroadcastProcessorDemo extends BroadcastProcessor { //继承BroadcastProcessor类

@Override
public ProcessResult preProcess(TaskContext context) throws Exception { //在所有节点广播执行前执行,只会在一台机器执行一次
log.info("广播前,参数:{}", context.getJobParams());
return new ProcessResult(true);
}

@Override
public ProcessResult process(TaskContext taskContext) throws Exception { //核心逻辑,会广播给所有节点并行处理
log.info("广播核心逻辑触发!参数:{}", taskContext.getJobParams());
return new ProcessResult(true);
}

@Override
public ProcessResult postProcess(TaskContext context, List<TaskResult> taskResults) throws Exception { //在所有节点广播执行完成后执行,只会在一台机器执行一次
//通知执行结果,有点类似下面要测试的MapReduce的reduce方法
log.info("广播任务执行完毕,reduce触发!TaskContext: {},List<TaskResult>: {}",
JSONObject.toJSONString(context), JSONObject.toJSONString(taskResults));
return new ProcessResult(true, "success");
}
}

然后在powerjob设置该任务:

图12

运行结果如下:

图13

广播模式执行流程如下(可以跟下方的MapReduce模式坐下对比):

图14

3.3:任务类型-Map(大任务拆分)

map就是一次大的任务可以被拆分成细碎的小批次任务进行分布式执行,测试用例代码如下:

代码块3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Slf4j
@Component
public class MapProcessorDemo extends MapProcessor { //继承MapProcessor

private static final int batchSize = 100; //单批发送数据量
private static final int batchNum = 2; //一共2批,默认上限为200批,再多就要适当调整batchSize大小了

@Override
public ProcessResult process(TaskContext context) throws Exception {

if (isRootTask()) { //如果是根任务(说明map刚被调度到),则触发任务拆分
log.info("根任务,需要做任务拆分~");
List<SubTask> subTasks = Lists.newLinkedList();
for (int j = 0; j < batchNum; j++) {
SubTask subTask = new SubTask();
subTask.siteId = j;
subTask.itemIds = Lists.newLinkedList();
subTasks.add(subTask); //批次入集合
for (int i = 0; i < batchSize; i++) { //内部id集合,这里只是举例,实际业务场景可以是从db里获取的业务id集合
subTask.itemIds.add(i);
}
}
return map(subTasks, "MAP_TEST_TASK"); //最后调用map,触发这些批次任务的执行
} else { //子任务,说明批次已做过拆分,此时被调度到时会触发下方逻辑
SubTask subTask = (SubTask) context.getSubTask(); //直接从上下文对象里拿到批次对象
log.info("子任务,拿到的批次实体为:{}", JSON.toJSONString(subTask));
return new ProcessResult(true, "RESULT:true");
}
}

@Getter
@NoArgsConstructor
@AllArgsConstructor
private static class SubTask { //定义批次实体(业务方可自由发挥)
private Integer siteId; //批次id
private List<Integer> itemIds; //批次内部所携带的id(可以是我们自己的业务id)
}
}

代码意义注释已给出,发布完成后可在powerjob平台配置,如下:

图15

然后看下运行结果:

图16

图17

上面就是一次map任务触发的演示过程(注:被拆分的map子任务只要有一个失败,即认为整个map任务为失败状态,但不具备事务性)。

Map任务执行流程如下:

图18

3.4:任务类型-MapReduce(大任务拆分与归并)

相比普通map,MapReduce在子任务执行完毕后可以知道它们的执行结果,并做出接下来的自定义逻辑处理,测试用例代码如下:

代码块4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Slf4j
@Component
public class MapReduceProcessorDemo extends MapReduceProcessor { //需要继承MapReduceProcessor

private static final int batchSize = 100;
private static final int batchNum = 2;

@Override
public ProcessResult process(TaskContext context) { //该方法跟普通map方法实现一致,主要用来拆分子任务和执行子任务

if (isRootTask()) {
log.info("根任务,需要做任务拆分~");
List<SubTask> subTasks = Lists.newLinkedList();
for (int j = 0; j < batchNum; j++) {
SubTask subTask = new SubTask();
subTask.siteId = j;
subTask.itemIds = Lists.newLinkedList();
subTasks.add(subTask); //批次入集合
for (int i = 0; i < batchSize; i++) {
subTask.itemIds.add(i);
}
}
return map(subTasks, "MAP_TEST_TASK");
} else {
SubTask subTask = (SubTask) context.getSubTask();
log.info("子任务,拿到的批次实体为:{}", JSON.toJSONString(subTask));
return new ProcessResult(true, "RESULT:true");
}
}

@Override
public ProcessResult reduce(TaskContext context, List<TaskResult> taskResults) { //相比普通map任务,多出reduce方法,这里将两个参数全部打印出来
log.info("子任务执行完毕,reduce触发!TaskContext: {},List<TaskResult>: {}",
JSONObject.toJSONString(context), JSONObject.toJSONString(taskResults));
return new ProcessResult(true, "RESULT:true");
}

@Getter
@NoArgsConstructor
@AllArgsConstructor
private static class SubTask {
private Integer siteId;
private List<Integer> itemIds;
}
}

代码意义注释已给出,发布完成后可在powerjob平台配置,如下:

图19

现在看下运行结果:

图20

图21

MapReduce的运行流程如下:

图22

3.5:工作流

工作量,顾名思义,遵循任务A → 任务B → 任务C这个流程,只需要在表单里选中下方选项即可将任务本身设置成一个工作流任务:

图23

需要注意的是,工作流有自己的调度触发器,因此后面框框即便填了CRON表达式,也不会生效。

现在让我们将前面实验中的所有任务都设置成工作流模式:

图24

现在去工作流编辑里编辑工作流触发顺序:

图25

图26

任务按照编排好的顺序,执行了下来。

现在我们把工作流改成下面这样:

图27

触发顺序就成了下面这样:

图28

四、定时类型&验证

4.1:CRON表达式

前面的例子均通过该方式触发,支持一般CRON表达式,但是不支持秒级任务(即便配置了每秒执行一次,实际执行却是15s一次,秒级任务可以通过固定频率固定延迟来做~),由CRON表达式触发的定时任务,在任务本身超时的情况下,仍然保持对应频率执行,比如,我们让某个简单定时任务每1min执行一次,但实际运行的业务逻辑调成2min,系统调度频率如下:

图29

可以看到,即便是任务需要花费很长时间,任务也是按照每一分钟一次的频率调度,但接下来介绍的延时任务就不一样了。

4.2:固定频率

如果你需要让某个任务按照固定某个频率执行,可以尝试使用固定频率来做:

图30

来看下它的调度结果:

图31

4.3:固定延迟

如果你需要让某个任务按照固定某个频率延迟执行,沿用4.1的例子,现在配置成延迟任务:

图32

它的调度结果如下:

图33

可以看到,现在是2min调度一次,相比CRON和固定频率,这个调度是串行化的,后续的任务需要前面的任务执行完才可以执行。

五、其他

5.1:任务表单

如果对创建任务时表单的每一项不是很了解,请参考官方文档:https://www.yuque.com/powerjob/guidence/nyio9g#v8uF4

5.2:如何配置工作流?

参考文档:https://www.yuque.com/powerjob/guidence/ysug77#xgylz(不太好用,用的时候需要注意)