侧边栏壁纸
博主头像
soulballad博主等级

技术文章记录及总结

  • 累计撰写 169 篇文章
  • 累计创建 26 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

2.Netty初步使用

soulballad
2022-03-06 / 0 评论 / 0 点赞 / 69 阅读 / 19,514 字
温馨提示:
本文最后更新于 2022-03-06,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

1. Netty入门

作为Netty的第一个应用程序,我们依然以时间服务器为例进行开发,通过Netty版本的时间服务器的开发,让初学者尽快学到如何搭建Netty开发环境和运行Netty应用程序。

1.1 Netty服务端开发

在开始使用Netty开发TimeServer之前,先回顾一下使用NIO进行服务端开发的步骤。

  • (1)创建ServerSocketChannel,配置它为非阻塞模式;
  • (2)绑定监听,配置TCP参数,例如backlog大小;
  • (3)创建一个独立的I/O线程,用于轮询多路复用器Selector;
  • (4)创建Seleetor,将之前创建的ServerSocketChannel 注册到Selector上,监听SelectionKey.ACCEPT;
  • (5)启动I/O线程,在循环体中执行Selector.select()方法,轮询就绪的Channel;
  • (6)当轮询到了处于就绪状态的Channel时,需要对其进行判断,如果是OP_ ACCEPT状态,说明是新的客户端接入,则调用ServerSocketChannel.accept()方法接受新的客户端;
  • (7)设置新接入的客户端链路SocketChannel为非阻塞模式,配置其他的一些TCP参数;
  • (8)将SocketChannel注册到Selector,监听OP_ READ操作位;
  • (9)如果轮询的Channel为OP_READ,则说明SocketChannel中有新的就绪的数据包需要读取,则构造ByteBuffer对象,读取数据包;
  • (10)如果轮询的Channel为OP_ WRITE,说明还有数据没有发送完成,需要继续发送。

一个简单的NIO服务端程序,如果我们直接使用JDK的NIO类库进行开发,竟然需要经过烦琐的十多步操作才能完成最基本的消息读取和发送,这也是我们要选择Netty等NIO框架的原因了,下面我们看看使用Netty是如何轻松搞定服务端开发的。

TimeServer

public class TimeServer {

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new TimeServer().bind(port);
    }

    public void bind(int port) throws Exception {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());

            // 绑定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();
            // 等待服务端监听端口关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放NIO线程组
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new TimeServerHandler());
        }
    }
}

TimeServerHandler

public class TimeServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] bytes = new byte[buf.readableBytes()];
        buf.readBytes(bytes);
        String body = new String(bytes, StandardCharsets.UTF_8);
        System.out.println("The time server receive order : " + body);
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.write(resp);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

1.2 Netty客户端开发

TimeClient

public class TimeClient {

    public static void main(String[] args) throws InterruptedException {
        new TimeClient().connect("127.0.0.1", 8080);
    }

    public void connect(String host, int port) throws InterruptedException {
        // 配置客户端NIO线程组
        NioEventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new TimeClientHandler());
                        }
                    });
            // 发起异步连接操作
            ChannelFuture f = b.connect(host, port).sync();
            // 等待客户端链路关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放NIO线程组
            group.shutdownGracefully();
        }
    }
}

TimeClientHandler

public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    private final ByteBuf firstMessage;

    public TimeClientHandler() {
        byte[] req = "QUERY TIME ORDER".getBytes();
        this.firstMessage = Unpooled.buffer(req.length);
        firstMessage.writeBytes(req);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(firstMessage);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] bytes = new byte[buf.readableBytes()];
        buf.readBytes(bytes);
        String body = new String(bytes, StandardCharsets.UTF_8);
        System.out.println("current time is : " + body);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("Unexpected exception from downstream : " + cause.getMessage());
        ctx.close();
    }
}

1.3 运行和调试

服务端执行结果:

image-20200419205231924

客户端执行结果:

image-20200419205231924

