从dpdk1811看virtio1.1 的实现—packed ring
——lvyilong316
virtio1.1已经在新的kernel和dpdk pmd中陆续支持,但是网上关于这一块的介绍却比较少,唯一描述多一点的就是这个ppt: 。但是看ppt这东西总觉得还是不过瘾的。只是模糊的大概理解,但要想看清其本质还是要看代码。这篇文章主要是基于dpdk18.11中的vhost_user来分析virtio1.1具有哪些新特性,已经具体是如何工作的。
virtio1.1 关键的最大改动点就是引入了packed queue,也就是将virtio1.0中的desc ring,avail ring,used ring三个ring打包成一个desc ring了。向对应的,我们将virtio 1.0这种实现方式称之为split ring。我们以vm的接收处理逻辑(vhost_user的发送逻辑)为例分析一下split 和packed方式的区别。
在virtio_dev_rx中有如下实现:
点击(此处)折叠或打开
-
if (vq_is_packed(dev))
-
nb_tx = virtio_dev_rx_packed(dev, vq, pkts, count);
-
else
- nb_tx = virtio_dev_rx_split(dev, vq, pkts, count);
根据后端设备是否支持VIRTIO_F_RING_PACKED这个feature,分别调用packed和split处理函数。我们先回顾了解下我们看下其分别实现的流程。
split方式处理
l virtio_dev_rx_split
点击(此处)折叠或打开
-
static __rte_always_inline uint32_t
-
virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
-
struct rte_mbuf **pkts, uint32_t count)
-
{
-
uint32_t pkt_idx = 0;
-
uint16_t num_buffers;
-
struct buf_vector buf_vec[BUF_VECTOR_MAX];
-
uint16_t avail_head;
-
-
rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
-
avail_head = *((volatile uint16_t *)&vq->avail->idx);
-
-
for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
-
uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
-
uint16_t nr_vec = 0;
-
/* 为拷贝当前mbuf后续预留avail desc */
-
if (unlikely(reserve_avail_buf_split(dev, vq,
-
pkt_len, buf_vec, &num_buffers,
-
avail_head, &nr_vec) < 0)) {
-
vq->shadow_used_idx -= num_buffers;
-
break;
-
}
-
/* 拷贝mbuf到avail desc */
-
if (copy_mbuf_to_desc(dev, vq, pkts[pkt_idx],
-
buf_vec, nr_vec,
-
num_buffers) < 0) {
-
vq->shadow_used_idx -= num_buffers;
-
break;
-
}
-
/* 更新last_avail_idx */
-
vq->last_avail_idx += num_buffers;
-
}
-
/* 小包的批处理拷贝 */
-
do_data_copy_enqueue(dev, vq);
-
-
if (likely(vq->shadow_used_idx)) {
-
flush_shadow_used_ring_split(dev, vq); /* 更新used ring */
-
vhost_vring_call_split(dev, vq); /* 通知前端 */
-
}
-
-
return pkt_idx;
- }
其中涉及三处会和packed方式处理不同的地方,从函数名字我们也能看出,就是带有split的函数,对应一定有packed函数。split收包处理流程这里不再具体展开,下图描述了split相关数据结构。下面重点以这个图为背景大致描述一下guset接收流程。
图中黄色部分表示guest内存,绿色部分表示host内存(非共享内存)。可以看到共享内存主要由三部分也就是三个ring构成:desc ring,avail ring,used ring。这三个ring也是split模式的核心构成。
首先是desc ring,有多个desc chain构成,用来指向存放数据的地址。desc中有个flag,主要有以下几个取值:
点击(此处)折叠或打开
-
/* This marks a buffer as continuing via the next field. */
-
#define VRING_DESC_F_NEXT 1
-
/* This marks a buffer as write-only (otherwise read-only). */
-
#define VRING_DESC_F_WRITE 2
-
/* This means the buffer contains a list of buffer descriptors. */
- #define VRING_DESC_F_INDIRECT 4
其次是avail ring,注意avail->idx 不是desc ring的idx,而是avail->ring的idx,对应的avail->ring[idx]最后一个后端可用的(对前端来说是下一个可用)desc chain的header idx。last_avail_idx也不是desc ring的idx,记录的也是avail->ring的idx,对应的avail->ring[idx]表示上一轮拷贝用到的最后一个desc chain的header idx(avail->ring[idx+1]为本轮拷贝可用的第一个desc chain的header idx)。avail ring中也有一个flag,目前只会取值VRING_AVAIL_F_NO_INTERRUPT,其作用是让guest通过设置这个来告诉后端(如果更新了uesd ring)暂时不用kick前端,作为前端的一个优化。
最后是uesd ring,再讲used ring前要提一下shadow_used ring,shadow_used ring是vhost user为了提高性能分配的一个ring,它和其他三个ring不同,它是host内存,guest是不感知的。仔细观察上图就可以看出shadow_used ring和uesd->ring指向的结构是完全一样的。因为shadow_used ring正是uesd ring的一个暂存的buff。当后端将数据从对应desc chain记录的内存拷贝出之后,这些desc chain的idx和len就需要先暂时记录在shadow_used ring中,等这一批mbuf拷贝完后,再将shadow_used ring一次性拷贝到uesd->ring的对应位置。同样last_used_idx记录的也不是desc ring的idx,而是used->ring的idx,对应used->ring[idx]记录的是上一次后端已经处理好可以给前端释放(对于guest rx来说)的desc chain的header idx。used ring中也有一个flag,目前只会取值VRING_USED_F_NO_NOTIFY,其作用是让host使用这个值告诉前端,当用可用的avail ring时不要kick host,dpdk vhost_user默认会设置这个flag,因为后端采用的是polling模式。
另外值得一提的是,整个guest rx涉及到两次位置的向guest内存拷贝的动作,一个是将mbuf中的数据拷贝到desc中(对应函数copy_mbuf_to_desc),另一处是将shadow_uesd ring拷贝到uesd ring的过程中(对应函数flush_shadow_used_ring_split),所以如果在guest热迁移过程中这两处都会涉及到log_page的相关操作。
关于split ring的其他一些注释:
1. 每个virtqueue由三部分组成:
(1) Descriptor Table
(2) Available Ring
(3) Used Ring
2. Legacy Interfaces
(1)vq需要严格的按照以下顺序和pad 布局
点击(此处)折叠或打开
-
struct virtq {
-
// The actual descriptors (16 bytes each)
-
struct virtq_desc desc[ Queue Size ];
-
// A ring of available descriptor heads with free-running index.
-
struct virtq_avail avail;
-
// Padding to the next Queue Align boundary.
-
u8 pad[ Padding ];
-
// A ring of used descriptor heads with free-running index.
-
struct virtq_used used;
- };
3. avail desc中的ring存放的是desc chain的header desc id。desc的id表示的是下一个可用(对于前端)的desc id;
packed方式处理
前面回顾分析完split的处理方式后下面重点要分析packed的处理方式,这是virtio1.1改变的重点。packed的关键变化是desc ring的变化,为了更好的利用cache和硬件的亲和性(方便硬件实现virtio),将split方式中的三个ring(desc,avail,used)打包成一个packed desc ring。
我们首先看些相对split desc来说packed desc有什么不同。
点击(此处)折叠或打开
-
/*split desc*/
-
struct vring_desc {
-
uint64_t addr; /* Address (guest-physical). */
-
uint32_t len; /* Length. */
-
uint16_t flags; /* The flags as indicated above. */
-
uint16_t next; /* We chain unused descriptors via this. */
-
};
-
/*packed desc*/
-
struct vring_packed_desc {
-
uint64_t addr;
-
uint32_t len;
-
uint16_t id;
-
uint16_t flags;
- };
我们看到addr和len名字和含义保持不变,flags看起来也没有变化,实际上其取值多了几种。下面我们具体分析其变化的原因,以及每个变化字段的含义。
(1) 相对split desc去掉了next字段:我们知道在split desc中next字段是记录一个desc chain中的下一个desc idx使用的,通常配合flags这样使用:
if ((descs[idx].flags & VRING_DESC_F_NEXT) == 1)
nextdesc = descs[ descs[idx].next];
但是在packed desc ring中一个desc chain一定是相邻的(可以理解为链表变为了数组),所以next字段就用不上了,上面获取nextdesc的方式可以转化为如下方式:
if ((descs[idx].flags & VRING_DESC_F_NEXT) == 1)
nextdesc = descs[++idx];
(2) flags字段的变化:相对split desc,flags字段仍然保留,但是其取值增加了,因为要把三个ring合一,每个desc就需要更多的信息表明身份(是used还是avail)。在原有flags的基础上增加了两个flag:
#define VRING_DESC_F_AVAIL (1ULL << 7)
#define VRING_DESC_F_USED (1ULL << 15)
关于这两个flag如何使用后面再分析。
(3) 相对split desc增加了id字段:这个id比较特殊,他是buffer id,注意不是desc的下标idx。那么这个buffer又是个什么含义呢?其实可用理解为前端guest维护的一个mbuf数组,这个buffer就是这个数组的idx,用来发送或接受数据。为了更准确描述buffer id的来历,我们看下前端是如果将一个avail buffer关联到一个desc的:
对每个(foreach)将要发送的buffer, b:
1.从desc ring中获取到下一个可用的desc,d;
2.获取下一个可用的buffer id;
3.设置d.addr的值为b的数据起始物理地址;
4.设置d.len的值为b的数据长度;
5.设置d.id为buffer id;
6.采用如下方式生成desc的flag:
(a)如果b是后端可写的,则设置VIRTQ_DESC_F_WRITE,否则不设置;
(b)按照avail ring的Wrap Counter值设置VIRTQ_DESC_F_AVAIL;
(c)按照avail ring 的Wrap Counter值取反设置VIRTQ_DESC_F_USED;
7. 调用一下memory barrier确保desc已经被初始化;
8.设置d.flags为刚刚生成的flag;
9.如果d是avail ring的最后一个desc,则对Wrap Counter进行翻转;
10.否则增加d指向下一个desc;
附上伪代码实现:
点击(此处)折叠或打开
-
/* Note: vq->avail_wrap_count is initialized to 1 */
-
/* Note: vq->sgs is an array same size as the ring */
-
id = alloc_id(vq);
-
first = vq->next_avail;
-
sgs = 0;
-
for (each buffer element b) {
-
sgs++;
-
vq->ids[vq->next_avail] = -1;
-
vq->desc[vq->next_avail].address = get_addr(b);
-
vq->desc[vq->next_avail].len = get_len(b);
-
avail = vq->avail_wrap_count ? VIRTQ_DESC_F_AVAIL : 0;
-
used = !vq->avail_wrap_count ? VIRTQ_DESC_F_USED : 0;
-
f = get_flags(b) | avail | used;
-
if (b is not the last buffer element) {
-
f |= VIRTQ_DESC_F_NEXT;
-
}
-
/* Don't mark the 1st descriptor available until all of them are ready. */
-
if (vq->next_avail == first) {
-
flags = f;
-
} else {
-
vq->desc[vq->next_avail].flags = f;
-
}
-
last = vq->next_avail;
-
vq->next_avail++;
-
if (vq->next_avail >= vq->size) {
-
vq->next_avail = 0;
-
vq->avail_wrap_count \^= 1;
-
}
-
}
-
vq->sgs[id] = sgs;
-
/* ID included in the last descriptor in the list */
-
vq->desc[last].id = id;
-
write_memory_barrier();
-
vq->desc[first].flags = flags;
-
memory_barrier();
-
if (vq->device_event.flags != RING_EVENT_FLAGS_DISABLE) {
-
notify_device(vq);
- }
注意上面实现的一个细节:当需要传递多个buffer的时候,第一个desc的flag是延时到最后更新的,这样可以减少memory_barrier调用次数,一次调用确保之后的desc都已经正常初始化了(为什么最后更新flag,以及要调用memory_barrier呢?因为后端是以flag判断desc是否可以使用,所以需要确保flag设置时其他字段以及被正确设置写入内存)。最后再附一张desc ring的图。
另外注意,由于avail 和uesd都统一到了desc中,但是并不是每个字段都是必须的。avail ring和used ring是如何体现的?
packed把三个ring进行了整合,但virtio的本质思想并没有变化,整个数据传输还是avail和used共同作用完成的,所以三ring合一仅仅是形式的变化,avail ring和used ring并没有消失。那么自然就有一个问题:avail ring和used ring是如何体现的?
回答这个问题前,我们先看一下virtio的vq为了支持packed发生的一些变化。
点击(此处)折叠或打开
-
struct vhost_virtqueue {
-
…
-
bool used_wrap_counter;
-
bool avail_wrap_counter;
-
…
- }
这两个wrap_counter分别对应avail ring和used ring,packed方式正式通过这两个bool型变量以及前面提到的packed desc新增的两个flag完成avail和uesd的区分的。
首先这两个wrap_counter在初始化队列的时候都被初始化为1;
对于avail ring,当使用了最后一个desc时则将avail_wrap_counter进行翻转(0变为1,1变为0),然后再从第一个开始;对于uesd ring,当使用了最后一个desc时将used_wrap_counter进行翻转,然后再从第一个开始。
有了上面的前提就可以说明avail desc和used desc是如果表示的了:
avail desc:当desc flags关于VRING_DESC_F_AVAIL的设置和avail_wrap_counter同步,且VRING_DESC_F_USED的设置和avail_wrap_counter相反时,表示desc为avail desc。例如avail_wrap_counter为1时,flags应该设置VRING_DESC_F_AVAIL|~VRING_DESC_F_USED,当avail_wrap_counter为0时,flags应该设置~VRING_DESC_F_AVAIL|VRING_DESC_F_USED。
used desc:当desc flags关于VRING_DESC_F_USED的设置和used_wrap_counter同步,且VRING_DESC_F_AVAIL的设置也和used_wrap_counter同步时,表示desc为used desc。例如used_wrap_counter为1时,flags应该设置VRING_DESC_F_AVAIL|VRING_DESC_F_USED,当used_wrap_counter为0时,flags应该设置~VRING_DESC_F_AVAIL|~VRING_DESC_F_USED。
综上可以看出,avail desc的两个flag总是相反的(只能设置一个),而used desc的两个flag总是相同的,要么都设置,要么都不设置。
看到这里可能有人会奇怪,为什么要搞得这么麻烦呢?仅仅通过两个flags应该也可以区分出是uesd还是avail吧。那我们看下面这个图,以avail desc为例,假如仅仅靠VRING_DESC_F_AVAIL|~VRING_DESC_F_USED就表示avail desc:
图中情况表示当前avail ring满了,没有uesddesc,这个时候如果后端处理完最后一个avail desc,回绕到第一个avail desc时,就无法区分这个avail desc是新的avail desc还是已经处理过的desc。而如果结合avail_wrap_counter就很好处理了,假如本轮其值为1,则遍历到最后一个avail desc时avail_wrap_counter要被置零了,再继续遍历到第一个desc时判断是avail desc的标准就变为了~USED|AVAIL,所以第一个desc就不满足条件了。
所以我们看出引入wrap_counter的作用主要是为了解决desc ring回绕问题。在split方式中,由于对于avail ring有avail->idx存放当前最后一个可用avail desc的位置,对于uesd ring有used->idx存放最后一个可用的uesd desc位置,而packed方式中三ring合一,不再有这样一个变量表示ring的结束位置,所以才引入了这么个机制。
关于packed ring的其他一些注释:
1. Packed virtqueues支持2^15 entries;
2. 每个packed virtqueue 有三部分构成:
(1)Descriptor Ring
(2)Driver Event Suppression:后端(device)只读,用来控制后端向前端(driver)的通知(used notifications)
(3)Device Event Suppression:前端(driver)只读,用来控制前端向后端(device)的通知(avail notifications)
3. Write Flag,VIRTQ_DESC_F_WRITE
(1)对于avail desc这个flag用来标记其关联的buffer是只读的还是只写的;
(2)对于used desc这个flag用来表示去关联的buffer是否有被后端(device)写入数据;
4. desc中的len
(1)对于avail desc,len表示desc关联的buffer中被写入的数据长度;
(2)对于uesd desc,当VIRTQ_DESC_F_WRITE被设置时,len表示后端(device)写入数据的长度,当VIRTQ_DESC_F_WRITE没有被设置时,len没有意义;
5. Descriptor Chain
buffer id包含在desc chain的最后一个desc中,另外,VIRTQ_DESC_F_NEXT在used desc中是没有意义的。
好了,说了这么多我们大概对packed的实现原理清楚了,那么接下来就看下具体实现,还是以vm收包方向的后端处理逻辑为例。
l virtio_dev_rx_packed
点击(此处)折叠或打开
-
static __rte_always_inline uint32_t
-
virtio_dev_rx_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
-
struct rte_mbuf **pkts, uint32_t count)
-
{
-
uint32_t pkt_idx = 0;
-
uint16_t num_buffers;
-
struct buf_vector buf_vec[BUF_VECTOR_MAX];
-
-
for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
-
uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
-
uint16_t nr_vec = 0;
-
uint16_t nr_descs = 0;
-
/* 为拷贝当前mbuf后续预留avail desc */
-
if (unlikely(reserve_avail_buf_packed(dev, vq,
-
pkt_len, buf_vec, &nr_vec,
-
&num_buffers, &nr_descs) < 0)) {
-
vq->shadow_used_idx -= num_buffers;
-
break;
-
}
-
rte_prefetch0((void *)(uintptr_t)buf_vec[0].buf_addr);
-
-
/* 拷贝mbuf到avail desc */
-
if (copy_mbuf_to_desc(dev, vq, pkts[pkt_idx],
-
buf_vec, nr_vec,
-
num_buffers) < 0) {
-
vq->shadow_used_idx -= num_buffers;
-
break;
-
}
-
-
vq->last_avail_idx += nr_descs;
-
if (vq->last_avail_idx >= vq->size) {
-
vq->last_avail_idx -= vq->size;
-
vq->avail_wrap_counter ^= 1;
-
}
-
}
-
/* 小包的批处理拷贝 */
-
do_data_copy_enqueue(dev, vq);
-
-
if (likely(vq->shadow_used_idx)) {
-
/* 更新used ring */
-
flush_shadow_used_ring_packed(dev, vq);
-
/* kick 前端 */
-
vhost_vring_call_packed(dev, vq);
-
}
-
-
return pkt_idx;
- }
函数中带有packed后缀的都是packed方式的特有处理实现。我们先看reserve_avail_buf_packed,这个函数为拷贝当前mbuf后续预留avail desc。
l reserve_avail_buf_packed
点击(此处)折叠或打开
-
static inline int
-
reserve_avail_buf_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
-
uint32_t size, struct buf_vector *buf_vec,
-
uint16_t *nr_vec, uint16_t *num_buffers,
-
uint16_t *nr_descs)
-
{
-
uint16_t avail_idx;
-
uint16_t vec_idx = 0;
-
uint16_t max_tries, tries = 0;
-
-
uint16_t buf_id = 0;
-
uint32_t len = 0;
-
uint16_t desc_count;
-
-
*num_buffers = 0;
-
avail_idx = vq->last_avail_idx;
-
/* 如果支持mergeable特性,则一个mbuf可用使用多个desc chain */
-
if (rxvq_is_mergeable(dev))
-
max_tries = vq->size - 1;
-
else
-
max_tries = 1;
-
-
while (size > 0) {
-
if (unlikely(++tries > max_tries))
-
return -1;
-
/* 尝试填充一个desc chain */
-
if (unlikely(fill_vec_buf_packed(dev, vq,
-
avail_idx, &desc_count,
-
buf_vec, &vec_idx,
-
&buf_id, &len,
-
VHOST_ACCESS_RW) < 0))
-
return -1;
-
-
len = RTE_MIN(len, size);
-
/* 将当前使用的desc chian信息同步到shadow_used_packed ring 中 */
-
update_shadow_used_ring_packed(vq, buf_id, len, desc_count);
-
size -= len;
-
-
avail_idx += desc_count;
-
if (avail_idx >= vq->size)
-
avail_idx -= vq->size;
-
-
*nr_descs += desc_count;
-
*num_buffers += 1;
-
}
-
-
*nr_vec = vec_idx;
-
-
return 0;
- }
下面看fill_vec_buf_packed,这个函数是将mbuf填充到当前desc chain中(如果mbuf过大,不保证填完,只负责填充当前desc chian)。
l fill_vec_buf_packed
主要倒数第三个参数是返回的buffer id。
点击(此处)折叠或打开
-
static __rte_always_inline int
-
fill_vec_buf_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
-
uint16_t avail_idx, uint16_t *desc_count,
-
struct buf_vector *buf_vec, uint16_t *vec_idx,
-
uint16_t *buf_id, uint32_t *len, uint8_t perm)
-
{
-
bool wrap_counter = vq->avail_wrap_counter;
-
struct vring_packed_desc *descs = vq->desc_packed;
-
uint16_t vec_id = *vec_idx;
-
/* 如果avail idx发送了回绕,则wrap_counter要进行翻转 */
-
if (avail_idx < vq->last_avail_idx)
-
wrap_counter ^= 1;
-
/* 判断是否是avail desc */
-
if (unlikely(!desc_is_avail(&descs[avail_idx], wrap_counter)))
-
return -1;
-
-
*desc_count = 0;
-
*len = 0;
-
while (1) {
-
if (unlikely(vec_id >= BUF_VECTOR_MAX))
-
return -1;
-
-
*desc_count += 1;
-
*buf_id = descs[avail_idx].id; /* buf_id记录的是使用的最后一个avail desc的id */
-
if (descs[avail_idx].flags & VRING_DESC_F_INDIRECT) {
-
if (unlikely(fill_vec_buf_packed_indirect(dev, vq,
-
&descs[avail_idx],
-
&vec_id, buf_vec,
-
len, perm) < 0))
-
return -1;
-
} else {
-
*len += descs[avail_idx].len;
-
-
if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
-
descs[avail_idx].addr,
-
descs[avail_idx].len,
-
perm)))
-
return -1;
-
}
-
if ((descs[avail_idx].flags & VRING_DESC_F_NEXT) == 0)
-
break;
-
if (++avail_idx >= vq->size) {
-
avail_idx -= vq->size;
-
wrap_counter ^= 1;
-
}
-
}
-
-
*vec_idx = vec_id;
-
return 0;
- }
其中需要注意的有三点,首先,buf_id记录的是使用的最后一个avail desc的buffer id,这个id会在shadow_uesd中使用,然后是当avail ring出现翻转的时候,同步翻转对应的wrap_counter。再一点就是desc_is_avail函数,用来判断当前desc是否是avail desc。
l desc_is_avail
点击(此处)折叠或打开
-
static inline bool
-
desc_is_avail(struct vring_packed_desc *desc, bool wrap_counter)
-
{
-
/* VRING_DESC_F_AVAIL的设置和wrap_counter一致,且VRING_DESC_F_USED的设置和wrap_counter相反时表示设avail desc */
-
return wrap_counter == !!(desc->flags & VRING_DESC_F_AVAIL) &&
-
wrap_counter != !!(desc->flags & VRING_DESC_F_USED);
- }
我们看到这个判断逻辑原理和之前我们讲的packed方式中avail和uesd desc是如果区分的相同。即avail desc需要VRING_DESC_F_AVAIL这个flag的设置和avail wrap_counter一致,且VRING_DESC_F_USED的设置和avail wrap_counter相反。
下面回头看update_shadow_used_ring_packed函数,这个函数将当前使用的desc chian信息同步到shadow_used_packed ring 中。
l update_shadow_used_ring_packed
点击(此处)折叠或打开
-
static __rte_always_inline void
-
update_shadow_used_ring_packed(struct vhost_virtqueue *vq,
-
uint16_t desc_idx, uint32_t len, uint16_t count)
-
{
-
uint16_t i = vq->shadow_used_idx++;
-
-
vq->shadow_used_packed[i].id = desc_idx; /*desc chain最后一个avail desc的buffer id*/
-
vq->shadow_used_packed[i].len = len;
-
vq->shadow_used_packed[i].count = count;
- }
注意这里的count是当前desc chain中使用的desc个数,desc_idx是当前desc chain使用的最后一个desc的buffer idx,而split方式shadow_used的id记录的是当前desc chain头部desc的id。
另外一个关键的地方,shadow_used_packed相对shadow_used_split 多了一个count字段,用来记录当前desc chain中使用的desc个数。这个作用我们后面马上分析。
flush_shadow_used_ring_packed函数用来根据shadow_used ring的信息更新uesd ring。我们看其具体实现。
l update_shadow_used_ring_split
点击(此处)折叠或打开
-
static __rte_always_inline void
-
update_shadow_used_ring_split(struct vhost_virtqueue *vq,
-
uint16_t desc_idx, uint32_t len)
-
{
-
uint16_t i = vq->shadow_used_idx++;
-
-
vq->shadow_used_split[i].id = desc_idx;
-
vq->shadow_used_split[i].len = len;
-
}
-
-
static __rte_always_inline void
-
flush_shadow_used_ring_packed(struct virtio_net *dev,
-
struct vhost_virtqueue *vq)
-
{
-
int i;
-
uint16_t used_idx = vq->last_used_idx;
-
-
/* Split loop in two to save memory barriers */
-
for (i = 0; i < vq->shadow_used_idx; i++) {
-
vq->desc_packed[used_idx].id = vq->shadow_used_packed[i].id;
-
vq->desc_packed[used_idx].len = vq->shadow_used_packed[i].len;
-
/* count的作用就一个是用来判断desc ring发送回绕 */
-
used_idx += vq->shadow_used_packed[i].count;
-
if (used_idx >= vq->size)
-
used_idx -= vq->size;
-
}
-
-
rte_smp_wmb();
-
/* 将desc 标记为uesd desc */
-
for (i = 0; i < vq->shadow_used_idx; i++) {
-
uint16_t flags;
-
-
if (vq->shadow_used_packed[i].len)
-
flags = VRING_DESC_F_WRITE;
-
else
-
flags = 0;
-
-
if (vq->used_wrap_counter) {
-
flags |= VRING_DESC_F_USED;
-
flags |= VRING_DESC_F_AVAIL;
-
} else {
-
flags &= ~VRING_DESC_F_USED;
-
flags &= ~VRING_DESC_F_AVAIL;
-
}
-
-
vq->desc_packed[vq->last_used_idx].flags = flags;
-
/* log page 更新热迁移bitmap*/
-
vhost_log_cache_used_vring(dev, vq,
-
vq->last_used_idx *
-
sizeof(struct vring_packed_desc),
-
sizeof(struct vring_packed_desc));
-
/* count的作用另一个是用来更新last_used_idx */
-
vq->last_used_idx += vq->shadow_used_packed[i].count;
-
if (vq->last_used_idx >= vq->size) {
-
vq->used_wrap_counter ^= 1;
-
vq->last_used_idx -= vq->size;
-
}
-
}
-
-
rte_smp_wmb();
-
vq->shadow_used_idx = 0;
-
vhost_log_cache_sync(dev, vq);
- }
注意为什么先更新uesd desc的其他字段,最后才一起更新flag,而不是第一次循环就一起吧flag更新了,这个原因其实我们前面讲“buffer id”的时候已经说明了。对端(前端)是更加desc 的flag判断desc是否可用的,所以在更新flag需要有memory barrier,确保其他字段以及正确初始化到内存,为了减少memory barrier的调用,所以单独进行flag更新。
这个函数需要关注的地方还是比较多的。首先我们看到shadow_used_packed 中count字段的作用,其一是用来判断desc ring发送回绕,可以看到packed方式uesd desc在desc中不是连续的,而是会跳隔:used_idx += vq->shadow_used_packed[i].count,这个在split中是不存在的,因为split中uesd有单独的ring,所以uesd是连续的,直接就可以使用起始位置和要拷贝的uesd desc长度就可以判断used ring回绕了(注意一个是判断desc ring回绕,一个是判断uesd ring回绕,packed没有单独的uesd ring)。另外一个作用是用来更新last_used_idx,packed中last_used_idx表示的是使用的desc chain的最后的desc idx,而split方式中表示的是使用的desc chain的header idx。
然后就是标记desc为uesd desc,我们之前已经讲过,uesd desc需要VRING_DESC_F_USED和VRING_DESC_F_AVAIL一致且和used_wrap_counter一致。
关于通知前端的逻辑vhost_vring_call_packed我们下一次再分析。