最近在调研分布式任务如何选型,最终选择了比较年轻的PowerJob
,下面会简单介绍下这个框架的使用以及它的运行流程。
一、选择PowerJob的原因
1.1:同类产品对比
官方文档给出的同类产品对比图:

作者の人生经验:https://zhuanlan.zhihu.com/p/157614020
1.2:特点
定制方面:代码较简单,易于理解和改造,比如我们就集成了自己的服务发现平台来管理powerjob服务端节点。
功能方面:很全面,我们能想到的功能它全部支持
体积方面:非常轻量,代码量少,而且不依赖外部乱七八糟的服务(比如zk),仅需要一个mysql即可
1.3:成熟度
产品上线仅3个月,已积累1.8k
的star:

并且已有较大的公司和机构接入:

二、PowerJob的工作流程
2.1:基本概念:app、worker、job、server
app可以理解为我们的一个工程项目,worker可以理解成一个app的集群节点,而job则是一个任务(可以是简单的定时任务,也可以是复杂的MapReduce),它跟具体某个app对应,而一个app则可以拥有很多job,它们的关系如下:

server即为PowerJob节点,主要负责任务的监听和派发,可以单点部署,也可以集群部署,它的工作流程详细参考2.2
和2.3
2.2:app&server的绑定
首先,在worker里配置上所需的server节点信息(这些节点信息也可以从服务发现获取),在worker启动时会注册到server,此时server便拥有了所有app的worker信息:

这层绑定关系在worker们启动后即可确认,这时worker端会启动两个定时任务,一个是heartbeat,用来给server端发送心跳,这样server端即可知道对应app有多少个worker在运行了,另外一个是discovery,用来同步server端状态,如果有备份server,可以用来做高可用。
图里powerjob服务端是单点部署,这不高可用,下面来看下powerjob是如何实现服务高可用的。
2.3:高可用
2.2的过程运转的不错,但是如果server端故障,那么所有的任务将直接终止无法执行,这是我们不愿意看到的,因此需要给powerjob备份一个节点,假设现在它有两个节点,那么当单点故障时,powerjob会通过discovery机制做故障转移:

我们目前基于集群部署,一般有3台机器,一台master
,两台slave
。
2.4:server端的调度
说完了整体的绑定流程,下面来详细看下server端是如何轮询和派发任务的:

2.5:部署顺序
- 部署PowerJob的server端(这个一般情况下都预先部署好了)
- 编写自己的job类app项目,写好各类job,在自身配置文件里指定一个server来调度自己
- 前往PowerJob客户端注册该app信息
- 启动发布该app项目(此时app集群会跟对应的server绑定上)
- 在PowerJob客户端利用该app登录,将该app里的job配置上去(此时便可指定cron表达式、并发度、是否mapreduce之类的信息)
经过上面的步骤,你在app内编写的job程序便可被对应的PowerJob的server调度到了,不过上面的过程是裸用powerjob时需要做的,现在已经被我们大仓简化了,具体用法会出使用文档。
三、任务类型&验证
3.1:如何定义PowerJob任务
任务类必须要实现powerjob提供的一些接口,它们继承关系图如下:

业务方只需要继承(实现)这些类(接口)即可,powerjob在执行任务时会率先从spring上下文里获取实例,如果你没使用spring,那么powerjob就会利用反射机制来触发你的业务逻辑(这在下方具体实例中有所体现,表单里填写的是类的全限定名)。
3.1:任务类型-单机任务
这种就是普通定期执行的任务,属于最常用最普通的任务,现在来做下测试,测试用例代码如下:
代码块11 2 3 4 5 6 7 8 9 10
| @Slf4j @Component public class StandaloneProcessor implements BasicProcessor { @Override public ProcessResult process(TaskContext context) { log.info("简单定时任务-触发!,参数是:{}", context.getJobParams()); return new ProcessResult(true, context + ": " + true); } }
|
然后将我们的job发布,发布完成后在powerjob平台对应app下配置该任务的调度信息:

配好之后就可以运行我们的job了,来看看日志平台的打印:

3.2:任务类型-广播任务
3.2.1:广播任务改造
将上面的任务改成广播模式:


其实就是由原先单机触发,广播给worker集群里每个节点都触发一次。
3.2.2:广播模式接口实现
实例代码如下:
代码块21 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Slf4j @Component public class BroadcastProcessorDemo extends BroadcastProcessor { @Override public ProcessResult preProcess(TaskContext context) throws Exception { log.info("广播前,参数:{}", context.getJobParams()); return new ProcessResult(true); } @Override public ProcessResult process(TaskContext taskContext) throws Exception { log.info("广播核心逻辑触发!参数:{}", taskContext.getJobParams()); return new ProcessResult(true); } @Override public ProcessResult postProcess(TaskContext context, List<TaskResult> taskResults) throws Exception { log.info("广播任务执行完毕,reduce触发!TaskContext: {},List<TaskResult>: {}", JSONObject.toJSONString(context), JSONObject.toJSONString(taskResults)); return new ProcessResult(true, "success"); } }
|
然后在powerjob设置该任务:

运行结果如下:

广播模式执行流程如下(可以跟下方的MapReduce模式坐下对比):