运行结果正确。可以发现,通过Netty开发的NIO服务端和客户端非常简单,短短儿十行代码,就能完成之前NIO程序需要几百行才能完成的功能。基于Netty的应用开发不但API使用简单、开发模式固定,而且扩展性和定制性非常好,后面,我们会通过更多应用来介绍Netty的强大功能。

需要指出的是,本例程依然没有考虑读半包的处理,对于功能演示或者测试,上述程序没有问题,但是稍加改造进行性能或者压力测试,它就不能正确地工作了。在下一个章节我们会给出能够正确处理半包消息的应用实例。

2. 解决TCP粘包拆包

2.1 TCP粘包拆包

TCP是个“流”协议,所谓流,就是没有界限的一串数据。大家可以想想河里的流水,是连成一片的,其间并没有分界线。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

2.1.1 粘包拆包问题说明

我们可以通过图解对TCP粘包和拆包问题进行说明,粘包问题示例如图4-1所示。

image-20200419205231924

假设客户端分别发送了两个数据包DI和D2给服务端,由于服务端一次 读取到的字节数是不确定的,故可能存在以下4种情况。

  • (1)服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;
  • (2)服务端一次接收到了两个数据包,DI和D2粘合在一起,被称为TCP粘包;
  • (3)服务端分两次读取到了两个数据包,第一次读取到了完整的DI包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包;
  • (4)服务端分两次读取到了两个数据包,第一次读取到了DI包的部分内容DI_1,第二次读取到了DI包的剩余内容D1_ 2和D2包的整包。

如果此时服务端TCP接收滑窗非常小,而数据包DI和D2比较大,很有可能会发生第五种可能,即服务端分多次才能将DI和D2包接收完全,期间发生多次拆包。

2.1.2 问题产生的原因

TCP粘包/拆包问题产生的原因有三个,分别如下:

  • (1)应用程序write写入的字节大小大于套接口发送缓冲区大小;
  • (2)进行MSS大小的TCP分段;
  • (3)以太网帧的payload大于MTU进行IP分片。

图解如图 4-2 所示:

image-20200419205231924

2.1.3 问题的解决策略

由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下:

  • (1)消息定长,例如每个报文的大小为固定长度200字节,如果不够,空位补空格;
  • (2)在包尾增加回车换行符进行分割,例如FTP协议;
  • (3)将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段,通常设计思路为消息头的第一个字段使用int32来表示消息的总长度;
  • (4)更复杂的应用层协议。

介绍完了TCP粘包/拆包的基础知识,下面我们就通过实际例程来看看如何使用Netty提供的半包解码器来解决TCP粘包/拆包问题。

2.2 未考虑粘包拆包异常案例

2.2.1 TimeServer

public class TimeServer {

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new TimeServer().bind(port);
    }

    public void bind(int port) throws Exception {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());

            // 绑定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();
            // 等待服务端监听端口关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放NIO线程组
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new TimeServerHandler());
        }
    }
}

TimeServerHandler

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] bytes = new byte[buf.readableBytes()];
        buf.readBytes(bytes);
        String body = new String(bytes, StandardCharsets.UTF_8).substring(0, bytes.length - System.getProperty("line.separator").length());
        System.out.println("The time server receive order : " + body + "; the counter is : " + ++counter);
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.write(resp);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

2.2.2 TimeClient

public class TimeClient {

    public static void main(String[] args) throws InterruptedException {
        new TimeClient().connect("127.0.0.1", 8080);
    }

    public void connect(String host, int port) throws InterruptedException {
        // 配置客户端NIO线程组
        NioEventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new TimeClientHandler());
                        }
                    });
            // 发起异步连接操作
            ChannelFuture f = b.connect(host, port).sync();
            // 等待客户端链路关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放NIO线程组
            group.shutdownGracefully();
        }
    }
}

TimeClientHandler

public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    private byte[] req;
    private int counter;

    public TimeClientHandler() {
        req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] bytes = new byte[buf.readableBytes()];
        buf.readBytes(bytes);
        String body = new String(bytes, StandardCharsets.UTF_8);
        System.out.println("current time is : " + body + "; the counter is : " + ++counter);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("Unexpected exception from downstream : " + cause.getMessage());
        ctx.close();
    }
}

