在前面的文章中有提到,在multi 前可以通过watch 来观察哪些key,被观察的这些key,会被redis服务器监控,涉及该key被修改时,则在exec 命令执行过程中会被识别出来,exec 就不会再执行命令。
源码分析
// 监控对应的key
void watchCommand(client *c) {int j;if (c->flags & CLIENT_MULTI) {addReplyError(c,"WATCH inside MULTI is not allowed");return;}/* No point in watching if the client is already dirty. */if (c->flags & CLIENT_DIRTY_CAS) {addReply(c,shared.ok);return;}for (j = 1; j < c->argc; j++)// 按指定的key进行监控watchForKey(c,c->argv[j]);addReply(c,shared.ok);
}
void watchForKey(client *c, robj *key) {list *clients = NULL;listIter li;listNode *ln;watchedKey *wk;/* Check if we are already watching for this key */// 当前已经监控的keylistRewind(c->watched_keys,&li);// 检查是否在监控列表while((ln = listNext(&li))) {wk = listNodeValue(ln);if (wk->db == c->db && equalStringObjects(key,wk->key))return; /* Key already watched */}/* This key is not already watched in this DB. Let's add it */// 获取监控的客户端列表// db->watched_keys是全局缓存的列表clients = dictFetchValue(c->db->watched_keys,key);if (!clients) {// 没有则创建clients = listCreate();dictAdd(c->db->watched_keys,key,clients);incrRefCount(key);}// 记录新的watchKey内容/* Add the new key to the list of keys watched by this client */wk = zmalloc(sizeof(*wk));wk->key = key; // 当前的keywk->client = c; // 当前的客户端wk->db = c->db; // 当前的数据库wk->expired = keyIsExpired(c->db, key); // 是否过期incrRefCount(key);listAddNodeTail(c->watched_keys,wk); // 加入客户端的列表listAddNodeTail(clients,wk); // 加入全局列表
}
watch 命令,做的事情其实也很简单,就是将当前客户端与key 进行关联,并把监控的key加入全局列表中,该全局列表。
当key有变更时,会调用signalModifiedKey
进行通知。
void signalModifiedKey(client *c, redisDb *db, robj *key)
{// 监控的key进行刷新touchWatchedKey(db, key);trackingInvalidateKey(c, key, 1);
}
// 如果在multi之前有watch某些key,则该key的所以异动都会要执行该方法
// 并进行重置client的
void touchWatchedKey(redisDb *db, robj *key) {list *clients;listIter li;listNode *ln;if (dictSize(db->watched_keys) == 0) return;clients = dictFetchValue(db->watched_keys, key);if (!clients) return;/* Mark all the clients watching this key as CLIENT_DIRTY_CAS *//* Check if we are already watching for this key */listRewind(clients,&li);while((ln = listNext(&li))) {watchedKey *wk = listNodeValue(ln);client *c = wk->client;if (wk->expired) {/* The key was already expired when WATCH was called. */if (db == wk->db &&equalStringObjects(key, wk->key) &&dictFind(db->dict, key->ptr) == NULL){/* Already expired key is deleted, so logically no change. Clear* the flag. Deleted keys are not flagged as expired. */wk->expired = 0;goto skip_client;}break;}// 客户端增加标识,告知该客户端有被污染c->flags |= CLIENT_DIRTY_CAS;/* As the client is marked as dirty, there is no point in getting here* again in case that key (or others) are modified again (or keep the* memory overhead till EXEC). */// 移除所有的keyunwatchAllKeys(c);skip_client:continue;}
}
该方法会进行相应key的处理,并为持有该key的客户端,将其标识增加CLIENT_DIRTY_CAS
,该标识会在exec命令中判断,如果有,说明watch的key已经被修改,不执行相关的命令。