体验地址:http://47.122.54.172/chat
Netty
Netty 是一个 Java 开源框架。Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
Netty 是一个基于 NIO 的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。Netty 相当简化和流线化了网络应用的编程开发过程。
实现思路
通过Netty承载WebSocket长连接,用Spring管理业务Handler,用统一消息协议完成多类型消息路由,结合Redis+MySQL实现在线状态与消息持久化
服务端
NettyServer
首先创建NettyServer类,用于启动Netty服务器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| @Component @Slf4j public class NettyServer { @Value("${netty.port}") private Integer port; @Autowired private NettyServerHandlerInitializer nettyServerHandlerInitializer;
private EventLoopGroup bossGroup = new NioEventLoopGroup();
private EventLoopGroup workGroup = new NioEventLoopGroup();
private Channel channel;
@PostConstruct public void run() throws Exception{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .option(ChannelOption.SO_BACKLOG,1024) .childOption(ChannelOption.SO_KEEPALIVE,true) .childOption(ChannelOption.TCP_NODELAY,true) .childHandler(nettyServerHandlerInitializer);
ChannelFuture future = serverBootstrap.bind().sync();
if(future.isSuccess()){ channel = future.channel() ;
log.info("Netty服务端启动在 {} 端口",port); } }
@PreDestroy public void shutdown(){ if(channel != null){ channel.close(); }
bossGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } }
|
ServerBootstrap:Netty提供的服务器启动类,方便我们初始化Server
bossGroup :Boss 线程组,用于服务端接受客户端的连接;
workGroup:work线程组,用于处理客户端的读写请求
childHandler(ChannelHandler childHandler) :设置客户端连接上来的channel处理器
NettyServerHandlerInitializer
Netty的ChannelHandler组件用来处理Channel的各种事件,这里的事件包括连接,数据读写,异常等等
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> { private static final Integer READ_TIMEOUT = 30000; @Autowired private MessageDispatcher messageDispatcher; @Autowired private NettyServerHandler nettyServerHandler; @Override protected void initChannel(Channel Channel) throws Exception { ChannelPipeline pipeline = Channel.pipeline(); pipeline.addLast(new ReadTimeoutHandler(READ_TIMEOUT, TimeUnit.SECONDS)) .addLast(new HttpServerCodec()) .addLast(new ChunkedWriteHandler()) .addLast(new WebSocketServerProtocolHandler("/ws")) .addLast(messageDispatcher) .addLast(nettyServerHandler); } }
|
在每一个客户端与服务端建立连接时,服务端都会创建一个channel与之对应,此时会执行initChannel方法,完成自定义初始化。
在该函数中我们调用了Channel的pipeline方法,为连接的channel配置处理器链Pipeline。Pipeline 由一系列的 ChannelHandler 组成, Channel 所有上所有的事件都会经过 ChannelPipeline,被其上的 ChannelHandler 所处理。
这里我们配置的Handler主要有:
ReadTimeoutHandler: 设置连接的心跳超时时间,若指定时间未收到客户端消息,则自动断开连接。
HttpServerCodec: 支持 HTTP 编解码
WebSocketServerProtocolHandler: 负责 websocket 握手、协议升级,以及处理 WebSocket 控制帧,指定 ws 路径。
messageDispatcher: 自定义的消息分发器,用于根据消息类型将消息路由至对应的业务处理逻辑。
nettyServerHandler: 自定义的核心消息处理器,负责具体的消息处理业务。
NettyServerHandler
创建NettyServerHandler 类,继承 ChannelInboundHandlerAdapter 类,实现客户端 Channel 建立连接、断开连接、异常时的处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| @ChannelHandler.Sharable public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Autowired private NettyChannelManager nettyChannelManager;
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { nettyChannelManager.add(ctx.channel()); }
@Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { nettyChannelManager.remove(ctx.channel()); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("连接发生异常:{}",ctx.channel().id(),cause); ctx.channel().close(); }
}
|
@ChannelHandler.Sharable注解表示这个Handler可以被多个channel使用
NettyChannelManager
NettyChannelManager用于管理所有在线channel,并建立用户ID到channel的映射,用于定向推送消息与广播推送
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class NettyChannelManager {
private static final AttributeKey< Long> CHANNEL_ATTR_KEY_USER = AttributeKey.newInstance("uid");
private ConcurrentMap<ChannelId, Channel> channels = new ConcurrentHashMap<>();
private ConcurrentMap< Long,Channel> userChannels = new ConcurrentHashMap<>() ;
@Resource private RedisUtils redisUtils;
public void add(Channel channel){
log.info("新连接加入:{}",channel.id()); channels.put(channel.id(),channel); }
public void remove(Channel channel){
log.info("连接断开:{}",channel.id());
channels.remove(channel.id());
if(channel.hasAttr(CHANNEL_ATTR_KEY_USER)){ Long uid = channel.attr(CHANNEL_ATTR_KEY_USER).get(); log.info("Uid:{} 断开连接",uid); userChannels.remove(uid); } } }
|
- channels:记录所有连接
- userChannels:记录登录用户连接
sendMsg
sendMsg函数给指定用户推送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public void sendMsg(Long uid, ChatRedirectToUserRequest req){
Channel channel = userChannels.get(uid);
boolean flag = true;
Map<Object, Object> msgItem = redisUtils.hmget(RedisConstant.OFFLINE_MSG+uid+"_"+req.getFromId()); if(msgItem.size() == 0){ msgItem.put("fromId",String.valueOf(req.getFromId())); msgItem.put("earlyTime", DateTime.now().toString()); msgItem.put("count",0); } msgItem.put("lastMsg",req.getContent()); msgItem.put("lastTime",DateTime.now().toString());
if(!flag){ msgItem.put("count",(Integer)msgItem.get("count")+1); }
redisUtils.hmset(RedisConstant.OFFLINE_MSG+uid+"_"+req.getFromId(),redisUtils.change(msgItem));
Invocation invocation = new Invocation(ChatRedirectToUserRequest.TYPE,req);
String text = JSON.toJSONString(invocation);
channel.writeAndFlush(new TextWebSocketFrame(text));
}
|
当用户在线时,将消息包装成invocation后直接写入目标channel,若用户不在线,则维护Redis中的离线消息lastMsg与lastTime,等对方上线后推送。
sendMsgToAll
广播消息,向每个活跃连接发送文本帧
1 2 3 4 5 6 7 8 9 10 11 12 13
| public void sendMsgToAll(Invocation invocation){
String text = JSON.toJSONString(invocation);
for(Channel channel:channels.values()){ if(!channel.isActive()){ log.error("连接未激活"); return; }
channel.writeAndFlush(new TextWebSocketFrame(text)); } }
|
消息封装
在服务端和客户端需要构建通信协议,便于序列化和传输。发送方将消息对象通过序列化,转换成字节数组推送到TCP中,接收方收到字节数组后反序列化。常见的序列化方式有protobuf,JSON等。
这里创建Invocation类用于封装信息。作为通信协议的消息体
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class Invocation implements Serializable {
private String type ;
private String message;
public String getMessage(){ return message; }
public Invocation(String type, Message message){ this.type = type; this.message = JSON.toJSONString(message); } }
|
消息分发
首先创建Message接口作为消息类型
1 2 3
| public interface Message { String TYPE = ""; }
|
创建MessageHandler作为消息处理器接口
1 2 3 4 5 6 7 8 9 10 11 12 13
| public interface MessageHandler<T extends Message> {
void execute(Channel channel, T message);
String getType(); }
|
MessageHandlerContainer
MessageHandlerContainer 作为 MessageHandler 的容器,注册所有消息处理器的Bean
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| public class MessageHandlerContainer implements InitializingBean {
private final Map<String,MessageHandler> handlers= new HashMap<>();
@Autowired private ApplicationContext applicationContext;
@Override public void afterPropertiesSet() throws Exception { applicationContext.getBeansOfType(MessageHandler.class).values() .forEach(messageHandler -> handlers.put(messageHandler.getType(),messageHandler));
log.info("消息处理器数量:{}",handlers.size()); }
MessageHandler getMessageHandler(String type) throws Exception {
MessageHandler messageHandler=handlers.get(type); if(messageHandler== null){ throw new Exception("找不到对应的消息处理器"); } return messageHandler; }
static Class<? extends Message> getMessageClass(MessageHandler handler){ Class<?> targetClass = AopProxyUtils.ultimateTargetClass(handler); Type[] interfaces = targetClass.getGenericInterfaces();
Class<?> superClass = targetClass.getSuperclass();
while((Objects.isNull(interfaces)|| interfaces.length==0) && Objects.nonNull(superClass)){ interfaces = superClass.getGenericInterfaces(); superClass = targetClass.getSuperclass(); } if (Objects.nonNull(interfaces)) { for (Type type : interfaces) { if (type instanceof ParameterizedType) { ParameterizedType parameterizedType = (ParameterizedType) type; if (Objects.equals(parameterizedType.getRawType(), MessageHandler.class)) { Type[] actualTypeArguments = parameterizedType.getActualTypeArguments(); if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) { return (Class<Message>) actualTypeArguments[0]; } else { throw new IllegalStateException(String.format("类型(%s) 获得不到消息类型", handler)); } } } } } throw new IllegalStateException(String.format("类型(%s) 获得不到消息类型", handler)); } }
|
handlers:保存消息类型与处理器映射
getMessageHandler():根据消息类型获得对应的MessageHandler
getMessageClass():获取Handler对应的消息类型
MessageDispatcher
MessageDispatcher作为消息分配器,处理从客户端接收的数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class MessageDispatcher extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Autowired private MessageHandlerContainer messageHandlerContainer;
private final ExecutorService executor = Executors.newFixedThreadPool(200);
@Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { Invocation invocation = JSONUtil.toBean(msg.text(), Invocation.class); System.out.println(invocation); MessageHandler messageHandler = messageHandlerContainer.getMessageHandler(invocation.getType()); Class<? extends Message> messageClass = MessageHandlerContainer.getMessageClass(messageHandler);
Message message = JSON.parseObject(invocation.getMessage(),messageClass);
executor.submit(new Runnable(){ @Override public void run() { messageHandler.execute(ctx.channel(),message); } }); }
}
|
首先将消息反序列化为Invocation,根据消息类型获取对应的消息处理器,再根据消息处理器获取Message类型,最后放入线程池中调用execute方式执行业务逻辑。
消息分类
当前设计共三类消息,分别是认证消息、单聊消息、群聊消息,分别对应Auth、Chat、ChatAll。
Auth
auth消息用于用户身份认证
创建AuthRequest类定义用户请求,用户需要携带登录获取的token
1 2 3 4 5 6 7 8
| @Data public class AuthRequest implements Message {
public static String TYPE = "auth_req";
public String token;
}
|
创建AuthResponse用于响应客户端
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class AuthResponse implements Message {
public static final String TYPE = "auth_res";
private Integer code;
private long uid;
private String username;
private String avatar;
}
|
创建 AuthRequestHandler 类,为服务端处理客户端的认证请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class AuthRequestHandler implements MessageHandler<AuthRequest> {
@Autowired private NettyChannelManager nettyChannelManager;
@Autowired private JWTPropertites jwtPropertites;
@Resource private RedisUtils redisUtils;
@Override public void execute(Channel channel, AuthRequest message) { String token = message.getToken();
long uid = JwtUtil.getId(token);
log.info("Uid:{} 用户连接",uid);
AuthResponse authResponse = AuthResponse.builder() .code(1) .uid(Long.parseLong(mp.get("uid").toString())) .username(mp.get("username").toString()) .avatar(mp.get("avatar").toString()) .build(); Invocation invocation = new Invocation(AuthResponse.TYPE,authResponse);
nettyChannelManager.addUser(channel,uid);
nettyChannelManager.sendMsgToAll(invocation);
}
|
Chat
创建ChatSendToOneRequest类,表示发送给指定用户的私聊请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Data public class ChatSendToOneRequest implements Message {
public static final String TYPE = "chat_req"; private Long fromId; private Long toId; private Long msgId;
private String content;
}
|
创建ChatSendToOneResponse,表示消息发送的情况
1 2 3 4 5 6 7 8 9 10 11 12
| @Data @Builder public class ChatSendToOneResponse implements Message {
public static final String TYPE = "chat_res";
private Long msgId;
private Long code;
private String message; }
|
创建ChatRedirectToUserRequest类,用于转发消息给用户
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class ChatRedirectToUserRequest implements Message {
public static final String TYPE = "chat_redirect_req";
private Long fromId;
private String username;
private String avatar;
private Long msgId;
private String content;
}
|
创建ChatSendToOneHandler处理器,处理私聊消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| public class ChatSendToOneHandler implements MessageHandler<ChatSendToOneRequest> {
@Autowired private NettyChannelManager nettyChannelManager;
@Autowired private ChatService chatService;
@Resource private RedisUtils redisUtils;
@Override public void execute(Channel channel, ChatSendToOneRequest message) {
try { ChatSendToOneResponse chatSendToOneResponse = ChatSendToOneResponse.builder() .msgId(message.getMsgId()) .code((long)1) .message(message.getContent()) .build(); String text = JSON.toJSONString(new Invocation(ChatSendToOneResponse.TYPE, chatSendToOneResponse)); channel.writeAndFlush(new TextWebSocketFrame(text)); }catch (Exception e){ log.error("消息发送失败:{}",e); }
log.info("Uid:{},发送给 Uid:{} 一条消息",message.getFromId(),message.getToId()); Chat chat = Chat.builder() .fromId(message.getFromId()) .toId(message.getToId()) .createTime(DateTime.now()) .content(message.getContent()) .msgId(message.getMsgId()) .timestamp(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()) .build(); chatService.saveMsg(chat);
Map<Object, Object> fromInfo = redisUtils.hmget(RedisConstant.USER_INFO + message.getFromId());
ChatRedirectToUserRequest chatRedirectToUserRequest = ChatRedirectToUserRequest.builder() .fromId(message.getFromId()) .username(fromInfo.get("username").toString()) .avatar(fromInfo.get("avatar").toString()) .msgId(message.getMsgId()) .content(message.getContent()) .build(); nettyChannelManager .sendMsg(message.getToId(),chatRedirectToUserRequest);
}
@Override public String getType() { return ChatSendToOneRequest.TYPE; } }
|
该handler逻辑如下:
- 首先向当前发送者推送发送结果,向前端响应
- 将聊天消息持久化到数据库
- 构造
ChatRedirectToUserRequest,把消息转发给指定接受者的channel
ChatAll
创建ChatSendToAllRequest类,表示群聊消息请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class ChatSendToAllRequest implements Message {
public static final String TYPE = "group_req";
private String type;
private Long gid;
private Long msgId;
private Long fromId;
private String content;
}
|
创建ChatSendToAllResponse类,表示群消息请求的响应,gid为群ID
1 2 3 4 5 6 7 8 9 10 11
| public class ChatSendToAllResponse implements Message {
public static final String TYPE = "group_res";
private Long code ;
private Long gid;
private Long msgId;
}
|
创建 ChatSendToAllHandler 类,为服务端处理客户端的群聊请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| public class ChatSendToAllHandler implements MessageHandler<ChatSendToAllRequest> {
@Autowired private NettyChannelManager nettyChannelManager;
@Autowired private GroupMsgService groupMsgService;
@Resource private RedisUtils redisUtils;
@Override public void execute(Channel channel, ChatSendToAllRequest message) { log.info("群发消息:{}",message);
GroupMsg msg = GroupMsg.builder() .senderId(message.getFromId()) .gid(message.getGid()) .content(message.getContent()) .createTime(DateTime.now()) .msgId(message.getMsgId()) .timestamp(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()) .build(); groupMsgService.saveMsg(msg); log.info("群消息保存成功");
Map<Object, Object> mp = redisUtils.hmget(RedisConstant.USER_INFO + message.getFromId()); ChatRedirectToAllRequest redirectToAllRequest = ChatRedirectToAllRequest.builder() .fromId(message.getFromId()) .username(mp.get("username").toString()) .avatar(mp.get("avatar").toString()) .content(message.getContent()) .build();
Invocation invocation = new Invocation(ChatRedirectToAllRequest.TYPE,redirectToAllRequest);
nettyChannelManager.sendMsgToAll(invocation);
log.info("群消息转发成功"); }
@Override public String getType() { return ChatSendToAllRequest.TYPE; } }
|
该handler逻辑如下:
- 构造群消息并将其持久化
- 从redis中获取发送者消息
- 封装后调用
sendMsgToAll进行广播
总结
至此实现了基于Netty的IM聊天系统,连接管理、协议分发、单聊落库与离线摘要、群聊广播与心跳功能都已实现,后续优化方向包括:
- 多群组扩展,当前只设置的gid,在业务逻辑上并没有针对多群扩展,所有用户都在同一的大群中。
- 引入消息队列,在handler中处理消息落库,以及离线消息推送,可以写入MQ中,由后台异步处理消息。
- 通信协议可以使用protobuf降低带宽。