Redis源码分析之发布订阅(pub/sub)

Redis算是缓存界的老大哥了,最近做的事情对Redis依赖较多,使用了里面的发布订阅功能,事务功能以及SortedSet等数据结构,后面准备好好学习总结一下Redis的一些知识点。

先看下redis发布订阅的结构:

redis发布订阅结构


redis发布订阅结构

其中发布者跟订阅者之间通过channel进行交互,channel分为两种模式。

一、redis发布订阅命令简介

redis中为发布订阅(pub/sub)功能提供了六个命令,分为两种模式。

由subscribe,unsubscribe组成,它们是负责订阅有确定名称的channel,例如subscribe test表示订阅名字为test的channel。

由psubscribe,punsubscribe组成,是负责订阅模糊名字的channel,例如psubscribe test* 表示订阅所有以test开头的channel。

最后再加上发布命令publish以及查看订阅相关信息的pubsub命令组成。

二、redis发布订阅源码分析

redis所有的命令及其处理函数都放在了server.c文件的开头,从其中找出发布订阅功能相关的命令信息。

{"subscribe",subscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0}, {"unsubscribe",unsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0}, {"psubscribe",psubscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0}, {"punsubscribe",punsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0}, {"publish",publishCommand,3,"pltF",0,NULL,0,0,0,0,0}, {"pubsub",pubsubCommand,-2,"pltR",0,NULL,0,0,0,0,0},

这里可以看出创建一条命令需要很多参数,我们这里只需要关注前两个参数,第一个参数表示命令的内容,第二个表示该命令对应的处理函数。

普通模式订阅subscribe函数:
该命令支持多个参数,即subscribe channel1,channel2...

void subscribeCommand(client *c) { int j; //这里挨个处理subscribe的参数,因为命令本身被作为参数0所以从1开始处理后面的参数 for (j = 1; j < c->argc; j++) //订阅每个频道 pubsubSubscribeChannel(c,c->argv[j]); //这里设置客户端的状态,下面会解释这个状态的作用 c->flags |= CLIENT_PUBSUB; }

在server.c文件中,processCommand函数是在调用具体命令函数之前的判断逻辑,其中有一段:

/* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */ if (c->flags & CLIENT_PUBSUB && c->cmd->proc != pingCommand && c->cmd->proc != subscribeCommand && c->cmd->proc != unsubscribeCommand && c->cmd->proc != psubscribeCommand && c->cmd->proc != punsubscribeCommand) { addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context"); return C_OK; }

这里注释也写的很清楚,就是当client处于pub/sub上下文时,只接收订阅相关命令以及一个ping命令,这就解释了上面subscribeCommand函数中为什么要设置客户端flag字段。

接下来看下订阅的具体逻辑:

int pubsubSubscribeChannel(client *c, robj *channel) { dictEntry *de; list *clients = NULL; int retval = 0; //把指定channel加入到client的pubsub_channels哈希表中 //不成功说明已经订阅了该频道 if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) { retval = 1; //这里是把该channel加入到client的哈希表中,引用加1 incrRefCount(channel); //在server的发布订阅哈希表中查找指定channel de = dictFind(server.pubsub_channels,channel); //如果该channel还不存在,则创建 if (de == NULL) { //创建一个空list clients = listCreate(); //把channel加入到server的哈希表中,value就是该channel的所有订阅者 dictAdd(server.pubsub_channels,channel,clients); //该channel引用加1 incrRefCount(channel); } else { clients = dictGetVal(de); } //把client加入到该channel的订阅列表中 listAddNodeTail(clients,c); } //一系列通知客户端的操作 addReply(c,shared.mbulkhdr[3]); addReply(c,shared.subscribebulk); addReplyBulk(c,channel); addReplyLongLong(c,clientSubscriptionsCount(c)); return retval; }

总结一下,订阅其实就是把指定channel分别加入到client跟server的pub/sub哈希表中,然后在server端保存订阅了该channle的所有client列表,如下图:

普通模式发布订阅数据结构


普通模式发布订阅数据结构

下面看一下publish发布命令:
例如:publish channelName msg

void publishCommand(client *c) { //发布逻辑 int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]); //这里是关于集群或者AOF的操作 if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]); else forceCommandPropagation(c,PROPAGATE_REPL); //返回给client通知了的订阅者数 addReplyLongLong(c,receivers); }

重点看下发布函数的源码:

int pubsubPublishMessage(robj *channel, robj *message) { int receivers = 0; dictEntry *de; listNode *ln; listIter li; //根据上面的订阅源码,这里就是取出订阅该channel的所有clients de = dictFind(server.pubsub_channels,channel); if (de) { //获取client的链表 list *list = dictGetVal(de); listNode *ln; listIter li; //由client链表创建它的迭代器,c++代码真是无力吐槽 listRewind(list,&li); //遍历所有client并发送消息 while ((ln = listNext(&li)) != NULL) { client *c = ln->value; addReply(c,shared.mbulkhdr[3]); addReply(c,shared.messagebulk); addReplyBulk(c,channel); addReplyBulk(c,message); receivers++; } } //开始模糊匹配的逻辑处理,模糊模式使用的是链表而不是哈希表,后面会讲 if (listLength(server.pubsub_patterns)) { //创建模糊规则的迭代器li listRewind(server.pubsub_patterns,&li); channel = getDecodedObject(channel); //遍历所有的模糊模式,如果匹配成功则发送消息 while ((ln = listNext(&li)) != NULL) { pubsubPattern *pat = ln->value; //判断当前channel是否可以匹配模糊规则 if (stringmatchlen((char*)pat->pattern->ptr, sdslen(pat->pattern->ptr), (char*)channel->ptr, sdslen(channel->ptr),0)) { addReply(pat->client,shared.mbulkhdr[4]); addReply(pat->client,shared.pmessagebulk); addReplyBulk(pat->client,pat->pattern); addReplyBulk(pat->client,channel); addReplyBulk(pat->client,message); receivers++; } } decrRefCount(channel); } return receivers; }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/c7284be87fa7592ef8a4c895779c3a42.html