redis源码阅读之zset模型

3230阅读 0评论2019-04-27 stolennnxb
分类:NOSQL

zset模型和之前讲过的模型都不太一样,别的模型底层有多个单独的实现,来回转换,zset这边是由两个实现来存储的,只不过一种是ziplist,另外一种是hashtable+zskiplist的common zset,其中第二种在节省空间上面花了心思——共享底层的redis object,其存储结构如下所示

点击(此处)折叠或打开

  1. typedef struct zset {
  2.     dict *dict;
  3.     zskiplist *zsl;
  4. } zset;
在数据插入到hashtable的时候,mapping关系是redis object-->score,与此同时,元素插入到skiplist的时候,mapping关系是score--->redis object, 在后一种情况下,元素自然而然的就按照分数排好序了~~~
由于dict当中只有在注册了keyDestructor的时候才会释放空间,zset为了释放元素的时候更加容易管理,这边只是在zslFreeNode()当中释放空间。删除的时候是从dict里rem,后在skiplist中删除。

这里的转换提供了双向转换,从ziplist到common zset和common zset到ziplist的转换都有,前者的转换基本就是没法满足在redis.conf当中设置的两个条件,具体如下:

点击(此处)折叠或打开

  1. # Similarly to hashes and lists, sorted sets are also specially encoded in
  2. # order to save a lot of space. This encoding is only used when the length and
  3. # elements of a sorted set are below the following limits:
  4. zset-max-ziplist-entries 128
  5. zset-max-ziplist-value 64

点击(此处)折叠或打开

  1. int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
  2.     /* Turn options into simple to check vars. */
  3.     int incr = (*flags & ZADD_INCR) != 0;
  4.     int nx = (*flags & ZADD_NX) != 0;
  5.     int xx = (*flags & ZADD_XX) != 0;
  6.     *flags = 0; /* We'll return our response flags. */
  7.     double curscore;

  8. ...
  9.             zobj->ptr = zzlInsert(zobj->ptr,ele,score);
  10.             if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
  11.                 zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
  12.             if (sdslen(ele) > server.zset_max_ziplist_value)
  13.                 zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
  14.             if (newscore) *newscore = score;
  15.             *flags |= ZADD_ADDED;
  16.             return 1;
  17. ...
  18.     return 0; /* Never reached. */
  19. }
转换回到ziplist的代码就只有在union操作的时候才用到了~~~

点击(此处)折叠或打开

  1. void zsetConvertToZiplistIfNeeded(robj *zobj, size_t maxelelen) {
  2.     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) return;
  3.     zset *zset = zobj->ptr;

  4.     if (zset->zsl->length <= server.zset_max_ziplist_entries &&
  5.         maxelelen <= server.zset_max_ziplist_value)
  6.             zsetConvert(zobj,OBJ_ENCODING_ZIPLIST);
  7. }

点击(此处)折叠或打开

  1. void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
  2. ...

  3.     /* parse optional extra arguments */
  4. ...
  5.     /* sort sets from the smallest to largest, this will improve our
  6.      * algorithm's performance */
  7. ...
  8.         /* Step 1: Create a dictionary of elements -> aggregated-scores
  9.          * by iterating one sorted set after the other. */
  10. ...
  11.         /* Step 2: convert the dictionary into the final sorted set. */
  12. ...
  13.     if (dbDelete(c->db,dstkey))
  14.         touched = 1;
  15.     if (dstzset->zsl->length) {
  16.         zsetConvertToZiplistIfNeeded(dstobj,maxelelen);
  17.         dbAdd(c->db,dstkey,dstobj);
  18.         addReplyLongLong(c,zsetLength(dstobj));
  19.         signalModifiedKey(c->db,dstkey);
  20.         notifyKeyspaceEvent(NOTIFY_ZSET,
  21.             (op == SET_OP_UNION) ? "zunionstore" : "zinterstore",
  22.             dstkey,c->db->id);
  23.         server.dirty++;
  24.     } else {
  25.         decrRefCount(dstobj);
  26.         addReply(c,shared.czero);
  27.         if (touched) {
  28.             signalModifiedKey(c->db,dstkey);
  29.             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id);
  30.             server.dirty++;
  31.         }
  32.     }
  33.     zfree(src);
  34. }

