LibEvent代码阅读--多缓冲区和零拷贝技术

1440阅读 0评论2015-02-12 qxhgd
分类:LINUX

最近项目赶着发版本,连续好几天通宵赶工,瓶颈出现在了性能问题,各种方案验证和修订。周末闲下来还是对之前学习的libevent进行总结。
在libevent的报文收发处理过程中采用了一系列提高收发性能的技术,其中多缓冲区的接收和发送以及零拷贝技术等,本篇主要分析这些技术在libevent中的运用。

首先简要的介绍一下两种技术:
多缓存的收据发送和接收:

点击(此处)折叠或打开

  1. struct iovec {
  2.     ptr_t iov_base; /* Starting address */
  3.     size_t iov_len; /* Length in bytes */
  4. };

  5. int readv(int fd, const struct iovec *vector, int count);
  6. int writev(int fd, const struct iovec *vector, int count);
其中的iovec是指一个缓冲区,包含了数据区的地址和对应的长度,在两个函数中的count是指iovec的个数。这种多缓冲区的发送和接收操作性能相对更好。而iovec需要在使用前分配好相关的内存空间。

零拷贝技术:
零拷贝能够减少数据之间无效的数据拷贝,而直接进行数据的发送,通常在数据发送的过程中使用,特别是在文件发送的过程中被经常使用。通常情况下要将文件a.txt中的内容发送出去,需要进行如下的操作:
读取文件内容: read(),然后发送读取的内容send()。因此一个完整的过程会出现一个读取再发送的操作,往往文件的IO操作是相对费时的操作,因此零拷贝技术实际上就是较少了read()的处理过程,即在发送数据前不需要进行文件的读取操作,这样相对而言就会提高处理的性能。关于零拷贝的技术有很多方式,这里主要介绍sendfile和mmap.
其中的mmap是采用映射的方式将文件内容映射到内存中,在发送报文时直接读取内存中的内容,这样就能提高发送效率。
sendfile则是直接将读取到文件fd的内容发送到输出的fd中,也不需要文件读取的过程,性能也会提高。
以上两种处理方式实际上都是内核协助完成。

点击(此处)折叠或打开

  1. void *mmap(void *start,size_t length,int prot,int flags,int fd,off_t offsize);

  2. ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);
Libevent中多缓存的分析
关于多缓冲区的数据发送和接收主要是在报文的接收和发送过程中使用,多缓冲区减少了调用send和recv的次数。

点击(此处)折叠或打开

  1. /* 如果支持多缓冲区的写操作 */
  2. int
  3. evbuffer_write_iovec(struct evbuffer *buffer, evutil_socket_t fd,
  4.     ev_ssize_t howmuch)
  5. {
  6.     IOV_TYPE iov[NUM_WRITE_IOVEC];
  7.     struct evbuffer_chain *chain = buffer->first;
  8.     int n, i = 0;

  9.     if (howmuch < 0)
  10.         return -1;

  11.     ASSERT_EVBUFFER_LOCKED(buffer);
  12.     /* XXX make this top out at some maximal data length? if the
  13.      * buffer has (say) 1MB in it, split over 128 chains, there's
  14.      * no way it all gets written in one go. */
  15.      /* 从evbuffer中将对应的数据拷贝出来 */
  16.     while (chain != NULL && i < NUM_WRITE_IOVEC && howmuch) {
  17. #ifdef USE_SENDFILE
  18.         /* we cannot write the file info via writev */
  19.         if (chain->flags & EVBUFFER_SENDFILE)
  20.             break;
  21. #endif
  22.         /*iov[i].iov_base XXX的含义*/
  23.         iov[i].IOV_PTR_FIELD = (void *) (chain->buffer + chain->misalign);
  24.         if ((size_t)howmuch >= chain->off) {
  25.             /* XXXcould be problematic when windows supports mmap*/
  26.             /*iov[i++].iov_len是指长度*/
  27.             iov[i++].IOV_LEN_FIELD = (IOV_LEN_TYPE)chain->off;
  28.             howmuch -= chain->off;
  29.         } else {
  30.             /* XXXcould be problematic when windows supports mmap*/
  31.             iov[i++].IOV_LEN_FIELD = (IOV_LEN_TYPE)howmuch;
  32.             break;
  33.         }
  34.         chain = chain->next;
  35.     }
  36.     if (! i)
  37.         return 0;
  38.     /* 采用多缓冲区发送,因此fd应该是发送的接口 */
  39.     n = writev(fd, iov, i);

  40.     return (n);
  41. }
接收报文的的处理过程:

点击(此处)折叠或打开

  1. int
  2. evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
  3. {
  4.     struct evbuffer_chain **chainp;
  5.     int n;
  6.     int result;

  7.     int nvecs, i, remaining;

  8.     EVBUFFER_LOCK(buf);

  9.     if (buf->freeze_end) {
  10.         result = -1;
  11.         goto done;
  12.     }

  13.     n = get_n_bytes_readable_on_socket(fd);
  14.     if (n <= 0 || n > EVBUFFER_MAX_READ)
  15.         n = EVBUFFER_MAX_READ;
  16.     if (howmuch < 0 || howmuch > n) /* 读写长度 */
  17.         howmuch = n;

  18.     /* Since we can use iovecs, we're willing to use the last
  19.      * NUM_READ_IOVEC chains. */
  20.     if (_evbuffer_expand_fast(buf, howmuch, NUM_READ_IOVEC) == -1) {
  21.         result = -1;
  22.         goto done;
  23.     } else {
  24.         IOV_TYPE vecs[NUM_READ_IOVEC];
  25.         /* 实际是完成数据空间的预分配,即vecs空间的分配,4个vecs的空间,chainp是缓冲区的开始地址 */
  26.         nvecs = _evbuffer_read_setup_vecs(buf, howmuch, vecs,
  27.          NUM_READ_IOVEC, &chainp, 1);

  28.         /* 调用readv,采用多缓冲区的读写方式,linux的高级套接字,n是实际返回的长度 */
  29.         n = readv(fd, vecs, nvecs);
  30.     }

  31.     if (n == -1) {
  32.         result = -1;
  33.         goto done;
  34.     }
  35.     if (n == 0) {
  36.         result = 0;
  37.         goto done;
  38.     }

  39.     remaining = n;
  40.     /* nvecs是指多个缓冲区,但是不一定有那么多的数据 */
  41.     for (i=0; i < nvecs; ++i) {
  42.         /* 获取chain的长度 */
  43.         ev_ssize_t space = (ev_ssize_t) CHAIN_SPACE_LEN(*chainp);
  44.         if (space < remaining) { /* 若长度不够 */
  45.             (*chainp)->off += space; /* 则当前chain的内存使用完毕 */
  46.             remaining -= (int)space; /* 剩下的内存空间 */
  47.         } else {
  48.             (*chainp)->off += remaining; /* 当前空间已经足够 */
  49.             buf->last_with_datap = chainp;
  50.             break;
  51.         }
  52.         chainp = &(*chainp)->next;
  53.     }

  54.     /* 更新当前实际的有效长度 */
  55.     buf->total_len += n;
  56.     buf->n_add_for_cb += n;

  57.     /* Tell someone about changes in this buffer */
  58.     evbuffer_invoke_callbacks(buf);
  59.     result = n;
  60. done:
  61.     EVBUFFER_UNLOCK(buf);
  62.     return result;
  63. }
设置缓存队列的过程如下所示:

点击(此处)折叠或打开

  1. int
  2. _evbuffer_read_setup_vecs(struct evbuffer *buf, ev_ssize_t howmuch,
  3.     struct evbuffer_iovec *vecs, int n_vecs_avail,
  4.     struct evbuffer_chain ***chainp, int exact)
  5. {
  6.     struct evbuffer_chain *chain;
  7.     struct evbuffer_chain **firstchainp;
  8.     size_t so_far;
  9.     int i;
  10.     ASSERT_EVBUFFER_LOCKED(buf);

  11.     if (howmuch < 0)
  12.         return -1;

  13.     so_far = 0;
  14.     /* Let firstchain be the first chain with any space on it */
  15.     /* 从当前有数据的位置保存 */
  16.     firstchainp = buf->last_with_datap;
  17.     if (CHAIN_SPACE_LEN(*firstchainp) == 0) {
  18.         firstchainp = &(*firstchainp)->next;
  19.     }

  20.     chain = *firstchainp;
  21.     for (i = 0; i < n_vecs_avail && so_far < (size_t)howmuch; ++i) {
  22.         /* 获取当前chain可用的内存大小 */
  23.         size_t avail = (size_t) CHAIN_SPACE_LEN(chain);
  24.         if (avail > (howmuch - so_far) && exact) /* 当前内存足够存放 */
  25.             avail = howmuch - so_far;
  26.         /* vecs的基地址 */
  27.         vecs[i].iov_base = CHAIN_SPACE_PTR(chain);
  28.         vecs[i].iov_len = avail;
  29.         so_far += avail;
  30.         
  31.         chain = chain->next;
  32.     }

  33.     *chainp = firstchainp;
  34.     return i;
  35. }