2.2.3 运行结果分析

服务端执行结果:

image-20200419205231924

客户端执行结果:

image-20200419205231924

按照设计初衷,客户端应该收到100条当前系统时间的消息,但实际上只收到了一条

这不难理解,因为服务端只收到了1条请求消息,所以实际服务端只发送了1条应答,由于请求消息不满足查询条件,所以返回了1条“BAD ORDER"应答消息。但是实际上客户端只收到了一条包含1条“BADORDER"指令的消息,说明服务端返回的应答消息也发生了粘包。

由于上面的例程没有考虑TCP的粘包/拆包,所以当发生TCP粘包时,我们的程序就不能正常工作。

下面将演示如何通过Netty的LineBasedFrameDecoder和StringDecoder来解决TCP粘包问题。

2.3 LineBasedFrameDecoder解决粘包问题

为了解决TCP粘包/拆包导致的半包读写问题,Netty默认提供了多种编解码器用于处理半包,只要能熟练掌握这些类库的使用,TCP粘包问题从此会变得非常容易,你甚至不需要关心它们,这也是其他NIO框架和JDK原生的NIO API所无法匹敌的。

2.3.1 处理TCP粘包的TimeServer

public class TimeServer {

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new TimeServer().bind(port);
    }

    public void bind(int port) throws Exception {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());

            // 绑定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();
            // 等待服务端监听端口关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放NIO线程组
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
            ch.pipeline().addLast(new StringDecoder());
            ch.pipeline().addLast(new TimeServerHandler());
        }
    }
}

TimeServerHandler

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;
        System.out.println("The time server receive order : " + body + "; the counter is : " + ++counter);
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
                System.currentTimeMillis()).toString() : "BAD ORDER";
        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.write(resp);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

2.3.2 处理TCP粘包的TimeClient

public class TimeClient {

    public static void main(String[] args) throws InterruptedException {
        new TimeClient().connect("127.0.0.1", 8080);
    }

    public void connect(String host, int port) throws InterruptedException {
        // 配置客户端NIO线程组
        NioEventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new TimeClientHandler());
                        }
                    });
            // 发起异步连接操作
            ChannelFuture f = b.connect(host, port).sync();
            // 等待客户端链路关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放NIO线程组
            group.shutdownGracefully();
        }
    }
}

TimeClientHandler

public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    private byte[] req;
    private int counter;

    public TimeClientHandler() {
        req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;
        System.out.println("current time is : " + body + "; the counter is : " + ++counter);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("Unexpected exception from downstream : " + cause.getMessage());
        ctx.close();
    }
}

3.3.3 运行结果分析

服务端执行结果:

image-20200419205231924

客户端执行结果:

image-20200419205231924

运行结果完全符合预期,说明通过使用LineBasedFrameDecoder 和StringDecoder成功解决了TCP粘包导致的读半包问题。对于使用者来说,只要将支持半包解码的handler添加到ChannelPipeline中即可,不需要写额外的代码,用户使用起来非常简单。

3.3.4 LineBasedFrameDecoder原理分析

LineBasedFrameDecoder的工作原理是它依次遍历ByteBuf 中的可读字节,判断看是否有“\n” 或者“\r\n”, 如果有,就以此位置为结束位置,从可读索引到结東位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。

StringDecoder的功能非常简单,就是将接收到的对象转换成字符串,然后继续调用后面的handler。

LineBasedFrameDecoder + StringDecoder 组合就是按行切换的文本解码器,它被设计用来支持TCP的粘包和拆包。

3. 分隔符和定长解码器

TCP以流的方式进行数据传输,上层的应用协议为了对消息进行区分,往往采用如下4种方式:

  • (1)消息长度固定,累计读取到长度总和为定长LEN的报文后,就认为读取到了一个完整的消息:将计数器置位,重新开始读取下一个数据报;
  • (2)将回车换行符作为消息结束符,例如FTP协议,这种方式在文本协议中应用比较广泛;
  • (3)将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结東分隔符;
  • (4)通过在消息头中定义长度字段来标识消息的总长度。

