back to index

基于Netty实现IM聊天系统

2025/04/01
loading

体验地址:http://47.122.54.172/chat

Netty

Netty 是一个 Java 开源框架。Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

Netty 是一个基于 NIO 的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。Netty 相当简化和流线化了网络应用的编程开发过程。

实现思路

通过Netty承载WebSocket长连接,用Spring管理业务Handler,用统一消息协议完成多类型消息路由,结合Redis+MySQL实现在线状态与消息持久化

服务端

NettyServer

首先创建NettyServer类,用于启动Netty服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Component
@Slf4j
public class NettyServer {
@Value("${netty.port}")
private Integer port;
@Autowired
private NettyServerHandlerInitializer nettyServerHandlerInitializer;

private EventLoopGroup bossGroup = new NioEventLoopGroup();

private EventLoopGroup workGroup = new NioEventLoopGroup();

private Channel channel;

@PostConstruct
public void run() throws Exception{

ServerBootstrap serverBootstrap = new ServerBootstrap();

serverBootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.option(ChannelOption.SO_BACKLOG,1024)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childOption(ChannelOption.TCP_NODELAY,true)
.childHandler(nettyServerHandlerInitializer);

ChannelFuture future = serverBootstrap.bind().sync();

if(future.isSuccess()){
channel = future.channel() ;

log.info("Netty服务端启动在 {} 端口",port);
}
}

@PreDestroy
public void shutdown(){
if(channel != null){
channel.close();
}

bossGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
  • ServerBootstrap:Netty提供的服务器启动类,方便我们初始化Server
  • bossGroup :Boss 线程组,用于服务端接受客户端的连接;
  • workGroup:work线程组,用于处理客户端的读写请求
  • childHandler(ChannelHandler childHandler) :设置客户端连接上来的channel处理器

NettyServerHandlerInitializer

Netty的ChannelHandler组件用来处理Channel的各种事件,这里的事件包括连接,数据读写,异常等等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {
private static final Integer READ_TIMEOUT = 30000;

@Autowired
private MessageDispatcher messageDispatcher;

@Autowired
private NettyServerHandler nettyServerHandler;

@Override
protected void initChannel(Channel Channel) throws Exception {
ChannelPipeline pipeline = Channel.pipeline();

pipeline.addLast(new ReadTimeoutHandler(READ_TIMEOUT, TimeUnit.SECONDS))
.addLast(new HttpServerCodec())
.addLast(new ChunkedWriteHandler())
.addLast(new WebSocketServerProtocolHandler("/ws"))
//消息分发器
.addLast(messageDispatcher)
//消息处理器
.addLast(nettyServerHandler);
}
}

在每一个客户端与服务端建立连接时,服务端都会创建一个channel与之对应,此时会执行initChannel方法,完成自定义初始化。

在该函数中我们调用了Channel的pipeline方法,为连接的channel配置处理器链Pipeline。Pipeline 由一系列的 ChannelHandler 组成, Channel 所有上所有的事件都会经过 ChannelPipeline,被其上的 ChannelHandler 所处理。

这里我们配置的Handler主要有:

  • ReadTimeoutHandler: 设置连接的心跳超时时间,若指定时间未收到客户端消息,则自动断开连接。
  • HttpServerCodec: 支持 HTTP 编解码
  • WebSocketServerProtocolHandler: 负责 websocket 握手、协议升级,以及处理 WebSocket 控制帧,指定 ws 路径。
  • messageDispatcher: 自定义的消息分发器,用于根据消息类型将消息路由至对应的业务处理逻辑。
  • nettyServerHandler: 自定义的核心消息处理器,负责具体的消息处理业务。

NettyServerHandler

创建NettyServerHandler 类,继承 ChannelInboundHandlerAdapter 类,实现客户端 Channel 建立连接、断开连接、异常时的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

@Autowired
private NettyChannelManager nettyChannelManager;

/**
* 建立连接时
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
nettyChannelManager.add(ctx.channel());
}

/**
* 断开连接时
* @param ctx
* @throws Exception
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
nettyChannelManager.remove(ctx.channel());
}

/**
* 发生异常时
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

log.error("连接发生异常:{}",ctx.channel().id(),cause);
ctx.channel().close();
}

}

@ChannelHandler.Sharable注解表示这个Handler可以被多个channel使用

NettyChannelManager

NettyChannelManager用于管理所有在线channel,并建立用户ID到channel的映射,用于定向推送消息与广播推送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class NettyChannelManager {

private static final AttributeKey< Long> CHANNEL_ATTR_KEY_USER = AttributeKey.newInstance("uid");

private ConcurrentMap<ChannelId, Channel> channels = new ConcurrentHashMap<>();

private ConcurrentMap< Long,Channel> userChannels = new ConcurrentHashMap<>() ;

@Resource
private RedisUtils redisUtils;

public void add(Channel channel){

log.info("新连接加入:{}",channel.id());
channels.put(channel.id(),channel);
}

public void remove(Channel channel){

log.info("连接断开:{}",channel.id());

channels.remove(channel.id());

if(channel.hasAttr(CHANNEL_ATTR_KEY_USER)){
Long uid = channel.attr(CHANNEL_ATTR_KEY_USER).get();
log.info("Uid:{} 断开连接",uid);
userChannels.remove(uid);
}
}
}
  • channels:记录所有连接
  • userChannels:记录登录用户连接

sendMsg

sendMsg函数给指定用户推送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void sendMsg(Long uid, ChatRedirectToUserRequest req){

Channel channel = userChannels.get(uid);

boolean flag = true;

Map<Object, Object> msgItem = redisUtils.hmget(RedisConstant.OFFLINE_MSG+uid+"_"+req.getFromId());
if(msgItem.size() == 0){
msgItem.put("fromId",String.valueOf(req.getFromId()));
msgItem.put("earlyTime", DateTime.now().toString());
msgItem.put("count",0);
}
msgItem.put("lastMsg",req.getContent());
msgItem.put("lastTime",DateTime.now().toString());

// check user is online...

if(!flag){
msgItem.put("count",(Integer)msgItem.get("count")+1);
}

redisUtils.hmset(RedisConstant.OFFLINE_MSG+uid+"_"+req.getFromId(),redisUtils.change(msgItem));

Invocation invocation = new Invocation(ChatRedirectToUserRequest.TYPE,req);

String text = JSON.toJSONString(invocation);

channel.writeAndFlush(new TextWebSocketFrame(text));

}

当用户在线时,将消息包装成invocation后直接写入目标channel,若用户不在线,则维护Redis中的离线消息lastMsg与lastTime,等对方上线后推送。

sendMsgToAll

广播消息,向每个活跃连接发送文本帧

1
2
3
4
5
6
7
8
9
10
11
12
13
public void sendMsgToAll(Invocation invocation){

String text = JSON.toJSONString(invocation);

for(Channel channel:channels.values()){
if(!channel.isActive()){
log.error("连接未激活");
return;
}

channel.writeAndFlush(new TextWebSocketFrame(text));
}
}

消息封装

在服务端和客户端需要构建通信协议,便于序列化和传输。发送方将消息对象通过序列化,转换成字节数组推送到TCP中,接收方收到字节数组后反序列化。常见的序列化方式有protobuf,JSON等。

这里创建Invocation类用于封装信息。作为通信协议的消息体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Invocation implements Serializable {

private String type ;

private String message;

public String getMessage(){
return message;
}

public Invocation(String type, Message message){
this.type = type;
this.message = JSON.toJSONString(message);
}
}
  • type:消息类型
  • message:消息内容

消息分发

首先创建Message接口作为消息类型

1
2
3
public interface Message {
String TYPE = "";
}

创建MessageHandler作为消息处理器接口

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface MessageHandler<T extends Message> {
/**
* 执行处理消息
* @param channel
* @param message
*/
void execute(Channel channel, T message);
/**
* 消息类型
* @return
*/
String getType();
}

MessageHandlerContainer

MessageHandlerContainer 作为 MessageHandler 的容器,注册所有消息处理器的Bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class MessageHandlerContainer implements InitializingBean {

private final Map<String,MessageHandler> handlers= new HashMap<>();

@Autowired
private ApplicationContext applicationContext;

@Override
public void afterPropertiesSet() throws Exception {
applicationContext.getBeansOfType(MessageHandler.class).values()
.forEach(messageHandler -> handlers.put(messageHandler.getType(),messageHandler));

log.info("消息处理器数量:{}",handlers.size());
}

MessageHandler getMessageHandler(String type) throws Exception {

MessageHandler messageHandler=handlers.get(type);
if(messageHandler== null){
throw new Exception("找不到对应的消息处理器");
}
return messageHandler;
}

static Class<? extends Message> getMessageClass(MessageHandler handler){
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(handler);
//获得接口的type数组
Type[] interfaces = targetClass.getGenericInterfaces();

Class<?> superClass = targetClass.getSuperclass();

while((Objects.isNull(interfaces)|| interfaces.length==0) && Objects.nonNull(superClass)){
interfaces = superClass.getGenericInterfaces();
superClass = targetClass.getSuperclass();
}
if (Objects.nonNull(interfaces)) {
// 遍历 interfaces 数组
for (Type type : interfaces) {
// 要求 type 是泛型参数
if (type instanceof ParameterizedType) {
ParameterizedType parameterizedType = (ParameterizedType) type;
// 要求是 MessageHandler 接口
if (Objects.equals(parameterizedType.getRawType(), MessageHandler.class)) {
Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
// 取首个元素
if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
return (Class<Message>) actualTypeArguments[0];
} else {
throw new IllegalStateException(String.format("类型(%s) 获得不到消息类型", handler));
}
}
}
}
}
throw new IllegalStateException(String.format("类型(%s) 获得不到消息类型", handler));
}
}
  • handlers:保存消息类型与处理器映射
  • getMessageHandler():根据消息类型获得对应的MessageHandler
  • getMessageClass():获取Handler对应的消息类型

