侧边栏壁纸
博主头像
soulballad博主等级

技术文章记录及总结

  • 累计撰写 169 篇文章
  • 累计创建 26 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

Netty和WebSocket实现IM,讨论Channel和用户标识的双向绑定,离线消息和消息签收

soulballad
2022-03-06 / 0 评论 / 0 点赞 / 99 阅读 / 11,583 字
温馨提示:
本文最后更新于 2022-03-08,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

1. 问题引入

相信很多人用过Netty写过聊天室的简单案例吧,可以说是模板代码了,没有结合业务。如果我们要做项目中的即时通讯模块(IM),需要将用户A发的消息转发给用户B,将会不可避免的遇到一个问题:如何快速找到用户B所建立的Channel (用户 -> Channel 的映射)?围绕我们的聊天业务,离线消息又如何进行推送?一个用户建立Channel之后,我们要想知道他有没有未签收的离线消息,就必定要知道用户标识。原则来讲,我们又如何避免一个用户重复创建Channel?

换言之,在IM业务中,我们要解决:Channel和用户标识绑定的问题。他们的映射关系是一对一的。

2. "绑定"类型的消息需要携带token

要想绑定用户标识,客户端就必须在WebSocket建立之后(Channel建立之后),立马发送一条绑定类型的消息给后端,该消息必须要携带用户唯一标识,后端建立并维护Channel和用户的一对一映射关系。那么绑定类型的消息,携带的用户标识是什么?客户端本地存储的userid?其实这并不合理,应该携带token!(我这里用的jwt,jwt里面的载荷有userid)。

为什么携带token更加合理?因为token可以代表用户的一次有效的登录状态,我们可以在后端验证用户登录状态有效性(严格的可以做单点登录的验证),并且可以查出用户的身份信息,包括userid。绑定类型的消息如果携带userid,之所以说不合理,是因为:假设用户id就是自增长的unsigned int,那么userid就很容易猜到,就是一个纯数字嘛,那么拿一个纯数字,就可以随便跟我后端建立websocket连接,对后端来说,必然不安全。

3. 离线消息和消息签收

什么是离线消息?比如说用户A给用户B发送一条消息,后端转发的时候发现用户B不在线(换言之,就是没有建立WebSocket连接,没有建立Channel)。那么这条消息对于B来说就是离线消息。

什么是签收?用户A给用户B发送一条消息,B同时也在线,他就能收到这一条消息,那么这条消息就是“已签收”。假如,此时B不在线,那他肯定就没办法收到,就称这条消息“未签收”。

如果B不在线,我们显然没有办法立马将消息转发给B,需要将消息暂存到数据库。当用户B上线(Channel建立之后),我们去数据库查询他是否有未签收的消息,如果有,则将未签收的消息立刻推送给B。

这里,我们遇到2个问题。

  1. 我们要去数据查询用户未签收的消息,就必须知道这是哪个用户。(Channel -> 用户)。我们前面讨论的 Channel和用户标识的双向绑定 就解决了这个问题。
  2. 要想知道消息有没有成功被B收到,我们就必须给消息(数据库表)增设一个签收状态字段,同时用户在成功收到消息之后,要立马告诉后端,该消息已经签收了。所以我们还有一种类型的消息。称为 “签收”类型的消息

4. 业务层面的消息类型和消息模型定义

除了我们上面讨论到的

  1. “绑定”类型的消息:携带token

  2. “签收”类型的消息:携带消息id。分为单签和多签,为了方便,如果是多签,我们与前端约定,将多个消息id之间以逗号作为分隔符拼接成字符串

    之外,我们的聊天业务中还涉及到其他一些类型的消息,比如说:聊天消息,好友申请消息,拉取新好友类型的消息,以及 心跳类型的消息,等等。下面我们再来分析一下聊天类型的消息。

  3. “聊天”类型的消息

    这个基于业务,可以分为:单聊和群聊。根据消息内容的不同,又可以分为:文字消息、图片消息、语音消息等。这里简单起见,我们以 单聊、文字消息 为例子进行讨论。这种类型的消息,需要携带哪些数据?显然接收者的userid和文字消息的内容(String类型)是必须的。如果是群聊,还得携带上groupid。

每种类型的消息,携带的数据可能都不同。显然需要定义泛型。见下面:

/**
 * @author passerbyYSQ
 * @create 2021-02-05 22:31
 */
@Data
public class MsgModel<T> implements Serializable {
    // 消息类型
    private Integer action;
    // 消息实体
    private T data;
}

为了规范定义消息类型,我们另外定义枚举类:

package net.ysq.webchat.netty.entity;
 
/**
 *
 * @Description: 发送消息的动作 枚举
 */
public enum MsgActionEnum {
 
	BIND(1, "第一次(或重连)初始化连接"),
	CHAT(2, "聊天消息"),
	SIGNED(3, "消息签收"),
	KEEP_ALIVE(4, "心跳消息"),
	PULL_FRIEND(5, "拉取好友"),
	FRIEND_REQUEST(6, "请求添加为好友"),
	FORCE_OFFLINE(7, "账号在异地登录,您已被挤下线");
 
	public final Integer type;
	public final String content;
 
	MsgActionEnum(Integer type, String content){
		this.type = type;
		this.content = content;
	}
 
	public Integer getType() {
		return type;
	}
}

5. 核心代码

维护用户标识和Channel映射关系的UserChannelRepository

package net.ysq.webchat.netty;
 
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import net.ysq.webchat.netty.entity.MsgActionEnum;
import net.ysq.webchat.netty.entity.MsgModel;
import net.ysq.webchat.utils.SpringUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
 
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
/**
 * 用户id和channel关联的仓库
 *
 * @author passerbyYSQ
 * @create 2021-02-05 23:20
 */
@Slf4j
public class UserChannelRepository {
 
    //private final static Logger logger = LoggerFactory.getLogger(UserChannelRepository.class);
 
    private static ChannelGroup CHANNEL_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private static Map<String, Channel> USER_CHANNEL = new ConcurrentHashMap<>();
    private static final Object bindLocker = new Object();
    private static final Object removeLocker = new Object();
 
    public static void bind(String userId, Channel channel) {
        synchronized (bindLocker) {
            // 此时channel一定已经在ChannelGroup中了
 
            // 之前已经绑定过了,移除并释放掉之前绑定的channel
            if (USER_CHANNEL.containsKey(userId)) { // map  userId --> channel
                Channel oldChannel = USER_CHANNEL.get(userId);
                CHANNEL_GROUP.remove(oldChannel);
                oldChannel.close();
            }
 
            // 双向绑定
            // channel -> userId
            AttributeKey<String> key = AttributeKey.valueOf("userId");
            channel.attr(key).set(userId);
 
            // userId -> channel
            USER_CHANNEL.put(userId, channel);
        }
    }
 
    /**
     * 从通道中获取userId。只要userId和channel绑定周,这个方法就一定能获取的到
     * @param channel
     * @return
     */
    public static String getUserId(Channel channel) {
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        return channel.attr(key).get();
    }
 
    public static void add(Channel channel) {
        CHANNEL_GROUP.add(channel);
    }
 
    public static void remove(Channel channel) {
        synchronized(removeLocker) { // 确保原子性
 
            String userId = getUserId(channel);
 
            // userId有可能为空。可能chanelActive之后,由于前端原因(或者网络原因)没有及时绑定userId。
            // 此时netty认为channelInactive了,就移除通道,这时userId就是null
            if (!StringUtils.isEmpty(userId)) {
                USER_CHANNEL.remove(userId); // map
            }
 
            CHANNEL_GROUP.remove(channel);
 
            // 关闭channel
            channel.close();
        }
    }
 
    public static void remove(String userId) {
        synchronized(removeLocker) { // 确保原子性
 
            Channel channel = USER_CHANNEL.get(userId);
            USER_CHANNEL.remove(userId); // map
            CHANNEL_GROUP.remove(channel);
 
            // 关闭channel
            if (!ObjectUtils.isEmpty(channel)) {
                channel.close();
            }
        }
    }
 
    /**
     * 判断用户是否在线
     * map和channelGroup中均能找得到对应的channel说明用户在线
     * @return      在线就返回对应的channel,不在线返回null
     */
    public static Channel isBind(String userId) {
        Channel channel = USER_CHANNEL.get(userId); // map
        if (ObjectUtils.isEmpty(channel)) {
            return null;
        }
        return CHANNEL_GROUP.find(channel.id());
    }
 