3.3:任务类型-Map(大任务拆分)
map就是一次大的任务可以被拆分成细碎的小批次任务进行分布式执行,测试用例代码如下:
代码块31 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| @Slf4j @Component public class MapProcessorDemo extends MapProcessor { private static final int batchSize = 100; private static final int batchNum = 2; @Override public ProcessResult process(TaskContext context) throws Exception { if (isRootTask()) { log.info("根任务,需要做任务拆分~"); List<SubTask> subTasks = Lists.newLinkedList(); for (int j = 0; j < batchNum; j++) { SubTask subTask = new SubTask(); subTask.siteId = j; subTask.itemIds = Lists.newLinkedList(); subTasks.add(subTask); for (int i = 0; i < batchSize; i++) { subTask.itemIds.add(i); } } return map(subTasks, "MAP_TEST_TASK"); } else { SubTask subTask = (SubTask) context.getSubTask(); log.info("子任务,拿到的批次实体为:{}", JSON.toJSONString(subTask)); return new ProcessResult(true, "RESULT:true"); } } @Getter @NoArgsConstructor @AllArgsConstructor private static class SubTask { private Integer siteId; private List<Integer> itemIds; } }
|
代码意义注释已给出,发布完成后可在powerjob平台配置,如下:

然后看下运行结果:


上面就是一次map任务触发的演示过程(注:被拆分的map子任务只要有一个失败,即认为整个map任务为失败状态,但不具备事务性)。
Map任务执行流程如下:

3.4:任务类型-MapReduce(大任务拆分与归并)
相比普通map,MapReduce在子任务执行完毕后可以知道它们的执行结果,并做出接下来的自定义逻辑处理,测试用例代码如下:
代码块41 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| @Slf4j @Component public class MapReduceProcessorDemo extends MapReduceProcessor { private static final int batchSize = 100; private static final int batchNum = 2; @Override public ProcessResult process(TaskContext context) { if (isRootTask()) { log.info("根任务,需要做任务拆分~"); List<SubTask> subTasks = Lists.newLinkedList(); for (int j = 0; j < batchNum; j++) { SubTask subTask = new SubTask(); subTask.siteId = j; subTask.itemIds = Lists.newLinkedList(); subTasks.add(subTask); for (int i = 0; i < batchSize; i++) { subTask.itemIds.add(i); } } return map(subTasks, "MAP_TEST_TASK"); } else { SubTask subTask = (SubTask) context.getSubTask(); log.info("子任务,拿到的批次实体为:{}", JSON.toJSONString(subTask)); return new ProcessResult(true, "RESULT:true"); } } @Override public ProcessResult reduce(TaskContext context, List<TaskResult> taskResults) { log.info("子任务执行完毕,reduce触发!TaskContext: {},List<TaskResult>: {}", JSONObject.toJSONString(context), JSONObject.toJSONString(taskResults)); return new ProcessResult(true, "RESULT:true"); } @Getter @NoArgsConstructor @AllArgsConstructor private static class SubTask { private Integer siteId; private List<Integer> itemIds; } }
|
代码意义注释已给出,发布完成后可在powerjob平台配置,如下:

现在看下运行结果:


MapReduce的运行流程如下:

3.5:工作流
工作量,顾名思义,遵循任务A → 任务B → 任务C
这个流程,只需要在表单里选中下方选项即可将任务本身设置成一个工作流任务:

需要注意的是,工作流有自己的调度触发器,因此后面框框即便填了CRON表达式,也不会生效。
现在让我们将前面实验中的所有任务都设置成工作流模式:

现在去工作流编辑里编辑工作流触发顺序:


任务按照编排好的顺序,执行了下来。
现在我们把工作流改成下面这样:

触发顺序就成了下面这样:

四、定时类型&验证
4.1:CRON表达式
前面的例子均通过该方式触发,支持一般CRON表达式,但是不支持秒级任务(即便配置了每秒执行一次,实际执行却是15s
一次,秒级任务可以通过固定频率
或固定延迟
来做~),由CRON表达式触发的定时任务,在任务本身超时的情况下,仍然保持对应频率执行,比如,我们让某个简单定时任务每1min执行一次,但实际运行的业务逻辑调成2min,系统调度频率如下:

可以看到,即便是任务需要花费很长时间,任务也是按照每一分钟一次的频率调度,但接下来介绍的延时任务就不一样了。
4.2:固定频率
如果你需要让某个任务按照固定某个频率执行,可以尝试使用固定频率来做:

来看下它的调度结果:

4.3:固定延迟
如果你需要让某个任务按照固定某个频率延迟执行,沿用4.1
的例子,现在配置成延迟任务:

它的调度结果如下:

可以看到,现在是2min调度一次,相比CRON和固定频率,这个调度是串行化的,后续的任务需要前面的任务执行完才可以执行。
五、其他
5.1:任务表单
如果对创建任务时表单的每一项不是很了解,请参考官方文档:https://www.yuque.com/powerjob/guidence/nyio9g#v8uF4
5.2:如何配置工作流?
参考文档:https://www.yuque.com/powerjob/guidence/ysug77#xgylz(不太好用,用的时候需要注意)