From 5fdc51b2aac307c0219e1489b80bc18e9a3db0d1 Mon Sep 17 00:00:00 2001
From: Anthony Liguori <aliguori@us.ibm.com>
Date: Mon, 24 Jan 2011 18:19:08 -0600
Subject: [PATCH 8/7] posix-aio: convert to glib based thread pool
This removes the custom pthread based thread pool in favor of a GThreadPool.
I believe this patch implements all of the necessary functionality but it needs
quite a lot more testing and performance analysis.
One thing I'm sure will break--we used to deliver a signal on every I/O
completion. This just slows down the I/O path. The reason we did this was
because at the time, I believe Cris depended on that signal to break out of
QEMU because it did I/O without a periodic timer installed.
At this point in time, I think any architecture that requires signals needs to
do so with a periodic timer or some other mechanism.
Signed-off-by: Anthony Liguori <aliguori@us.ibm.com>
@@ -1663,6 +1663,7 @@ if $pkg_config --modversion gthread-2.0 > /dev/null 2>&1 ; then
glib_cflags=`$pkg_config --cflags gthread-2.0 2>/dev/null`
glib_libs=`$pkg_config --libs gthread-2.0 2>/dev/null`
libs_softmmu="$glib_libs $libs_softmmu"
+ libs_tools="$glib_libs $libs_tools"
else
echo "glib-2.0 required to compile QEMU"
exit 1
@@ -12,27 +12,24 @@
*/
#include <sys/ioctl.h>
-#include <sys/types.h>
-#include <pthread.h>
-#include <unistd.h>
-#include <errno.h>
-#include <time.h>
-#include <signal.h>
-#include <string.h>
-#include <stdlib.h>
-#include <stdio.h>
-
-#include "qemu-queue.h"
+
#include "osdep.h"
#include "sysemu.h"
#include "qemu-common.h"
#include "trace.h"
#include "block_int.h"
+#include "qemu-thread.h"
#include "block/raw-posix-aio.h"
+typedef enum AioState {
+ INACTIVE,
+ CANCELLED,
+ ACTIVE,
+ COMPLETED
+} AioState;
-struct qemu_paiocb {
+typedef struct AioAiocb {
BlockDriverAIOCB common;
int aio_fildes;
union {
@@ -40,34 +37,29 @@ struct qemu_paiocb {
void *aio_ioctl_buf;
};
int aio_niov;
- size_t aio_nbytes;
-#define aio_ioctl_cmd aio_nbytes /* for QEMU_AIO_IOCTL */
- int ev_signo;
+ union {
+ size_t aio_nbytes;
+ long aio_ioctl_cmd;
+ };
off_t aio_offset;
-
- QTAILQ_ENTRY(qemu_paiocb) node;
int aio_type;
ssize_t ret;
- int active;
- struct qemu_paiocb *next;
-
int async_context_id;
-};
-typedef struct PosixAioState {
+ /* This state can only be set/get when the aio pool lock is held */
+ AioState state;
+} AioAiocb;
+
+typedef struct AioPool {
+ GThreadPool *pool;
int rfd, wfd;
- struct qemu_paiocb *first_aio;
-} PosixAioState;
+ GList *requests;
+ /* If this turns out to be contended, push to a per-request lock */
+ GMutex *lock;
+} AioPool;
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
-static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
+static AioPool aio_pool;
#ifdef CONFIG_PREADV
static int preadv_present = 1;
@@ -75,51 +67,7 @@ static int preadv_present = 1;
static int preadv_present = 0;
#endif
-static void die2(int err, const char *what)
-{
- fprintf(stderr, "%s failed: %s\n", what, strerror(err));
- abort();
-}
-
-static void die(const char *what)
-{
- die2(errno, what);
-}
-
-static void mutex_lock(pthread_mutex_t *mutex)
-{
- int ret = pthread_mutex_lock(mutex);
- if (ret) die2(ret, "pthread_mutex_lock");
-}
-
-static void mutex_unlock(pthread_mutex_t *mutex)
-{
- int ret = pthread_mutex_unlock(mutex);
- if (ret) die2(ret, "pthread_mutex_unlock");
-}
-
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
- struct timespec *ts)
-{
- int ret = pthread_cond_timedwait(cond, mutex, ts);
- if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
- return ret;
-}
-
-static void cond_signal(pthread_cond_t *cond)
-{
- int ret = pthread_cond_signal(cond);
- if (ret) die2(ret, "pthread_cond_signal");
-}
-
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
- void *(*start_routine)(void*), void *arg)
-{
- int ret = pthread_create(thread, attr, start_routine, arg);
- if (ret) die2(ret, "pthread_create");
-}
-
-static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
+static ssize_t handle_aiocb_ioctl(AioAiocb *aiocb)
{
int ret;
@@ -138,7 +86,7 @@ static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
return aiocb->aio_nbytes;
}
-static ssize_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
+static ssize_t handle_aiocb_flush(AioAiocb *aiocb)
{
int ret;
@@ -178,7 +126,7 @@ qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
#endif
-static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
+static ssize_t handle_aiocb_rw_vector(AioAiocb *aiocb)
{
size_t offset = 0;
ssize_t len;
@@ -201,7 +149,7 @@ static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
return len;
}
-static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
+static ssize_t handle_aiocb_rw_linear(AioAiocb *aiocb, char *buf)
{
ssize_t offset = 0;
ssize_t len;
@@ -232,7 +180,7 @@ static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
return offset;
}
-static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
+static ssize_t handle_aiocb_rw(AioAiocb *aiocb)
{
ssize_t nbytes;
char *buf;
@@ -302,278 +250,94 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
return nbytes;
}
-static void *aio_thread(void *unused)
+static void aio_routine(gpointer data, gpointer user_data)
{
- pid_t pid;
-
- pid = getpid();
-
- while (1) {
- struct qemu_paiocb *aiocb;
- ssize_t ret = 0;
- qemu_timeval tv;
- struct timespec ts;
-
- qemu_gettimeofday(&tv);
- ts.tv_sec = tv.tv_sec + 10;
- ts.tv_nsec = 0;
-
- mutex_lock(&lock);
-
- while (QTAILQ_EMPTY(&request_list) &&
- !(ret == ETIMEDOUT)) {
- ret = cond_timedwait(&cond, &lock, &ts);
- }
-
- if (QTAILQ_EMPTY(&request_list))
- break;
-
- aiocb = QTAILQ_FIRST(&request_list);
- QTAILQ_REMOVE(&request_list, aiocb, node);
- aiocb->active = 1;
- idle_threads--;
- mutex_unlock(&lock);
-
- switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
- case QEMU_AIO_READ:
- case QEMU_AIO_WRITE:
- ret = handle_aiocb_rw(aiocb);
- break;
- case QEMU_AIO_FLUSH:
- ret = handle_aiocb_flush(aiocb);
- break;
- case QEMU_AIO_IOCTL:
- ret = handle_aiocb_ioctl(aiocb);
- break;
- default:
- fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
- ret = -EINVAL;
- break;
- }
-
- mutex_lock(&lock);
- aiocb->ret = ret;
- idle_threads++;
- mutex_unlock(&lock);
+ AioPool *s = user_data;
+ AioAiocb *aiocb = data;
+ ssize_t ret = 0;
+ char ch = 0;
+ AioState state;
+ ssize_t len;
- if (kill(pid, aiocb->ev_signo)) die("kill failed");
+ g_mutex_lock(s->lock);
+ if (aiocb->state != CANCELLED) {
+ aiocb->state = ACTIVE;
}
-
- idle_threads--;
- cur_threads--;
- mutex_unlock(&lock);
-
- return NULL;
-}
-
-static void spawn_thread(void)
-{
- sigset_t set, oldset;
-
- cur_threads++;
- idle_threads++;
-
- /* block all signals */
- if (sigfillset(&set)) die("sigfillset");
- if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
- thread_create(&thread_id, &attr, aio_thread, NULL);
-
- if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
-}
-
-static void qemu_paio_submit(struct qemu_paiocb *aiocb)
-{
- aiocb->ret = -EINPROGRESS;
- aiocb->active = 0;
- mutex_lock(&lock);
- if (idle_threads == 0 && cur_threads < max_threads)
- spawn_thread();
- QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
- mutex_unlock(&lock);
- cond_signal(&cond);
-}
-
-static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
-{
- ssize_t ret;
-
- mutex_lock(&lock);
- ret = aiocb->ret;
- mutex_unlock(&lock);
-
- return ret;
-}
-
-static int qemu_paio_error(struct qemu_paiocb *aiocb)
-{
- ssize_t ret = qemu_paio_return(aiocb);
-
- if (ret < 0)
- ret = -ret;
- else
- ret = 0;
-
- return ret;
-}
-
-static int posix_aio_process_queue(void *opaque)
-{
- PosixAioState *s = opaque;
- struct qemu_paiocb *acb, **pacb;
- int ret;
- int result = 0;
- int async_context_id = get_async_context_id();
-
- for(;;) {
- pacb = &s->first_aio;
- for(;;) {
- acb = *pacb;
- if (!acb)
- return result;
-
- /* we're only interested in requests in the right context */
- if (acb->async_context_id != async_context_id) {
- pacb = &acb->next;
- continue;
- }
-
- ret = qemu_paio_error(acb);
- if (ret == ECANCELED) {
- /* remove the request */
- *pacb = acb->next;
- qemu_aio_release(acb);
- result = 1;
- } else if (ret != EINPROGRESS) {
- /* end of aio */
- if (ret == 0) {
- ret = qemu_paio_return(acb);
- if (ret == acb->aio_nbytes)
- ret = 0;
- else
- ret = -EINVAL;
- } else {
- ret = -ret;
- }
- /* remove the request */
- *pacb = acb->next;
- /* call the callback */
- acb->common.cb(acb->common.opaque, ret);
- qemu_aio_release(acb);
- result = 1;
- break;
- } else {
- pacb = &acb->next;
- }
- }
+ state = aiocb->state;
+ g_mutex_unlock(s->lock);
+
+ if (state == CANCELLED) {
+ return;
}
- return result;
-}
-
-static void posix_aio_read(void *opaque)
-{
- PosixAioState *s = opaque;
- ssize_t len;
-
- /* read all bytes from signal pipe */
- for (;;) {
- char bytes[16];
-
- len = read(s->rfd, bytes, sizeof(bytes));
- if (len == -1 && errno == EINTR)
- continue; /* try again */
- if (len == sizeof(bytes))
- continue; /* more to read */
+ switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+ case QEMU_AIO_READ:
+ case QEMU_AIO_WRITE:
+ ret = handle_aiocb_rw(aiocb);
+ break;
+ case QEMU_AIO_FLUSH:
+ ret = handle_aiocb_flush(aiocb);
+ break;
+ case QEMU_AIO_IOCTL:
+ ret = handle_aiocb_ioctl(aiocb);
+ break;
+ default:
+ fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+ ret = -EINVAL;
break;
}
- posix_aio_process_queue(s);
-}
-
-static int posix_aio_flush(void *opaque)
-{
- PosixAioState *s = opaque;
- return !!s->first_aio;
-}
-
-static PosixAioState *posix_aio_state;
+ aiocb->ret = ret;
+ g_mutex_lock(s->lock);
+ aiocb->state = COMPLETED;
+ g_mutex_unlock(s->lock);
-static void aio_signal_handler(int signum)
-{
- if (posix_aio_state) {
- char byte = 0;
- ssize_t ret;
-
- ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
- if (ret < 0 && errno != EAGAIN)
- die("write()");
- }
+ do {
+ len = write(s->wfd, &ch, sizeof(ch));
+ } while (len == -1 && errno == EINTR);
- qemu_service_io();
+ return;
}
-static void paio_remove(struct qemu_paiocb *acb)
+
+static void qemu_paio_submit(AioAiocb *aiocb)
{
- struct qemu_paiocb **pacb;
-
- /* remove the callback from the queue */
- pacb = &posix_aio_state->first_aio;
- for(;;) {
- if (*pacb == NULL) {
- fprintf(stderr, "paio_remove: aio request not found!\n");
- break;
- } else if (*pacb == acb) {
- *pacb = acb->next;
- qemu_aio_release(acb);
- break;
- }
- pacb = &(*pacb)->next;
- }
+ AioPool *s = &aio_pool;
+ aiocb->state = INACTIVE;
+ aiocb->async_context_id = get_async_context_id();
+ s->requests = g_list_append(s->requests, aiocb);
+ g_thread_pool_push(s->pool, aiocb, NULL);
}
-static void paio_cancel(BlockDriverAIOCB *blockacb)
+static void qemu_paio_cancel(BlockDriverAIOCB *acb)
{
- struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
- int active = 0;
-
- mutex_lock(&lock);
- if (!acb->active) {
- QTAILQ_REMOVE(&request_list, acb, node);
- acb->ret = -ECANCELED;
- } else if (acb->ret == -EINPROGRESS) {
- active = 1;
- }
- mutex_unlock(&lock);
+ AioAiocb *aiocb = container_of(acb, AioAiocb, common);
+ AioPool *s = &aio_pool;
- if (active) {
- /* fail safe: if the aio could not be canceled, we wait for
- it */
- while (qemu_paio_error(acb) == EINPROGRESS)
- ;
+ g_mutex_lock(s->lock);
+ if (aiocb->state == INACTIVE) {
+ aiocb->state = CANCELLED;
}
-
- paio_remove(acb);
+ g_mutex_unlock(s->lock);
+
}
static AIOPool raw_aio_pool = {
- .aiocb_size = sizeof(struct qemu_paiocb),
- .cancel = paio_cancel,
+ .aiocb_size = sizeof(AioAiocb),
+ .cancel = qemu_paio_cancel,
};
BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
BlockDriverCompletionFunc *cb, void *opaque, int type)
{
- struct qemu_paiocb *acb;
+ AioAiocb *acb;
acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
if (!acb)
return NULL;
acb->aio_type = type;
acb->aio_fildes = fd;
- acb->ev_signo = SIGUSR2;
- acb->async_context_id = get_async_context_id();
if (qiov) {
acb->aio_iov = qiov->iov;
@@ -582,9 +346,6 @@ BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
acb->aio_nbytes = nb_sectors * 512;
acb->aio_offset = sector_num * 512;
- acb->next = posix_aio_state->first_aio;
- posix_aio_state->first_aio = acb;
-
trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
qemu_paio_submit(acb);
return &acb->common;
@@ -594,68 +355,114 @@ BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
unsigned long int req, void *buf,
BlockDriverCompletionFunc *cb, void *opaque)
{
- struct qemu_paiocb *acb;
+ AioAiocb *acb;
acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
if (!acb)
return NULL;
acb->aio_type = QEMU_AIO_IOCTL;
acb->aio_fildes = fd;
- acb->ev_signo = SIGUSR2;
- acb->async_context_id = get_async_context_id();
acb->aio_offset = 0;
acb->aio_ioctl_buf = buf;
acb->aio_ioctl_cmd = req;
- acb->next = posix_aio_state->first_aio;
- posix_aio_state->first_aio = acb;
-
qemu_paio_submit(acb);
return &acb->common;
}
-int paio_init(void)
+static int paio_process_queue(void *opaque)
{
- struct sigaction act;
- PosixAioState *s;
- int fds[2];
- int ret;
+ AioPool *s = opaque;
+ GList *i, *next_i;
+ GList *completed_requests = NULL;
+ int async_context_id = get_async_context_id();
+ int did_work = 0;
- if (posix_aio_state)
- return 0;
+ /* Search the list to build a list of completed requests, we do
+ * this as it's own pass so that we minimize the time we're holding
+ * the shared lock.
+ */
+ g_mutex_lock(s->lock);
+ for (i = s->requests; i != NULL; i = next_i) {
+ AioAiocb *aiocb = i->data;
+ next_i = g_list_next(i);
+
+ /* don't complete a request that isn't part of this async context */
+ if (aiocb->async_context_id != async_context_id) {
+ continue;
+ }
- s = qemu_malloc(sizeof(PosixAioState));
+ if (aiocb->state == CANCELLED || aiocb->state == COMPLETED) {
+ s->requests = g_list_remove_link(s->requests, i);
+ completed_requests = g_list_concat(completed_requests, i);
+ }
+ }
+ g_mutex_unlock(s->lock);
+
+ /* Dispatch any completed requests */
+ for (i = completed_requests; i != NULL; i = g_list_next(i)) {
+ AioAiocb *aiocb = i->data;
+ if (aiocb->state == COMPLETED) {
+ if (aiocb->ret == aiocb->aio_nbytes) {
+ aiocb->ret = 0;
+ }
+ aiocb->common.cb(aiocb->common.opaque, aiocb->ret);
+ did_work = 1;
+ }
+ qemu_aio_release(aiocb);
+ }
- sigfillset(&act.sa_mask);
- act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
- act.sa_handler = aio_signal_handler;
- sigaction(SIGUSR2, &act, NULL);
+ g_list_free(completed_requests);
+
+ return did_work;
+}
- s->first_aio = NULL;
- if (qemu_pipe(fds) == -1) {
- fprintf(stderr, "failed to create pipe\n");
- return -1;
+static int paio_io_flush(void *opaque)
+{
+ AioPool *s = opaque;
+ if (s->requests == NULL) {
+ return 0;
}
+ return 1;
+}
- s->rfd = fds[0];
- s->wfd = fds[1];
+static void paio_complete(void *opaque)
+{
+ AioPool *s = opaque;
+ char buffer[1024];
+ ssize_t len;
- fcntl(s->rfd, F_SETFL, O_NONBLOCK);
- fcntl(s->wfd, F_SETFL, O_NONBLOCK);
+ /* Drain event queue */
+ do {
+ len = read(s->rfd, buffer, sizeof(buffer));
+ } while (len == -1 && errno == EINTR);
+
+ if (len == -1 && errno == EAGAIN) {
+ return;
+ }
+
+ paio_process_queue(s);
+}
- qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
- posix_aio_process_queue, s);
+int paio_init(void)
+{
+ AioPool *s = &aio_pool;
+ int fds[2];
- ret = pthread_attr_init(&attr);
- if (ret)
- die2(ret, "pthread_attr_init");
+ if (pipe(fds) == -1) {
+ return -errno;
+ }
- ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- if (ret)
- die2(ret, "pthread_attr_setdetachstate");
+ s->pool = g_thread_pool_new(aio_routine, s, 64, FALSE, NULL);
+ s->rfd = fds[0];
+ s->wfd = fds[1];
+ s->requests = NULL;
+ s->lock = g_mutex_new();
- QTAILQ_INIT(&request_list);
+ fcntl(s->wfd, F_SETFL, O_NONBLOCK);
+ fcntl(s->rfd, F_SETFL, O_NONBLOCK);
+ qemu_aio_set_fd_handler(s->rfd, paio_complete, NULL,
+ paio_io_flush, paio_process_queue, s);
- posix_aio_state = s;
return 0;
}
@@ -111,3 +111,11 @@ int qemu_set_fd_handler2(int fd,
{
return 0;
}
+
+int qemu_set_fd_handler(int fd,
+ IOHandler *fd_read,
+ IOHandler *fd_write,
+ void *opaque)
+{
+ return 0;
+}
--
1.7.0.4