    public static boolean isBind(Channel channel) {
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        String userId = channel.attr(key).get();
        return !ObjectUtils.isEmpty(userId) &&
                !ObjectUtils.isEmpty(USER_CHANNEL.get(userId));
    }
 
    public static void forceOffLine(String userId) {
        Channel channel = isBind(userId);
        if (!ObjectUtils.isEmpty(channel)) {
            // 推送下线通知
            MsgModel<Object> msgModel = new MsgModel<>();
            msgModel.setAction(MsgActionEnum.FORCE_OFFLINE.type);
            msgModel.setData(MsgActionEnum.FORCE_OFFLINE.content);
            pushMsg(userId, msgModel);
 
            // 移除通道。服务端单方面关闭连接。前端心跳会发送失败
            remove(userId);
        }
    }
 
    /**
     * 消息推送
     * @param receiverId
     * @param msgModel
     */
    public static void pushMsg(String receiverId, MsgModel msgModel) {
        Channel receiverChannel = isBind(receiverId);
        if (!ObjectUtils.isEmpty(receiverChannel)) {
            TextWebSocketFrame frame = new TextWebSocketFrame(toJson(msgModel));
            receiverChannel.writeAndFlush(frame);
        } else {
            // 离线状态
            log.info("{} 用户离线", receiverId);
        }
    }
 
    private static String toJson(MsgModel msgModel) {
        // 在线,就推送;离线,不做处理
        ObjectMapper mapper = SpringUtils.getBean(ObjectMapper.class);
        try {
            return mapper.writeValueAsString(msgModel);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            return null;
        }
    }
 
    public synchronized static void print() {
        log.info("所有通道的长id:");
        for (Channel channel : CHANNEL_GROUP) {
            log.info(channel.id().asLongText());
        }
        log.info("userId -> channel 的映射:");
        for (Map.Entry<String, Channel> entry : USER_CHANNEL.entrySet()) {
            log.info("userId: {} ---> channelId: {}", entry.getKey(), entry.getValue().id().asLongText());
        }
    }
 
}

业务Handler:TextMsgHandler

package net.ysq.webchat.netty;
 
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import net.ysq.webchat.netty.entity.MsgActionEnum;
import net.ysq.webchat.netty.entity.MsgModel;
import net.ysq.webchat.netty.entity.SingleChatMsgRequest;
import net.ysq.webchat.netty.entity.SingleChatMsgResponse;
import net.ysq.webchat.po.ChatMsg;
import net.ysq.webchat.service.ChatMsgService;
import net.ysq.webchat.service.FriendService;
import net.ysq.webchat.utils.JwtUtils;
import net.ysq.webchat.utils.RedisUtils;
import net.ysq.webchat.utils.SpringUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.util.HtmlUtils;
 
import java.util.ArrayList;
import java.util.List;
 
/**
 * 用于处理文本消息的handler
 *
 * @author passerbyYSQ
 * @create 2021-02-05 21:23
 */
