利用CompletableFuture优化程序的执行效率

一、线程池的Future模式

在了解java8的CompletableFuture之前,先通过Future来解决一个问题,看个例子:

假设现在有一个网站,首页有顶部Banner位、左边栏、右边栏、用户信息几大模块需要加载,现在出一个接口,要求包装并吐出这几大模块的内容

先来抽象一个首页接口对象:

代码块1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class WebModule {

private String top; //顶部Banner位

private String left; //左边栏

private String right; //右边栏

private String user; //用户信息

//...get...set...

@Override
public String toString() {
return String.format("top: %s; left: %s; right: %s; user: %s", top, left, right, user);
}
}

现在提供下面几个业务方法来获取这些信息:

代码块2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private String getTop() { // 这里假设getTop需要执行200ms
try {
Thread.sleep(200L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "顶部banner位";
}

private String getLeft() { // 这里假设getLeft需要执行50ms
try {
Thread.sleep(50L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "左边栏";
}

private String getRight() { // 这里假设getRight需要执行80ms
try {
Thread.sleep(80L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "右边栏";
}

private String getUser() { // 这里假设getUser需要执行100ms
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "用户信息";
}

ok,现在来实现下这个接口:

代码块3
1
2
3
4
5
6
7
8
9
// 同步获取
public WebModule getWebModuleMsgSync() {
WebModule webModule = new WebModule();
webModule.setTop(getTop());
webModule.setLeft(getLeft());
webModule.setRight(getRight());
webModule.setUser(getUser());
return webModule;
}

上面的代码会一次调用一个方法来赋值,最终返回接口对象,这个方法的最终耗时为几个业务方法耗时的总和:

1
2
通过同步方法获取首页全部信息消耗时间:435ms
结果为:top: 顶部banner位; left: 左边栏; right: 右边栏; user: 用户信息

430ms左右的执行时间,其实这几个模块是相互独立没有影响的,因此可以使用线程池Future模式来进行多线程处理优化:

代码块4
1
2
3
4
5
6
7
8
9
10
11
12
13
// 异步获取
public WebModule getWebModuleMsgAsync() throws ExecutionException, InterruptedException {
Future top = executorService.submit(this::getTop);
Future left = executorService.submit(this::getLeft);
Future right = executorService.submit(this::getRight);
Future user = executorService.submit(this::getUser);
WebModule webModule = new WebModule();
webModule.setTop(top.get());
webModule.setLeft(left.get());
webModule.setRight(right.get());
webModule.setUser(user.get());
return webModule;
}

这几个方法会被异步执行,get方法会被阻塞,直到执行结束,运行结果如下:

1
2
通过异步方法获取首页全部信息消耗时间:276ms
结果为:top: 顶部banner位; left: 左边栏; right: 右边栏; user: 用户信息

可以看到,执行速度几乎降了近200ms,这取决于最慢的那个任务的耗时。

通过上述的例子可以发现,很多程序都是可以通过异步充分利用CPU资源的方式来进行优化处理的,单看上面的程序没什么问题,但是仔细想想会发现太过局限,因为几个模块相互独立,但在实际开发中,我们可能存在B方法需要拿到A方法的结果才可以往下进行的问题,所以上面的程序就不太适用了,java8出现了今天要说的一个内容:CompletableFuture,该类可以帮助你实现上面所说的任务顺序调度,不相干的程序依然在异步,相干的存在先后顺序的将会通过一定的设置来满足自己的顺序期望。

二、CompletableFuture

现在再来假设一个例子,现在存在以下几个方法的调用:zero方法、a方法、b方法、ab方法、c方法、d方法、e方法

定义如下:

代码块5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//各个方法,sleep当成是执行时间

private void zero() {
sleep(100L);
System.out.println("zero方法触发!\n-----------------------------");
}

private String a() {
sleep(500L);
return "a";
}

private String b(String a) {
sleep(1000L);
return a + "b";
}

private String c() {
sleep(500L);
return "c";
}

private String ab(String a, String b) {
sleep(100L);
return a + "|" + b;
}

private void d(String a) {
sleep(1000L);
System.out.println("d方法触发,拿到的a = " + a);
}

private String e(String a) {
sleep(100L);
return a + "e";
}

根据上面的方法定义,可以整理出来其执行关系:

zero、a、c都是独立调用的方法,而b、d、e方法都需要拿到a的执行结果值才能触发,ab方法则要求更加苛刻,需要同时拿到a和b的执行结果才可以触发,现在假设需要把所有的方法都触发一遍,我们又期望通过异步的方式来尽可能的优化代码,这个时候如果还用上面例子里的方式,恐怕就很难进行下去了,因为很多方法存在相互依赖的现象,不过现在有了CompletableFuture,这个问题就可以解决了,来看下代码(方法及作用都写在注释上了,下面的文章就不多做说明了):

代码块6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public static void main(String[] args) throws ExecutionException, InterruptedException {

long s = System.currentTimeMillis();
Test t = new Test();

//runAsync用于执行没有返回值的异步任务
CompletableFuture future0 = CompletableFuture.runAsync(t::zero)
.exceptionally(e -> {
System.out.println("Zero出错!");
return null;
}); //这里是异常处理,指的是该异步任务执行中出错,应该做的处理

//supplyAsync方法用于执行带有返回值的异步任务
CompletableFuture futureA = CompletableFuture.supplyAsync(t::a)
.exceptionally(e -> {
System.out.println("方法A出错!");
return null;
});

//thenCompose方法用于连接两个CompletableFuture任务,如下代表futureA结束后将执行结果交由另外一个CompletableFuture处理,然后将执行链路最终赋值给futureB
CompletableFuture futureB = futureA.thenCompose(a -> CompletableFuture.supplyAsync(() -> t.b(a)))
.exceptionally(e -> {
System.out.println("方法B出错!");
return null;
});

//thenAccept方法用于将一个任务的结果,传给需要该结果的任务,如下表示futureD的执行需要futureA的结果,与thenApply不同的是,这个方法没有有返回值
CompletableFuture futureD = futureA.thenAccept(t::d);

//thenApply方法用于将一个任务的结果,传给需要该结果的任务,如下表示futureE的执行需要futureA的结果,与thenAccept不同的是,这个方法有返回值
CompletableFuture futureE = futureA.thenApply(t::e)
.exceptionally(e -> {
System.out.println("方法E出错!");
return null;
});

/**
* thenApply方法概念容易与thenCompose混淆,毕竟最终目的很相似
*/

//thenCombine方法用于连接多个异步任务的结果,如下ab方法需要futureA和futureB的执行结果,那么就可以使用thenCombine进行连接
//注意,执行到ab这里,说明futureA和futureB一定已经执行完了
CompletableFuture futureAB = futureA.thenCombine(futureB, t::ab)
.exceptionally(e -> {
System.out.println("方法AB出错!");
return null;
});

//单纯的一个异步任务,不依赖任何其他任务
CompletableFuture futureC = CompletableFuture.supplyAsync(t::c)
.exceptionally(e -> {
System.out.println("方法C出错!");
return null;
});

//allOf如果阻塞结束则表示所有任务都执行结束了
CompletableFuture.allOf(future0, futureA, futureB, futureAB, futureC, futureD, futureE).get();

System.out.println("方法Zero输出:" + future0.get());
System.out.println("方法A输出:" + futureA.get());
System.out.println("方法B输出:" + futureB.get());
System.out.println("方法AB输出:" + futureAB.get());
System.out.println("方法C输出:" + futureC.get());
System.out.println("方法D输出:" + futureD.get());
System.out.println("方法E输出:" + futureE.get());
System.out.println("耗时:" + (System.currentTimeMillis() - s) + "ms");
}

输出结果如下:

1
2
3
4
5
6
7
8
9
10
11
zero方法触发!
-----------------------------
d方法触发,拿到的a = a
方法Zero输出:null
方法A输出:a
方法B输出:ab
方法AB输出:a|ab
方法C输出:c
方法D输出:null
方法E输出:ae
耗时:1668ms

可以看到,逻辑方面是没有任何问题的,也按照预期的顺序和方式进行了,注意看这里的运行时间,约等于1600ms,与第一个例子时长取决于执行时间最长的那个方法不同,上面的例子时长取决于执行链的耗时最长的执行时间;

分析下上面的程序,执行链最长的就是ab这条,ab需要a和b全部执行完,而b又依赖a的结果,因此ab执行完的时间就是500+1000的时间(a需要500ms,b又需要等待a,500ms后b触发,b自身又需要1000ms,等都结束了,再触发ab方法,而ab方法又需要100ms的执行时间,因此ab是最长的耗时方法,ab=500+1000+100

需要说明的是上述例子里用到的方法,几乎每个都有个重载方法,用来传递一个线程池对象,例子里用的都是不传的,用的是其内部的ForkJoinPool.commonPool()

CompletableFuture的用法还有很多很多,较常用的应该就是例子里的几种,更多的用法以后会继续记录到这里。