侧边栏壁纸
博主头像
soulballad博主等级

技术文章记录及总结

  • 累计撰写 169 篇文章
  • 累计创建 26 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

Netty实现同步“请求-响应”的同步通信机制

soulballad
2022-01-05 / 0 评论 / 0 点赞 / 62 阅读 / 18,959 字
温馨提示:
本文最后更新于 2022-03-08,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

1. 需求

实现基于Netty的“请求-响应”同步通信机制。

2. 设计思路

Netty提供了异步IO和同步IO的统一实现,但是我们的需求其实和IO的同步异步并无关系。我们的关键是要实现请求-响应这种典型的一问一答交互方式。要实现这个需求,需要解决两个问题: 

2.1 请求和响应的正确匹配。

客户端发送数据后,服务端返回响应结果的时候,怎么和客户端的请求正确匹配起来呢,(即一个请求对应一个自己的响应)?
解决思路:通过客户端唯一的RequestId,服务端返回的响应中需要包含该RequestId,这样客户端就可以通过RequestId来正确匹配请求响应。

2.2 请求线程和响应线程的通信。

请求线程会在发出请求后,同步等待服务端的返回。因此,就需要解决,Netty客户端在接受到响应之后,怎么通知请求线程结果。
解决思路:客户端线程在发送请求后,进入等待,服务器返回响应后,根据RequestId来唤醒客户端的请求线程,并把结果返回给请求线程。

3. 解决方案

利用Java中的CountDownLatch类来实现同步Future。
具体过程是:客户端发送请求后将<请求ID,Future>的键值对保存到一个缓存中,这时候用Future等待结果,挂住请求线程;当Netty客户端收到服务端的响应后,响应线程根据请求ID从缓存中取出Future,然后设置响应结果到Future中。这个时候利用CountDownLatch的通知机制,通知请求线程。请求线程从Future中拿到响应结果,然后做业务处理。
缓存使用google的guava

4. 具体代码

首先引入依赖

<!-- guava -->
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>28.0-jre</version>
</dependency>

<!-- Netty -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.39.Final</version>
</dependency>

4.1 SyncFuture

SyncFuture: 同步的Future。这个是核心,通过这个工具类来实现线程等待。

package com.topinfo.ci.netty.client;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class SyncFuture<T> implements Future<T> {

    // 因为请求和响应是一一对应的,因此初始化CountDownLatch值为1。
    private CountDownLatch latch = new CountDownLatch(1);
    
    // 需要响应线程设置的响应结果
    private T response;
    
    // Futrue的请求时间,用于计算Future是否超时
    private long beginTime = System.currentTimeMillis();

    public SyncFuture() {
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return false;
    }

    @Override
    public boolean isCancelled() {
        return false;
    }

    @Override
    public boolean isDone() {
        if (response != null) {
            return true;
        }
        return false;
    }

    // 获取响应结果,直到有结果才返回。
    @Override
    public T get() throws InterruptedException {
        latch.await();
        return this.response;
    }

    // 获取响应结果,直到有结果或者超过指定时间就返回。
    @Override
    public T get(long timeout, TimeUnit unit) throws InterruptedException {
        if (latch.await(timeout, unit)) {
            return this.response;
        }
        return null;
    }

    // 用于设置响应结果,并且做countDown操作,通知请求线程
    public void setResponse(T response) {
        this.response = response;
        latch.countDown();
    }

    public long getBeginTime() {
        return beginTime;
    }
}

4.2 客户端代码

4.2.1 NettyClient

NettyClient: 有消息同步的和异步的方法,具体内容如下:

package com.topinfo.ci.netty.client;

import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;


/**
 *@Description: Netty客户端
 *@Author:杨攀
 *@Since:2019年9月26日下午8:54:59  
 */
@Component
public class NettyClient {

    private static final Logger LOGGER = LoggerFactory.getLogger(NettyClient.class);
    
    private EventLoopGroup group = new NioEventLoopGroup();

    /** 
     *@Fields DELIMITER : 自定义分隔符,服务端和客户端要保持一致
     */ 
    public static final String DELIMITER = "@@";
    