在接收的过程中只需要设置好对应的缓存区大小以及对应的缓存地址,然后调用readv进行报文的接收。

Libevent中零拷贝的实现
关于零拷贝技术在libevent中主要体现在用于文件发送的过程中,提供了发送文件内容的接口:

点击(此处)折叠或打开

  1. int
  2. evbuffer_add_file(struct evbuffer *outbuf, int fd,
  3.     ev_off_t offset, ev_off_t length)
其中outbuf主要用于输出的缓存,fd是指需要被发送的文件描述符,offset是文件的偏移量,length是指需要发送的长度,其中outbuf是在bufferevent中的output。

点击(此处)折叠或打开

  1. int
  2. evbuffer_add_file(struct evbuffer *outbuf, int fd,
  3.     ev_off_t offset, ev_off_t length)
  4. {
  5. #if defined(USE_SENDFILE) || defined(_EVENT_HAVE_MMAP)
  6.     struct evbuffer_chain *chain;
  7.     struct evbuffer_chain_fd *info;
  8. #endif
  9. #if defined(USE_SENDFILE)
  10.     int sendfile_okay = 1;
  11. #endif
  12.     int ok = 1;

  13. #if defined(USE_SENDFILE)
  14.     if (use_sendfile) {
  15.         EVBUFFER_LOCK(outbuf);
  16.         sendfile_okay = outbuf->flags & EVBUFFER_FLAG_DRAINS_TO_FD;
  17.         EVBUFFER_UNLOCK(outbuf);
  18.     }

  19.     if (use_sendfile && sendfile_okay) {
  20.         /* 1K大小,分配一个chain */
  21.         chain = evbuffer_chain_new(sizeof(struct evbuffer_chain_fd));
  22.         if (chain == NULL) {
  23.             event_warn("%s: out of memory", __func__);
  24.             return (-1);
  25.         }
  26.         
  27.      /* 设置chain的属性 */
  28.         chain->flags |= EVBUFFER_SENDFILE | EVBUFFER_IMMUTABLE;
  29.         chain->buffer = NULL;    /* no reading possible,在sendfile中不需要buffer */
  30.         chain->buffer_len = length + offset;
  31.         chain->off = length;
  32.         chain->misalign = offset;

  33.         info = EVBUFFER_CHAIN_EXTRA(struct evbuffer_chain_fd, chain);
  34.         /* 避免拷贝数据,保存的是文件的fd,后面获取该fd进行发送操作 */
  35.         info->fd = fd;

  36.         EVBUFFER_LOCK(outbuf);
  37.         if (outbuf->freeze_end) {
  38.             mm_free(chain);
  39.             ok = 0;
  40.         } else {
  41.             /* 添加到output吧 */
  42.             outbuf->n_add_for_cb += length;
  43.             evbuffer_chain_insert(outbuf, chain);
  44.         }
  45.     } else
  46. #endif
  47. #if defined(_EVENT_HAVE_MMAP)
  48.     if (use_mmap) {
  49.         /* 内存映射,将文件映射到内存中,采用mmap的方式减少内存拷贝 */
  50.         void *mapped = mmap(NULL, length + offset, PROT_READ,
  51. #ifdef MAP_NOCACHE
  52.          MAP_NOCACHE |
  53. #endif
  54. #ifdef MAP_FILE
  55.          MAP_FILE |
  56. #endif
  57.          MAP_PRIVATE,
  58.          fd, 0);
  59.         /* some mmap implementations require offset to be a multiple of
  60.          * the page size. most users of this api, are likely to use 0
  61.          * so mapping everything is not likely to be a problem.
  62.          * TODO(niels): determine page size and round offset to that
  63.          * page size to avoid mapping too much memory.
  64.          */
  65.         if (mapped == MAP_FAILED) {
  66.             event_warn("%s: mmap(%d, %d, %zu) failed",
  67.              __func__, fd, 0, (size_t)(offset + length));
  68.             return (-1);
  69.         }

  70.         /* 将需要处理的报文压缩为一个chain */
  71.         chain = evbuffer_chain_new(sizeof(struct evbuffer_chain_fd));
  72.         if (chain == NULL) {
  73.             event_warn("%s: out of memory", __func__);
  74.             munmap(mapped, length);
  75.             return (-1);
  76.         }

  77.         chain->flags |= EVBUFFER_MMAP | EVBUFFER_IMMUTABLE;
  78.         chain->buffer = mapped;  //映射后的内存地址,发送报文时使用该地址
  79.         chain->buffer_len = length + offset;
  80.         chain->off = length + offset;

  81.         info = EVBUFFER_CHAIN_EXTRA(struct evbuffer_chain_fd, chain);
  82.         /* 需要处理的文件,实际上将需要拷贝的文件 */
  83.         info->fd = fd;

  84.         EVBUFFER_LOCK(outbuf);
  85.         if (outbuf->freeze_end) {
  86.             info->fd = -1;
  87.             evbuffer_chain_free(chain);
  88.             ok = 0;
  89.         } else {
  90.             outbuf->n_add_for_cb += length;

  91.             evbuffer_chain_insert(outbuf, chain);

  92.             /* we need to subtract whatever we don't need */
  93.             evbuffer_drain(outbuf, offset);
  94.         }
  95.     } else /* 在以上几种技术都不支持的情况下,采用传统的先拷贝,然后再发送的方式 */
  96. #endif
  97.     { /* 普通的处理方式,先拷贝再发送 */
  98.         /* the default implementation */
  99.         struct evbuffer *tmp = evbuffer_new();
  100.         ev_ssize_t read;

  101.         if (tmp == NULL)
  102.             return (-1);

  103.         if (lseek(fd, offset, SEEK_SET) == -1) {
  104.             evbuffer_free(tmp);
  105.             return (-1);
  106.         }

  107.         /* we add everything to a temporary buffer, so that we
  108.          * can abort without side effects if the read fails.
  109.          */
  110.         while (length) { /* 实际是将数据拷贝到evbuffer中 */
  111.             read = evbuffer_readfile(tmp, fd, (ev_ssize_t)length);
  112.             if (read == -1) {
  113.                 evbuffer_free(tmp);
  114.                 return (-1);
  115.             }

  116.             length -= read;
  117.         }

  118.         EVBUFFER_LOCK(outbuf);
  119.         if (outbuf->freeze_end) {
  120.             evbuffer_free(tmp);
  121.             ok = 0;
  122.         } else {
  123.          /* 将读出来的buffer添加到输出的buffer中 */
  124.             evbuffer_add_buffer(outbuf, tmp);
  125.             evbuffer_free(tmp);

  126.             /* 关闭文件操作 */
  127.             close(fd);
  128.         }
  129.     }

  130.     if (ok)
  131.         evbuffer_invoke_callbacks(outbuf);
  132.     EVBUFFER_UNLOCK(outbuf);

  133.     return ok ? 0 : -1;
  134. }