Netty对上面四种应用做了统一的抽象,提供了4种解码器来解决对应的问题,使用起来非常方便。有了这些解码器,用户不需要自己对读取的报文进行人工解码,也不需要考虑TCP的粘包和拆包。

3.1 DelimiterBasedFramedDecoder

通过对DelimiterBasedFrameDecoder的使用,我们可以自动完成以分隔符作为码流结束标识的消息的解码,下面通过一个演示程序来学习下如何使用DelimiterBased FrameDecoder进行开发

3.1.1 服务端开发

EchoServer

public class EchoServer {

    public static void main(String[] args) throws Exception {
        new EchoServer().bind(8080);
    }

    public void bind(int port) throws Exception {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            // 绑定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();
            // 等待服务端监听端口关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放线程池资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

EchoServerHandler

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    int counter = 0;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;
        System.out.println("This is : " + ++counter + " times receive client : {" + body + "}");
        body += "$_";
        ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
        ctx.writeAndFlush(echo);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

3.1.2 客户端开发

EchoClient

public class EchoClient {

    public static void main(String[] args) throws InterruptedException {
        new EchoClient().connect("127.0.0.1", 8080);
    }

    public void connect(String host, int port) throws InterruptedException {
        // 配置客户端NIO线程组
        NioEventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            // 发起异步连接操作
            ChannelFuture f = b.connect(host, port).sync();
            // 等待客户端链路关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放NIO线程组
            group.shutdownGracefully();
        }
    }
}

EchoClientHandler

public class EchoClientHandler extends ChannelInboundHandlerAdapter {

    private int counter;
    private final String ECHO_REQ = "Hi, soulballad. Welcome to Netty.$_";

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i < 10; i++) {
            ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("This is " + ++counter + " times receive server : {" + msg + "}");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

3.1.3 运行结果分析

服务端执行结果:

image-20200419205231924

客户端执行结果:

image-20200419205231924

服务端成功接收到了客户端发送的10条 “Hi, soulballad. Welcome to Netty." 请求消息,客户端成功接收到了服务端返回的10条 “Hi, soulballad. Welcome to Netty." 应答消息。测试结果表明使用DelimiterBasedFrameDecoder 可以自动对采用分隔符做码流结束标识的消息进行解码。

3.2 FixedLengthBasedFrameDecoder

FixedLengthFrameDecoder是固定长度解码器,它能够按照指定的长度对消息进行自动解码,开发者不需要考虑TCP的粘包/拆包问题,非常实用。

3.2.1 服务端开发

EchoServer

public class EchoServer {

    public static void main(String[] args) throws Exception {
        new EchoServer().bind(8080);
    }

    public void bind(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            // 绑定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();
            // 等待服务端监听端口关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放线程池资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

EchoServerHandler

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Receive client : {" + msg + "}");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

3.2.2 客户端开发

EchoClient

public class EchoClient {

    public static void main(String[] args) throws InterruptedException {
        new EchoClient().connect("127.0.0.1", 8080);
    }

    public void connect(String host, int port) throws InterruptedException {
        // 配置客户端NIO线程组
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            // 发起异步连接操作
            ChannelFuture f = b.connect(host, port).sync();
            // 等待客户端链路关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放NIO线程组
            group.shutdownGracefully();
        }
    }
}

EchoClientHandler

public class EchoClientHandler extends ChannelInboundHandlerAdapter {

    private final String ECHO_REQ = "Hi, soulballad. Welcome to Netty.";

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i < 3; i++) {
            ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

3.2.3 运行结果分析

服务端执行结果:

image-20200419205231924

服务端运行结果完全符合预期,FixedLengthFrameDecoder 解码器按照20个字节长度对请求消息进行截取,输出结果为 “Hi, soulballad. Welc ”。

0

评论区