Redis运行流程源码解析

4550阅读 0评论2013-06-19 zhm_sunboy
分类:NOSQL

概述

  Redis通过定义一个 struct redisServer 类型的全局变量server 来保存服务器的相关信息(比如:配置信息,统计信息,服务器状态等等)。启动时通过读取配置文件里边的信息对server进行初始化(如果没有指定配置文件,将使用默认值对sever进行初始化),初始化的内容有:起监听端口,绑定有新连接时的回调函数,绑定服务器的定时函数,虚拟内存初始化,log初始化等等。

  启动

  初始化服务器配置

  先来看看redis 的main函数的入口

  Redis.c:1694


int main(int argc, char **argv) { 
    time_t start; 

    initServerConfig(); 
    if (argc == 2) { 
        if (strcmp(argv[1], "-v") == 0 || 
            strcmp(argv[1], "--version") == 0) version(); 
        if (strcmp(argv[1], "--help") == 0) usage(); 
        resetServerSaveParams(); 
        loadServerConfig(argv[1]); 
    } else if ((argc > 2)) { 
        usage(); 
    } else { 
        ... 
    } 
    if (server.daemonize) daemonize(); 
    initServer(); 
    ...


  Redis.c:923


    if (server.port != 0) { 
        server.ipfd= anetTcpServer(server.neterr,server.port,server.bindaddr); 
        if (server.ipfd == ANET_ERR) { 
            redisLog(REDIS_WARNING, "Opening port %d: %s", 
                server.port, server.neterr); 
            exit(1); 
        } 
    }


  事件轮询结构体定义

  先看看事件轮询的结构体定义

  Ae.h:88


/* State of an event based program */ 
typedef struct aeEventLoop { 
    int maxfd; 
    long long timeEventNextId; 
    aeFileEvent events[AE_SETSIZE]; /* Registered events */ 
    aeFiredEvent fired[AE_SETSIZE]; /* Fired events */ 
    aeTimeEvent *timeEventHead; 
    int stop; 
    void *apidata; /* This is used for polling API specific data */ 
    aeBeforeSleepProc *beforesleep; 
} aeEventLoop;



typedef struct aeApiState { 
    int epfd; 
    struct epoll_event events[AE_SETSIZE]; 
} aeApiState;


  创建事件轮询

  Redis.c:920


  server.el = aeCreateEventLoop(); 
Ae.c:55 
aeEventLoop *aeCreateEventLoop(void) { 
    aeEventLoop *eventLoop; 
    int i; 

    eventLoop = zmalloc(sizeof(*eventLoop)); 
    if (!eventLoop) return NULL; 
    eventLoop->timeEventHead = NULL; 
    eventLoop->timeEventNextId = 0; 
    eventLoop->stop = 0; 
    eventLoop->maxfd = -1; 
    eventLoop->beforesleep = NULL; 
    if (aeApiCreate(eventLoop) == -1) { 
        zfree(eventLoop); 
        return NULL; 
    } 
/* Events with mask == AE_NONE are not set. So let's initialize 
 * the vector with it. */ 
    for (i = 0; i < AE_SETSIZE; i++) 
        eventLoop->events[i].mask = AE_NONE; 
    return eventLoop; 
}


  绑定定时函数和有新连接时的回调函数

  redis.c:973


aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL); 
if (server.ipfd > 0 && 
    aeCreateFileEvent(server.el,server.ipfd,AE_READABLE, 
acceptTcpHandler,NULL) == AE_ERR) oom("creating file event");


  进入事件轮询

  初始化后将进入事件轮询

  Redis.c:1733


    aeSetBeforeSleepProc(server.el,beforeSleep); 
    aeMain(server.el); 
    aeDeleteEventLoop(server.el);


  事件轮询函数aeMain

  看看aeMain的内容

  Ae.c:382


void aeMain(aeEventLoop *eventLoop) { 
    eventLoop->stop = 0; 
    while (!eventLoop->stop) { 
        if (eventLoop->beforesleep != NULL) 
            eventLoop->beforesleep(eventLoop); 
        aeProcessEvents(eventLoop, AE_ALL_EVENTS); 
    } 
}


  启动完毕,等待客户端请求

  到进入事件轮询函数后,redis的启动工作就做完了,接下来就是等待客户端的请求了。

  接收请求

  新连接到来时的回调函数

  在绑定定时函数和有新连接时的回调函数中说到了绑定有新连接来时的回调函数acceptTcpHandler,现在来看看这个函数的具体内容

  Networking.c:427


