深入浅出vhostuser传输模型
1. virtio的ring结构
Virtio设备是支持多队列,每个队列由结构vring_virtqueue定义(可以是收包队列也可以是发包队列),而每个vring_virtqueue中都定义了一个vring结构,负责具体的数据传输。
// include/uapi/linux/virtio_ring.h
struct vring {
unsigned int num;
struct vring_desc *desc;
struct vring_avail *avail;
struct vring_used *used;
};
可见,ving不是一个ring环,而是包含了三个ring环,利用着三个ring环实现报文的收发。我们通过一张图来描述三个ring环的作用及关系:
1. vring_desc
Struct vring_desc并没有定义一个ring环,而是定义了ring环中每个元素的结构。上图中已经对vring_desc各成员做了注解。Desc ring没有消费者和生产者,我们可以把它看作一块用来交互数据的共享内存。
说明:vring_desc结构中的addr成员,在Guest向外发包的场景中,指向的是一块承载了发包数据的内存,而在Guest从外面收包的场景中,指向的是一块预分配好的空内存,Host会将收到的包存放到这块空内存中。
2. vring_avail
Struct vring_avail是定义了一个ring环的(即成员ring[]),这个ring环的生产者是Guest中的virtio-net,消费者是Host中vhostuser/vhostnet。Avail ring环中每个元素即指向desc ring的下标。
说明:Avail ring和desc ring的长度都是一样的,但是avail ring并不会指向desc ring的每一个desc。例如有些skb是由多个分片组成的(scattergather),那么这个skb实际会被转换成多个desc,并且通过vring_desc中的next将多个desc链接在一起,最后一个desc通过flag标记结束。那么这种情况下,Avail ring只会存储第一个desc的下标,同时vring_avail的idx也只累加1。
3. vring_used
Struct vring_used跟vring_avail类似,不过used ring的生产者是vhostnet/vhostuser,消费者是virtio-net。
说明:used ring中的每个元素包含两个成员id和len,id指向desc ring中的下标,而len则指向desc中所存储数据的长度(通常len成员只在Guest从外面收包的场景中才有效,这个时候desc中len指的是内存中可以最大存储的数据的长度,而user ring中的len指的则是内存中实际存储的数据的长度)。
那么这三个ring在内存中是怎么分布的呢?我们通过一张图描述下:
如图,三个ring是分布在一块连续的内存中的(物理/虚拟地址都是连续的)。最前面是desc ring,接下来是avail ring,最后是used ring。
2. 将vring映射到vhostuser
Virtio队列中的vring是由Guest中的virtio-net驱动申请的,那么vhostuser如何操作这些vring呢?答案是virtio-net在申请好vring后需要将vring的地址告诉vhostuser。我们通过一张图,看一下虚拟机启动时所涉及到的内存注册过程:
如上图所示,整个内存注册过程分为三个步骤:
第一步:
QEMU未虚拟机申请内存,并将虚拟机的整个内存注册到vhostuser。你没看错,确实是需要将虚拟机的整个内存都注册到vhostuser驱动中。
说明:Vhostuser和QEMU通过unix socket建立了通信连接,两者通过该连接进行协商。
第二步:
Guest中的virtio-net驱动申请队列(即virtqueue),并将队列中的vring地址同步给QEMU。
// 追踪从virtio-net开始初始化到创建virtqueue,函数位置:linux-kernel-src/drivers/virtio/
|virtio_pci_probe
| |virtio_pci_legacy_probe / virtio_pci_modern_probe
| | |setup_vq
| | | |vring_create_virtqueue
| | | | |vring_create_virtqueue_split
| | | | | |void *queue = vring_alloc_queue // 申请vring的地址
| | | | | |vring_init(struct vring *, queue)
| | | | | |__vring_create_virtqueue
| | | |iowrite32(VIRTIO_PCI_QUEUE_PFN) // 将vring_addr注册到QEMU
说明:Virtio-net和QEMU之间的通信不是通过什么scoket,而是由virtio-net向一段特定的io空间写数据实现的。不单单QEMU是这样做的,包括VMWARE也是这么做的(XEN不熟悉)。同理,QEMU向GUEST发起的数据请求也都是都通过IO实现的。
第三步:
QEMU在enable每个virtqueu的时候,会将virtqueue中三个vring的长度及地址注册到vhostuser。并且初始化三个vring中消费者/生产者的位置。
// vhostuser中相关协商处理函数
static vhost_message_handler_t vhost_message_handlers[VHOST_USER_MAX] = {
......
[VHOST_USER_SET_VRING_NUM] = vhost_user_set_vring_num,
[VHOST_USER_SET_VRING_ADDR] = vhost_user_set_vring_addr,
[VHOST_USER_SET_VRING_BASE] = vhost_user_set_vring_base,
......
};
3. Guest向外发包
// 函数位置:linux-kernel-src/drivers/net/virtio-net.c
|start_xmit
| |free_old_xmit_skbs // 每次发包前,先清理上一次已成功发送的包
| |xmit_skb
| | |virtqueue_add_outbuf
| | | |virtqueue_add
| | | | |virtqueue_add_split
这里面virtqueue_add()是一个通用的函数,不管收包还是发包,都是通过调用virtqueue_add()函数实现:
static inline int virtqueue_add(struct virtqueue *_vq,
struct scatterlist *sgs[],
unsigned int total_sg,
unsigned int out_sgs,
unsigned int in_sgs,
void *data,
void *ctx,
gfp_t gfp);
参数解析:
- _vq,没什么好解释的,virtqueue被包含在vring_virtqueue中,几乎跟vring传输相关的所有内容都定义在vring_virtqueue中;
- sgs,元素为scatterlist的列表;这里需要额外注意,每个scatterlist本身也是一个列表;举个例子,一个skb可以由多个分片构成,多个分片内存上是不连续的,在没有scatter-gather之前或者禁用scatter-gather的情况下,驱动需要将所有分片拷贝到一块连续的内存上,而开启scatter-gather后,我们不必再重新拷贝报文分片,直接通过scattherlist将报文的多个分片串联起来,供网卡驱动使用。可以说scatterlist是skb在网卡驱动中的表示;
- total_sg,所有scatterlist中分片加起来的总数,每个分片都占用一个独立的desc,所以total_sg表明接下来要消耗的desc总数;
- out_sgs,sgs中有多少是out_sg;
【说明】:scatterlist是分为out_sg(只读)和in_sg(可读可写)两种类型的。当Guest发送报文的时候,使用out_sg,当Guest打算收包,需要先将可承载报文数据的内存通过desc ring传递到vhost的时候,就使用in_sg。此外需要注意,我们发包的时候,只会传递out_sg给virtqueue_add(),收包的时候只传递in_sg给virtqueue_add(),还有一种通过virtqueue进行前后端协商和管理的virtqueue,会同时传递out_sg和in_sg给virtqueue_add()。 - int_sgs,sgs中有多少是in_sg;
- data,要传输的内存起始地址;
【说明】:在发包场景中,就是要发送的skb的地址,注意是虚拟地址,而我们赋值给desc->addr是物理地址,那么这个data有啥用呢?用处就是这个报文被vhost成功处理发送后,virtio-net会通过used ring再次获取到已经被成功发送的报文,这个时候virtio-net需要释放报文,那么直接引用这个data指向的虚拟地址释放就可以了。
【说明】:在收包场景中类似,virtio-net填充预申请的空白内存给vhostuser收包,收到的报文会通过used ring再送回到virtio-net中,这个时候直接引用data即可对内存中的报文数据进行操作了。
【说明】:那么data存储再哪呢?下面代码解析里有介绍。 - ctx,跟indirect相关,暂时不管;
- gfp,跟indirect相关,暂时不管;
virtqueue_add_split函数源码分析:
说明:packed queus是virtio 1.1引入的新特性,我们暂时不管,先分析老的split模式。
static inline int virtqueue_add_split(struct virtqueue *_vq,
struct scatterlist *sgs[],
unsigned int total_sg,
unsigned int out_sgs,
unsigned int in_sgs,
void *data,
void *ctx,
gfp_t gfp)
{
......
} else {
// 非indirect模式
indirect = false;
desc = vq->split.vring.desc;
i = head;
descs_used = total_sg;
}
......
// 如果desc ring没有空间了,赶紧通知vhost处理报文好腾地方
if (vq->vq.num_free < descs_used) {
pr_debug("Can't add buf len %i - avail = %i\n",
descs_used, vq->vq.num_free);
/* FIXME: for historical reasons, we force a notify here if
* there are outgoing parts to the buffer. Presumably the
* host should service the ring ASAP. */
if (out_sgs)
vq->notify(&vq->vq);
if (indirect)
kfree(desc);
END_USE(vq);
return -ENOSPC;
}
......
// *************************************************************************
// 第一步,填充desc ring
// 本函数最核心的代码了,out_sg和in_sg的存放位置也是有讲究的,当同时又两种scatterlist时,
// out_sg总是被放在前面,in_sg被存储在out_sg后面;
for (n = 0; n < out_sgs; n++) {
for (sg = sgs[n]; sg; sg = sg_next(sg)) {
// 这里需要注意的是,通过desc->addr传递给vhost的是Guest的物理地址
dma_addr_t addr = vring_map_one_sg(vq, sg, DMA_TO_DEVICE);
if (vring_mapping_error(vq, addr))
goto unmap_release;
desc[i].flags = cpu_to_virtio16(_vq->vdev, VRING_DESC_F_NEXT);
desc[i].addr = cpu_to_virtio64(_vq->vdev, addr);
desc[i].len = cpu_to_virtio32(_vq->vdev, sg->length);
prev = i;
i = virtio16_to_cpu(_vq->vdev, desc[i].next);
}
}
for (; n < (out_sgs + in_sgs); n++) {
for (sg = sgs[n]; sg; sg = sg_next(sg)) {
dma_addr_t addr = vring_map_one_sg(vq, sg, DMA_FROM_DEVICE);
if (vring_mapping_error(vq, addr))
goto unmap_release;
desc[i].flags = cpu_to_virtio16(_vq->vdev, VRING_DESC_F_NEXT | VRING_DESC_F_WRITE);
desc[i].addr = cpu_to_virtio64(_vq->vdev, addr);
desc[i].len = cpu_to_virtio32(_vq->vdev, sg->length);
prev = i;
i = virtio16_to_cpu(_vq->vdev, desc[i].next);
}
}
/* Last one doesn't continue. */
// OK,对于发包场景,上面所有desc都是一个SKB的,现在这个SKB填充完毕,需要通过flag标记
// desc的结束,前面介绍desc ring的时候介绍过,所有desc通过next成员链在一起,并且通过flag
// 标记一个报文存储的结束。
desc[prev].flags &= cpu_to_virtio16(_vq->vdev, ~VRING_DESC_F_NEXT);
/* We're using some buffers from the free list. */
// 用了多少,得从num_free中减掉
vq->vq.num_free -= descs_used;
/* Update free pointer */
if (indirect)
......
else
// 更新下一次开始填充的desc下标
vq->free_head = i;
......
// vring_virtqueue又自己维护了一个跟desc ring长度相同的数组,专门用来存储对应desc中内存
// 对应的虚拟地址
vq->split.desc_state[head].data = data;
......
/* Put entry in available array (but don't update avail->idx until they
* do sync). */
// *************************************************************************
// 第二步,填充avail ring
// 上面是desc ring的填充,下main开始填充avail ring了,可以看到只需要将第一个desc
// 填充到avail ring即可
avail = vq->split.avail_idx_shadow & (vq->split.vring.num - 1);
vq->split.vring.avail->ring[avail] = cpu_to_virtio16(_vq->vdev, head);
/* Descriptors and available array need to be set before we expose the
* new available array entries. */
// 累加avail ring的生产者计数
virtio_wmb(vq->weak_barriers);
vq->split.avail_idx_shadow++;
vq->split.vring.avail->idx = cpu_to_virtio16(_vq->vdev,
vq->split.avail_idx_shadow);
// *************************************************************************
// num_added主要跟通知机制有关,下面章节详细介绍
vq->num_added++;
pr_debug("Added buffer head %i to %p\n", head, vq);
END_USE(vq);
/* This is very unlikely, but theoretically possible. Kick
* just in case. */
if (unlikely(vq->num_added == (1 << 16) - 1))
virtqueue_kick(_vq);
......
4. Guest从外面收包
|virtnet_poll()
| |virtnet_receive()
| | |virtqueue_get_buf()
| | | |detach_buf()
| | |receive_buf()
| | |try_fill_recv()
| | | |add_recebuf_xxx()
| | | | |virtqueue_add_xxx()
| | | | | |virtqueue_add()
| | | |virqueue_kick()
我们从virtqueue_get_buf()函数开始看。该函数执行的是收包函数的第一步,还是以split模式为例,该函数会根据模式选择最终调用到virtqueue_get_buf_ctx_split()函数:
static void *virtqueue_get_buf_ctx_split(struct virtqueue *_vq,
unsigned int *len,
void **ctx)
{
// 注意:该函数每次只收一个包
......
// 这一步先判断下used ring里有没有未处理的成员。贴一下more_used_split()的代码:
// return vq->last_used_idx !=
// virtio16_to_cpu(vq->vq.vdev, vq->split.vring.used->idx);
// ***************************************************************************
// 这里需要说明的是,vring_virtqueue中定义了一个成员叫last_used_idx,last_used_idx是
// virtio-net消费used ring的下标+1,也就是这一次将从last_used_idx这个位置开始消费used
// ring。而vring_used中的idx则是由生产者(也就是vhost)填充的,表示下一次将要填充的used
// ring的下标。
// ***************************************************************************
// 说明:Vring_avail和vring_used中的idx都是生产者填充的,而消费者都会在各自的virtqueue的
// 结构中定义一个last_xxx_idx,表示上次消费的截至位置,以及下一次开始消费的位置。
if (!more_used_split(vq)) {
pr_debug("No more buffers in queue\n");
END_USE(vq);
return NULL;
}
/* Only get used array entries after they have been exposed by host. */
virtio_rmb(vq->weak_barriers);
// 获取要消费的used ring的下标
last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
// 从used成员中获取指向的desc ring中的下标
i = virtio32_to_cpu(_vq->vdev,
vq->split.vring.used->ring[last_used].id);
// 获取这个报文的实际长度
// 注意:这个报文可能是由多个desc构成的,下面的len是指所有desc中报文的总长度,并且报文的存
// 储总是前面desc满了之后,再向下一个desc中存储数据。
*len = virtio32_to_cpu(_vq->vdev,
vq->split.vring.used->ring[last_used].len);
// 如果这个desc ring的下标超过数组长度,则发生错误。
// ***************************************************************************
// 特别注意:
// 细心的同学可能已经发现,avail ring和used ring的生产者/消费者下标是不断累加的,然后使用
// 的时候做一下“idx&(vring_num-1)”的操作来保证访问不越界。但是我们使用desc ring的下标并不
// 是不断累加的,而是每次通过desc的next成员获取到的(观察上面virtqueue_add函数得分析)。所
// 以我们从avail ring和used ring中获取得desc下标是直接得下标,不存在越界。
if (unlikely(i >= vq->split.vring.num)) {
BAD_RING(vq, "id %u out of range\n", i);
return NULL;
}
// ***************************************************************************
// 这个data前面介绍过了
if (unlikely(!vq->split.desc_state[i].data)) {
BAD_RING(vq, "id %u is not a head!\n", i);
return NULL;
}
/* detach_buf_split clears data, so grab it now. */
ret = vq->split.desc_state[i].data;
// OK,报文已成功提取,释放掉这个desc,如果占用了多个desc,会在detach_buf_split中一起
// 释放(通过flag标记结束)。
detach_buf_split(vq, i, ctx);
// 累加消费者下标
vq->last_used_idx++;
/* If we expect an interrupt for the next entry, tell host
* by writing event index and flush out the write before
* the read in the next get_buf call. */
if (!(vq->split.avail_flags_shadow & VRING_AVAIL_F_NO_INTERRUPT))
virtio_store_mb(vq->weak_barriers,
&vring_used_event(&vq->split.vring),
cpu_to_virtio16(_vq->vdev, vq->last_used_idx));
LAST_ADD_TIME_INVALID(vq);
END_USE(vq);
// 返回指向报文的虚拟机地址
return ret;
5. Vhost从Guest收包
我们选择DPDK-20.11的代码进行分析,因为这个版本vhostuser的收包代码非场简洁。在DPDK-20.08之前,vhostuser驱动支持zerocopy功能,但是在DPDK-20.08中zerocopy被移除了。因为zerocopy虽然带来了性能的提升,却让代码变得复杂且难以维护,同时zerocopy在VPC场景存在使用限制,复杂的代码也给virtio一些新功能添加也带来的阻碍,种种因素导致zerocopy最终被社区抛弃。今后virtio性能优化的方向主要时通过硬件的方式进行,例如通过CPU的CBDMA引擎加速拷贝,或者通过支持virtio offload的网卡进行卸载加速。
|rte_vhost_dequeue_burst()
| |virtio_dev_tx_split()
| | |for() // 处理所有报文(最多不超过32个,可配)
| | | |fill_vec_buf_split()
| | | | |while() // 处理该报文下所有的desc(通过desc.next串起的list)
| | | | | |map_one_desc()
| | | | | | |vhost_iova_to_vva()
| | | | | | | |rte_vhost_va_from_guest_pa()
| | | |copy_desc_to_mbuf()
继续贯彻深入浅出原则,咱们先看rte_vhost_va_from_guest_pa()函数,该函数主要实现将desc->addr这个Guest的物理地址(后面简称GPA)转换成DPDK进程中可以直接访问虚拟地址(后面简称VVA,虽然通常大家喜欢称之为HVA,但是我们跟着DPDK里面定义的VVA叫吧,大家知道怎么回事就行了)。
特别介绍:
在分析rte_vhost_va_from_guest_pa()函数之前,有必要先介绍一下rte_vhost_memory和rte_vhost_mem_region 这2个结构,前面第2节曾提到,VM虚拟机启动的时候的QEMU会将虚拟机整个内存都注册到vhostuser驱动中,那么虚拟机的内存信息存储在哪呢?答案就是由rte_vhost_memory结构负责存储:
// 每个rte_vhost_mem_region对应一个page
struct rte_vhost_memory {
// region个数
uint32_t nregions;
// region数组
struct rte_vhost_mem_region regions[];
};
struct rte_vhost_mem_region {
// 就是这个region在Guest中的物理地址
uint64_t guest_phys_addr;
// 主要在QEMU把vring注册过来的时候用到,Guest中的虚拟地址?TODO
uint64_t guest_user_addr;
// region映射到DPDK进程后的虚拟地址
uint64_t host_user_addr;
// region的长度
uint64_t size;
void *mmap_addr;
uint64_t mmap_size;
int fd;
};
我们再来分析rte_vhost_va_from_guest_pa()函数:
__rte_experimental
static __rte_always_inline uint64_t
rte_vhost_va_from_guest_pa(struct rte_vhost_memory *mem,
uint64_t gpa, uint64_t *len)
{
struct rte_vhost_mem_region *r;
uint32_t i;
// 其实就是拿报文的gpa在vhostuser维护的mem_regions中逐个对比,看属于
// 哪个page,然后报文在vhostuser中的vva = page->vva + (gpa - page->gpa)
for (i = 0; i < mem->nregions; i++) {
r = &mem->regions[i];
if (gpa >= r->guest_phys_addr &&
gpa < r->guest_phys_addr + r->size) {
if (unlikely(*len > r->guest_phys_addr + r->size - gpa))
*len = r->guest_phys_addr + r->size - gpa;
return gpa - r->guest_phys_addr +
r->host_user_addr;
}
}
*len = 0;
return 0;
}
vhost_iova_to_vva()是个封装函数,我们不用管。直接看map_one_desc()函数:
static __rte_always_inline int
map_one_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
struct buf_vector *buf_vec, uint16_t *vec_idx,
uint64_t desc_iova, uint64_t desc_len, uint8_t perm)
{
uint16_t vec_id = *vec_idx;
// 这里为什么有个循环处理?要知道map_one_desc()这个函数只处理一个desc,
// 也就是只处理当前的desc,不用管desc.next。答案是:因为desc->addr有可能
// 是跨page的,所以需要多次地址转换,特别是开启tso的情况下。
while (desc_len) {
uint64_t desc_addr;
uint64_t desc_chunck_len = desc_len;
if (unlikely(vec_id >= BUF_VECTOR_MAX))
return -1;
// 地址转换:GPA => VVA
desc_addr = vhost_iova_to_vva(dev, vq,
desc_iova,
&desc_chunck_len,
perm);
if (unlikely(!desc_addr))
return -1;
rte_prefetch0((void *)(uintptr_t)desc_addr);
// 这个函数将desc转换后,存储在buf_vec中,然后再上层函数统一处理
buf_vec[vec_id].buf_iova = desc_iova;
buf_vec[vec_id].buf_addr = desc_addr;
buf_vec[vec_id].buf_len = desc_chunck_len;
desc_len -= desc_chunck_len;
desc_iova += desc_chunck_len;
vec_id++;
}
*vec_idx = vec_id;
return 0;
}
接着看fill_vec_buf_split()函数:
static __rte_always_inline int
fill_vec_buf_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
uint32_t avail_idx, uint16_t *vec_idx,
struct buf_vector *buf_vec, uint16_t *desc_chain_head,
uint32_t *desc_chain_len, uint8_t perm)
{
// 获取desc ring中的下标
uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
uint16_t vec_id = *vec_idx;
uint32_t len = 0;
uint64_t dlen;
uint32_t nr_descs = vq->size;
uint32_t cnt = 0;
struct vring_desc *descs = vq->desc;
struct vring_desc *idesc = NULL;
// 上文提到过,desc ring中下标是不会超过数组长度的,因为其值来自desc.next
if (unlikely(idx >= vq->size))
return -1;
*desc_chain_head = idx;
if (vq->desc[idx].flags & VRING_DESC_F_INDIRECT) {
......
}
while (1) {
......
len += descs[idx].len;
// 为一个desc转换地址
if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
descs[idx].addr, descs[idx].len,
perm))) {
free_ind_table(idesc);
return -1;
}
// 判断desc list是否截止
if ((descs[idx].flags & VRING_DESC_F_NEXT) == 0)
break;
// 处理该报文的下一个desc
idx = descs[idx].next;
}
// 报文总长度
*desc_chain_len = len;
// vsec总个数
// 注意:desc是可以跨page的,但是用于接收的desc_vec是不跨page的
// 所以desc_vec中的元素的个数有可能回避desc的个数多。
*vec_idx = vec_id;
if (unlikely(!!idesc))
free_ind_table(idesc);
return 0;
}
copy_desc_to_mbuf()这个函数不想太详细的看了,改函数主要就是将buf_vec中的数据拷贝到mbuf中。并且根据virtio_hdr初始化mbuf相关参数(例如offload相关参数等)。