MessageDispatcher

MessageDispatcher作为消息分配器,处理从客户端接收的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class MessageDispatcher extends SimpleChannelInboundHandler<TextWebSocketFrame> {

@Autowired
private MessageHandlerContainer messageHandlerContainer;

private final ExecutorService executor = Executors.newFixedThreadPool(200);


@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//将数据反序列化为Invocation
Invocation invocation = JSONUtil.toBean(msg.text(), Invocation.class);
System.out.println(invocation);
//根据消息类型获取对应的消息处理器
MessageHandler messageHandler = messageHandlerContainer.getMessageHandler(invocation.getType());
//根据消息处理器获取Message类型
Class<? extends Message> messageClass = MessageHandlerContainer.getMessageClass(messageHandler);

Message message = JSON.parseObject(invocation.getMessage(),messageClass);

//创建线程执行消息流程
//将IO线程池与业务线程池分开,可以避免耗时的业务处理逻辑阻塞IO线程。
executor.submit(new Runnable(){
@Override
public void run() {
messageHandler.execute(ctx.channel(),message);
}
});
}

}
  • SimpleChannelInboundHandler 是 Netty 定义的消息处理 ChannelHandler 抽象类,处理消息的类型是 泛型时。

  • channelRead0() 方法处理消息,进行分发。

