qemu aio简记

4140阅读 0评论2014-07-11 sak0
分类:虚拟化


点击(此处)折叠或打开

  1. struct AioContext {
  2.     GSource source;

  3.     /* Protects all fields from multi-threaded access */
  4.     RFifoLock lock;

  5.     /* The list of registered AIO handlers */
  6.     QLIST_HEAD(, AioHandler) aio_handlers;

  7.     /* This is a simple lock used to protect the aio_handlers list.
  8.      * Specifically, it's used to ensure that no callbacks are removed while
  9.      * we're walking and dispatching callbacks.
  10.      */
  11.     int walking_handlers;

  12.     /* lock to protect between bh's adders and deleter */
  13.     QemuMutex bh_lock;
  14.     /* Anchor of the list of Bottom Halves belonging to the context */
  15.     struct QEMUBH *first_bh;

  16.     /* A simple lock used to protect the first_bh list, and ensure that
  17.      * no callbacks are removed while we're walking and dispatching callbacks.
  18.      */
  19.     int walking_bh;

  20.     /* Used for aio_notify. */
  21.     EventNotifier notifier;

  22.     /* GPollFDs for aio_poll() */
  23.     GArray *pollfds;

  24.     /* Thread pool for performing work and receiving completion callbacks */
  25.     struct ThreadPool *thread_pool;

  26.     /* TimerLists for calling timers - one per clock type */
  27.     QEMUTimerListGroup tlg;
  28. };

  29. struct QEMUBH {
  30.     AioContext *ctx;
  31.     QEMUBHFunc *cb;
  32.     void *opaque;
  33.     QEMUBH *next;
  34.     bool scheduled;
  35.     bool idle;
  36.     bool deleted;
  37. };

  38. struct BlockDriverAIOCB {
  39.     const AIOCBInfo *aiocb_info;
  40.     BlockDriverState *bs;
  41.     BlockDriverCompletionFunc *cb;
  42.     void *opaque;
  43. };

  44. struct ThreadPoolElement {
  45.     BlockDriverAIOCB common;
  46.     ThreadPool *pool;
  47.     ThreadPoolFunc *func;
  48.     void *arg;

  49.     /* Moving state out of THREAD_QUEUED is protected by lock. After
  50.      * that, only the worker thread can write to it. Reads and writes
  51.      * of state and ret are ordered with memory barriers.
  52.      */
  53.     enum ThreadState state;
  54.     int ret; //io返回值

  55.     /* Access to this list is protected by lock. */
  56.     QTAILQ_ENTRY(ThreadPoolElement) reqs;

  57.     /* Access to this list is protected by the global mutex. */
  58.     QLIST_ENTRY(ThreadPoolElement) all;
  59. };

  60. struct ThreadPool {
  61.     EventNotifier notifier;
  62.     AioContext *ctx;
  63.     QemuMutex lock;
  64.     QemuCond check_cancel;
  65.     QemuCond worker_stopped;
  66.     QemuSemaphore sem;
  67.     int max_threads;
  68.     QEMUBH *new_thread_bh;

  69.     /* The following variables are only accessed from one AioContext. */
  70.     QLIST_HEAD(, ThreadPoolElement) head;

  71.     /* The following variables are protected by lock. */
  72.     QTAILQ_HEAD(, ThreadPoolElement) request_list;
  73.     int cur_threads;
  74.     int idle_threads;
  75.     int new_threads; /* backlog of threads we need to create */
  76.     int pending_threads; /* threads created but not running yet */
  77.     int pending_cancellations; /* whether we need a cond_broadcast */
  78.     bool stopping;
  79. };



IO线程
worker_thread(pool)
    do_spawn_thread(pool);
    req = QTAILQ_FIRST(&pool->request_list);
    QTAILQ_REMOVE(&pool->request_list, req, reqs);//从pool的request队列中拿出req,变更req的状态
    req->state = THREAD_ACTIVE;
    ret = req->func(req->arg);aio_worker//req->arg是paio_submit中组织的RawPosixAIOData *acb
        ret = handle_aiocb_rw(aiocb);
            return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
                len = pwrite(aiocb->aio_fildes,
                         (const char *)buf + offset,
                         aiocb->aio_nbytes - offset,
                         aiocb->aio_offset + offset);
                offset += len;
                return offset
        if (ret == aiocb->aio_nbytes) ret = 0; //在这里检查IO是否完整
    req->state = THREAD_DONE;//变更thread的状态值
    req->ret = ret;//记录IO是否成功
    event_notifier_set(&pool->notifier);
        