void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { 
    int cport, cfd; 
    char cip[128]; 
    REDIS_NOTUSED(el); 
    REDIS_NOTUSED(mask); 
    REDIS_NOTUSED(privdata); 

    cfd = anetTcpAccept(server.neterr, fd, cip, &cport); 
    if (cfd == AE_ERR) { 
        redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr); 
        return; 
    } 
    redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport); 
    acceptCommonHandler(cfd); 
}


  接收客户端的新连接

  接下来看看anetTcpAccept函数的具体内容


Anet.c:330 
int anetTcpAccept(char *err, int s, char *ip, int *port) { 
    int fd; 
    struct sockaddr_in sa; 
    socklen_t salen = sizeof(sa); 
    if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == ANET_ERR) 
        return ANET_ERR; 

    if (ip) strcpy(ip,inet_ntoa(sa.sin_addr)); 
    if (port) *port = ntohs(sa.sin_port); 
    return fd; 
}


  再进去anetGenericAccept 看看

  Anet.c:313


static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) { 
    int fd; 
    while(1) { 
        fd = accept(s,sa,len); 
        if (fd == -1) { 
            if (errno == EINTR) 
                continue; 
            else { 
                anetSetError(err, "accept: %s", strerror(errno)); 
                return ANET_ERR; 
            } 
        } 
        break; 
    } 
    return fd; 
}


  创建redisClient进行接收处理

  anetTcpAccept 运行完后,返回新连接的socket fd, 然后返回到调用函数acceptTcpHandler中,继续执行acceptCommonHandler 函数

  Networking.c:403


static void acceptCommonHandler(int fd) { 
    redisClient *c; 
    if ((c = createClient(fd)) == NULL) { 
        redisLog(REDIS_WARNING,"Error allocating resoures for the client"); 
        close(fd); /* May be already closed, just ingore errors */ 
        return; 
    } 
    /* If maxclient directive is set and this is one client more... close the 
     * connection. Note that we create the client instead to check before 
     * for this condition, since now the socket is already set in nonblocking 
     * mode and we can send an error for free using the Kernel I/O */ 
    if (server.maxclients && listLength(server.clients) > server.maxclients) { 
        char *err = "-ERR max number of clients reached\r\n"; 

        /* That's a best effort error message, don't check write errors */ 
        if (write(c->fd,err,strlen(err)) == -1) { 
            /* Nothing to do, Just to avoid the warning... */ 
        } 
        freeClient(c); 
        return; 
    } 
    server.stat_numconnections++; 
}


  绑定有数据可读时的回调函数

  Networking.c:15


redisClient *createClient(int fd) { 
    redisClient *c = zmalloc(sizeof(redisClient)); 
    c->bufpos = 0; 

    anetNonBlock(NULL,fd); 
    anetTcpNoDelay(NULL,fd); 
    if (aeCreateFileEvent(server.el,fd,AE_READABLE, 
        readQueryFromClient, c) == AE_ERR) 
    { 
        close(fd); 
        zfree(c); 
        return NULL; 
    } 

    selectDb(c,0); 
    c->fd = fd; 
    c->querybuf = sdsempty(); 
c->reqtype = 0; 
... 
}


  createClient 函数执行完后返回到调用处acceptCommonHandler函数,然后从acceptCommonHandler函数再返回到acceptTcpHandler函数。

  接收请求完毕,准备接收客户端得数据

  到此为止,新连接到来时的回调函数acceptTcpHandler执行完毕,在这个回调函数中创建了一个redisClient来处理这个客户端接下来的请求,并绑定了接收的新连接的读文件事件。当有数据可读时,网络i/o轮询(比如epoll)会有事件触发,此时绑定的回调函数readQueryFromClient将会调用来处理客户端发送过来的数据。

  读取客户端请求的数据

  在绑定有数据可读时的回调函数中的createClient函数中绑定了一个有数据可读时的回调函数readQueryFromClient函数,现在看看这个函数的具体内容

  Networking.c:874