首先将消息反序列化为Invocation,根据消息类型获取对应的消息处理器,再根据消息处理器获取Message类型,最后放入线程池中调用execute方式执行业务逻辑。

消息分类

当前设计共三类消息,分别是认证消息、单聊消息、群聊消息,分别对应Auth、Chat、ChatAll。

Auth

auth消息用于用户身份认证

创建AuthRequest类定义用户请求,用户需要携带登录获取的token

1
2
3
4
5
6
7
8
@Data
public class AuthRequest implements Message {

public static String TYPE = "auth_req";

public String token;

}

创建AuthResponse用于响应客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
public class AuthResponse implements Message {

public static final String TYPE = "auth_res";

private Integer code;

private long uid;

private String username;

private String avatar;

}

创建 AuthRequestHandler 类,为服务端处理客户端的认证请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class AuthRequestHandler implements MessageHandler<AuthRequest> {

@Autowired
private NettyChannelManager nettyChannelManager;

@Autowired
private JWTPropertites jwtPropertites;

@Resource
private RedisUtils redisUtils;

@Override
public void execute(Channel channel, AuthRequest message) {
String token = message.getToken();

long uid = JwtUtil.getId(token);

log.info("Uid:{} 用户连接",uid);

AuthResponse authResponse = AuthResponse.builder()
.code(1)
.uid(Long.parseLong(mp.get("uid").toString()))
.username(mp.get("username").toString())
.avatar(mp.get("avatar").toString())
.build();
Invocation invocation = new Invocation(AuthResponse.TYPE,authResponse);

//uid与channel绑定
nettyChannelManager.addUser(channel,uid);

///更新在线信息
nettyChannelManager.sendMsgToAll(invocation);

}

Chat

创建ChatSendToOneRequest类,表示发送给指定用户的私聊请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Data
public class ChatSendToOneRequest implements Message {

public static final String TYPE = "chat_req";
// 发送者
private Long fromId;
// 接受者
private Long toId;
// 消息ID
private Long msgId;

private String content;

}

创建ChatSendToOneResponse,表示消息发送的情况

1
2
3
4
5
6
7
8
9
10
11
12
@Data
@Builder
public class ChatSendToOneResponse implements Message {

public static final String TYPE = "chat_res";

private Long msgId;

private Long code;

private String message;
}

创建ChatRedirectToUserRequest类,用于转发消息给用户

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ChatRedirectToUserRequest implements Message {

public static final String TYPE = "chat_redirect_req";

private Long fromId;

private String username;

private String avatar;

private Long msgId;

private String content;

}

创建ChatSendToOneHandler处理器,处理私聊消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class ChatSendToOneHandler implements MessageHandler<ChatSendToOneRequest> {

@Autowired
private NettyChannelManager nettyChannelManager;

@Autowired
private ChatService chatService;

@Resource
private RedisUtils redisUtils;

@Override
public void execute(Channel channel, ChatSendToOneRequest message) {

try {
ChatSendToOneResponse chatSendToOneResponse = ChatSendToOneResponse.builder()
.msgId(message.getMsgId())
.code((long)1)
.message(message.getContent())
.build();
String text = JSON.toJSONString(new Invocation(ChatSendToOneResponse.TYPE, chatSendToOneResponse));

channel.writeAndFlush(new TextWebSocketFrame(text));
}catch (Exception e){
log.error("消息发送失败:{}",e);
}

log.info("Uid:{},发送给 Uid:{} 一条消息",message.getFromId(),message.getToId());
Chat chat = Chat.builder()
.fromId(message.getFromId())
.toId(message.getToId())
.createTime(DateTime.now())
.content(message.getContent())
.msgId(message.getMsgId())
.timestamp(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli())
.build();
chatService.saveMsg(chat);

Map<Object, Object> fromInfo = redisUtils.hmget(RedisConstant.USER_INFO + message.getFromId());

ChatRedirectToUserRequest chatRedirectToUserRequest = ChatRedirectToUserRequest.builder()
.fromId(message.getFromId())
.username(fromInfo.get("username").toString())
.avatar(fromInfo.get("avatar").toString())
.msgId(message.getMsgId())
.content(message.getContent())
.build();
nettyChannelManager
.sendMsg(message.getToId(),chatRedirectToUserRequest);

}

@Override
public String getType() {
return ChatSendToOneRequest.TYPE;
}
}

该handler逻辑如下:

  1. 首先向当前发送者推送发送结果,向前端响应
  2. 将聊天消息持久化到数据库
  3. 构造ChatRedirectToUserRequest,把消息转发给指定接受者的channel

ChatAll

创建ChatSendToAllRequest类,表示群聊消息请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ChatSendToAllRequest implements Message {

public static final String TYPE = "group_req";

private String type;

private Long gid;

private Long msgId;

private Long fromId;

private String content;

}

创建ChatSendToAllResponse类,表示群消息请求的响应,gid为群ID

1
2
3
4
5
6
7
8
9
10
11
public class ChatSendToAllResponse implements Message {

public static final String TYPE = "group_res";

private Long code ;

private Long gid;

private Long msgId;

}

创建 ChatSendToAllHandler 类,为服务端处理客户端的群聊请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class ChatSendToAllHandler implements MessageHandler<ChatSendToAllRequest> {

@Autowired
private NettyChannelManager nettyChannelManager;

@Autowired
private GroupMsgService groupMsgService;

@Resource
private RedisUtils redisUtils;

//TODO 多个群聊的时候
@Override
public void execute(Channel channel, ChatSendToAllRequest message) {
log.info("群发消息:{}",message);

GroupMsg msg = GroupMsg.builder()
.senderId(message.getFromId())
.gid(message.getGid())
.content(message.getContent())
.createTime(DateTime.now())
.msgId(message.getMsgId())
.timestamp(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli())
.build();
groupMsgService.saveMsg(msg);
log.info("群消息保存成功");

Map<Object, Object> mp = redisUtils.hmget(RedisConstant.USER_INFO + message.getFromId());
ChatRedirectToAllRequest redirectToAllRequest = ChatRedirectToAllRequest.builder()
.fromId(message.getFromId())
.username(mp.get("username").toString())
.avatar(mp.get("avatar").toString())
.content(message.getContent())
.build();

Invocation invocation = new Invocation(ChatRedirectToAllRequest.TYPE,redirectToAllRequest);

nettyChannelManager.sendMsgToAll(invocation);

log.info("群消息转发成功");
}

@Override
public String getType() {
return ChatSendToAllRequest.TYPE;
}
}

该handler逻辑如下:

  1. 构造群消息并将其持久化
  2. 从redis中获取发送者消息
  3. 封装后调用sendMsgToAll进行广播

总结

至此实现了基于Netty的IM聊天系统,连接管理、协议分发、单聊落库与离线摘要、群聊广播与心跳功能都已实现,后续优化方向包括:

  • 多群组扩展,当前只设置的gid,在业务逻辑上并没有针对多群扩展,所有用户都在同一的大群中。
  • 引入消息队列,在handler中处理消息落库,以及离线消息推送,可以写入MQ中,由后台异步处理消息。
  • 通信协议可以使用protobuf降低带宽。
CATALOG
  1. 1. Netty
  2. 2. 实现思路
  3. 3. 服务端
    1. 3.1. NettyServer
    2. 3.2. NettyServerHandlerInitializer
    3. 3.3. NettyServerHandler
    4. 3.4. NettyChannelManager
      1. 3.4.1. sendMsg
      2. 3.4.2. sendMsgToAll
    5. 3.5. 消息封装
  4. 4. 消息分发
    1. 4.1. MessageHandlerContainer
    2. 4.2. MessageDispatcher
    3. 4.3. 消息分类
    4. 4.4. Auth
    5. 4.5. Chat
    6. 4.6. ChatAll
  5. 5. 总结