主线程
main_loop ()
    last_io = main_loop_wait(nonblocking);
        ret = os_host_main_loop_wait(timeout_ns);
            glib_pollfds_poll();
                g_main_context_check(context, max_priority, pfds, glib_n_poll_fds)
                g_main_context_dispatch(context);
                    aio_ctx_dispatch
                        aio_poll(ctx, false);
                            aio_bh_poll(ctx)
                                for (bh = ctx->first_bh; bh; bh = next)
                                bh->cb(bh->opaque);spawn_thread_bh_fn(ctx)
                                    do_spawn_thread(pool);
                                        qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED);
                                            err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
                                            err = pthread_create(&thread->thread, &attr, start_routine, arg);
                                    bdrv_co_em_bh(acb)
                                        virtio_blk_rw_complete//virtio的callback
                            aio_dispatch(ctx)
                                node = QLIST_FIRST(&ctx->aio_handlers);
                                while(node)
                                node->io_read(node->opaque);event_notifier_ready(pool)
                                    QLIST_FOREACH_SAFE(elem, &pool->head, all, next)
                                    if (elem->state == THREAD_DONE && elem->common.cb)
                                    elem->common.cb(elem->common.opaque, elem->ret);bdrv_co_io_em_complete//协程的callback
                                    
        qemu_iohandler_poll(gpollfds, ret);
            ioh->fd_read(ioh->opaque);virtio_queue_host_notifier_read
                virtio_queue_notify_vq(vq);
                    vq->handle_output(vdev, vq);virtio_blk_handle_output
                        virtio_submit_multiwrite(s->bs, &mrb);
                            ret = bdrv_aio_multiwrite(bs, mrb->blkreq, mrb->num_writes);
                                num_reqs = multiwrite_merge(bs, reqs, num_reqs, mcb);
                                for (i = 0; i < num_reqs; i++) {
                                bdrv_co_aio_rw_vector(bs, reqs[i].sector, reqs[i].qiov,
                                reqs[i].nb_sectors, reqs[i].flags,
                                multiwrite_cb, mcb,
                                true);}//callback == ,实现是多路的virtio_blk_rw_complete
                                    virtio_blk_handle_rw_error //如果req->ret != 0,在这里处理
                                        action = bdrv_get_error_action(req->dev->bs, is_read, error); //取得处理错误的动作
                                        virtio_blk_req_complete(req, VIRTIO_BLK_S_IOERR);//提交VIRTIO_BLK_S_IOERR
                                        bdrv_error_action(s->bs, action, is_read, error);
                                            bdrv_emit_qmp_error_event(bs, QEVENT_BLOCK_IO_ERROR, action, is_read);
                                    virtio_blk_req_complete(req, VIRTIO_BLK_S_OK);//ret==0则提交VIRTIO_BLK_S_OK
                                return 0

                                