    /**
     * @Fields hostIp : 服务端ip
     */
    private String hostIp = "192.168.90.96";

    /**
     * @Fields port : 服务端端口
     */
    private int port= 8888;

    /**
     * @Fields socketChannel : 通道
     */
    private SocketChannel socketChannel;

    
    /** 
     *@Fields clientHandlerInitilizer : 初始化
     */ 
    @Autowired
    private NettyClientHandlerInitilizer clientHandlerInitilizer;
    
    /**
     * @Description: 启动客户端
     * @Author:杨攀
     * @Since: 2019年9月12日下午4:43:21
     */
    @SuppressWarnings("unchecked")
    @PostConstruct
    public void start() {

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
            // 指定Channel
            .channel(NioSocketChannel.class)
            // 服务端地址
            .remoteAddress(hostIp, port)
            
            // 将小的数据包包装成更大的帧进行传送,提高网络的负载,即TCP延迟传输
            .option(ChannelOption.SO_KEEPALIVE, true)
            // 将小的数据包包装成更大的帧进行传送,提高网络的负载,即TCP延迟传输
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(clientHandlerInitilizer);
        
        // 连接
        ChannelFuture channelFuture = bootstrap.connect();
         //客户端断线重连逻辑
        channelFuture.addListener(new ChannelFutureListener() {

            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if(future.isSuccess()) {
                    LOGGER.info("连接Netty服务端成功...");
                }else {
                    
                    LOGGER.info("连接Netty服务端失败,进行断线重连...");
                    final EventLoop loop =future.channel().eventLoop();
                    loop.schedule(new Runnable() {
                        @Override
                        public void run() {
                            LOGGER.info("连接正在重试...");
                            start();
                        }
                    }, 20, TimeUnit.SECONDS);
                }
            }

             
        });
        
        socketChannel = (SocketChannel) channelFuture.channel();
    }

    
    /**
     *@Description: 消息发送
     *@Author:杨攀
     *@Since: 2019年9月12日下午5:08:47
     *@param message
     */
    public void sendMsg(String  message) {
        
        String msg = message.concat(NettyClient.DELIMITER);

        ByteBuf byteBuf = Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8);
        ChannelFuture future = socketChannel.writeAndFlush(byteBuf);

        future.addListener(new ChannelFutureListener() {

            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                
                if(future.isSuccess()) {
                    System.out.println("===========发送成功");
                }else {
                    System.out.println("------------------发送失败");
                }
            }
        });
    }

    /**
     *@Description: 发送同步消息
     *@Author:杨攀
     *@Since: 2019年9月12日下午5:08:47
     *@param message
     */
    public String sendSyncMsg(String  message, SyncFuture<String> syncFuture) {
        
        String result = "";
        
        String msg = message.concat(NettyClient.DELIMITER);

        ByteBuf byteBuf = Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8);
        
        try {
            
            ChannelFuture future = socketChannel.writeAndFlush(byteBuf);
            future.addListener(new ChannelFutureListener() {

                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    
                    if(future.isSuccess()) {
                        System.out.println("===========发送成功");
                    }else {
                        System.out.println("------------------发送失败");
                    }
                }
            });
            
            // 等待 8 秒
            result = syncFuture.get(8, TimeUnit.SECONDS);
            
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        return result;
    }

    public String getHostIp() {
        return hostIp;
    }

    public void setHostIp(String hostIp) {
        this.hostIp = hostIp;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }
}

4.2.2 NettyClientHandlerInitilizer

NettyClientHandlerInitilizer: 初始化

package com.topinfo.ci.netty.client;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

@Component
public class NettyClientHandlerInitilizer extends ChannelInitializer<Channel> {

    /** 
     *@Fields clientHandler : 客户端处理 
     */ 
    @Autowired
    private NettyClientHandler clientHandler;
    
