@@ -31,12 +31,15 @@ struct qemu_laiocb {
struct iocb iocb;
ssize_t ret;
size_t nbytes;
+ int async_context_id;
+ QLIST_ENTRY(qemu_laiocb) node;
};
struct qemu_laio_state {
io_context_t ctx;
int efd;
int count;
+ QLIST_HEAD(, qemu_laiocb) completed_reqs;
};
static inline ssize_t io_event_ret(struct io_event *ev)
@@ -44,6 +47,69 @@ static inline ssize_t io_event_ret(struct io_event *ev)
return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
}
+/*
+ * Completes an AIO request (calls the callback and frees the ACB).
+ * Be sure to be in the right AsyncContext before calling this function.
+ */
+static void qemu_laio_process_completion(struct qemu_laio_state *s,
+ struct qemu_laiocb *laiocb)
+{
+ int ret;
+
+ s->count--;
+
+ ret = laiocb->ret;
+ if (ret != -ECANCELED) {
+ if (ret == laiocb->nbytes)
+ ret = 0;
+ else if (ret >= 0)
+ ret = -EINVAL;
+
+ laiocb->common.cb(laiocb->common.opaque, ret);
+ }
+
+ qemu_aio_release(laiocb);
+}
+
+/*
+ * Processes all queued AIO requests, i.e. requests that have return from OS
+ * but their callback was not called yet. Requests that cannot have their
+ * callback called in the current AsyncContext, remain in the queue.
+ *
+ * Returns 1 if at least one request could be completed, 0 otherwise.
+ */
+static int qemu_laio_process_requests(void *opaque)
+{
+ struct qemu_laio_state *s = opaque;
+ struct qemu_laiocb *laiocb, *next;
+ int res = 0;
+
+ QLIST_FOREACH_SAFE (laiocb, &s->completed_reqs, node, next) {
+ if (laiocb->async_context_id == get_async_context_id()) {
+ qemu_laio_process_completion(s, laiocb);
+ QLIST_REMOVE(laiocb, node);
+ res = 1;
+ }
+ }
+
+ return res;
+}
+
+/*
+ * Puts a request in the completion queue so that its callback is called the
+ * next time when it's possible. If we already are in the right AsyncContext,
+ * the request is completed immediately instead.
+ */
+static void qemu_laio_enqueue_completed(struct qemu_laio_state *s,
+ struct qemu_laiocb* laiocb)
+{
+ if (laiocb->async_context_id == get_async_context_id()) {
+ qemu_laio_process_completion(s, laiocb);
+ } else {
+ QLIST_INSERT_HEAD(&s->completed_reqs, laiocb, node);
+ }
+}
+
static void qemu_laio_completion_cb(void *opaque)
{
struct qemu_laio_state *s = opaque;
@@ -74,19 +140,8 @@ static void qemu_laio_completion_cb(void *opaque)
struct qemu_laiocb *laiocb =
container_of(iocb, struct qemu_laiocb, iocb);
- s->count--;
-
- ret = laiocb->ret = io_event_ret(&events[i]);
- if (ret != -ECANCELED) {
- if (ret == laiocb->nbytes)
- ret = 0;
- else if (ret >= 0)
- ret = -EINVAL;
-
- laiocb->common.cb(laiocb->common.opaque, ret);
- }
-
- qemu_aio_release(laiocb);
+ laiocb->ret = io_event_ret(&events[i]);
+ qemu_laio_enqueue_completed(s, laiocb);
}
}
}
@@ -149,6 +204,7 @@ BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
laiocb->nbytes = nb_sectors * 512;
laiocb->ctx = s;
laiocb->ret = -EINPROGRESS;
+ laiocb->async_context_id = get_async_context_id();
iocbs = &laiocb->iocb;
@@ -183,6 +239,7 @@ void *laio_init(void)
struct qemu_laio_state *s;
s = qemu_mallocz(sizeof(*s));
+ QLIST_INIT(&s->completed_reqs);
s->efd = eventfd(0, 0);
if (s->efd == -1)
goto out_free_state;
@@ -191,8 +248,8 @@ void *laio_init(void)
if (io_setup(MAX_EVENTS, &s->ctx) != 0)
goto out_close_efd;
- qemu_aio_set_fd_handler(s->efd, qemu_laio_completion_cb,
- NULL, qemu_laio_flush_cb, NULL, s);
+ qemu_aio_set_fd_handler(s->efd, qemu_laio_completion_cb, NULL,
+ qemu_laio_flush_cb, qemu_laio_process_requests, s);
return s;
Also for Linux AIO, don't call callbacks that don't belong to the active AsyncContext. Signed-off-by: Kevin Wolf <kwolf@redhat.com> --- linux-aio.c | 87 ++++++++++++++++++++++++++++++++++++++++++++++++---------- 1 files changed, 72 insertions(+), 15 deletions(-)