redis源码阅读之aof

11210阅读 0评论2019-05-27 stolennnxb
分类:NOSQL

本来今天想写点别的,但是心想之前一篇既然已经提到持久化这边了,而且也说了之后会讲到,索性,今天就说说这个。
所谓持久化,就是将内存中的内容同步到磁盘当中,redis提供了两种持久化机制:aof和rdb。今天的主角是aof。
aof持久化将被执行的命令写到AOF的末尾,以此来记录数据发生的变化,它一共有三个配置选项,在redis.conf这个配置文件中,具体如下:

点击(此处)折叠或打开

  1. #
  2. # If unsure, use "everysec".

  3. # appendfsync always
  4. appendfsync everysec
  5. # appendfsync no
其意义也比较明显了,就不再多说了。
redis开启aof的开关代码如下:

点击(此处)折叠或打开

  1. /* Called when the user switches from "appendonly no" to "appendonly yes"
  2.  * at runtime using the CONFIG command. */
  3. int startAppendOnly(void) {
  4.     char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
  5.     int newfd;

  6.     newfd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644);
  7.     serverAssert(server.aof_state == AOF_OFF);
  8.     if (newfd == -1) {
  9.         char *cwdp = getcwd(cwd,MAXPATHLEN);

  10.         serverLog(LL_WARNING,
  11.             "Redis needs to enable the AOF but can't open the "
  12.             "append only file %s (in server root dir %s): %s",
  13.             server.aof_filename,
  14.             cwdp ? cwdp : "unknown",
  15.             strerror(errno));
  16.         return C_ERR;
  17.     }
  18.     if (server.rdb_child_pid != -1) {
  19.         server.aof_rewrite_scheduled = 1;
  20.         serverLog(LL_WARNING,"AOF was enabled but there is already a child process saving an RDB file on disk. An AOF background was scheduled to start when possible.");
  21.     } else {
  22.         /* If there is a pending AOF rewrite, we need to switch it off and
  23.          * start a new one: the old one cannot be reused becuase it is not
  24.          * accumulating the AOF buffer. */
  25.         if (server.aof_child_pid != -1) {
  26.             serverLog(LL_WARNING,"AOF was enabled but there is already an AOF rewriting in background. Stopping background AOF and starting a rewrite now.");
  27.             killAppendOnlyChild();
  28.         }
  29.         if (rewriteAppendOnlyFileBackground() == C_ERR) {
  30.             close(newfd);
  31.             serverLog(LL_WARNING,"Redis needs to enable the AOF but can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.");
  32.             return C_ERR;
  33.         }
  34.     }
  35.     /* We correctly switched on AOF, now wait for the rewrite to be complete
  36.      * in order to append data on disk. */
  37.     server.aof_state = AOF_WAIT_REWRITE;
  38.     server.aof_last_fsync = server.unixtime;
  39.     server.aof_fd = newfd;
  40.     return C_OK;
  41. }
aof写的代码如下:

点击(此处)折叠或打开

  1. ssize_t aofWrite(int fd, const char *buf, size_t len) {
  2.     ssize_t nwritten = 0, totwritten = 0;

  3.     while(len) {
  4.         nwritten = write(fd, buf, len);

  5.         if (nwritten < 0) {
  6.             if (errno == EINTR) {
  7.                 continue;
  8.             }
  9.             return totwritten ? totwritten : -1;
  10.         }

  11.         len -= nwritten;
  12.         buf += nwritten;
  13.         totwritten += nwritten;
  14.     }

  15.     return totwritten;
  16. }
将aof的缓冲区(将aof选项置为 everysec的时候,中间会有缓冲),写入磁盘的代码如下,其中需要判断后台是否有fsync正在执行(如果正在执行,会阻塞write调用),如果有,则会延迟,但是如果force参数被设置的话,就啥都不管不顾了,直接开整~

