Redis源码分析之发布订阅(pub/sub)(2)

从上面的publish处理函数可以看出每次进行消息发布的时候,都会向普通模式跟模糊模式发布消息,同时也能看出普通模式跟模糊模式使用的是两种不同的数据结构,下面看下模糊订阅模式。

模糊模式订阅psubscribe函数:

//psubscribe命令对应的处理函数 void psubscribeCommand(client *c) { int j; //挨个订阅client指定的pattern for (j = 1; j < c->argc; j++) pubsubSubscribePattern(c,c->argv[j]); //修改client状态 c->flags |= CLIENT_PUBSUB; } int pubsubSubscribePattern(client *c, robj *pattern) { int retval = 0; //判断client是否已经订阅该pattern,这里与普通模式不同,是个链表 if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { retval = 1; pubsubPattern *pat; //把指定pattern加入到client的pattern链表中 listAddNodeTail(c->pubsub_patterns,pattern); //引用计数+1 incrRefCount(pattern); //这里是创建一个pattern对象,并指向该client,加入到server的pattern链表中 //从这里可以看出,多个client订阅同一个pattern会创建多个patter对象,与普通模式不同 pat = zmalloc(sizeof(*pat)); pat->pattern = getDecodedObject(pattern); pat->client = c; listAddNodeTail(server.pubsub_patterns,pat); } //通知客户端 addReply(c,shared.mbulkhdr[3]); addReply(c,shared.psubscribebulk); addReplyBulk(c,pattern); addReplyLongLong(c,clientSubscriptionsCount(c)); return retval; }

通过分析上面的源码可以总结一下模糊订阅中的数据结构,如下图:

模糊发布订阅模式数据结构


模糊发布订阅模式数据结构

注:正如上面提到的,模糊模式中,一个pat对象中包含一个pattern规则跟一个client指针,也就是说当多个client模糊订阅同一个pattern时同样会为每个client都创建一个节点。

普通模式取消订阅unsubscribe函数:
取消就相对简单了,说白了就是把上面锁保存在server跟client端的数据删除。

取消订阅入口 void unsubscribeCommand(client *c) { //如果该命令没有参数,则把channel全部取消 if (c->argc == 1) { pubsubUnsubscribeAllChannels(c,1); } else { int j; //迭代取消置顶channel for (j = 1; j < c->argc; j++) pubsubUnsubscribeChannel(c,c->argv[j],1); } //如果channel被全部取消,则修改client状态,这样client就可以发送其他命令了 if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB; } //一次性取消订阅所有channel int pubsubUnsubscribeAllChannels(client *c, int notify) { //取出client端所有的channel dictIterator *di = dictGetSafeIterator(c->pubsub_channels); dictEntry *de; int count = 0; while((de = dictNext(di)) != NULL) { robj *channel = dictGetKey(de); //最终也是挨个取消channel count += pubsubUnsubscribeChannel(c,channel,notify); } //如果client���面都没有订阅,依然返回响应 if (notify && count == 0) { addReply(c,shared.mbulkhdr[3]); addReply(c,shared.unsubscribebulk); addReply(c,shared.nullbulk); addReplyLongLong(c,dictSize(c->pubsub_channels)+ listLength(c->pubsub_patterns)); } //释放空间 dictReleaseIterator(di); return count; } //取消订阅指定channel int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) { dictEntry *de; list *clients; listNode *ln; int retval = 0; //从client中删除指定channel if (dictDelete(c->pubsub_channels,channel) == DICT_OK) { retval = 1; //删除服务端该channel中的指定client de = dictFind(server.pubsub_channels,channel); serverAssertWithInfo(c,NULL,de != NULL); clients = dictGetVal(de); ln = listSearchKey(clients,c); serverAssertWithInfo(c,NULL,ln != NULL); listDelNode(clients,ln); if (listLength(clients) == 0) { //如果删除完以后channel没有了订阅者,则把channel也删除 dictDelete(server.pubsub_channels,channel); } } //返回client响应 if (notify) { addReply(c,shared.mbulkhdr[3]); addReply(c,shared.unsubscribebulk); addReplyBulk(c,channel); addReplyLongLong(c,dictSize(c->pubsub_channels)+ listLength(c->pubsub_patterns)); } //引用计数-1 decrRefCount(channel); return retval; }

由于模糊模式的取消订阅与普通模式类似,这里就不再贴代码了。

三、redis发布订阅总结

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/c7284be87fa7592ef8a4c895779c3a42.html