@Slf4j
public class TextMsgHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        // json串
        log.info("接收到的文本消息:{}", msg.text());
 
        ChatMsgService chatMsgService = (ChatMsgService) SpringUtils.getBean("chatMsgServiceImpl");
        FriendService friendService = (FriendService) SpringUtils.getBean("friendServiceImpl");
        RedisUtils redisUtils = (RedisUtils) SpringUtils.getBean("redisUtils");
        ObjectMapper objectMapper = SpringUtils.getBean(ObjectMapper.class);
 
        // 消息类型
        JsonNode rootNode = objectMapper.readTree(msg.text());
        Integer action = rootNode.get("action").asInt();
        // 取出数据部分,不同的消息类型,数据部分对应的泛型不一样
        JsonNode dataNode = rootNode.get("data");
        Channel channel = ctx.channel();
 
        // 判断消息类型
        // 根据不同的消息类型,处理不同的业务
        if (action.equals(MsgActionEnum.BIND.type)) {
            // 1、当websocket第一次open的时候,初始化channel,把channel和userId关联起来
            // 如果是CONNECT类型,与前端约定,data部分是token
            String token = objectMapper.treeToValue(dataNode, String.class);
 
            // 先验证是否过期。如果过期会抛出异常,全局捕获。之后的代码不会执行
            JwtUtils.verifyJwt(token, JwtUtils.DEFAULT_SECRET);
            // 如果没有抛出异常,表示token有效。则在Redis中寻找对应的登录信息
            String userId = JwtUtils.getClaimByKey(token, "userId");
            String redisToken = (String) redisUtils.get("user:" + userId);
 
            if (!StringUtils.isEmpty(redisToken) && token.equals(redisToken)) {
                UserChannelRepository.bind(userId, channel);
                // 查询是否有未签收的消息,如果有,就一次性全部推送(并不是逐条推送)
                List<SingleChatMsgResponse> unsignedMsgList = chatMsgService.getUnsignedMsg(userId);
                if (unsignedMsgList.size() > 0) { // 不为空才推送
                    MsgModel<List<SingleChatMsgResponse>> model = new MsgModel<>();
                    model.setAction(MsgActionEnum.CHAT.type);
                    model.setData(unsignedMsgList);
                    UserChannelRepository.pushMsg(userId, model);
                }
            }
 
        } else if (action.equals(MsgActionEnum.KEEP_ALIVE.type)) {
            // 4、心跳类型的消息
            // 假如客户端进程被正常退出,websocket主动断开连接,那么服务端对应的channel是会释放的
            // 但是如果客户端关闭网络后,重启网络,会导致服务端会再新建一个channel
            // 而旧的channel已经没用了,但是并没有被移除
            log.info("收到来自于channel {} 的心跳包", channel.id().asLongText());
 
        } else if (UserChannelRepository.isBind(channel)) {
            // 其他类型的消息需要绑定后才会处理
 
            if (action.equals(MsgActionEnum.CHAT.type)) {
                // 2、聊天类型的消息,把消息保存到数据库,同时标记消息状态为[未签收]
                SingleChatMsgRequest data = objectMapper.treeToValue(dataNode, SingleChatMsgRequest.class);
                // 由于是通过websocket,而并非http协议,所以并没有经过SpringMVC的参数绑定流程。此处需要我们自己转义
                data.setContent(HtmlUtils.htmlEscape(data.getContent(), "UTF-8"));
 
                // 对于聊天消息,channel所绑定的user是发送者
                String senderId = UserChannelRepository.getUserId(channel);
                // 如果是空的,说明绑定失败了(可能是token过期了)。不做处理
                if (!StringUtils.isEmpty(senderId) &&
                        // 且接收者是我的好友
                        !ObjectUtils.isEmpty(friendService.getMyOneFriend(senderId, data.getReceiverId()))) {
 
                    // 往消息表插入数据
                    ChatMsg chatMsg = chatMsgService.saveMsg(senderId, data);
 
                    // 构建消息实体
                    MsgModel<List<SingleChatMsgResponse>> model = new MsgModel<>();
                    model.setAction(MsgActionEnum.CHAT.type);
                    List<SingleChatMsgResponse> unsignedMsgList = new ArrayList<>();
                    unsignedMsgList.add(new SingleChatMsgResponse(chatMsg));
                    model.setData(unsignedMsgList);
 
                    // 推送消息
                    UserChannelRepository.pushMsg(data.getReceiverId(), model);
                }
 
            } else if (action.equals(MsgActionEnum.SIGNED.type)) {
                // 3、签收消息的类型。针对具体的消息进行签收,修改数据库对应的消息状态为[已签收]
                // 签收状态并非是指用户有没有读了消息。而是消息是否已经被推送到达用户的手机设备
                // 在签收类型的消息中,代表需要签收的消息的id。多个id之间用,分隔
                String msgIdsStr = objectMapper.treeToValue(dataNode, String.class);
 
                // 对于要签收类型消息,只有是我收到的消息,我才能签收。所以我是接收者
                String receiverId = UserChannelRepository.getUserId(channel);
                if (!StringUtils.isEmpty(msgIdsStr)) {
                    String[] msgIds = msgIdsStr.split(",");
                    if (!ObjectUtils.isEmpty(msgIds)) {
                        chatMsgService.signMsg(receiverId, msgIds);
                    }
                }
            }
        }
    }
 
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        UserChannelRepository.add(ctx.channel());
    }
 
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        UserChannelRepository.remove(ctx.channel());
//        logger.info("剩余通道个数:{}", UserChannelRepository.CHANNEL_GROUP.size());
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        UserChannelRepository.remove(ctx.channel());
    }
}
0

评论区