1. Linux的网络
1.1 五种网络模型
Linux的内核将所有外部设备都看做一个文件来操作,对一个文件的读写操作会调用内核提供的系统命令,返回一个file descriptor(fd,文件描述符)。而对一个socket的读写也会有相应的描述符,称为socketfd(socket描述符),描述符就是一个数字,它指向内核中的一个结构体(文件路径,数据区等一些属性)。
根据UNIX网络编程对I/O模型的分类,UNIX提供了5种I/O模型,分别如下。
1.1.1 阻塞I/O模型
最常用的I/O模型就是阻塞I/O模型,缺省情形下,所有文件操作都是阻塞的。我们以套接字接口为例来讲解此模型:在进程空间中调用recvfrom,其系统调用直到数据包到达且被复制到应用进程的缓冲区中或者发生错误时才返回,在此期间一直会等待,进程在从调用recvfrom开始到它返回的整段时间内都是被阻塞的,因此被称为阻塞l/O模型。
1.1.2 非阻塞I/O模型
recvfrom从应用层到内核的时候,如果该缓冲区没有数据的话,就直接返回一个EWOULDBLOCK错误,一般都对非阻塞/O模型进行轮询检查这个状态,看内核是不是有数据到来。
1.1.3 I/O复用模型
Linux提供 select/pol,进程通过将一个或多个fd传递给 select或poll系统调用,阻塞在 select操作上,这样 selectpoll可以帮我们侦测多个fd是否处于就 绪状态。 select/poll是顺序扫描fd是否就绪,而且支持的fd数量有限,因此它的使用受到 了一些制约。 Linux还提供了一个epoll系统调用, epoll使用基于事件驱动方式代替顺序 扫描,因此性能更高。当有fd就绪时,立即回调函数 rollback
1.1.4 信号驱动I/O模型
首先开启套接口信号驱动IO功能,并通过系统调用 sigaction 执行一个信号处理函数(此系统调用立即返回,进程继续工作,它是非阻塞的)。当数据 准备就绪时,就为该进程生成一个SGIO信号,通过信号回调通知应用程序调用 recvfrom 来读取数据,并通知主循环函数处理数据。
1.1.5 异步I/O
告知内核启动某个操作,并让内核在整个操作完成后(包括将数据从内核复制到用户自己的缓冲区)通知我们。这种模型与信号驱动模型的主要区别是:信号驱动I/O由内核通知我们何时可以开始一个I/O操作;异步I/O模型由内核通知我们I/O操作何时已经完成。
1.2 I/O 多路复用技术
在I/O编程过程中,当需要同时处理多个客户端接入请求时,可以利用多线程或者I/O多路复用技术进行处理。I/O多路复用技术通过把多个I/O的阻塞复用到同一个select的阻塞上,从而使得系统在单线程的情况下可以同时处理多个客户端请求。与传统的多线程/多进程模型比,I/O多路复用的最大优势是系统开销小,系统不需要创建新的额外进程或者线程,也不需要维护这些进程和线程的运行,降低了系统的维护工作量,节省了系统资源,I/O多路复用的主要应用场景如下。
- 服务器需要同时处理多个处于监听状态或者多个连接状态的套接字;
- 服务器需要同时处理多种网络协议的套接字。
目前支持I/O多路复用的系统调用有 select、pselect、poll、epoll,在Linux网络编程过程中,很长一段时间都使用select做轮询和网络事件通知,然而select的一些固有缺陷导致了它的应用受到了很大的限制,最终Linux不得不在新的内核版本中寻找select的替代方案,最终选择了epoll。epoll与select的原理比较类似,为了克服select的缺点,epoll作了很多重大改进,现总结如下。
-
支持一个进程打开的socket 描述符(FD)不受限制(仅受限于操作系统的最大文件句柄数)。
select最大的缺陷就是单个进程所打开的FD是有一定限制的,它由FD_SETSIZE设置,默认值是1024。对于那些需要支持上万个TCP连接的大型服务器来说显然太少了。可以选择修改这个宏然后重新编译内核,不过这会带来网络效率的下降。我们也可以通过选择多进程的方案(传统的Apache方案)解决这个问题,不过虽然在Linux上创建进程的代价比较小,但仍旧是不可忽视的,另外,进程间的数据交换非常麻烦,对于Java由于没有共享内存,需要通过Socket通信或者其他方式进行数据同步,这带来了额外的性能损耗,增加了程序复杂度,所以也不是一种完美的解决方案。值得庆幸的是,epoll并没有这个限制,它所支持的FD上限是操作系统的最大文件句柄数,这个数字远远大于1024。例如,在1GB内存的机器上大约是10万个句柄左右,具体的值可以通过cat/proc/sys/fs/file-max察看,通常情况下这个值跟系统的内存关系比较大。
-
I/O效率不会随着FD数目的增加而线性下降。
传统的select/poll另一个致命弱点就是当你拥有一个很大的socket集合,由于网络延时或者链路空闲,任一时刻只有少部分的socket是“活跃”的,但是select/poll每次调用都会线性扫描全部的集合,导致效率呈现线性下降。epoll不存在这个问题,它只会对“活跃”的socket进行操作-这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的,那么,只有“活跃”的socket才会主动的去调用callback函数,其他idle 状态socket则不会。在这点上,epoll实现了一个伪AlO。针对epoll和select性能对比的benchmark测试表明:如果所有的socket都处于活跃态-例如一个高速LAN环境,epoll并不比select/poll效率高太多:相反,如果过多使用epoll_ctl,效率相比还有稍微的下降。但是一旦使用idle connections模拟WAN环境,epoll的效率就远在select/poll之上了。
-
使用mmap加速内核与用户空间的消息传递。
无论是SELECT,还是EPOLL都需要内核把FD消息通知给用户空间,如何避免不必要的内存复制就显得非常重要, epoll是通过内核和用户空间mmap同一块内存实现
-
epoll的APl更加简单
包括创建一个 epoll描述符、添加监听事件、阻塞等待所监听的事件发生,关闭 epoll描述符等。
值得说明的是,用来克服select/pol缺点的方法不只有poll, epoll只是一种 Linux的 实现方案。在freeBSD下有 kqueue,而 devpoll是最古老的 Solaris的方案,使用难度依次 递增。 kqueue是 freebsd的宠儿,它实际上是一个功能相当丰富的 kernel事件队列,它不 仅仅是 selectpoll的升级,而且可以处理 signal、目录结构变化、进程等多种事件,kqueue 是边缘触发的。devpoll是 Solaris的产物,是这一系列高性能API中最早出现的。 Kernel 提供一个特殊的设备文件devpoll,应用程序打开这个文件得到操作 fd set的句柄,通过 写入 pollfd来修改它,一个特殊的 ioctl用用来替换 select,不过由于出现的年代比较早, 所以/devpoll的接口实现比较原始。
到这里,I/O的基础知识已经介绍完毕,从1.2章节开始介绍Java的I/O演进历史,从BO到NO是Jva通信类库迈出的一小步,但却对Java在高性能通信领域的发展起到 了关键性的推动作用随着基于NO的各类NIO框架的发展,以及基于NO的Web服务 器的发展,Java在很多领域取代了C和C++,成为企业服务端应用开发的首选语言。
2. Java的I/O演进
2.1 BIO
在JDK 1.4推出Java NIO之前,基于Java的所有Socket通信都采用了同步阻塞模式(BIO),这种一请求一应答的通信模型简化了上层的应用开发,但是在性能和可靠性方面却存在着巨大的瓶颈。因此,在很长一段时间里,大型的应用服务器都采用C或者C++语言开发,因为它们可以直接使用操作系统提供的异步I/O或者AIO能力。当并发访问量
增大、响应时间延迟增大之后,采用JavaBIO开发的服务端软件只有通过硬件的不断扩容来满足高并发和低时延,它极大地增加了企业的成本,并且随着集群规模的不断膨胀,系统的可维护性也面临巨大的挑战,只能通过采购性能更高的硬件服务器来解决问题,这会导致恶性循环。
2.2 NIO
2002年发布 JDK1.4时, NIO以 JSR--51的身份正式随DK发布。它新增了个 java nio包,提供了很多 进行异步I/O开发的API和类库,主要的类和接口如下:
- 进行异步 I/O操作的缓冲区ByteBuffer等;
- 心进行异步I/O操作的管道Pipe;
- 进行各种I/O 操作(异步或者同步)的Channel,包括ServerSocketChannel和SocketChannel;
- 多种字符集的编码能力和解码能力;
- 实现非阻塞I/O操作的多路复用器selector;
- 基于流行的Perl实现的正则表达式类库;
- 文件通道FileChannel。
新的NIO类库的提供,极大地促进了基于Java的异步非阻塞编程的发展和应用,但是,它依然有不完善的地方,特别是对文件系统的处理能力仍显不足,主要问题如下。
- 没有统一的文件属性(例如读写权限);
- API能力比较弱,例如目录的级联创建和递归遍历,往往需要自己实现:
- 底层存储系统的一些高级API无法使用:
- 所有的文件操作 都是同步阻塞调用,不支持异步文件读写操作。
2.3 AIO
2011年7月28日,JDK1.7 正式发布。它的一个比较大的亮点就是将原来的NIO类库进行了升级,被称为NIO2.0。 NIO2.0 由JSR-203 演进而来,它主要提供了如下三个方面的改进。
- 提供能够批量获取文件属性的API,这些API具有平台无关性,不与特性的文件系统相耦合,另外它还提供了标准文件系统的SPI,供各个服务提供商扩展实现:
- 提供AIO功能,支持基于文件的异步I/O操作和针对网络套接字的异步操作;
- 完成JSR-5I定义的通道功能,包括对配置和多播数据报的支持等。
3. I/O在开发中的体现
3.1 传统BIO编程
网络编程的基本模型是Client/Server模型,也就是两个进程之间进行相互通信,其中服务端提供位置信息(绑定的IP地址和监听端口),客户端通过连接操作向服务端监听的地址发起连接请求,通过三次握手建立连接,如果连接建立成功,双方就可以通过网络套接字(Socket) 进行通信。
在基于传统同步阻塞模型开发中,ServerSocket 负责绑定IP 地址,启动监听端口:Socket负责发起连接操作。连接成功之后,双方通过输入和输出流进行同步阻塞式通信。
下面,我们就以经典的时间服务器( TimeServer)为例,通过代码分析来回顾和熟悉下BIO编程。
3.1.1 BIO通信模型
3.1.2 BIO创建TimeServer
public class TimeServer {
public static void main(String[] args) throws IOException {
int port = 8080;
ServerSocket server = null;
try {
Socket socket;
server = new ServerSocket(port);
System.out.println("The time server is start at port: " + port);
while (true) {
socket = server.accept();
new Thread(new TimeServerHandler(socket)).start();
}
} finally {
if (server != null) {
server.close();
}
}
}
}
TimeServerHandler
public class TimeServerHandler implements Runnable {
private Socket socket;
public TimeServerHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
out = new PrintWriter(this.socket.getOutputStream(), true);
String body = null;
while (true) {
body = in.readLine();
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
out.println(currentTime);
}
} catch (IOException e) {
close(in);
close(out);
close(socket);
}
}
private void close(Closeable closeable) {
if (null != closeable) {
try {
closeable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
3.1.3 BIO创建TimeClient
public class TimeClient {
public static void main(String[] args) {
int port = 8080;
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try {
socket = new Socket("127.0.0.1", port);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
out.println("QUERY TIME ORDER");
String resp = in.readLine();
System.err.println("current time is :" + resp);
} catch (IOException e) {
e.printStackTrace();
} finally {
close(out);
close(in);
close(socket);
}
}
private static void close(Closeable closeable) {
if (null != closeable) {
try {
closeable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
3.1.4 BIO结果说明
服务端执行结果:
客户端执行结果:
BIO 主要的问题在于每当有一个新的客户端请求接入时,服务端必须创建一个新的线程处理新接入的客户端链路,一个线程只能处理-一个客户端连接。在高性能服务器应用领域,往往需要面向成千上万个客户端的并发连接,这种模型显然无法满足高性能、高并发接入的场景。
为了改进-线程一连接模型,后来又演进出了--种通过线程池或者消息队列实现1个或者多个线程处理N个客户端的模型,由于它的底层通信机制依然使用同步阻塞I/O,所以被称为“伪异步”,下面我们就对伪异步代码进行分析,看看伪异步是否能够满足我们对高性能、高并发接入的诉求。
3.2 伪异步I/O编程
为了解决同步阻塞IO面临的一-个链路需要一一个线程处理的问题,后来有人对它的线程模型进行了优化,后端通过一个线程池来处理多个客户端的请求接入,形成客户端个数M:线程池最大线程数N的比例关系,其中M可以远远大于N,通过线程池可以灵活的调配线程资源,设置线程的最大值,防止由于海量并发接入导致线程耗尽。
下面,我们结合连接模型图和源码,对伪异步I/O进行分析,看它是否能够解决同步阻塞I/O面临的问题。
3.2.1 伪异步I/O模型
当有新的客户端接入的时候,将客户端的Socket 封装成一-个Task (该任务实现java.lang.Runnable接口)投递到后端的线程池中进行处理,JDK的线程池维护一个消息队列和N个活跃线程对消息队列中的任务进行处理。由于线程池可以设置消息队列的大小和最大线程数,因此,它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机。
3.2.2 伪异步I/O TimeServer
public class TimeServer {
public static void main(String[] args) throws IOException {
int port = 8080;
ServerSocket server = null;
try {
Socket socket;
server = new ServerSocket(port);
System.out.println("The time server is start at port: " + port);
TimeServerHandlerExecutorPool singleExecutor = new TimeServerHandlerExecutorPool(50, 10000);
while (true) {
socket = server.accept();
singleExecutor.execute(new TimeServerHandler(socket));
}
} finally {
if (server != null) {
server.close();
}
}
}
}
TimeServerHandlerExecutorPool
public class TimeServerHandlerExecutorPool {
private ExecutorService executor;
public TimeServerHandlerExecutorPool(int maxPoolSize, int queueSize) {
executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), maxPoolSize, 120L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueSize));
}
public void execute(Runnable task) {
executor.execute(task);
}
}
3.2.3 伪异步I/O 结果说明
服务端执行结果:
客户端执行结果:
由于线程池和消息队列都是有界的,因此,无论客户端并发连接数多大,它都不会导致线程个数过于膨胀或者内存溢出,相比于传统的一连接- -线程模型,是一种改良。
伪异步I/O通信框架采用了线程池实现,因此避免了为每个请求都创建一个 独立线程造成的线程资源耗尽问题。但是由于它底层的通信依然采用同步阻塞模型,因此无法从根本上解决问题。下个小节我们对伪异步I/O 进行深入分析,找到它的弊端,然后看看NIO是如何从根本上解决这个问题的。
伪异步I/O实际上仅仅只是对之前I/O线程模型的一个简单优化,它无法从根本上解决同步I0导致的通信线程阻塞问题。下面我们就简单分析下如果通信对方返回应答时间过长,会引起的级联故障。
- (1)服务端处理缓慢,返回应答消息耗费60s, 平时只需要10ms.
- (2)采用伪异步I/O的线程正在读取故障服务节点的响应,由于读取输入流是阻塞的,因此,它将会被同步阻塞60s。
- (3)假如所有的可用线程都被故障服务器阻塞,那后续所有的I/O消息都将在队列中排队。
- (4)由于线程池采用阻塞队列实现,当队列积满之后,后续入队列的操作将被阻塞。
- (5)由于前端只有一个Aceptor线程接收客户端接入,它被阻塞在线程池的同步阻塞队列之后,新的客户端请求消息将被拒绝,客户端会发生大量的连接超时。
- (6)由于几乎所有的连接都超时,调用者会认为系统已经崩溃,无法接收新的请求消息。
3.3 NIO编程
在介绍NIO编程之前,我们首先需要澄清一个概念: NIO到底是什么的简称?有人称之为New IO,因为它相对于之前的I/O类库是新增的,所以被称为New I/O, 这是它的官方叫法。但是,由于之前老的I/O类库是阻塞I/O, New I/O 类库的目标就是要让Java支持非阻塞I/O,所以,更多的人喜欢称之为非阻塞IO (Non-block I/O),由于非阻塞I/O更能够体现NIO的特点,所以本书使用的NIO都指的是非阻塞I/O.
与Socket类和ServerSocket类相对应,NIO也提供了SocketChannel和ServerSocketChannel两种不同的套接字通道实现。这两种新增的通道都支持阻塞和非阻塞两种模式。阻塞模式使用非常简单,但是性能和可靠性都不好,非阻塞模式则正好相反。开发人员一般可以根据自己的需要来选择合适的模式,-般来说,低负载、低并发的应用程序可以选择同步阻塞I/O以降低编程复杂度,但是对于高负载、高并发的网络应用,需要使用NIO的非阻塞模式进行开发。
3.3.1 NIO服务端序列图
说明:
- 步骤一:打开ServerSocketChannel,用于监听客户端的连接,它是所有客户端连接的父管道,代码示例如下。
ServerSocketChannel acceptorSvr = ServerSocketChannel.open();
- 步骤二:绑定监听端口,设置连接为非阻塞模式,示例代码如下。
acceptorSvr.socket().bind (new InetSocketAddress (InetAddress.getByName("IP"), port)); acceptorSvr.configureBlocking(false) ;
- 步骤三:创建Reactor线程,创建多路复用器并启动线程,代码如下。
Selector selector = selector.open(); New Thread(new ReactorTask()).start();
- 步骤四:将ServerSocketChannel注册到Reactor线程的多路复用器Selector上,监听ACCEPT事件,代码如下。
SelectionKey key = acceptorsvr.register( selector, selectionKey.OP_ ACCEPT, ioHandler) ;
- 步骤五:多路复用器在线程run方法的无限循环体内轮询准备就绪的Key,代码如下。
int num = selector.select(); Set selectedKeys = selector.selectedKeys(); Iterator it = selectedKeys.iterator() ; while (it.hasNext()){ selectionKey key = (SelectionKey)it.next(); // ... deal with I/O event ... }
- 步骤六:多路复用器监听到有新的客户端接入,处理新的接入请求,完成TCP三次握手,建立物理链路,代码示例如下。
SocketChannel channel = svrChannel.accept();
- 步骤七:设置客户端链路为非阻塞模式,示例代码如下。
channel.configureBlocking(false) ; channel.socket().setReuseAddress(true) ;
- 步骤八:将新接入的客户端连接注册到Reactor 线程的多路复用器上,监听读操作,用来读取客户端发送的网络消息,代码如下。
SelectionKey key = socketChannel.register( selector, SelectionKey.OP_ READ, ioHandler);
- 步骤九:异步读取客户端请求消息到缓冲区,示例代码如下。
int readNumber = channel.read (receivedBuffer) ;
- 步骤十:对ByteBuffer进行编解码,如果有半包消息指针reset,继续读取后续的报文,将解码成功的消息封装成Task,投递到业务线程池中,进行业务逻辑编排,示例代码如下。
Object message = nul1; while (buffer.hasRemain()){ byteBuffer.mark(); Object message = decode(byteBuffer) ; if (message == nu1l){ byteBuffer.reset(); break; } messageList.add (message); } if (!byteBuffer.hasRemain()){ byteBuffer.clear(); }else{ byteBuffer.compact (); } if (messageList != null && !messageList.isEmpty()){ for (object messageE : messageList){ handlerTask (messageE) ; } }
- 步骤十一:将POJO对象encode成ByteBuffer,调用SocketChannel的异步write 接口,将消息异步发送给客户端,示例代码如下。
socketChannel.write(buffer) ;
注意:如果发送区TCP缓冲区满,会导致写半包,此时,需要注册监听写操作位,循环写,直到整包消息写入TCP缓冲区,此处不赘述,后续Netty源码分析章节会详细分析Netty的处理策略。
3.3.2 NIO创建TimeServer
public class TimeServer {
public static void main(String[] args) throws IOException {
int port = 8080;
MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
}
}
MultiplexerTimeServer
public class MultiplexerTimeServer implements Runnable {
private Selector selector;
private ServerSocketChannel serverChannel;
private volatile boolean stop;
public MultiplexerTimeServer(int port) {
try {
selector = Selector.open();
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
// 绑定监听的端口
serverChannel.socket().bind(new InetSocketAddress(port), 1024);
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The time server is start at port :" + port);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
public void stop() {
this.stop = true;
}
@Override
public void run() {
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (IOException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
// 处理接入的请求消息
if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, StandardCharsets.UTF_8);
System.out.println("The time server receive order : " + body);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
doWrite(sc, currentTime);
} else if (readBytes < 0) {
// 对端链路关闭
key.cancel();
sc.close();
} else {
// 读到0字节,忽略
}
}
}
}
private void doWrite(SocketChannel channel, String resp) throws IOException {
if (resp != null && resp.trim().length() > 0) {
byte[] bytes = resp.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
}
}
}
3.3.3 NIO客户端序列图
说明:
- 步骤一:打开SocketChannel,绑定客户端本地地址(可选,默认系统会随机分配一个可用的本地地址),示例代码如下。
java SocketChannel clientChannel = SocketChannel.open();
- 步骤二:设置SocketChannel为非阻塞模式,同时设置客户端连接的TCP参数,示例代码如下。
java clientChannel.configureBlocking(false); socket.setReuseAddress(true); socket.setReceiveBufferSize(BUFFER_ SIZE); socket.setSendBuffersize(BUFFER SIZE);
- 步骤三:异步连接服务端,示例代码如下。
java boolean connected=clientChannel.connect(new InetSocketAddress("ip", port));
- 步骤四:如果当前没有连接成功(异步连接,返回false, 说明客户端已经发送syne包,服务端没有返回ack包,物理链路还没有建立),示例代码如下。
java if(connected){ clientChannel.register(selector, SelectionKey.OP_ READ, ioHandler); }else{ clientChannel.register(selector, Select ionKey.OP_ CONNECT, ioHandler); }
- 步骤五:向Reactor线程的多路复用器注册OP_CONNECT状态位,监听服务端的TCPACK应答,示例代码如下。
java clientChannel.register(selector, SelectionKey.OP_ CONNECT, ioHandler);
- 步骤六:创建Reactor线程,创建多路复用器并启动线程,代码如下。
java Selector selector = Selector.open(); New Thread(new ReactorTask()).start();
- 步骤七:多路复用器在线程run方法的无限循环体内轮询准备就绪的Key,代码如下。
java int num = selector.select(); Set selectedKeys = selector.selectedKeys(); Iterator it = selectedKeys.iterator(); while(it.hasNext()) { SelectionKey key =(SelectionKey) it.next(); //...deal with I/0 event... }
- 步骤八:接收connect事件进行处理,示例代码如下。
java if(key.isConnectable()){ //handlerConnect(); }
- 步骤九:判断连接结果,如果连接成功,注册读事件到多路复用器,示例代码如下。
java if(channel.finishConnect() registerRead();
- 步骤十:注册读事件到多路复用器,示例代码如下。
java clientChannel.register(selector, selectionKey.0P_ READ, ioHandler);
- 步骤十一:异步读客户端请求消息到缓冲区,示例代码如下。
java int readNumber = channel.read(receivedBuffer);
- 步骤十二:对ByteBuffer进行编解码,如果有半包消息接收缓冲区Reset, 继续读取后续的报文,将解码成功的消息封装成Task,投递到业务线程池中,进行业务逻辑编排,示例代码如下。
java Object message = nu11; while(buffer.hasRemain()){ byteBuffer.mark(); Object message = decode(byteBuffer); if(message == nu1l){ byteBuffer.reset(); break;. } messageList.add(message ); } if(!byteBuffer.hasRemain()){ byteBuffer.clear(); }else{ byteBuffer.compact(); } if(messageList != null & ! messageList.isEmpty() ){ for(Object messageE : messageList) handlerTask(messageE); }
- 步骤十三:将POJO对象encode成ByteBuffer, 调用SocketChannel的异步write 接口,将消息异步发送给客户端,示例代码如下。
java socketChannel.write(buffer);
通过序列图和关键代码的解说,相信大家对创建NIO客户端程序已经有了一个初步的 了解,下面就跟随着我们的脚步,继续看看如何使用NIO改造之前的时间服务器客户端 TimeClient吧。
3.3.4 NIO创建TimeClient
public class TimeClient {
public static void main(String[] args) {
int port = 8080;
new Thread(new TimeClientHandler("127.0.0.1", port), "TimeClient-001").start();
}
}
TimeClientHandler
public class TimeClientHandler implements Runnable {
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
public TimeClientHandler(String host, int port) {
this.host = host == null ? "127.0.0.1" : host;
this.port = port;
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
@Override
public void run() {
try {
doConnect();
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (Exception e) {
e.printStackTrace();
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
SocketChannel sc = (SocketChannel) key.channel();
if (key.isConnectable()) {
if (sc.finishConnect()) {
sc.register(selector, SelectionKey.OP_READ);
doWrite(sc);
} else {
System.exit(1);
}
}
if (key.isReadable()) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, StandardCharsets.UTF_8);
System.out.println("current time is : " + body);
this.stop = true;
} else if (readBytes < 0) {
key.cancel();
sc.close();
} else {
// 读到0字节,忽略
}
}
}
}
private void doConnect() throws IOException {
//如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
if (socketChannel.connect(new InetSocketAddress(host, port))) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
} else {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
}
private void doWrite(SocketChannel channel) throws IOException {
byte[] bytes = "QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
if (!writeBuffer.hasRemaining()) {
System.out.println("Send order 2 server succeed.");
}
}
}
3.3.5 NIO结果说明
服务端执行结果:
客户端执行结果:
通过源码对比分析,我们发现NIO编程难度确实比同步阻塞BIO大很多,我们的NIO例程并没有考虑“半包读”和“半包写”,如果加上这些,代码将会更加复杂。NIO代码既然这么复杂,为什么它的应用却越来越广泛呢,使用NIO编程的优点总结如下。
- (1)客户端发起的连接操作是异步的,可以通过在多路复用器注册OP_CONNECT等待后续结果,不需要像之前的客户端那样被同步阻塞。
- (2) SocketChannel 的读写操作都是异步的,如果没有可读写的数据它不会同步等待,直接返回,这样1/O通信线程就可以处理其他的链路,不需要同步等待这个链路可用。
- (3)线程模型的优化:由于JDK的Selector 在Linux等主流操作系统上通过epoll实现,它没有连接句柄数的限制(只受限于操作系统的最大句柄数或者对单个进程的句柄限制),这意味着一个Selector 线程可以同时处理成千上万个客户端连接,而且性能不会随着客户端的增加而线性下降,因此,它非常适合做高性能、高负载的网络服务器。
JDK1.7升级了NIO类库,升级后的NIO类库被称为NIO2.0, 引人注目的是,Java正式提供了异步文件I/O操作,同时提供了与UNIX网络编程事件驱动IO对应的AIO,
3.4 AIO编程
NIO2.0引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现。异步通道提供两种方式获取获取操作结果。
- 通过 java.util.concurrent.Future类来表示异步操作的结果:
- 在执行异步操作的时候传入--个java.nio.channels.
CompletionHandler接口的实现类作为操作完成的回调。
NIO2.0的异步套接字通道是真正的异步非阻塞I/O,它对应UNIX网络编程中的事件驱动I/O(AIO),它不需要通过多路复用器(Selector)对注册的通道进行轮询操作即可实现异步读写,从而简化了NIO的编程模型。
3.4.1 AIO创建TimeServer
public class TimeServer {
public static void main(String[] args) throws IOException {
int port = 8080;
AsyncTimeServerHandler timeServer = new AsyncTimeServerHandler(port);
new Thread(timeServer, "AIO-AsyncTimeServerHandler-001").start();
}
}
AsyncTimeServerHandler
public class AsyncTimeServerHandler implements Runnable {
private int port;
CountDownLatch latch;
AsynchronousServerSocketChannel asynchronousServerSocketChannel;
public AsyncTimeServerHandler(int port) {
this.port = port;
try {
asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
System.out.println("The time server is start at port : " + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
latch = new CountDownLatch(1);
doAccept();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void doAccept() {
asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler());
}
}
AcceptCompletionHandler
public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncTimeServerHandler> {
@Override
public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) {
attachment.asynchronousServerSocketChannel.accept(attachment, this);
ByteBuffer buffer = ByteBuffer.allocate(1024);
result.read(buffer, buffer, new ReadCompletionHandler(result));
}
@Override
public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
exc.printStackTrace();
attachment.latch.countDown();
}
}
ReadCompletionHandler
public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer>{
private AsynchronousSocketChannel channel;
public ReadCompletionHandler(AsynchronousSocketChannel channel) {
if (this.channel == null) {
this.channel = channel;
}
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
byte[] bytes = new byte[attachment.remaining()];
attachment.get(bytes);
String body = new String(bytes, StandardCharsets.UTF_8);
System.out.println("The time server receive order : " + body);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
doWrite(currentTime);
}
private void doWrite(String currentTime) {
if (currentTime!=null&¤tTime.trim().length()>0) {
byte[] bytes = currentTime.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
// 如果没有发送完成,继续发送
if (buffer.hasRemaining()) {
channel.write(buffer, buffer, this);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
this.channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
3.4.2 AIO创建TimeClient
public class TimeClient {
public static void main(String[] args) {
int port = 8080;
new Thread(new AsyncTimeClientHandler("127.0.0.1", port), "AIO-AsyncTimeClient-001").start();
}
}
AsyncTimeClientHandler
public class AsyncTimeClientHandler implements CompletionHandler<Void, AsyncTimeClientHandler>, Runnable {
private AsynchronousSocketChannel client;
private String host;
private int port;
private CountDownLatch latch;
public AsyncTimeClientHandler(String host, int port) {
this.host = host;
this.port = port;
try {
client = AsynchronousSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
latch = new CountDownLatch(1);
client.connect(new InetSocketAddress(host, port), this, this);
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void completed(Void result, AsyncTimeClientHandler attachment) {
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
client.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
client.write(buffer, buffer, this);
} else {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
client.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
readBuffer.get(bytes);
try {
String body = new String(bytes, StandardCharsets.UTF_8);
System.out.println("current time is : " + body);
latch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
@Override
public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
}
3.4.3 AIO结果说明
服务端执行结果:
客户端执行结果:
JDK底层通过线程池ThreadPoolExecutor来执行回调通知,异步回调通知类由sun.nio.ch.AsynchronousChannelGrouplmpl 实现,它经过层层调用,最终回调com.pheinetty.aio.Async TimeClientHandlerS 1.completed方法,完成回调通知。由此我们也可以得出结论:异步Socket Channel是被动执行对象,我们不需要像NIO编程那样创建一一个独立的 I/0 线程来处理读写操作。对于AsynchronousServerSocketChannel和AsynchronousSocketChannel,它们都由JDK底层的线程池负责回调并驱动读写操作。正因为如此,基于NIO2.0新的异步非阻塞Channel进行编程比NIO编程更为简单。
3.5 各种I/O对比
3.6 选择Netty的理由
3.6.1 原生NIO存在的问题
- (1) NIO的类库和API繁杂,使用麻烦,你需要熟练掌握Sclector、ServerSocketChannel、 SocketChannel、ByteBuffer 等。
- (2)需要具备其他的额外技能做铺垫,例如熟悉Java多线程编程。这是因为NIO编程涉及到Reactor模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的NIO程序。
- (3)可靠性能力补齐,工作量和难度都非常大。例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等问题,NIO编程的特点是功能开发相对容易,但是可靠性能力补齐的工作 量和难度都非常大。
- (4)JDK NIO的BUG,例如臭名昭著的epoll bug,它会导致Selector空轮询,最终导致CPU 100%。官方声称在JDK1.6版本的update18修复了该问题,但是直到JDK1.7版本该问题仍旧存在,只不过该BUG发生概率降低了一些而已,它并没有被根本解决。
3.6.2 为什么选择Netty
Netty是业界最流行的NIO框架之一,它的健壮性、功能、性能、可定制性和可扩展性在同类框架中都是首屈一指的,它已经得到成百上千的商用项目验证,例如Hadoop的RPC框架avro使用Netty作为底层通信框架;很多其他业界主流的RPC框架,也使用Netty来构建高性能的异步通信能力。
通过对Netty 的分析,我们将它的优点总结如下。
- API使用简单,开发门槛低;
- 功能强大,预置了多种编解码功能,支持多种主流协议;
- 定制能力强,可以通过ChannelHandler对通信框架进行灵活地扩展;
- 性能高,通过与其他业界主流的NIO框架对比,Netty的综合性能最优;
- 成熟、稳定,Netty修复了已经发现的所有JDK NIO BUG, 业务开发人员不需要再为NIO的BUG而烦恼;
- 社区活跃,版本迭代周期短,发现的BUG可以被及时修复,同时,更多的新功能会加入;
- 经历了大规模的商业应用考验,质量得到验证。在互联网、大数据、网络游戏、企业应用、电信软件等众多行业得到成功商用,证明了它已经完全能够满足不同行业的商业应用了。
正是因为这些优点,Netty逐渐成为JavaNIO编程的首选框架。
评论区