从上面的代码中可知,在支持sendfile的系统中采用了sendfile的方式,在支持mmap的系统中采用mmap的方式,而都不支持则直接进行普通的处理,先拷贝再发送。

在上述不同的三种处理中,都是创建新的buffer_chain,新的内存块,然后插入到outbuf中,但是新的内存块中的零拷贝技术的内容不一样。普通情况下都是将对应的报文内容填充到一系列的buffer_chain。而在零拷贝中chain中保存的是evbuffer_chain_fd,该结构体中实际只是包含了文件的fd。但mmap中还保存了映射内存的起始地址和长度。而sendfile中则无数据相关的处理。
关于sendfile的具体发送操作如下所示:

点击(此处)折叠或打开

  1. /* 如果支持sendfile,一种零拷贝的技术 */
  2. #ifdef USE_SENDFILE
  3. static inline int
  4. evbuffer_write_sendfile(struct evbuffer *buffer, evutil_socket_t fd,
  5.     ev_ssize_t howmuch)
  6. {
  7.     struct evbuffer_chain *chain = buffer->first;
  8.     /* 采用fd的方式保存在chain的头部之后 */
  9.     struct evbuffer_chain_fd *info =
  10.      EVBUFFER_CHAIN_EXTRA(struct evbuffer_chain_fd, chain);

  11.     ev_ssize_t res;
  12.     off_t offset = chain->misalign;

  13.     ASSERT_EVBUFFER_LOCKED(buffer);
  14.     /* TODO(niels): implement splice */
  15.     /* 发送数据到fd,也就是将文件中的内容发送出去 */
  16.     res = sendfile(fd, info->fd, &offset, chain->off);
  17.     if (res == -1 && EVUTIL_ERR_RW_RETRIABLE(errno)) {
  18.         /* if this is EAGAIN or EINTR return 0; otherwise, -1 */
  19.         return (0);
  20.     }
  21.     return (res);
  22. }
上述的操作避免的文件到内存的拷贝,能够提高数据发送的效率。

而关于mmap的处理,加载内存之后还是需要按照常规的发送进行处理,只是减少了从文件到内存的拷贝过程。



上一篇:linux笔记 - 内核配置与编译
下一篇:_IO, _IOR, _IOW, _IOWR 宏的用法与解析