点击(此处)折叠或打开

  1. #define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
  2. void flushAppendOnlyFile(int force) {
  3.     ssize_t nwritten;
  4.     int sync_in_progress = 0;
  5.     mstime_t latency;

  6.     if (sdslen(server.aof_buf) == 0) return;

  7.     if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
  8.         sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;//bio有讲过

  9.     if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
  10.         /* With this append fsync policy we do background fsyncing.
  11.          * If the fsync is still in progress we can try to delay
  12.          * the write for a couple of seconds. */
  13.         if (sync_in_progress) {
  14.             if (server.aof_flush_postponed_start == 0) {
  15.                 /* No previous write postponing, remember that we are
  16.                  * postponing the flush and return. */
  17.                 server.aof_flush_postponed_start = server.unixtime;
  18.                 return;
  19.             } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
  20.                 /* We were already waiting for fsync to finish, but for less
  21.                  * than two seconds this is still ok. Postpone again. */
  22.                 return;
  23.             }
  24.             /* Otherwise fall trough, and go write since we can't wait
  25.              * over two seconds. */
  26.             server.aof_delayed_fsync++;
  27.             serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
  28.         }
  29.     }
  30.     /* We want to perform a single write. This should be guaranteed atomic
  31.      * at least if the filesystem we are writing is a real physical one.
  32.      * While this will save us against the server being killed I don't think
  33.      * there is much to do about the whole server stopping for power problems
  34.      * or alike */

  35.     latencyStartMonitor(latency);
  36.     nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
  37.     latencyEndMonitor(latency);
  38.     /* We want to capture different events for delayed writes:
  39.      * when the delay happens with a pending fsync, or with a saving child
  40.      * active, and when the above two conditions are missing.
  41.      * We also use an additional event name to save all samples which is
  42.      * useful for graphing / monitoring purposes. */
  43.     if (sync_in_progress) {
  44.         latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
  45.     } else if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) {
  46.         latencyAddSampleIfNeeded("aof-write-active-child",latency);
  47.     } else {
  48.         latencyAddSampleIfNeeded("aof-write-alone",latency);
  49.     }
  50.     latencyAddSampleIfNeeded("aof-write",latency);

  51.     /* We performed the write so reset the postponed flush sentinel to zero. */
  52.     server.aof_flush_postponed_start = 0;

  53.     if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
  54.         static time_t last_write_error_log = 0;
  55.         int can_log = 0;

  56.         /* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
  57.         if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
  58.             can_log = 1;
  59.             last_write_error_log = server.unixtime;
  60.         }

  61.         /* Log the AOF write error and record the error code. */
  62.         if (nwritten == -1) {
  63.             if (can_log) {
  64.                 serverLog(LL_WARNING,"Error writing to the AOF file: %s",
  65.                     strerror(errno));
  66.                 server.aof_last_write_errno = errno;
  67.             }
  68.         } else {
  69.             if (can_log) {
  70.                 serverLog(LL_WARNING,"Short write while writing to "
  71.                                        "the AOF file: (nwritten=%lld, "
  72.                                        "expected=%lld)",
  73.                                        (long long)nwritten,
  74.                                        (long long)sdslen(server.aof_buf));
  75.             }

  76.             if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
  77.                 if (can_log) {
  78.                     serverLog(LL_WARNING, "Could not remove short write "
  79.                              "from the append-only file. Redis may refuse "
  80.                              "to load the AOF the next time it starts. "
  81.                              "ftruncate: %s", strerror(errno));
  82.                 }
  83.             } else {
  84.                 /* If the ftruncate() succeeded we can set nwritten to
  85.                  * -1 since there is no longer partial data into the AOF. */
  86.                 nwritten = -1;
  87.             }
  88.             server.aof_last_write_errno = ENOSPC;
  89.         }

  90.         /* Handle the AOF write error. */
  91.         if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
  92.             /* We can't recover when the fsync policy is ALWAYS since the
  93.              * reply for the client is already in the output buffers, and we
  94.              * have the contract with the user that on acknowledged write data
  95.              * is synced on disk. */
  96.             serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
  97.             exit(1);
  98.         } else {
  99.             /* Recover from failed write leaving data into the buffer. However
  100.              * set an error to stop accepting writes as long as the error
  101.              * condition is not cleared. */
  102.             server.aof_last_write_status = C_ERR;

  103.             /* Trim the sds buffer if there was a partial write, and there
  104.              * was no way to undo it with ftruncate(2). */
  105.             if (nwritten > 0) {
  106.                 server.aof_current_size += nwritten;
  107.                 sdsrange(server.aof_buf,nwritten,-1);
  108.             }
  109.             return; /* We'll try again on the next call... */
  110.         }
  111.     } else {
  112.         /* Successful write(2). If AOF was in error state, restore the
  113.          * OK state and log the event. */
  114.         if (server.aof_last_write_status == C_ERR) {
  115.             serverLog(LL_WARNING,
  116.                 "AOF write error looks solved, Redis can write again.");
  117.             server.aof_last_write_status = C_OK;
  118.         }
  119.     }
  120.     server.aof_current_size += nwritten;

  121.     /* Re-use AOF buffer when it is small enough. The maximum comes from the
  122.      * arena size of 4k minus some overhead (but is otherwise arbitrary). */
  123.     if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
  124.         sdsclear(server.aof_buf);
  125.     } else {
  126.         sdsfree(server.aof_buf);
  127.         server.aof_buf = sdsempty();
  128.     }

  129.     /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
  130.      * children doing I/O in the background. */
  131.     if (server.aof_no_fsync_on_rewrite &&
  132.         (server.aof_child_pid != -1 || server.rdb_child_pid != -1))
  133.             return;

  134.     /* Perform the fsync if needed. */
  135.     if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
  136.         /* aof_fsync is defined as fdatasync() for Linux in order to avoid
  137.          * flushing metadata. */
  138.         latencyStartMonitor(latency);
  139.         aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
  140.         latencyEndMonitor(latency);
  141.         latencyAddSampleIfNeeded("aof-fsync-always",latency);
  142.         server.aof_last_fsync = server.unixtime;
  143.     } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
  144.                 server.unixtime > server.aof_last_fsync)) {
  145.         if (!sync_in_progress) aof_background_fsync(server.aof_fd);
  146.         server.aof_last_fsync = server.unixtime;
  147.     }
  148. }

另一个需要注意的点就是BGREWRITEAOF命令会重写AOF文件,使AOF文件尽可能的小,其中的大部分操作都是尽可能的使用占用空间小的内存类型,在此不再赘述了。在此期间的命令缓存是通过如下机制实现的。
1.  使用多个缓存block而非一整块大缓存,每个block10M,如下所示

点击(此处)折叠或打开

  1. #define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10) /* 10 MB per block */

  2. typedef struct aofrwblock {
  3.     unsigned long used, free;
  4.     char buf[AOF_RW_BUF_BLOCK_SIZE];
  5. } aofrwblock;
2. 向缓存中写数据的时候是先找到当前链表的最后一个元素,若满足,直接写,若不满足,填上空缺,重新建,继续写

点击(此处)折叠或打开

  1. /* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
  2. void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
  3.     listNode *ln = listLast(server.aof_rewrite_buf_blocks);
  4.     aofrwblock *block = ln ? ln->value : NULL;

  5.     while(len) {
  6.         /* If we already got at least an allocated block, try appending
  7.          * at least some piece into it. */
  8.         if (block) {
  9.             unsigned long thislen = (block->free < len) ? block-
上一篇:redis源码阅读之rio
下一篇:redis源码阅读之ae_select