注:本篇文章例子基于上一篇进行:[Java NIO学习与记录(七): Reactor单线程模型的实现](/2019/03/27/Java NIO学习与记录(七): Reactor单线程模型的实现/)
前言:单线程Reactor模型的缺点
紧接着上篇Reactor单线程模型
的例子来,假设Handler
的read
那里的处理方式延迟5s,当做是业务性能瓶颈,改变下原来的Handler,让其read方法
在处理时延迟5s:
代码块11 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| private void read() throws IOException { if (selectionKey.isValid()) { System.out.println("服务端读取数据前"); readBuffer.clear(); int count = socketChannel.read(readBuffer); if (count > 0) { try { Thread.sleep(5000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(String.format("收到来自 %s 的消息: %s", socketChannel.getRemoteAddress(), new String(readBuffer.array()))); status = SEND; selectionKey.interestOps(SelectionKey.OP_WRITE); } else { selectionKey.cancel(); socketChannel.close(); System.out.println("read时-------连接关闭"); } } }
|
现在同样开启两个客户端同时连接到该服务端,然后请求-->收到响应-->再次请求
的流程走10次,会发现,客户端每收到一次响应需要10s,同样的如果开启3个客户端,则需要15s,因为单线程的Reactor模型是串行
的,业务处理的瓶颈可以影响到全局的事件分发,这种模型下,如果存在类似例子中的瓶颈点是致命的(例子的5s是夸张处理),因为新进来的连接也会排队,整个select
都会被Handler
的处理给阻塞掉,举个实际点的例子,redis
在使用时大部分时候会避免使用类似keys这种重操作,为什么呢?就是因为redis
是单线程
,这里说的单线程其实并不是说redis服务端就一个线程,而是说redis采用的NIO Reactor模型就是单线程的Reactor模型
,跟上面代码里做的改动一样,5s可以理解成重操作,影响整个模型的正常运作,redis之所以采用单线程模式,是因为redis大部分操作实在是太快了,快到使用这种模式也可以提供近十万/秒的并发能力,单线程模型实现起来简单且可控性强,所以redis很自然的选择了这种模式。回到问题本身,我们自己的业务可能并没有redis那样高的处理能力,搞不好几个网络请求就可以造成性能瓶颈,拖慢甚至拖垮整个处理模型,所以大部分RPC框架和web容器并不会采用单线程的Reactor模型实现,那么有没有什么方法可以优化这种模型呢?比如,把这个瓶颈点利用独立线程异步出去处理,这样可以保证不影响select
的执行,也就很好的避免了上面的问题了,下面介绍两种多线程异步的Reactor模型
。
一、单Reactor多线程模型
模型图:
上图与单线程Reactor模型
对比可以看出,读入数据后,对数据的业务处理部分被线程池做了异步处理,也就是说,上述5s的那段瓶颈被放到了子线程去处理,select
的执行不会受到任何影响,因此对新的连接处理、多个客户端的响应速度都应该可以得到保障。
现在来改写下前篇文章里的单线程处理模式的Handler,更名为AsyncHandler
:
代码块21 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
| public class AsyncHandler implements Runnable {
private final Selector selector;
private final SelectionKey selectionKey; private final SocketChannel socketChannel;
private ByteBuffer readBuffer = ByteBuffer.allocate(1024); private ByteBuffer sendBuffer = ByteBuffer.allocate(2048);
private final static int READ = 0; private final static int SEND = 1; private final static int PROCESSING = 2;
private int status = READ;
private static final ExecutorService workers = Executors.newFixedThreadPool(5);
AsyncHandler(SocketChannel socketChannel, Selector selector) throws IOException { this.socketChannel = socketChannel; this.socketChannel.configureBlocking(false); selectionKey = socketChannel.register(selector, 0); selectionKey.attach(this); selectionKey.interestOps(SelectionKey.OP_READ); this.selector = selector; this.selector.wakeup(); }
@Override public void run() { switch (status) { case READ: read(); break; case SEND: send(); break; default: } }
private void read() { if (selectionKey.isValid()) { try { readBuffer.clear(); int count = socketChannel.read(readBuffer); if (count > 0) { status = PROCESSING; workers.execute(this::readWorker); } else { selectionKey.cancel(); socketChannel.close(); System.out.println("read时-------连接关闭"); } } catch (IOException e) { System.err.println("处理read业务时发生异常!异常信息:" + e.getMessage()); selectionKey.cancel(); try { socketChannel.close(); } catch (IOException e1) { System.err.println("处理read业务关闭通道时发生异常!异常信息:" + e.getMessage()); } } } }
void send() { if (selectionKey.isValid()) { status = PROCESSING; workers.execute(this::sendWorker); selectionKey.interestOps(SelectionKey.OP_READ); } }
private void readWorker() { try { Thread.sleep(5000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(String.format("收到来自客户端的消息: %s", new String(readBuffer.array()))); status = SEND; selectionKey.interestOps(SelectionKey.OP_WRITE); this.selector.wakeup(); }
private void sendWorker() { try { sendBuffer.clear(); sendBuffer.put(String.format("我收到来自%s的信息辣:%s, 200ok;", socketChannel.getRemoteAddress(), new String(readBuffer.array())).getBytes()); sendBuffer.flip();
int count = socketChannel.write(sendBuffer);
if (count < 0) { selectionKey.cancel(); socketChannel.close(); System.out.println("send时-------连接关闭"); } else { status = READ; } } catch (IOException e) { System.err.println("异步处理send业务时发生异常!异常信息:" + e.getMessage()); selectionKey.cancel(); try { socketChannel.close(); } catch (IOException e1) { System.err.println("异步处理send业务关闭通道时发生异常!异常信息:" + e.getMessage()); } } } }
|
可以看到,read
里、send
里的逻辑处理被异步出去执行,新增了中间状态“执行中
”,主要用来防止事件重复触发,重复执行异步逻辑,当异步逻辑处理完毕才会更改状态值,这时候可以继续处理接下来的事件(读或写)。
把Accptor类
里的实现换成AsyncHandler
,运行服务端和客户端会发现,两个客户端的响应均为5s,也不会阻塞新增的连接,新增至三个或者更多的客户端基本可以保持客户端响应均为5s(说明:这里5s是夸张比喻,正常瓶颈没这么夸张,若开了n多客户端,每个都阻塞5s,那么线程池也会发生排队,因为子线程个数有限,处理不过来,最后还是阻塞,一定会远超过5s)。
通过多线程Reactor模型,降低了业务代码瓶颈导致影响整个Reactor执行链路的风险,但是即便如此,read
、send
操作仍然和接收请求(accept
)处于同一个线程
,这就意味着read
、send
的处理可能会影响到对客户端连接的接收能力,那么有没有一种办法,可以把读写流程彻底异步出去,负责连接的线程就只负责接收连接?于是多Reactor多线程模型
就产生了,这种模型也叫主从Reactor模型
,该模型下可以分为一个主Reactor
专门处理连接事件,而多个从Reactor
负责读写
、业务处理
等,这样服务端可以接收并处理更多的请求,提升服务端的吞吐能力(该模型或者说所有基于NIO的Reactor模型,都是以提升服务端处理能力为基础的,NIO在某些情况下不一定会比BIO处理速度快,但一定比BIO稳,就像NIO可以利用很少的线程处理大量的客户端请求,而BIO在大量客户端请求过来的情况下,由于各种操作均会阻塞线程,会处理不过来)。
二、主从Reactor模型
还是把之前文章的图拿来展示下这种模型的流程,可以与上面图1
进行对比,看看发生了哪些变化:
上图就是主从Reactor模型的一个流程,看下与图1的不同之处,多了SubReactor这样一个角色,这个角色就是用来处理读写操作的Reactor,现在仍然基于之前的例子,进行改写,明确需要改写的点:
- 新增
SubReactor
Acceptor
那里进行初始化一批SubReactor
,进行分发处理
- 为了区分客户端分别是被哪个
SubReactor
处理的读写操作,还需要改写下AsyncHandler
,在里面加上SubReactor
的序号,打印信息时进行区分。
ok,总结完改动点,现在基于上面的代码(代码初代目版本:[Reactor单线程模型的实现](/2019/03/27/Java NIO学习与记录(七): Reactor单线程模型的实现/))改写一下这几个类:
step1.首先新增SubReactor类
代码块31 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public class SubReactor implements Runnable { private final Selector selector; private boolean register = false; private int num;
SubReactor(Selector selector, int num) { this.selector = selector; this.num = num; }
@Override public void run() { while (!Thread.interrupted()) { System.out.println(String.format("%d号SubReactor等待注册中...", num)); while (!Thread.interrupted() && !register) { try { if (selector.select() == 0) { continue; } } catch (IOException e) { e.printStackTrace(); } Set selectedKeys = selector.selectedKeys(); Iterator it = selectedKeys.iterator(); while (it.hasNext()) { dispatch(it.next()); it.remove(); } } } }
private void dispatch(SelectionKey key) { Runnable r = (Runnable) (key.attachment()); if (r != null) { r.run(); } }
void registering(boolean register) { this.register = register; }
}
|
这个类负责Acceptor
交给自己的事件select
(例子中实际上就是read
、send
)。
step2.Acceptor类的更改
代码块41 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| public class Acceptor implements Runnable {
private final ServerSocketChannel serverSocketChannel;
private final int coreNum = Runtime.getRuntime().availableProcessors();
private final Selector[] selectors = new Selector[coreNum];
private int next = 0;
private SubReactor[] reactors = new SubReactor[coreNum];
private Thread[] threads = new Thread[coreNum];
Acceptor(ServerSocketChannel serverSocketChannel) throws IOException { this.serverSocketChannel = serverSocketChannel; for (int i = 0; i < coreNum; i++) { selectors[i] = Selector.open(); reactors[i] = new SubReactor(selectors[i], i); threads[i] = new Thread(reactors[i]); threads[i].start(); } }
@Override public void run() { SocketChannel socketChannel; try { socketChannel = serverSocketChannel.accept(); if (socketChannel != null) { System.out.println(String.format("收到来自 %s 的连接", socketChannel.getRemoteAddress())); socketChannel.configureBlocking(false); reactors[next].registering(true); selectors[next].wakeup(); SelectionKey selectionKey = socketChannel.register(selectors[next], SelectionKey.OP_READ); selectors[next].wakeup(); reactors[next].registering(false); selectionKey.attach(new AsyncHandler(socketChannel, selectors[next], next)); if (++next == selectors.length) { next = 0; } } } catch (IOException e) { e.printStackTrace(); } } }
|
可以跟以前的Acceptor
做个对比,做了如下改动:
- 接受到连接后不再直接触发handler了
- 初始化一堆
SubReactor
(从反应堆),每个交给一个线程处理,注册读事件后顺序分配给不同的SubReactor
去处理自己的selector监听
。
以上,就可以把读写处理+业务处理与接受连接的Reactor彻底分开了,接受连接的事件不再受任何读写、业务相关的影响,只负责接收,目前即便是业务线程池用光线程发生排队,也不会影响到连接的接收,很大程度上降低了服务端的接收能力遭遇瓶颈的风险。
step3.改写AsyncHandler的打印
这里就不po代码了,具体就是把SubReactor
的序号传给handler,标记触发Handler的Reactor是哪个。
同样的,启动下服务端,再开启两个客户端(跟之前一样,每个客户端发10条消息终止连接),运行结果如下:
服务端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| 1号SubReactor等待注册中... 3号SubReactor等待注册中... 0号SubReactor等待注册中... 2号SubReactor等待注册中... 收到来自 /127.0.0.1:60407 的连接 0号SubReactor等待注册中... 收到来自 /127.0.0.1:60410 的连接 1号SubReactor等待注册中... 1号SubReactor触发:收到来自客户端/127.0.0.1:60410的消息: 客户端发送的第1条消息 0号SubReactor触发:收到来自客户端/127.0.0.1:60407的消息: 客户端发送的第1条消息 0号SubReactor触发:收到来自客户端/127.0.0.1:60407的消息: 客户端发送的第2条消息 1号SubReactor触发:收到来自客户端/127.0.0.1:60410的消息: 客户端发送的第2条消息 0号SubReactor触发:收到来自客户端/127.0.0.1:60407的消息: 客户端发送的第3条消息 1号SubReactor触发:收到来自客户端/127.0.0.1:60410的消息: 客户端发送的第3条消息 1号SubReactor触发:收到来自客户端/127.0.0.1:60410的消息: 客户端发送的第4条消息 0号SubReactor触发:收到来自客户端/127.0.0.1:60407的消息: 客户端发送的第4条消息 1号SubReactor触发:收到来自客户端/127.0.0.1:60410的消息: 客户端发送的第5条消息 0号SubReactor触发:收到来自客户端/127.0.0.1:60407的消息: 客户端发送的第5条消息 1号SubReactor触发:收到来自客户端/127.0.0.1:60410的消息: 客户端发送的第6条消息 0号SubReactor触发:收到来自客户端/127.0.0.1:60407的消息: 客户端发送的第6条消息 1号SubReactor触发:收到来自客户端/127.0.0.1:60410的消息: 客户端发送的第7条消息 0号SubReactor触发:收到来自客户端/127.0.0.1:60407的消息: 客户端发送的第7条消息 0号SubReactor触发:收到来自客户端/127.0.0.1:60407的消息: 客户端发送的第8条消息 1号SubReactor触发:收到来自客户端/127.0.0.1:60410的消息: 客户端发送的第8条消息 0号SubReactor触发:收到来自客户端/127.0.0.1:60407的消息: 客户端发送的第9条消息 1号SubReactor触发:收到来自客户端/127.0.0.1:60410的消息: 客户端发送的第9条消息 0号SubReactor触发:收到来自客户端/127.0.0.1:60407的消息: 客户端发送的第10条消息 1号SubReactor触发:收到来自客户端/127.0.0.1:60410的消息: 客户端发送的第10条消息 0号SubReactor触发:read时-------连接关闭 1号SubReactor触发:read时-------连接关闭
|
客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| 已完成 /127.0.0.1:2333 的连接 已完成 /127.0.0.1:2333 的连接 收到来自服务端的消息: 0号SubReactor触发:我收到来自/127.0.0.1:60407的信息辣:客户端发送的第1条消息, 200ok; 收到来自服务端的消息: 1号SubReactor触发:我收到来自/127.0.0.1:60410的信息辣:客户端发送的第1条消息, 200ok; 收到来自服务端的消息: 0号SubReactor触发:我收到来自/127.0.0.1:60407的信息辣:客户端发送的第2条消息, 200ok; 收到来自服务端的消息: 1号SubReactor触发:我收到来自/127.0.0.1:60410的信息辣:客户端发送的第2条消息, 200ok; 收到来自服务端的消息: 1号SubReactor触发:我收到来自/127.0.0.1:60410的信息辣:客户端发送的第3条消息, 200ok; 收到来自服务端的消息: 0号SubReactor触发:我收到来自/127.0.0.1:60407的信息辣:客户端发送的第3条消息, 200ok; 收到来自服务端的消息: 1号SubReactor触发:我收到来自/127.0.0.1:60410的信息辣:客户端发送的第4条消息, 200ok; 收到来自服务端的消息: 0号SubReactor触发:我收到来自/127.0.0.1:60407的信息辣:客户端发送的第4条消息, 200ok; 收到来自服务端的消息: 1号SubReactor触发:我收到来自/127.0.0.1:60410的信息辣:客户端发送的第5条消息, 200ok; 收到来自服务端的消息: 0号SubReactor触发:我收到来自/127.0.0.1:60407的信息辣:客户端发送的第5条消息, 200ok; 收到来自服务端的消息: 1号SubReactor触发:我收到来自/127.0.0.1:60410的信息辣:客户端发送的第6条消息, 200ok; 收到来自服务端的消息: 0号SubReactor触发:我收到来自/127.0.0.1:60407的信息辣:客户端发送的第6条消息, 200ok; 收到来自服务端的消息: 1号SubReactor触发:我收到来自/127.0.0.1:60410的信息辣:客户端发送的第7条消息, 200ok; 收到来自服务端的消息: 0号SubReactor触发:我收到来自/127.0.0.1:60407的信息辣:客户端发送的第7条消息, 200ok; 收到来自服务端的消息: 1号SubReactor触发:我收到来自/127.0.0.1:60410的信息辣:客户端发送的第8条消息, 200ok; 收到来自服务端的消息: 0号SubReactor触发:我收到来自/127.0.0.1:60407的信息辣:客户端发送的第8条消息, 200ok; 收到来自服务端的消息: 0号SubReactor触发:我收到来自/127.0.0.1:60407的信息辣:客户端发送的第9条消息, 200ok; 收到来自服务端的消息: 1号SubReactor触发:我收到来自/127.0.0.1:60410的信息辣:客户端发送的第9条消息, 200ok; 收到来自服务端的消息: 0号SubReactor触发:我收到来自/127.0.0.1:60407的信息辣:客户端发送的第10条消息, 200ok; 收到来自服务端的消息: 1号SubReactor触发:我收到来自/127.0.0.1:60410的信息辣:客户端发送的第10条消息, 200ok;
|
到这里,主从Reactor模型
就被改写完成了,上面的例子只是简单演示了下这个模型,所有的例子都是从单线程Reactor模型一点点改写来的,客户端没变过,为的是更好的测试服务端在不同模型下的表现。主从Reactor模型
应用的比较多,比如著名NIO框架Netty
底层模型也是基于主从Reactor模型
来实现的。
到这里java nio的东西已经差不多记录完了,后续会开始netty
的学习记录,当然上述例子弱化了buffer的使用,而且例子中不存在粘包拆包
的问题(因为都是请求+应答的方式进行),如果把上面的例子改成客户端在未收到响应时就连续发送几条信息,服务端这时再次由写模式切换到读模式,就会从Channel
里连续拿到这几条消息,这就导致了粘包问题,那么如何解决类似的问题呢?通常是定义一种协议,来区分消息头和尾,中间的消息体是我们真正需要的数据,这种协议也就是我们常说的应用层协议,比如HTTP
、FTP
等,这里不做赘述,之后会通过一个例子来完成这部分的补充说明。
代码地址
单线程Reactor模型:https://github.com/exceting/DemoAll/tree/master/jdk/src/main/java/demo/jdk/reactor/simple
多线程Reactor模型:同上,Acceptor里的Handler改成AsyncHandler
即可
主从多线程Reactor模型:https://github.com/exceting/DemoAll/tree/master/jdk/src/main/java/demo/jdk/reactor/mainsub