真正执行转换的代码如下:

点击(此处)折叠或打开

  1. void zsetConvert(robj *zobj, int encoding) {
  2.     zset *zs;
  3.     zskiplistNode *node, *next;
  4.     sds ele;
  5.     double score;

  6.     if (zobj->encoding == encoding) return;
  7.     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
  8.         unsigned char *zl = zobj->ptr;
  9.         unsigned char *eptr, *sptr;
  10.         unsigned char *vstr;
  11.         unsigned int vlen;
  12.         long long vlong;

  13.         if (encoding != OBJ_ENCODING_SKIPLIST)
  14.             serverPanic("Unknown target encoding");

  15.         zs = zmalloc(sizeof(*zs));
  16.         zs->dict = dictCreate(&zsetDictType,NULL);
  17.         zs->zsl = zslCreate();

  18.         eptr = ziplistIndex(zl,0);
  19.         serverAssertWithInfo(NULL,zobj,eptr != NULL);
  20.         sptr = ziplistNext(zl,eptr);
  21.         serverAssertWithInfo(NULL,zobj,sptr != NULL);

  22.         while (eptr != NULL) {
  23.             score = zzlGetScore(sptr);
  24.             serverAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
  25.             if (vstr == NULL)
  26.                 ele = sdsfromlonglong(vlong);
  27.             else
  28.                 ele = sdsnewlen((char*)vstr,vlen);

  29.             node = zslInsert(zs->zsl,score,ele);
  30.             serverAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK);
  31.             zzlNext(zl,&eptr,&sptr);
  32.         }

  33.         zfree(zobj->ptr);
  34.         zobj->ptr = zs;
  35.         zobj->encoding = OBJ_ENCODING_SKIPLIST;
  36.     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
  37.         unsigned char *zl = ziplistNew();

  38.         if (encoding != OBJ_ENCODING_ZIPLIST)
  39.             serverPanic("Unknown target encoding");

  40.         /* Approach similar to zslFree(), since we want to free the skiplist at
  41.          * the same time as creating the ziplist. */
  42.         zs = zobj->ptr;
  43.         dictRelease(zs->dict);
  44.         node = zs->zsl->header->level[0].forward;
  45.         zfree(zs->zsl->header);
  46.         zfree(zs->zsl);

  47.         while (node) {
  48.             zl = zzlInsertAt(zl,NULL,node->ele,node->score);
  49.             next = node->level[0].forward;
  50.             zslFreeNode(node);
  51.             node = next;
  52.         }

  53.         zfree(zs);
  54.         zobj->ptr = zl;
  55.         zobj->encoding = OBJ_ENCODING_ZIPLIST;
  56.     } else {
  57.         serverPanic("Unknown sorted set encoding");
  58.     }
  59. }
给外界的接口套路都差不多:先找到数据库,在数据库里对指定的存储操作,操作结束后发送事件通知,这里只列出了rem的代码,因为其他的实在是太长了- -|||,见谅见谅:

点击(此处)折叠或打开

  1. void zremCommand(client *c) {
  2.     robj *key = c->argv[1];
  3.     robj *zobj;
  4.     int deleted = 0, keyremoved = 0, j;

  5.     if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
  6.         checkType(c,zobj,OBJ_ZSET)) return;

  7.     for (j = 2; j < c->argc; j++) {
  8.         if (zsetDel(zobj,c->argv[j]->ptr)) deleted++;
  9.         if (zsetLength(zobj) == 0) {
  10.             dbDelete(c->db,key);
  11.             keyremoved = 1;
  12.             break;
  13.         }
  14.     }

  15.     if (deleted) {
  16.         notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id);
  17.         if (keyremoved)
  18.             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
  19.         signalModifiedKey(c->db,key);
  20.         server.dirty += deleted;
  21.     }
  22.     addReplyLongLong(c,deleted);
  23. }

还请各位多提宝贵意见~~~

上一篇:redis源码阅读之集合对象
下一篇:redis源码阅读之bio