redis源码阅读之block操作底层实现

4770阅读 0评论2019-05-03 stolennnxb
分类:NOSQL

在block操作底层的文件中,第一句话就是

点击(此处)折叠或打开

  1. /* blocked.c - generic support for blocking operations like BLPOP & WAIT.
  2.  */

这边是提供了一些api,供外界调用
1. getTimeoutFromObjectOrReply(),在object当中获取timeout参数,具体如下

点击(此处)折叠或打开

  1. int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) {
  2.     long long tval;

  3.     if (getLongLongFromObjectOrReply(c,object,&tval,
  4.         "timeout is not an integer or out of range") != C_OK)
  5.         return C_ERR;

  6.     if (tval < 0) {
  7.         addReplyError(c,"timeout is negative");
  8.         return C_ERR;
  9.     }

  10.     if (tval > 0) {
  11.         if (unit == UNIT_SECONDS) tval *= 1000;
  12.         tval += mstime();
  13.     }
  14.     *timeout = tval;

  15.     return C_OK;
  16. }


2. blockClient(), 将一个client的CLIENT_BLOCKED位置位,并设置具体阻塞的类型

点击(此处)折叠或打开

  1. void blockClient(client *c, int btype) {
  2.     c->flags |= CLIENT_BLOCKED;
  3.     c->btype = btype;
  4.     server.bpop_blocked_clients++;
  5. }


3. processUnblockedClients(),当一个客户端被解出阻塞之后,在event loop当中的beforeSleep()函数中会被调用,用于处理这些client的input buffer 

点击(此处)折叠或打开

  1. void processUnblockedClients(void) {
  2.     listNode *ln;
  3.     client *c;

  4.     while (listLength(server.unblocked_clients)) {
  5.         ln = listFirst(server.unblocked_clients);
  6.         serverAssert(ln != NULL);
  7.         c = ln->value;
  8.         listDelNode(server.unblocked_clients,ln);
  9.         c->flags &= ~CLIENT_UNBLOCKED;

  10.         /* Process remaining data in the input buffer, unless the client
  11.          * is blocked again. Actually processInputBuffer() checks that the
  12.          * client is not blocked before to proceed, but things may change and
  13.          * the code is conceptually more correct this way. */
  14.         if (!(c->flags & CLIENT_BLOCKED)) {
  15.             if (c->querybuf && sdslen(c->querybuf) > 0) {
  16.                 processInputBuffer(c);
  17.             }
  18.         }
  19.     }
  20. }


4. unblockClient(), 根据当前client被阻塞的类型进行unblock,

点击(此处)折叠或打开

  1. void unblockClient(client *c) {
  2.     if (c->btype == BLOCKED_LIST) {
  3.         unblockClientWaitingData(c);
  4.     } else if (c->btype == BLOCKED_WAIT) {
  5.         unblockClientWaitingReplicas(c);
  6.     } else if (c->btype == BLOCKED_MODULE) {
  7.         unblockClientFromModule(c);
  8.     } else {
  9.         serverPanic("Unknown btype in unblockClient().");
  10.     }
  11.     /* Clear the flags, and put the client in the unblocked list so that
  12.      * we'll process new commands in its query buffer ASAP. */
  13.     c->flags &= ~CLIENT_BLOCKED;
  14.     c->btype = BLOCKED_NONE;
  15.     server.bpop_blocked_clients--;
  16.     /* The client may already be into the unblocked list because of a previous
  17.      * blocking operation, don't add back it into the list multiple times. */
  18.     if (!(c->flags & CLIENT_UNBLOCKED)) {
  19.         c->flags |= CLIENT_UNBLOCKED;
  20.         listAddNodeTail(server.unblocked_clients,c);
  21.     }
  22. }

5. replyToBlockedClientTimedOut(),当被阻塞的client到达time out时间后,给其发送响应

点击(此处)折叠或打开

  1. void replyToBlockedClientTimedOut(client *c) {
  2.     if (c->btype == BLOCKED_LIST) {
  3.         addReply(c,shared.nullmultibulk);
  4.     } else if (c->btype == BLOCKED_WAIT) {
  5.         addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
  6.     } else if (c->btype == BLOCKED_MODULE) {
  7.         moduleBlockedClientTimedOut(c);
  8.     } else {
  9.         serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
  10.     }
  11. }


6. disconnectAllBlockedClients(), 这个函数只有在一个节点由主节点变为从节点,的时候,会给客户端发送一个unblocked error,并且断开与客户端的连接

点击(此处)折叠或打开

  1. void disconnectAllBlockedClients(void) {
  2.     listNode *ln;
  3.     listIter li;

  4.     listRewind(server.clients,&li);
  5.     while((ln = listNext(&li))) {
  6.         client *c = listNodeValue(ln);

  7.         if (c->flags & CLIENT_BLOCKED) {
  8.             addReplySds(c,sdsnew(
  9.                 "-UNBLOCKED force unblock from blocking operation, "
  10.                 "instance state changed (master -> slave?)\r\n"));
  11.             unblockClient(c);
  12.             c->flags |= CLIENT_CLOSE_AFTER_REPLY;
  13.         }
  14.     }
  15. }


上一篇:redis源码阅读之bitfieldop
下一篇:redis源码阅读之持久化优化——管道