    @Override
    protected void initChannel(Channel ch) throws Exception {
        
        // 通过socketChannel去获得对应的管道
                ChannelPipeline channelPipeline = ch.pipeline();
                
                /*
                 * channelPipeline中会有很多handler类(也称之拦截器类)
                 * 获得pipeline之后,可以直接.addLast添加handler
                 */
                ByteBuf buf = Unpooled.copiedBuffer(NettyClient.DELIMITER.getBytes());
                channelPipeline.addLast("framer", new DelimiterBasedFrameDecoder(1024*1024*2, buf));
                //channelPipeline.addLast("decoder",new StringDecoder(CharsetUtil.UTF_8));
                //channelPipeline.addLast("encoder",new StringEncoder(CharsetUtil.UTF_8));
                channelPipeline.addLast(clientHandler);
        
    }

}

4.2.3 NettyClientHandler

NettyClientHandler: 客户端处理类,实现了接收

package com.topinfo.ci.netty.client;

import java.util.concurrent.TimeUnit;

import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.topinfo.ci.netty.service.NettyClientService;
import com.topinfo.ci.netty.utils.ExceptionUtil;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;

@Component
@ChannelHandler.Sharable // 标注一个channel handler可以被多个channel安全地共享
public class NettyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientHandler.class);

    @Autowired
    private NettyClientService service;

    @Autowired
    private NettyClient nettyClient;

    /**
     * @Description: 服务端发生消息给客户端,会触发该方法进行接收消息
     * @Author:杨攀
     * @Since: 2019年9月12日下午5:03:31
     * @param ctx
     * @param byteBuf
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {

        String msg = byteBuf.toString(CharsetUtil.UTF_8);

        LOGGER.info("客户端收到消息:{}", msg);

        //service.ackMsg(msg);
        service.ackSyncMsg(msg); // 同步消息返回
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LOGGER.info("请求连接成功...");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        LOGGER.info("连接被断开...");

        // 使用过程中断线重连
        final EventLoop eventLoop = ctx.channel().eventLoop();
        eventLoop.schedule(new Runnable() {

            @Override
            public void run() {
                // 重连
                nettyClient.start();
            }
        }, 20, TimeUnit.SECONDS);
        super.channelInactive(ctx);
    }

    /**
     * 处理异常, 一般将实现异常处理逻辑的Handler放在ChannelPipeline的最后
     * 这样确保所有入站消息都总是被处理,无论它们发生在什么位置,下面只是简单的关闭Channel并打印异常信息
     * 
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();

        // 输出到日志中
        ExceptionUtil.getStackTrace(cause);

        Channel channel = ctx.channel();
        if (channel.isActive()) {
            ctx.close();
        }
    }

}

4.2.4 NettyClientServiceImpl

NettyClientServiceImpl: 客户端封装实现类, 它接口就不贴出来了。

package com.topinfo.ci.netty.service.impl;

import com.topinfo.ci.netty.bean.RealDataInfo;

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.stereotype.Service;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.topinfo.ci.netty.bean.Message;
import com.topinfo.ci.netty.client.NettyClient;
import com.topinfo.ci.netty.client.SyncFuture;
import com.topinfo.ci.netty.service.NettyClientService;
import com.topinfo.ci.netty.utils.AESUtil;

@Service
public class NettyClientServiceImpl implements NettyClientService {

    private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientServiceImpl.class);
    

    //缓存接口这里是LoadingCache,LoadingCache在缓存项不存在时可以自动加载缓存
    private static LoadingCache<String, SyncFuture> futureCache = CacheBuilder.newBuilder()
            //设置缓存容器的初始容量为10
            .initialCapacity(100)
            // maximumSize 设置缓存大小
            .maximumSize(10000)
            //设置并发级别为20,并发级别是指可以同时写缓存的线程数
            .concurrencyLevel(20)
            // expireAfterWrite设置写缓存后8秒钟过期
            .expireAfterWrite(8, TimeUnit.SECONDS)
            //设置缓存的移除通知
            .removalListener(new RemovalListener<Object, Object>() {
                @Override
                public void onRemoval(RemovalNotification<Object, Object> notification) {
                    LOGGER.debug("LoadingCache: {} was removed, cause is {}",notification.getKey(), notification.getCause());
                }
            })
            //build方法中可以指定CacheLoader,在缓存不存在时通过CacheLoader的实现自动加载缓存
            .build(new CacheLoader<String, SyncFuture>() {
                @Override
                public SyncFuture load(String key) throws Exception {
                    // 当获取key的缓存不存在时,不需要自动添加
                    return null;
                }
            });
    
    
    @Autowired
    private NettyClient nettyClient;
    
    @Autowired
    private CacheManager cacheManager;
 
    @Override
    public boolean sendMsg(String text, String dataId, String serviceId) {
        
        LOGGER.info("发送的内容:{}", text);

        //TODO        
        //nettyClient.sendMsg(json);
        return true;
    }
 

    @Override
    public String sendSyncMsg(String text, String dataId, String serviceId) {
        
        SyncFuture<String> syncFuture = new SyncFuture<String>();
        // 放入缓存中
        futureCache.put(dataId, syncFuture);
        
        // 封装数据
        JSONObject object = new JSONObject();
        object.put("dataId", dataId);
        object.put("text", text);
        
        // 发送同步消息
        String result = nettyClient.sendSyncMsg(object.toJSONString(), syncFuture);
        
        return result;
    }

    @Override
    public void ackSyncMsg(String msg) {
        
        LOGGER.info("ACK确认信息: {}",msg);
        
        JSONObject object =JSON.parseObject(msg);
        String dataId = object.getString("dataId");
        
        // 从缓存中获取数据
        SyncFuture<String> syncFuture = futureCache.getIfPresent(dataId);
        
        // 如果不为null, 则通知返回
        if(syncFuture != null) {
            syncFuture.setResponse(msg);
            //主动释放
            futureCache.invalidate(dataId);
        }
    }

}

4.2.5 TestController

TestController: 测试TestController。

package com.topinfo.ci.netty.controller;

import com.alibaba.fastjson.JSON;
import com.topinfo.ci.netty.bean.CmwSensoralert;
import com.topinfo.ci.netty.bean.Equip;
import com.topinfo.ci.netty.bean.JsonResult;
import com.topinfo.ci.netty.bean.RealDataInfo;
import com.topinfo.ci.netty.mapper.SensorAlertMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.topinfo.ci.netty.service.NettyClientService;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@RestController
@RequestMapping("/test")
public class TestController {
    
    @Autowired
    private NettyClientService clientService;
    @Autowired
    private SensorAlertMapper sensorAlertMapper;

    @RequestMapping("/sendSyncMsg")
    public String sendSyncMsg(String dataId, String text) {
        
        String serviceId = "mmmm";
        
        String result = clientService.sendSyncMsg(text, dataId, serviceId);
        
        return "result:"+result ;
    }
     
}

测试,完美实现了“请求-响应”的效果。

4.3 服务端代码

4.3.1 NettyServer

package com.topinfo.ju.ccon.netty.server;

import java.net.InetSocketAddress;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

@Component
public class NettyServer {

    private static final Logger LOGGER = LoggerFactory.getLogger(NettyServer.class);
    
    /** 
     *@Fields DELIMITER : 自定义分隔符,服务端和客户端要保持一致
     */ 
    public static final String DELIMITER = "@@";
    
    /**
     * @Fields boss : boss 线程组用于处理连接工作, 默认是系统CPU个数的两倍,也可以根据实际情况指定
     */
    private EventLoopGroup boss = new NioEventLoopGroup();

    /**
     * @Fields work : work 线程组用于数据处理, 默认是系统CPU个数的两倍,也可以根据实际情况指定
     */
    private EventLoopGroup work = new NioEventLoopGroup();

    /**
     * @Fields port : 监听端口
     */
    private Integer port = 8888;
    
    @Autowired
    private NettyServerHandlerInitializer handlerInitializer;

    /**
     * @throws InterruptedException 
     * @Description: 启动Netty Server
     * @Author:杨攀
     * @Since: 2019年9月12日下午4:21:35
     */
    @PostConstruct
    public void start() throws InterruptedException {

        ServerBootstrap bootstrap = new ServerBootstrap();

        bootstrap.group(boss, work)
                // 指定Channel
                .channel(NioServerSocketChannel.class)
                // 使用指定的端口设置套接字地址
                .localAddress(new InetSocketAddress(port))

                // 服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
                .option(ChannelOption.SO_BACKLOG, 1024)

                // 设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
                .childOption(ChannelOption.SO_KEEPALIVE, true)

                // 将小的数据包包装成更大的帧进行传送,提高网络的负载,即TCP延迟传输
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childHandler(handlerInitializer);

        ChannelFuture future = bootstrap.bind().sync();
        
        if (future.isSuccess()) {
            LOGGER.info("启动 Netty Server...");
        }
    }
    
    @PreDestroy
    public void destory() throws InterruptedException {
        boss.shutdownGracefully().sync();
        work.shutdownGracefully().sync();
        LOGGER.info("关闭Netty...");
    }
}

