一、Selector&Channel
1.1:各种channel
写这个模型需要提前了解Selector
以及Channel
,之前记录过FileChannel,除此之外还有以下几种Channel:
ServerSocketChannel:用于监听新的TCP连接的通道,负责读取&响应,通常用于服务端的实现。
SocketChannel:用于发起TCP连接,读写网络中的数据,通常用于客户端的实现。
DatagramChannel:上述两个通道基于TCP传输协议,而这个通道则基于UDP,用于读写网络中的数据。
FileChannel:从文件读取数据。
本篇重点放在ServerSocketChannel
和SocketChannel
上,大部分客户端/服务端为了保证数据准确性,都是基于TCP传输协议实现的,由于使用Selector
注册必须要求被注册的Channel
是非阻塞模式的,因此FileChannel
由于没有非阻塞模式(无法设置configureBlocking(false)
),没办法和注册到selector
。
1.2:selector
Selector
是个通道注册器(用法会在程序里标注),是实现Reactor模型
的关键,多个通道均可以注册到Selector
,Selector
负责监听每个Channel
的几个事件:连接就绪
、写就绪
、读就绪
,当某个channel
注册感兴趣就绪事件到selector
时,若发生兴趣事件就绪,则Selector.select()
方法不再阻塞,返回兴趣事件集合(可能包含多个channel
的),然后按照事件不同进行分发处理。
Selector
返回对应的就绪事件,封装为SelectionKey
,每个Channel
对应一个SelectionKey
,这个对象还可以通过attach方法
附着处理类(Handler
、Acceptor
等)。
1.3:一个简单的例子
先来看个简单使用Selector
做处理的服务端实现,可以简单对Selector
和SelectionKey
的用法做个了解:
代码块11 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(2333)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (selector.select() > 0) { Set keys = selector.selectedKeys(); Iterator iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isAcceptable()) { ServerSocketChannel skc = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = skc.accept(); socketChannel.configureBlocking(false); System.out.println(String.format("收到来自 %s 的连接", socketChannel.getRemoteAddress())); socketChannel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int count = socketChannel.read(buffer); if (count < 0) { socketChannel.close(); key.cancel(); System.out.println("连接关闭"); continue; } System.out.println(String.format("收到来自 %s 的消息: %s", socketChannel.getRemoteAddress(), new String(buffer.array()))); } keys.remove(key); } } }
|
上面是一个简单的例子,接下来,就利用选择器、通道来实现Reactor单线程模型
。
二、单Reactor单线程模型的服务端实现
实现服务端,服务端负责接收客户端的连接,接收客户端的请求数据以及响应客户端。
把上一篇的结构图再拿过来展示下,看看需要做的有哪些模块:
通过上图,我们需要实现的模块有Reactor
、Acceptor
、Handler
,下面来逐个编写:
2.1:Reactor核心模块
该模块内部包含两个核心方法,select
和dispatch
,该模块负责监听就绪事件和对事件的分发处理:
代码块21 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public class Reactor implements Runnable {
private final Selector selector; private final ServerSocketChannel serverSocketChannel;
public Reactor(int port) throws IOException { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); sk.attach(new Acceptor(serverSocketChannel, selector)); }
@Override public void run() { try { while (!Thread.interrupted()) { selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) { dispatch((SelectionKey) (it.next())); } selected.clear(); } } catch (IOException e) { e.printStackTrace(); } }
void dispatch(SelectionKey k) { Runnable r = (Runnable) (k.attachment()); if (r != null) { r.run(); } } }
|
细节已标注。
2.2:实现Acceptor模块
这个模块只负责处理连接就绪事件,有了这个事件就可以拿到客户单的SocketChannel
,就可以继续完成接下来的读写任务了:
代码块31 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class Acceptor implements Runnable {
private final Selector selector;
private final ServerSocketChannel serverSocketChannel;
Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) { this.serverSocketChannel = serverSocketChannel; this.selector = selector; }
@Override public void run() { SocketChannel socketChannel; try { socketChannel = serverSocketChannel.accept(); if (socketChannel != null) { System.out.println(String.format("收到来自 %s 的连接", socketChannel.getRemoteAddress())); new Handler(socketChannel, selector); } } catch (IOException e) { e.printStackTrace(); } } }
|
细节已标注。
2.3:Handler模块的实现
这个模块负责接下来的读写操作:
代码块41 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| public class Handler implements Runnable {
private final SelectionKey selectionKey; private final SocketChannel socketChannel;
private ByteBuffer readBuffer = ByteBuffer.allocate(1024); private ByteBuffer sendBuffer = ByteBuffer.allocate(2048);
private final static int READ = 0; private final static int SEND = 1;
private int status = READ;
Handler(SocketChannel socketChannel, Selector selector) throws IOException { this.socketChannel = socketChannel; this.socketChannel.configureBlocking(false); selectionKey = socketChannel.register(selector, 0); selectionKey.attach(this); selectionKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); }
@Override public void run() { try { switch (status) { case READ: read(); break; case SEND: send(); break; default: } } catch (IOException e) { System.err.println("read或send时发生异常!异常信息:" + e.getMessage()); selectionKey.cancel(); try { socketChannel.close(); } catch (IOException e2) { System.err.println("关闭通道时发生异常!异常信息:" + e2.getMessage()); e2.printStackTrace(); } } }
private void read() throws IOException { if (selectionKey.isValid()) { readBuffer.clear(); int count = socketChannel.read(readBuffer); if (count > 0) { System.out.println(String.format("收到来自 %s 的消息: %s", socketChannel.getRemoteAddress(), new String(readBuffer.array()))); status = SEND; selectionKey.interestOps(SelectionKey.OP_WRITE); } else { selectionKey.cancel(); socketChannel.close(); System.out.println("read时-------连接关闭"); } } }
void send() throws IOException { if (selectionKey.isValid()) { sendBuffer.clear(); sendBuffer.put(String.format("我收到来自%s的信息辣:%s, 200ok;", socketChannel.getRemoteAddress(), new String(readBuffer.array())).getBytes()); sendBuffer.flip(); int count = socketChannel.write(sendBuffer);
if (count < 0) { selectionKey.cancel(); socketChannel.close(); System.out.println("send时-------连接关闭"); }
status = READ; selectionKey.interestOps(SelectionKey.OP_READ); } } }
|
细节已标注。
关键模块已实现,下面来启动服务端:
代码块51
| new Thread(new Reactor(2333)).start();
|
三、客户端的编写
接下来同样利用selector
编写客户端,客户端需要做的事情就是发送消息到服务端
,等待服务端响应
,然后再次发送消息
,发够10条消息断开连接:
3.1:Client入口模块
代码块61 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| public class NIOClient implements Runnable {
private Selector selector;
private SocketChannel socketChannel;
NIOClient(String ip, int port) { try { selector = Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress(ip, port)); SelectionKey sk = socketChannel.register(selector, SelectionKey.OP_CONNECT); sk.attach(new Connector(socketChannel, selector)); } catch (IOException e) { e.printStackTrace(); } }
@Override public void run() { try { while (!Thread.interrupted()) { selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) { dispatch((SelectionKey) (it.next())); } selected.clear(); } } catch (IOException e) { e.printStackTrace(); } }
void dispatch(SelectionKey k) { Runnable r = (Runnable) (k.attachment()); if (r != null) { r.run(); } } }
|
细节已标注。
3.2:Connector模块(建连)
代码块71 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class Connector implements Runnable {
private final Selector selector;
private final SocketChannel socketChannel;
Connector(SocketChannel socketChannel, Selector selector) { this.socketChannel = socketChannel; this.selector = selector; }
@Override public void run() { try { if (socketChannel.finishConnect()) { System.out.println(String.format("已完成 %s 的连接", socketChannel.getRemoteAddress())); new Handler(socketChannel, selector); } } catch (IOException e) { e.printStackTrace(); } } }
|
细节已标注。
3.3:客户端Handler模块实现
代码块81 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| public class Handler implements Runnable {
private final SelectionKey selectionKey; private final SocketChannel socketChannel;
private ByteBuffer readBuffer = ByteBuffer.allocate(2048); private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
private final static int READ = 0; private final static int SEND = 1;
private int status = SEND;
private AtomicInteger counter = new AtomicInteger();
Handler(SocketChannel socketChannel, Selector selector) throws IOException { this.socketChannel = socketChannel; this.socketChannel.configureBlocking(false); selectionKey = socketChannel.register(selector, 0); selectionKey.attach(this); selectionKey.interestOps(SelectionKey.OP_WRITE); selector.wakeup(); }
@Override public void run() { try { switch (status) { case SEND: send(); break; case READ: read(); break; default: } } catch (IOException e) { System.err.println("send或read时发生异常!异常信息:" + e.getMessage()); selectionKey.cancel(); try { socketChannel.close(); } catch (IOException e2) { System.err.println("关闭通道时发生异常!异常信息:" + e2.getMessage()); e2.printStackTrace(); } } }
void send() throws IOException { if (selectionKey.isValid()) { sendBuffer.clear(); int count = counter.incrementAndGet(); if (count <= 10) { sendBuffer.put(String.format("客户端发送的第%s条消息", count).getBytes()); sendBuffer.flip(); socketChannel.write(sendBuffer);
status = READ; selectionKey.interestOps(SelectionKey.OP_READ); } else { selectionKey.cancel(); socketChannel.close(); } } }
private void read() throws IOException { if (selectionKey.isValid()) { readBuffer.clear(); socketChannel.read(readBuffer); System.out.println(String.format("收到来自服务端的消息: %s", new String(readBuffer.array())));
status = SEND; selectionKey.interestOps(SelectionKey.OP_WRITE); } } }
|
细节已标注。
下面启动客户端去连接之前的服务端:
代码块91 2
| new Thread(new NIOClient("127.0.0.1", 2333)).start(); new Thread(new NIOClient("127.0.0.1", 2333)).start();
|
上面模拟了两个客户端同时连到服务端,运行结果如下:
单线程Reactor模型
有个致命的缺点,通过上述例子可以看出,整个执行流程都是线性的,客户端请求→服务端读取→服务端响应→客户端收到响应→客户端再次发送请求
,那么在这个链路中,如果handler
中某个位置存在性能瓶颈,比如我们可以改造下服务端的send方法
:
代码块101 2 3 4 5 6 7
| try { Thread.sleep(2000L); } catch (InterruptedException e) { e.printStackTrace(); }
int count = socketChannel.write(sendBuffer);
|
在响应客户端之前睡眠2s,当做是性能瓶颈点,同样的再次开两个客户端同时访问服务端,每个客户端发送10条消息,会发现,程序直接运行了40s,这是大多数情况下不愿意看到的,因此,就有了多线程Reactor模式
,跟BIO
为了提高性能将读操作放到一个独立线程处理一样,Reactor
这样做,也是为了解决上面提到的性能问题,只不过NIO
比BIO
做异步有个最大的优势就是NIO
不会阻塞一个线程,类似read
这种操作状态都是由selector
负责监听的,不像BIO
里都是阻塞的,只要被异步出去,那么一定是非阻塞的业务代码(除非是人为将代码搞成阻塞),而BIO
由于read
本身阻塞,因此会阻塞掉整个业务线程,这也是同样是异步为什么NIO
可以更加高效的原因之一。
那么单线程Reactor
适用于什么情况呢?适用于那种程序复杂度很低的系统,例如redis
,其大部分操作都是非常高效的,很多命令的时间复杂度直接为O(1)
,这种情况下适合这种简单的Reactor
模型实现服务端。