从上面的publish处理函数可以看出每次进行消息发布的时候,都会向普通模式跟模糊模式发布消息,同时也能看出普通模式跟模糊模式使用的是两种不同的数据结构,下面看下模糊订阅模式。
模糊模式订阅psubscribe函数:
//psubscribe命令对应的处理函数
void psubscribeCommand(client *c) {
int j;
//挨个订阅client指定的pattern
for (j = 1; j < c->argc; j++)
pubsubSubscribePattern(c,c->argv[j]);
//修改client状态
c->flags |= CLIENT_PUBSUB;
}
int pubsubSubscribePattern(client *c, robj *pattern) {
int retval = 0;
//判断client是否已经订阅该pattern,这里与普通模式不同,是个链表
if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
retval = 1;
pubsubPattern *pat;
//把指定pattern加入到client的pattern链表中
listAddNodeTail(c->pubsub_patterns,pattern);
//引用计数+1
incrRefCount(pattern);
//这里是创建一个pattern对象,并指向该client,加入到server的pattern链表中
//从这里可以看出,多个client订阅同一个pattern会创建多个patter对象,与普通模式不同
pat = zmalloc(sizeof(*pat));
pat->pattern = getDecodedObject(pattern);
pat->client = c;
listAddNodeTail(server.pubsub_patterns,pat);
}
//通知客户端
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.psubscribebulk);
addReplyBulk(c,pattern);
addReplyLongLong(c,clientSubscriptionsCount(c));
return retval;
}
通过分析上面的源码可以总结一下模糊订阅中的数据结构,如下图:
模糊发布订阅模式数据结构
注:正如上面提到的,模糊模式中,一个pat对象中包含一个pattern规则跟一个client指针,也就是说当多个client模糊订阅同一个pattern时同样会为每个client都创建一个节点。
普通模式取消订阅unsubscribe函数:
取消就相对简单了,说白了就是把上面锁保存在server跟client端的数据删除。
取消订阅入口
void unsubscribeCommand(client *c) {
//如果该命令没有参数,则把channel全部取消
if (c->argc == 1) {
pubsubUnsubscribeAllChannels(c,1);
} else {
int j;
//迭代取消置顶channel
for (j = 1; j < c->argc; j++)
pubsubUnsubscribeChannel(c,c->argv[j],1);
}
//如果channel被全部取消,则修改client状态,这样client就可以发送其他命令了
if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
}
//一次性取消订阅所有channel
int pubsubUnsubscribeAllChannels(client *c, int notify) {
//取出client端所有的channel
dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
dictEntry *de;
int count = 0;
while((de = dictNext(di)) != NULL) {
robj *channel = dictGetKey(de);
//最终也是挨个取消channel
count += pubsubUnsubscribeChannel(c,channel,notify);
}
//如果client���面都没有订阅,依然返回响应
if (notify && count == 0) {
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.unsubscribebulk);
addReply(c,shared.nullbulk);
addReplyLongLong(c,dictSize(c->pubsub_channels)+
listLength(c->pubsub_patterns));
}
//释放空间
dictReleaseIterator(di);
return count;
}
//取消订阅指定channel
int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
dictEntry *de;
list *clients;
listNode *ln;
int retval = 0;
//从client中删除指定channel
if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
retval = 1;
//删除服务端该channel中的指定client
de = dictFind(server.pubsub_channels,channel);
serverAssertWithInfo(c,NULL,de != NULL);
clients = dictGetVal(de);
ln = listSearchKey(clients,c);
serverAssertWithInfo(c,NULL,ln != NULL);
listDelNode(clients,ln);
if (listLength(clients) == 0) {
//如果删除完以后channel没有了订阅者,则把channel也删除
dictDelete(server.pubsub_channels,channel);
}
}
//返回client响应
if (notify) {
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.unsubscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,dictSize(c->pubsub_channels)+
listLength(c->pubsub_patterns));
}
//引用计数-1
decrRefCount(channel);
return retval;
}
由于模糊模式的取消订阅与普通模式类似,这里就不再贴代码了。
三、redis发布订阅总结