4.3.2 NettyServerHandlerInitializer

package com.topinfo.ju.ccon.netty.server;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

@Component
public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {

    /** 
     *@Fields serverHandler : 服务处理
     */ 
    @Autowired
    private NettyServerHandler serverHandler;
    
    
    @Override
    protected void initChannel(Channel ch) throws Exception {

        // 通过socketChannel去获得对应的管道
        ChannelPipeline channelPipeline = ch.pipeline();
        
        /*
         * channelPipeline中会有很多handler类(也称之拦截器类)
         * 获得pipeline之后,可以直接.addLast添加handler
         */
        ByteBuf buf = Unpooled.copiedBuffer(NettyServer.DELIMITER.getBytes());
        channelPipeline.addLast("framer", new DelimiterBasedFrameDecoder(1024*1024*2, buf));
        //channelPipeline.addLast("decoder",new StringDecoder(CharsetUtil.UTF_8));
        //channelPipeline.addLast("encoder",new StringEncoder(CharsetUtil.UTF_8));
        
        // 自定义解码器,粘包/拆包/断包
        
        channelPipeline.addLast(serverHandler);
    }

}

4.3.3 NettyServerHandler

package com.topinfo.ju.ccon.netty.server;


import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;
import com.topinfo.ju.ccon.netty.bean.Message;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

