疯狂创客圈 Java 分布式聊天室【 亿级流量】实战系列之 17【 博客园 总入口 】
源码IDEA工程获取链接:Java 聊天室 实战 源码
写在前面 大家好,我是作者尼恩。
前面,已经完成一个高性能的 Java 聊天程序的四件大事:
完成了协议选型,选择了性能更佳的 Protobuf协议。具体的文章为: Netty+Protobuf 整合一:实战案例,带源码
介绍了 通讯消息数据包的几条设计准则。具体的文章为: Netty +Protobuf 整合二:protobuf 消息通讯协议设计的几个准则
解决了一个非常基础的问题,这就是通讯的 粘包和半包问题。具体的文章为:Netty 粘包/半包 全解 | 史上最全解读
前一篇文件,已经完成了 系统三大组成模块的组成介绍。 具体的文章为:Netty聊天程序(实战一):从0开始实战100w级流量应用
今天介绍非常重要的一个内容:
客户端的通讯、登录请求和登录响应设计。
下面,开启今天的 惊险和刺激实战之旅。
客户端的会话管理 什么是会话?
为了方便客户端的开发,管理与服务器的连接,这里引入一个非常重要的中间角色——Session (会话)。有点儿像Web开发中的Tomcat的服务器 Session,但是又有很大的不同。
客户端的会话概念图,如下图所示:
客户端会话有两个很重的成员,一个是user,代表了拥有会话的用户。一个是channel,代表了连接的通道。两个成员的作用是:
通过user,可以获得当前的用户信息
通过channel,可以向服务器发送消息
所以,会话左拥右抱,左手用户资料,右手服务器的连接。在本例的开发中,会经常用到。
客户端的逻辑构成从逻辑上来说,客户端有三个子的功能模块。
模块一:Handler
入站处理器。
在Netty 中非常重要,负责处理入站消息。比方,服务器发送过来登录响应,服务器发送过来的聊天消息。
模块二:MsgBuilder
消息组装器。
将 Java 内部的 消息 Bean 对象,转成发送出去的 Protobuf 消息。
模块三:Sender
消息发送器。
Handler 负责收的工作。Sender 则是负责将消息发送出去。
三大子模块的类关系图:
介绍完成了主要的组成部分后,开始服务器的连接和Session 的创建。
连接服务器与Session 的创建 通过bootstrap 帮助类,设置完成线程组、通道类型,向管道流水线加入处理器Handler后,就可以开始连接服务器的工作。
本小节需要重点介绍的,是连接成功之后,创建 Session,并且将 Session和 channel 相互绑定。
代码如下:
package com.crazymakercircle.chat.client; //... @Data @Service("EchoClient") public class ChatClient { static final Logger LOGGER = LoggerFactory.getLogger(ChatClient.class); //.. private Channel channel; private ClientSender sender; public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup) { ChannelFuture f = null; try { if (bootstrap != null) { bootstrap.group(eventLoopGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.remoteAddress(host, port); // 设置通道初始化 bootstrap.handler( new ChannelInitializer<SocketChannel>() { public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProtobufDecoder()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(chatClientHandler); } } ); LOGGER.info(new Date() + "客户端开始登录[疯狂创客圈IM]"); f = bootstrap.connect().addListener((ChannelFuture futureListener) -> { final EventLoop eventLoop = futureListener.channel().eventLoop(); if (!futureListener.isSuccess()) { LOGGER.info("与服务端断开连接!在10s之后准备尝试重连!"); eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS); initFalg = false; } else { initFalg = true; } if (initFalg) { LOGGER.info("EchoClient客户端连接成功!"); LOGGER.info(new Date() + ": 连接成功,启动控制台线程……"); channel = futureListener.channel(); // 创建会话 ClientSession session = new ClientSession(channel); channel.attr(ClientSession.SESSION).set(session); session.setUser(ChatClient.this.getUser()); startConsoleThread(); } }); // 阻塞 f.channel().closeFuture().sync(); } } catch (Exception e) { LOGGER.info("客户端连接失败!" + e.getMessage()); } } //... } Session和 channel 相互绑定