bdrv_co_aio_rw_vector
    //在这里组织BlockDriverAIOCBCoroutine *acb
    co = qemu_coroutine_create(bdrv_co_do_rw);
    qemu_coroutine_enter(co, acb);
        bdrv_co_do_rw
            acb->req.error = bdrv_co_do_writev(bs, acb->req.sector,
            acb->req.nb_sectors, acb->req.qiov, acb->req.flags);
                return bdrv_co_do_pwritev(bs, sector_num << BDRV_SECTOR_BITS,
                    nb_sectors << BDRV_SECTOR_BITS, qiov, flags);
                    bdrv_io_limits_intercept(bs, bytes, true);
                    ret = bdrv_aligned_pwritev(bs, &req, offset, bytes,
                               use_local_qiov ? &local_qiov : qiov,
                               flags);
                        ret = drv->bdrv_co_writev(bs, sector_num, nb_sectors, qiov);
                        bdrv_set_dirty(bs, sector_num, nb_sectors);
                            bdrv_co_writev_em
                                return bdrv_co_io_em(bs, sector_num, nb_sectors, iov, true);
                                    acb = bs->drv->bdrv_aio_writev(bs, sector_num, iov, nb_sectors,
                                       bdrv_co_io_em_complete, &co);raw_aio_writev
                                        return raw_aio_submit(bs, sector_num, qiov, nb_sectors,
                                            cb, opaque, QEMU_AIO_WRITE);cb==
                                            return paio_submit(bs, s->fd, sector_num, qiov, nb_sectors,
                                                cb, opaque, type);
                                                pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
                                                //pool就是qemu_aio_context这个全局AioContext的thread_pool
                                                return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
                                                    //在这里组织ThreadPoolElement *req
                                                    if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads)
                                                    QLIST_INSERT_HEAD(&pool->head, req, all); //把req加入tpool的执行队列
                                                    spawn_thread(pool);
                                                        if (!pool->pending_threads)
                                                        qemu_bh_schedule(pool->new_thread_bh);
                                                        /*把产生IO线程的BH加入到ctxt的执行队列里
                                                        其中,“pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);”
                                                        new_thread_bh是一个专用来生成thread的bh,它的cb是spawn_thread_bh_fn*/
                                                    QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
                                                    return &req->common;                
                                    qemu_coroutine_yield();
                                    //bdrv_co_io_em_complete callback here
                                    return co.ret;
        acb->bh = qemu_bh_new(bdrv_co_em_bh, acb);
            return aio_bh_new(qemu_aio_context, cb, opaque);
                //在这里组织BH
                bh->ctx = ctx; //qemu_aio_context
                bh->next = ctx->first_bh; //把这个bh加入到ctx的bh list
                ctx->first_bh = bh;
                reuturn bh;
        qemu_bh_schedule(acb->bh);
            aio_notify(bh->ctx);
                event_notifier_set(&ctx->notifier);
                    ret = write(e->wfd, &value, sizeof(value))
    return &acb->common;

    
    
guest virtio-driver->host kvm-> host qemu
GUEST:  2.6.32

点击(此处)折叠或打开

  1. virtio_blk.c
  2. static void do_virtblk_request(struct request_queue *q)
  3. {
  4.     struct virtio_blk *vblk = q->queuedata;
  5.     struct request *req;
  6.     unsigned int issued = 0;

  7.     while ((req = blk_peek_request(q)) != NULL) {
  8.         BUG_ON(req->nr_phys_segments + 2 > vblk->sg_elems);

  9.         /* If this request fails, stop queue and wait for something to
  10.            finish to restart it. */
  11.         if (!do_req(q, vblk, req)) {
  12.             blk_stop_queue(q);
  13.             break;
  14.         }
  15.         blk_start_request(req);
  16.         issued++;
  17.     }

  18.     if (issued)
  19.         vblk->vq->vq_ops->kick(vblk->vq);
  20. }

  21. virtio_pci.c
  22. static struct virtqueue *setup_vq(struct virtio_device *vdev, unsigned index,
  23.                   void (*callback)(struct virtqueue *vq),
  24.                   const char *name,
  25.                   u16 msix_vec)

  26. static void vp_notify(struct virtqueue *vq)
  27. {
  28.     struct virtio_pci_device *vp_dev = to_vp_device(vq->vdev);
  29.     struct virtio_pci_vq_info *info = vq->priv;

  30.     /* we write the queue's selector into the notification register to
  31.      * signal the other end */
  32.     iowrite16(info->queue_index, vp_dev->ioaddr + VIRTIO_PCI_QUEUE_NOTIFY);
  33. }


HOST:
设备初始化流程中有:
virtio_pci_start_ioeventfd
    virtio_pci_set_host_notifier_internal
        virtio_queue_set_host_notifier_fd_handler(vq, true, set_handler); //将fd加入iohandlers的fd处理队列,对应的触发handler就是virtio_queue_host_notifier_read
            event_notifier_set_handler(&vq->host_notifier,
                                   virtio_queue_host_notifier_read);
                qemu_set_fd_handler
                    qemu_set_fd_handler2(fd, NULL, fd_read, fd_write, opaque);
        memory_region_add_eventfd(&proxy->bar, VIRTIO_PCI_QUEUE_NOTIFY, 2,
                                  true, n, notifier);//注册使KVM处理对于VIRTIO_PCI_QUEUE_NOTIFY地址的io
上一篇:openstack.live_snapshot的实现方法存在竞态
下一篇:libvirt对监控虚机内存的改进