你好,我是袁武林。
在期中实战中,我们一起尝试实现了一个简易版的聊天系统,并且为这个聊天系统增加了一些基本功能。比如,用户登录、简单的文本消息收发、消息存储设计、未读数提示、消息自动更新等。
但是期中实战的目的,主要是让你对IM系统的基本功能构成有一个直观的了解,所以在功能的实现层面上比较简单。比如针对消息的实时性,期中采用的是基于HTTP短轮询的方式来实现。
因此,在期末实战中,我们主要的工作就是针对期中实战里的消息收发来进行功能优化。
比如,我们会采用WebSocket的长连接,来替代之前的HTTP短轮询方式,并且会加上一些课程中有讲到的相对高级的功能,如应用层心跳、ACK机制等。
希望通过期末整体技术实现上的升级,你能更深刻地体会到IM系统升级前后,对使用方和服务端压力的差异性。相应的示例代码我放在了GitHub里,你可以作为参考来学习和实现。
关于这次期末实战,希望你能够完成的功能主要包括以下几个部分:
接下来,我们就针对以上这些需要升级的功能和新增的主要功能,来进行实现上的拆解。
首先,期末实战一个比较大的改变就是,将之前HTTP短轮询的实现,改造成真正的长连接。为了方便Web端的演示,这里我建议你可以使用WebSocket来实现。
对于WebSocket,我们在客户端JS(JavaScript)里主要是使用HTML5的原生API来实现,其核心的实现代码部分如下:
if (window.WebSocket) {websocket = new WebSocket("ws://127.0.0.1:8080");websocket.onmessage = function (event) {onmsg(event);};//连接建立后的事件监听websocket.onopen = function () {bind();heartBeat.start();}//连接关闭后的事件监听websocket.onclose = function () {reconnect();};//连接出现异常后的事件监听websocket.onerror = function () {reconnect();};} else {alert("您的浏览器不支持WebSocket协议!"}
页面打开时,JS先通过服务端的WebSocket地址建立长连接。要注意这里服务端连接的地址是ws://开头的,不是http://的了;如果是使用加密的WebSocket协议,那么相应的地址应该是以wss://开头的。
建立长连之后,要针对创建的WebSocket对象进行事件的监听,我们只需要在各种事件触发的时候,进行对应的逻辑处理就可以了。
比如,API主要支持的几种事件有:长连接通道建立完成后,通过onopen事件来进行用户信息的上报绑定;通过onmessage事件,对接收到的所有该连接上的数据进行处理,这个也是我们最核心的消息推送的处理逻辑;另外,在长连接通道发生异常错误,或者连接被关闭时,可以分别通过onerror和onclose两个事件来进行监听处理。
除了通过事件监听,来对长连接的状态变化进行逻辑处理外,我们还可以通过这个WebSocket长连接,向服务器发送数据(消息)。这个功能在实现上也非常简单,你只需要调用WebSocket对象的send方法就OK了。
通过长连接发送消息的代码设计如下:
var sendMsgJson = '{ "type": 3, "data": {"senderUid":' + sender_id + ',"recipientUid":' + recipient_id + ', "content":"' + msg_content + '","msgType":1 }}';websocket.send(sendMsgJson);
此外,针对WebSocket在服务端的实现,如果你是使用JVM(Java Virtual Machine,Java虚拟机)系列语言的话,我推荐你使用比较成熟的Java NIO框架Netty来做实现。
因为Netty本身对WebSocket的支持就很完善了,各种编解码器和WebSocket的处理器都有,这样我们在代码实现上就比较简单。
采用Netty实现WebSocket Server的核心代码,你可以参考下面的示例代码:
EventLoopGroup bossGroup =new EpollEventLoopGroup(serverConfig.bossThreads, new DefaultThreadFactory("WebSocketBossGroup", true));EventLoopGroup workerGroup =new EpollEventLoopGroup(serverConfig.workerThreads, new DefaultThreadFactory("WebSocketWorkerGroup", true));ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(EpollServerSocketChannel.class);ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//先添加WebSocket相关的编解码器和协议处理器pipeline.addLast(new HttpServerCodec());pipeline.addLast(new HttpObjectAggregator(65536));pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));pipeline.addLast(new WebSocketServerProtocolHandler("/", null, true));//再添加服务端业务消息的总处理器pipeline.addLast(websocketRouterHandler);//服务端添加一个idle处理器,如果一段时间Socket中没有消息传输,服务端会强制断开pipeline.addLast(new IdleStateHandler(0, 0, serverConfig.getAllIdleSecond()));pipeline.addLast(closeIdleChannelHandler);}}serverBootstrap.childHandler(initializer);serverBootstrap.bind(serverConfig.port).sync(
首先创建服务器的ServerBootstrap对象。Netty作为服务端,从ServerBootstrap启动,ServerBootstrap对象主要用于在服务端的某一个端口进行监听,并接受客户端的连接。
接着,通过ChannelInitializer对象,初始化连接管道中用于处理数据的各种编解码器和业务逻辑处理器。比如这里,我们就需要添加为了处理WebSocket协议相关的编解码器,还要添加服务端接收到客户端发送的消息的业务逻辑处理器,并且还加上了用于通道idle超时管理的处理器。
最后,把这个管道处理器链挂到ServerBootstrap,再通过bind和sync方法,启动ServerBootstrap的端口进行监听就可以了。
建立好WebSocket长连接后,我们再来看一下最核心的消息收发是怎么处理的。
刚才讲到,客户端发送消息的功能,在实现上其实比较简单。我们只需要通过WebSocket对象的send方法,就可以把消息通过长连接发送到服务端。
那么,下面我们就来看一下服务端接收到消息后的逻辑处理。
核心的代码逻辑在WebSocketRouterHandler这个处理器中,消息接收处理的相关代码如下:
@Overrideprotected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {//如果是文本类型的WebSocket数据if (frame instanceof TextWebSocketFrame) {//先解析出具体的文本数据内容String msg = ((TextWebSocketFrame) frame).text();//再用JSON来对这些数据内容进行解析JSONObject msgJson = JSONObject.parseObject(msg);int type = msgJson.getIntValue("type");JSONObject data = msgJson.getJSONObject("data");long senderUid = data.getLong("senderUid");long recipientUid = data.getLong("recipientUid");String content = data.getString("content");int msgType = data.getIntValue("msgType");//调用业务层的Service来进行真正的发消息逻辑处理MessageVO messageContent = messageService.sendNewMsg(senderUid, recipientUid, content, msgType);if (messageContent != null) {JSONObject jsonObject = new JSONObject();jsonObject.put("type", 3);jsonObject.put("data", JSONObject.toJSON(messageContent));ctx.writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(jsonObject)));}}}
这里的WebSocketRouterHandler,我们也是采用事件监听机制来实现。由于这里需要处理“接收到”的消息,所以我们只需要实现channelRead0方法就可以。
在前面的管道处理器链中,因为添加了WebSocket相关的编解码器,所以这里的WebSocketRouterHandler接收到的都是WebSocketFrame格式的数据。
接下来,我们从WebSocketFrame格式的数据中,解析出文本类型的收发双方UID和发送内容,就可以调用后端业务模块的发消息功能,来进行最终的发消息逻辑处理了。
最后,把需要返回给消息发送方的客户端的信息,再通过writeAndFlush方法写回去,就完成消息的发送。
不过,以上的代码只是处理消息的发送,那么针对消息下推的逻辑处理又是如何实现的呢?
刚刚讲到,客户端发送的消息,会通过后端业务模块来进行最终的发消息逻辑处理,这个处理过程也包括消息的推送触发。
因此,我们可以在messageService.sendNewMsg方法中,等待消息存储、未读变更都完成后,再处理待推送给接收方的消息。
你可以参考下面的核心代码:
private static final ConcurrentHashMap<Long, Channel> userChannel = new ConcurrentHashMap<>(15000);@Overrideprotected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {//处理上线请求long loginUid = data.getLong("uid");userChannel.put(loginUid, ctx.channel());}public void pushMsg(long recipientUid, JSONObject message) {Channel channel = userChannel.get(recipientUid);if (channel != null && channel.isActive() && channel.isWritable()) {channel.writeAndFlush(new TextWebSocketFrame(message.toJSONString()));}}
首先,我们在处理用户建连上线的请求时,会先在网关机内存记录一个“当前连接用户和对应的连接”的映射。
当系统有消息需要推送时,我们通过查询这个映射关系,就能找到对应的连接,然后就可以通过这个连接,将消息下推下去。
public class NewMessageListener implements MessageListener {@Overridepublic void onMessage(Message message, byte[] pattern) {String topic = stringRedisSerializer.deserialize(message.getChannel());//从订阅到的Redis的消息里解析出真正需要的业务数据String jsonMsg = valueSerializer.deserialize(message.getBody());logger.info("Message Received --> pattern: {},topic:{},message: {}", new String(pattern), topic, jsonMsg);JSONObject msgJson = JSONObject.parseObject(jsonMsg);//解析出消息接收人的UIDlong otherUid = msgJson.getLong("otherUid");JSONObject pushJson = new JSONObject();pushJson.put("type", 4);pushJson.put("data", msgJson);//最终调用网关层处理器将消息真正下推下去websocketRouterHandler.pushMsg(otherUid, pushJson);}}@Overridepublic MessageVO sendNewMsg(long senderUid, long recipientUid, String content, int msgType) {//先对发送消息进行存储、加未读等操作//...// 然后将待推送消息发布到RedisredisTemplate.convertAndSend(Constants.WEBSOCKET_MSG_TOPIC, JSONObject.toJSONString(messageVO));}
然后,我们可以基于Redis的发布/订阅,实现一个消息推送的发布订阅器。
在业务层进行发送消息逻辑处理的最后,会将这条消息发布到Redis的一个Topic中,这个Topic被NewMessageListener一直监听着,如果有消息发布,那么监听器会马上感知到,然后再将消息提交给WebSocketRouterHandler,来进行最终消息的下推。
我在“04 | ACK机制:如何保证消息的可靠投递?”中有讲到,当系统有消息下推后,我们会依赖客户端响应的ACK包,来保证消息推送的可靠性。如果消息下推后一段时间,服务端没有收到客户端的ACK包,那么服务端会认为这条消息没有正常投递下去,就会触发重新下推。
关于ACK机制相应的服务端代码,你可以参考下面的示例:
public void pushMsg(long recipientUid, JSONObject message) {channel.writeAndFlush(new TextWebSocketFrame(message.toJSONString()));//消息推送下去后,将这条消息加入到待ACK列表中addMsgToAckBuffer(channel, message);}public void addMsgToAckBuffer(Channel channel, JSONObject msgJson) {nonAcked.put(msgJson.getLong("tid"), msgJson);//定时器针对下推的这条消息在5s后进行"是否ACK"的检查executorService.schedule(() -> {if (channel.isActive()) {//检查是否被ACK,如果没有收到ACK回包,会触发重推checkAndResend(channel, msgJson);}}, 5000, TimeUnit.MILLISECONDS);}long tid = data.getLong("tid");nonAcked.remove(tid);private void checkAndResend(Channel channel, JSONObject msgJson) {long tid = msgJson.getLong("tid");//重推2次int tryTimes = 2;while (tryTimes > 0) {if (nonAcked.containsKey(tid) && tryTimes > 0) {channel.writeAndFlush(new TextWebSocketFrame(msgJson.toJSONString()));try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}tryTimes--;}}
用户在上线完成后,服务端会在这个连接维度的存储里,初始化一个起始值为0的序号(tid),每当有消息推送给客户端时,服务端会针对这个序号进行加1操作,下推消息时就会携带这个序号连同消息一起推下去。
消息推送后,服务端会将当前消息加入到一个“待ACK Buffer”中,这个ACK Buffer的实现,我们可以简单地用一个ConcurrentHashMap来实现,Key就是这条消息携带的序号,Value是消息本身。
当消息加入到这个“待ACK Buffer”时,服务端会同时创建一个定时器,在一定的时间后,会触发“检查当前消息是否被ACK”的逻辑;如果客户端有回ACK,那么服务端就会从这个“待ACK Buffer”中移除这条消息,否则如果这条消息没有被ACK,那么就会触发消息的重新下推。
在了解了如何通过WebSocket长连接,来完成最核心的消息收发功能之后,我们再来看下,针对这个长连接,我们如何实现新增加的应用层心跳功能。
应用层心跳的作用,我在第8课“智能心跳机制:解决网络的不确定性”中也有讲到过,主要是为了解决由于网络的不确定性,而导致的连接不可用的问题。
客户端发送心跳包的主要代码设计如下,不过我这里的示例代码只是一个简单的实现,你可以自行参考,然后自己去尝试动手实现:
//每2分钟发送一次心跳包,接收到消息或者服务端的响应又会重置来重新计时。var heartBeat = {timeout: 120000,timeoutObj: null,serverTimeoutObj: null,reset: function () {clearTimeout(this.timeoutObj);clearTimeout(this.serverTimeoutObj);this.start();},start: function () {var self = this;this.timeoutObj = setTimeout(function () {var sender_id = $("#sender_id").val();var sendMsgJson = '{ "type": 0, "data": {"uid":' + sender_id + ',"timeout": 120000}}';websocket.send(sendMsgJson);self.serverTimeoutObj = setTimeout(function () {websocket.close();$("#ws_status").text("失去连接!");}, self.timeout)}, this.timeout)},}
客户端通过一个定时器,每2分钟通过长连接给服务端发送一次心跳包,如果在2分钟内接收到服务端的消息或者响应,那么客户端的下次2分钟定时器的计时,会进行清零重置,重新计算;如果发送的心跳包在2分钟后没有收到服务端的响应,客户端会断开当前连接,然后尝试重连。
我在下面的代码示例中,提供的“服务端接收到心跳包的处理逻辑”的实现过程,其实非常简单,只是封装了一个普通回包消息进行响应,代码设计如下:
@Overrideprotected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {long uid = data.getLong("uid");long timeout = data.getLong("timeout");logger.info("[heartbeat]: uid = {} , current timeout is {} ms, channel = {}", uid, timeout, ctx.channel());ctx.writeAndFlush(new TextWebSocketFrame("{\"type\":0,\"timeout\":" + timeout + "}"));}
我们实际在线上实现的时候,可以采用前面介绍的“智能心跳”机制,通过服务端对心跳包的响应,来计算新的心跳间隔,然后返回给客户端来进行调整。
好,到这里,期末实战的主要核心功能基本上也讲解得差不多了,细节方面你可以再翻一翻我在GitHub上提供的示例代码。
对于即时消息场景的代码实现来说,如果要真正达到线上使用的程度,相应的代码量是非常庞大的;而且对于同一个功能的实现,根据不同的使用场景和业务特征,很多业务在设计上也会有较大的差异性。
所以,实战课程的设计和示例代码只能做到挂一漏万,我尽量通过最简化的代码,来让你真正了解某一个功能在实现上最核心的思想。并且,通过期中和期末两个阶段的功能升级与差异对比,使你能感受到这些差异对于使用方体验和服务端压力的改善,从而可以更深刻地理解和掌握前面课程中相应的理论点。
今天的期末实战,我们主要是针对期中实战中IM系统设计的功能,来进行优化改造。
比如,使用基于WebSocket的长连接,代替基于HTTP的短轮询,来提升消息的实时性,并增加了应用层心跳、ACK机制等新功能。
通过这次核心代码的讲解,是想让你能理论结合实际地去理解前面课程讲到的,IM系统设计中最重要的部分功能,也希望你能自己尝试去动手写一写。当然,你也可以基于已有代码,去增加一些之前课程中有讲到,但是示例代码中没有实现的功能,比如离线消息、群聊等。
最后再给你留一个思考题:ACK机制的实现中,如果尝试多次下推之后仍然没有成功,服务端后续应该进行哪些处理呢?
以上就是今天课程的内容,欢迎你给我留言,我们可以在留言区一起讨论,感谢你的收听,我们下期再见。