void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { 
    redisClient *c = (redisClient*) privdata; 
    char buf[REDIS_IOBUF_LEN]; 
    int nread; 
    REDIS_NOTUSED(el); 
    REDIS_NOTUSED(mask); 

    server.current_client = c; 
    nread = read(fd, buf, REDIS_IOBUF_LEN); 
    if (nread == -1) { 
        if (errno == EAGAIN) { 
            nread = 0; 
        } else { 
            redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno)); 
            freeClient(c); 
            return; 
        } 
    } else if (nread == 0) { 
        redisLog(REDIS_VERBOSE, "Client closed connection"); 
        freeClient(c); 
        return; 
    } 
    if (nread) { 
        c->querybuf = sdscatlen(c->querybuf,buf,nread); 
        c->lastinteraction = time(NULL); 
    } else { 
        server.current_client = NULL; 
        return; 
    } 
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) { 
        sds ci = getClientInfoString(c), bytes = sdsempty(); 

        bytes = sdscatrepr(bytes,c->querybuf,64); 
        redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); 
        sdsfree(ci); 
        sdsfree(bytes); 
        freeClient(c); 
        return; 
    } 
    processInputBuffer(c); 
    server.current_client = NULL; 
}


  请求协议

  从readQueryFromClient函数读取客户端传过来的数据,进入processInputBuffer函数进行协议解析,可以把processInputBuffer函数看作是输入数据的协议解析器

  Networking.c:835


void processInputBuffer(redisClient *c) { 
    /* Keep processing while there is something in the input buffer */ 
    while(sdslen(c->querybuf)) { 
        /* Immediately abort if the client is in the middle of something. */ 
        if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return; 

        /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is 
         * written to the client. Make sure to not let the reply grow after 
         * this flag has been set (i.e. don't process more commands). */ 
        if (c->flags & REDIS_CLOSE_AFTER_REPLY) return; 

        /* Determine request type when unknown. */ 
        if (!c->reqtype) { 
            if (c->querybuf[0] == '*') { 
                c->reqtype = REDIS_REQ_MULTIBULK; 
            } else { 
                c->reqtype = REDIS_REQ_INLINE; 
            } 
        } 

        if (c->reqtype == REDIS_REQ_INLINE) { 
            if (processInlineBuffer(c) != REDIS_OK) break; 
        } else if (c->reqtype == REDIS_REQ_MULTIBULK) { 
            if (processMultibulkBuffer(c) != REDIS_OK) break; 
        } else { 
            redisPanic("Unknown request type"); 
        } 

        /* Multibulk processing could see a <= 0 length. */ 
        if (c->argc == 0) { 
            resetClient(c); 
        } else { 
            /* Only reset the client when the command was executed. */ 
            if (processCommand(c) == REDIS_OK) 
                resetClient(c); 
        } 
    } 
}


  Inline请求协议

  Networking.c:679


int processInlineBuffer(redisClient *c) { 
    ... 
}


  Multibulk请求协议

  Multibulk协议比inline协议复杂,它是二进制安全的,即传送数据可以包含不安全字符。Inline协议不是二进制安全的,比如,如果set key value命令中的key或value包含空白字符,那么inline协议解析时将会失败,因为解析出来的参数个数与命令需要的的参数个数会不一致。

  协议格式


* CR LF 
$ CR LF 
 CR LF 
... 
$ CR LF 
 CR LF


  协议举例


*3 
$3 
SET 
$5 
mykey 
$7 
myvalue


  具体解析代码位于

  Networking.c:731


int processMultibulkBuffer(redisClient *c) { 
... 
}


  详细解析见协议详解

  处理命令

  当协议解析完毕,则表示客户端的命令输入已经全部读取并已经解析成功,接下来就是执行客户端命令前的准备和执行客户端传送过来的命令

  Redis.c:1062


/* If this function gets called we already read a whole 
 * command, argments are in the client argv/argc fields. 
 * processCommand() execute the command or prepare the 
 * server for a bulk read from the client. 
 * 
 * If 1 is returned the client is still alive and valid and 
 * and other operations can be performed by the caller. Otherwise 
 * if 0 is returned the client was destroied (i.e. after QUIT). */ 
int processCommand(redisClient *c) { 
... 
 /* Now lookup the command and check ASAP about trivial error conditions 
  * such as wrong arity, bad command name and so forth. */ 
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); 
... 
call(c); 
... 
}


  Redi.c:72


