1. Netty入门
作为Netty的第一个应用程序,我们依然以时间服务器为例进行开发,通过Netty版本的时间服务器的开发,让初学者尽快学到如何搭建Netty开发环境和运行Netty应用程序。
1.1 Netty服务端开发
在开始使用Netty开发TimeServer之前,先回顾一下使用NIO进行服务端开发的步骤。
- (1)创建ServerSocketChannel,配置它为非阻塞模式;
- (2)绑定监听,配置TCP参数,例如backlog大小;
- (3)创建一个独立的I/O线程,用于轮询多路复用器Selector;
- (4)创建Seleetor,将之前创建的ServerSocketChannel 注册到Selector上,监听SelectionKey.ACCEPT;
- (5)启动I/O线程,在循环体中执行Selector.select()方法,轮询就绪的Channel;
- (6)当轮询到了处于就绪状态的Channel时,需要对其进行判断,如果是OP_ ACCEPT状态,说明是新的客户端接入,则调用ServerSocketChannel.accept()方法接受新的客户端;
- (7)设置新接入的客户端链路SocketChannel为非阻塞模式,配置其他的一些TCP参数;
- (8)将SocketChannel注册到Selector,监听OP_ READ操作位;
- (9)如果轮询的Channel为OP_READ,则说明SocketChannel中有新的就绪的数据包需要读取,则构造ByteBuffer对象,读取数据包;
- (10)如果轮询的Channel为OP_ WRITE,说明还有数据没有发送完成,需要继续发送。
一个简单的NIO服务端程序,如果我们直接使用JDK的NIO类库进行开发,竟然需要经过烦琐的十多步操作才能完成最基本的消息读取和发送,这也是我们要选择Netty等NIO框架的原因了,下面我们看看使用Netty是如何轻松搞定服务端开发的。
TimeServer
public class TimeServer {
public static void main(String[] args) throws Exception {
int port = 8080;
new TimeServer().bind(port);
}
public void bind(int port) throws Exception {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());
// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放NIO线程组
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeServerHandler());
}
}
}
TimeServerHandler
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
String body = new String(bytes, StandardCharsets.UTF_8);
System.out.println("The time server receive order : " + body);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.write(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
1.2 Netty客户端开发
TimeClient
public class TimeClient {
public static void main(String[] args) throws InterruptedException {
new TimeClient().connect("127.0.0.1", 8080);
}
public void connect(String host, int port) throws InterruptedException {
// 配置客户端NIO线程组
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
// 等待客户端链路关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}
}
TimeClientHandler
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private final ByteBuf firstMessage;
public TimeClientHandler() {
byte[] req = "QUERY TIME ORDER".getBytes();
this.firstMessage = Unpooled.buffer(req.length);
firstMessage.writeBytes(req);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(firstMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
String body = new String(bytes, StandardCharsets.UTF_8);
System.out.println("current time is : " + body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("Unexpected exception from downstream : " + cause.getMessage());
ctx.close();
}
}
1.3 运行和调试
服务端执行结果:
客户端执行结果:
运行结果正确。可以发现,通过Netty开发的NIO服务端和客户端非常简单,短短儿十行代码,就能完成之前NIO程序需要几百行才能完成的功能。基于Netty的应用开发不但API使用简单、开发模式固定,而且扩展性和定制性非常好,后面,我们会通过更多应用来介绍Netty的强大功能。
需要指出的是,本例程依然没有考虑读半包的处理,对于功能演示或者测试,上述程序没有问题,但是稍加改造进行性能或者压力测试,它就不能正确地工作了。在下一个章节我们会给出能够正确处理半包消息的应用实例。
2. 解决TCP粘包拆包
2.1 TCP粘包拆包
TCP是个“流”协议,所谓流,就是没有界限的一串数据。大家可以想想河里的流水,是连成一片的,其间并没有分界线。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。
2.1.1 粘包拆包问题说明
我们可以通过图解对TCP粘包和拆包问题进行说明,粘包问题示例如图4-1所示。
假设客户端分别发送了两个数据包DI和D2给服务端,由于服务端一次 读取到的字节数是不确定的,故可能存在以下4种情况。
- (1)服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;
- (2)服务端一次接收到了两个数据包,DI和D2粘合在一起,被称为TCP粘包;
- (3)服务端分两次读取到了两个数据包,第一次读取到了完整的DI包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包;
- (4)服务端分两次读取到了两个数据包,第一次读取到了DI包的部分内容DI_1,第二次读取到了DI包的剩余内容D1_ 2和D2包的整包。
如果此时服务端TCP接收滑窗非常小,而数据包DI和D2比较大,很有可能会发生第五种可能,即服务端分多次才能将DI和D2包接收完全,期间发生多次拆包。
2.1.2 问题产生的原因
TCP粘包/拆包问题产生的原因有三个,分别如下:
- (1)应用程序write写入的字节大小大于套接口发送缓冲区大小;
- (2)进行MSS大小的TCP分段;
- (3)以太网帧的payload大于MTU进行IP分片。
图解如图 4-2 所示:
2.1.3 问题的解决策略
由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下:
- (1)消息定长,例如每个报文的大小为固定长度200字节,如果不够,空位补空格;
- (2)在包尾增加回车换行符进行分割,例如FTP协议;
- (3)将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段,通常设计思路为消息头的第一个字段使用int32来表示消息的总长度;
- (4)更复杂的应用层协议。
介绍完了TCP粘包/拆包的基础知识,下面我们就通过实际例程来看看如何使用Netty提供的半包解码器来解决TCP粘包/拆包问题。
2.2 未考虑粘包拆包异常案例
2.2.1 TimeServer
public class TimeServer {
public static void main(String[] args) throws Exception {
int port = 8080;
new TimeServer().bind(port);
}
public void bind(int port) throws Exception {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());
// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放NIO线程组
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeServerHandler());
}
}
}
TimeServerHandler
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
private int counter;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
String body = new String(bytes, StandardCharsets.UTF_8).substring(0, bytes.length - System.getProperty("line.separator").length());
System.out.println("The time server receive order : " + body + "; the counter is : " + ++counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.write(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
2.2.2 TimeClient
public class TimeClient {
public static void main(String[] args) throws InterruptedException {
new TimeClient().connect("127.0.0.1", 8080);
}
public void connect(String host, int port) throws InterruptedException {
// 配置客户端NIO线程组
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
// 等待客户端链路关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}
}
TimeClientHandler
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private byte[] req;
private int counter;
public TimeClientHandler() {
req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message = null;
for (int i = 0; i < 100; i++) {
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
String body = new String(bytes, StandardCharsets.UTF_8);
System.out.println("current time is : " + body + "; the counter is : " + ++counter);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("Unexpected exception from downstream : " + cause.getMessage());
ctx.close();
}
}
2.2.3 运行结果分析
服务端执行结果:
客户端执行结果:
按照设计初衷,客户端应该收到100条当前系统时间的消息,但实际上只收到了一条。
这不难理解,因为服务端只收到了1条请求消息,所以实际服务端只发送了1条应答,由于请求消息不满足查询条件,所以返回了1条“BAD ORDER"应答消息。但是实际上客户端只收到了一条包含1条“BADORDER"指令的消息,说明服务端返回的应答消息也发生了粘包。
由于上面的例程没有考虑TCP的粘包/拆包,所以当发生TCP粘包时,我们的程序就不能正常工作。
下面将演示如何通过Netty的LineBasedFrameDecoder和StringDecoder来解决TCP粘包问题。
2.3 LineBasedFrameDecoder解决粘包问题
为了解决TCP粘包/拆包导致的半包读写问题,Netty默认提供了多种编解码器用于处理半包,只要能熟练掌握这些类库的使用,TCP粘包问题从此会变得非常容易,你甚至不需要关心它们,这也是其他NIO框架和JDK原生的NIO API所无法匹敌的。
2.3.1 处理TCP粘包的TimeServer
public class TimeServer {
public static void main(String[] args) throws Exception {
int port = 8080;
new TimeServer().bind(port);
}
public void bind(int port) throws Exception {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());
// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放NIO线程组
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeServerHandler());
}
}
}
TimeServerHandler
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
private int counter;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("The time server receive order : " + body + "; the counter is : " + ++counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
System.currentTimeMillis()).toString() : "BAD ORDER";
currentTime = currentTime + System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.write(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
2.3.2 处理TCP粘包的TimeClient
public class TimeClient {
public static void main(String[] args) throws InterruptedException {
new TimeClient().connect("127.0.0.1", 8080);
}
public void connect(String host, int port) throws InterruptedException {
// 配置客户端NIO线程组
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
// 等待客户端链路关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}
}
TimeClientHandler
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private byte[] req;
private int counter;
public TimeClientHandler() {
req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message = null;
for (int i = 0; i < 100; i++) {
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("current time is : " + body + "; the counter is : " + ++counter);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("Unexpected exception from downstream : " + cause.getMessage());
ctx.close();
}
}
3.3.3 运行结果分析
服务端执行结果:
客户端执行结果:
运行结果完全符合预期,说明通过使用LineBasedFrameDecoder 和StringDecoder成功解决了TCP粘包导致的读半包问题。对于使用者来说,只要将支持半包解码的handler添加到ChannelPipeline中即可,不需要写额外的代码,用户使用起来非常简单。
3.3.4 LineBasedFrameDecoder原理分析
LineBasedFrameDecoder的工作原理是它依次遍历ByteBuf 中的可读字节,判断看是否有“\n” 或者“\r\n”, 如果有,就以此位置为结束位置,从可读索引到结東位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。
StringDecoder的功能非常简单,就是将接收到的对象转换成字符串,然后继续调用后面的handler。
LineBasedFrameDecoder + StringDecoder 组合就是按行切换的文本解码器,它被设计用来支持TCP的粘包和拆包。
3. 分隔符和定长解码器
TCP以流的方式进行数据传输,上层的应用协议为了对消息进行区分,往往采用如下4种方式:
- (1)消息长度固定,累计读取到长度总和为定长LEN的报文后,就认为读取到了一个完整的消息:将计数器置位,重新开始读取下一个数据报;
- (2)将回车换行符作为消息结束符,例如FTP协议,这种方式在文本协议中应用比较广泛;
- (3)将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结東分隔符;
- (4)通过在消息头中定义长度字段来标识消息的总长度。
Netty对上面四种应用做了统一的抽象,提供了4种解码器来解决对应的问题,使用起来非常方便。有了这些解码器,用户不需要自己对读取的报文进行人工解码,也不需要考虑TCP的粘包和拆包。
3.1 DelimiterBasedFramedDecoder
通过对DelimiterBasedFrameDecoder的使用,我们可以自动完成以分隔符作为码流结束标识的消息的解码,下面通过一个演示程序来学习下如何使用DelimiterBased FrameDecoder进行开发
3.1.1 服务端开发
EchoServer
public class EchoServer {
public static void main(String[] args) throws Exception {
new EchoServer().bind(8080);
}
public void bind(int port) throws Exception {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoServerHandler());
}
});
// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
EchoServerHandler
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
int counter = 0;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("This is : " + ++counter + " times receive client : {" + body + "}");
body += "$_";
ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
ctx.writeAndFlush(echo);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
3.1.2 客户端开发
EchoClient
public class EchoClient {
public static void main(String[] args) throws InterruptedException {
new EchoClient().connect("127.0.0.1", 8080);
}
public void connect(String host, int port) throws InterruptedException {
// 配置客户端NIO线程组
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
// 等待客户端链路关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}
}
EchoClientHandler
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
private int counter;
private final String ECHO_REQ = "Hi, soulballad. Welcome to Netty.$_";
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 10; i++) {
ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("This is " + ++counter + " times receive server : {" + msg + "}");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
3.1.3 运行结果分析
服务端执行结果:
客户端执行结果:
服务端成功接收到了客户端发送的10条 “Hi, soulballad. Welcome to Netty." 请求消息,客户端成功接收到了服务端返回的10条 “Hi, soulballad. Welcome to Netty." 应答消息。测试结果表明使用DelimiterBasedFrameDecoder 可以自动对采用分隔符做码流结束标识的消息进行解码。
3.2 FixedLengthBasedFrameDecoder
FixedLengthFrameDecoder是固定长度解码器,它能够按照指定的长度对消息进行自动解码,开发者不需要考虑TCP的粘包/拆包问题,非常实用。
3.2.1 服务端开发
EchoServer
public class EchoServer {
public static void main(String[] args) throws Exception {
new EchoServer().bind(8080);
}
public void bind(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoServerHandler());
}
});
// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
EchoServerHandler
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Receive client : {" + msg + "}");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
3.2.2 客户端开发
EchoClient
public class EchoClient {
public static void main(String[] args) throws InterruptedException {
new EchoClient().connect("127.0.0.1", 8080);
}
public void connect(String host, int port) throws InterruptedException {
// 配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
// 等待客户端链路关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}
}
EchoClientHandler
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
private final String ECHO_REQ = "Hi, soulballad. Welcome to Netty.";
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 3; i++) {
ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
3.2.3 运行结果分析
服务端执行结果:
服务端运行结果完全符合预期,FixedLengthFrameDecoder 解码器按照20个字节长度对请求消息进行截取,输出结果为 “Hi, soulballad. Welc ”。
评论区