@Component
@ChannelHandler.Sharable //标注一个channel handler可以被多个channel安全地共享
public class NettyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private static final Logger LOGGER = LoggerFactory.getLogger(NettyServerHandler.class);
    
    public static AtomicInteger nConnection = new AtomicInteger(0);
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        
        String txt = msg.toString(CharsetUtil.UTF_8);
        
        LOGGER.info("收到客户端的消息:{}", txt);
    
        
        
        ackMessage(ctx, txt);
    }

    /**
     *@Description: 确认消息 
     *@Author:杨攀
     *@Since: 2019年9月17日上午11:22:27
     *@param ctx
     *@param message
     */
    public void ackMessage(ChannelHandlerContext ctx, String message) {
        
        //自定义分隔符
        String msg = message+NettyServer.DELIMITER;
        
        ByteBuf byteBuf = Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8);
        
        //回应客户端
        ctx.writeAndFlush(byteBuf);
    }
    
    

    /**
     *@Description: 每次来一个新连接就对连接数加一
     *@Author:杨攀
     *@Since: 2019年9月16日下午3:04:42
     *@param ctx
     *@throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        nConnection.incrementAndGet();
        
        LOGGER.info("请求连接...{},当前连接数: :{}",  ctx.channel().id(),nConnection.get());
    }
    
    /**
     *@Description: 每次与服务器断开的时候,连接数减一
     *@Author:杨攀
     *@Since: 2019年9月16日下午3:06:10
     *@param ctx
     *@throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        nConnection.decrementAndGet();
        LOGGER.info("断开连接...当前连接数: :{}",  nConnection.get());
    }
    
    
    /**
     *@Description: 连接异常的时候回调 
     *@Author:杨攀
     *@Since: 2019年9月16日下午3:06:55
     *@param ctx
     *@param cause
     *@throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        
        // 打印错误日志
        cause.printStackTrace();
        
        Channel channel = ctx.channel();
        
        if(channel.isActive()){
            ctx.close();
        }
        
    }
    
}

核心代码基本就这些,希望对大家有帮助。

来源:Netty实现同步“请求-响应”的同步通信机制

0

评论区