struct redisCommand *commandTable; 
struct redisCommand readonlyCommandTable[] = { 
{"get",getCommand,2,0,NULL,1,1,1}, 
... 
}


  回复请求

  回复请求位于对应的命令中,以get命令为例

  T_string.c:67


void getCommand(redisClient *c) { 
    getGenericCommand(c); 
}


  T_string.c:52


int getGenericCommand(redisClient *c) { 
    robj *o; 

    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL) 
        return REDIS_OK; 

    if (o->type != REDIS_STRING) { 
        addReply(c,shared.wrongtypeerr); 
        return REDIS_ERR; 
    } else { 
        addReplyBulk(c,o); 
        return REDIS_OK; 
    } 
}


  绑定写数据的回调函数

  接下来看看addReply函数里的内容

  Networking.c:190


void addReply(redisClient *c, robj *obj) { 
    if (_installWriteEvent(c) != REDIS_OK) return; 
    ... 
}


  Networking.c:64


int _installWriteEvent(redisClient *c) { 
    if (c->fd <= 0) return REDIS_ERR; 
    if (c->bufpos == 0 && listLength(c->reply) == 0 && 
        (c->replstate == REDIS_REPL_NONE || 
         c->replstate == REDIS_REPL_ONLINE) && 
        aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, 
        sendReplyToClient, c) == AE_ERR) return REDIS_ERR; 
    return REDIS_OK; 
}


  准备写的数据内容

    addReply函数一进来后就绑定写数据的回调函数,接下来就是准备写的数据内容

  Networking.c:190


void addReply(redisClient *c, robj *obj) { 
    if (_installWriteEvent(c) != REDIS_OK) return; 
    redisAssert(!server.vm_enabled || obj->storage == REDIS_VM_MEMORY); 

    /* This is an important place where we can avoid copy-on-write 
     * when there is a saving child running, avoiding touching the 
     * refcount field of the object if it's not needed. 
     * 
     * If the encoding is RAW and there is room in the static buffer 
     * we'll be able to send the object to the client without 
     * messing with its page. */ 
    if (obj->encoding == REDIS_ENCODING_RAW) { 
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK) 
            _addReplyObjectToList(c,obj); 
    } else { 
        /* FIXME: convert the long into string and use _addReplyToBuffer() 
         * instead of calling getDecodedObject. As this place in the 
         * code is too performance critical. */ 
        obj = getDecodedObject(obj); 
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK) 
            _addReplyObjectToList(c,obj); 
        decrRefCount(obj); 
    } 
}


  给客户端答复数据

  在绑定写数据的回调函数中看到绑定了回调函数sendReplyToClient,现在来看看这个函数的主要内容

  Networking.c:566


void sendReplyToClient(aeEventLoop *el, int fd, ...) { 
    ... 
while(c->bufpos > 0 || listLength(c->reply)) { 
    ... 
    if(c->bufpos > 0){ 
        ... 
            nwritten=write(fd,...,c->bufpos-c->sentlen); 
            ... 
        } else { 
            o = listNodeValue(listFirst(c->reply)); 
            ... 
            nwritten=write(fd,...,objlen-c->sentlen); 
            ... 
        } 
    } 
}


  退出

  Redis 服务器的退出是通过shutdown命令来退出的,退出前会做一系列的清理工作

  Db.c:347


void shutdownCommand(redisClient *c) { 
    if (prepareForShutdown() == REDIS_OK) 
        exit(0); 
    addReplyError(c,"Errors trying to SHUTDOWN. Check logs."); 
}


  总结

  框架从启动,接收请求,读取客户端数据,请求协议解析,处理命令,回复请求,退出对redis运行的整个流程做了一个梳理。对整个redis的运作和框架有了一个初步的了解。

上一篇:Redis源码简要分析
下一篇:typedef 函数指针的用法