在秀场直播系统中,如果说音视频功能的实现,是给直播装扮上了华丽的新装外表的话,那么直播系统中消息系统的实现,则是整个直播华丽新装下的灵魂,如何搭建高可用的直播间消息系统,也是每一个直播系统必须要解决的问题。
在设计秀场直播的消息系统之前,我们需要简单地梳理一下直播间的消息类型。
通知类消息例如送礼、弹幕、进场、榜单变化、等级变化等等消息。他们的特征是通知用户直播间的事件,营造直播间氛围,提升用户观看直播的体验。
功能类消息例如踢人、反垃圾审核、红包、PK消息等等。这类消息的特征是辅助直播业务开展,在流程上串联开播端、观看端、服务端三个角色。
我们可以从业务角度中,分析出直播间的各类消息虽然因为业务形态各式各样,最终呈现的形式也是多彩绚丽,但是我们可以从各类的消息展现形式可以分析出,消息从开发的角度,有如下几个特性,我们按照消息是否可丢弃,和实时性划分,我们可以把所有的业务消息归为如下几类:
在直播系统中,秀场直播,带货直播的直播间消息信令通信是比较偏多的,主要是因为业务性质所决定的,秀场直播和带货直播这两类直播的互动性相对比较强,玩法也比较多样,按照我们上图的分类,每一个业务的消息的可丢弃性和实时性要求都不一样,所以在开发消息系统的时候,也需要对消息进行优先级排序,对消息分发的实时性也要有业务性能考量。刚刚针对直播间消息实时性和不可丢弃性这两个属性做了业务上相关的阐述,不过对于直播消息而言,第一要素是稳定性,消息如何准确稳定地分发到指定的直播间,也是我们需要考虑的问题之一,直播消息的分发实现,从总体上说可以分为两种实现方式,第一是依靠直播间的实时通讯(Instant Messaging),也就是我们常说的IM消息系统,第二个是依靠http短轮询,例如客户端每隔1秒来请求一次服务器,服务器返回这一秒内发生的增量消息信息,客户端获取到这些增量信息,再根据具体的消息业务类型,再进行相对业务的页面UI渲染,这样就可以了,从技术上说,一个是“推”模型,一个是“拉”模型,今天我们因为搭建一个简单的直播间消息系统,我们先用一个简单的"拉"模型进行简单的实现。
基本实现思路:客户端每隔一个极短的时间,例如1秒亦或者更短的时间,根据直播间的id来调用服务端的接口,轮询该直播间发生的消息,服务端这边我们使用redis的SortedSet的数据结构来存储消息,其中key是直播间的房间id,score是服务器接收到该消息事件生成的时间戳,value可以简单地直接存储该消息序列化后的字符串,这样可以按照时间顺序地去存储消息,并且配置过期消息的删除逻辑,整个消息的存储就可以简单地搭建起来。
消息存储用java的伪代码所示:
long time = new Date().getTime(); try { // redis中插入消息数据 jedisTemplate.zadd(V_UNIQUE_ROOM_ID, time, JSON.toJSONString(roomMessage)); // 按照概率性的去删除redis中过期的消息数据 if (probability()) { deleteOverTimeCache(V_UNIQUE_ROOM_ID); } } catch (Exception e) { log.error("message save error", e); } 复制代码可以看到消息存储,如果使用redis的sortedSet进行存储还是比较方便的,接下来我们需要处理就是redis中过期消息的删除,因为无效的过期消息是没有价值的(所有的消息可以做持久化存储),redis中如果单一的key存储的消息过多,也会导致消息的慢查,和内存的使用量不断增大,这是我们不想看到的,这边因为是示例代码,所以简单地处理一下删除逻辑。
private void deleteOverTimeCache(String roomId) { Long totalCount = jedisTemplate.zcard(roomId); log.info("deleteOldTimeCache size is {}", totalCount); if (totalCount < 600) { return; } // 倒序删除过期数据 Set<Tuple> tuples = jedisTemplate.zrangeWithScores(roomId, -601, -1); if (CollectionUtils.isNotEmpty(tuples)) { for (Tuple tuple : tuples) { // 这是第一个-600条的那个score double score = tuple.getScore(); jedisTemplate.zremrangeByScore(roomId, 0d, score); break; } } } 复制代码