Message ID | 1362584735-30911-6-git-send-email-stefanha@redhat.com |
---|---|
State | New |
Headers | show |
Il 06/03/2013 16:45, Stefan Hajnoczi ha scritto: > Now that each AioContext has a ThreadPool and the main loop AioContext > can be fetched with qemu_get_aio_context(), we can eliminate the concept > of a global thread pool from thread-pool.c. > > The submit functions must take a ThreadPool* argument. This is certainly ok for thread-pool.c. For raw-posix and raw-win32, what about adding already a bdrv_get_aio_context() function and using that in paio_submit? Is it putting the cart before the horse? Paolo > block/raw-posix.c and block/raw-win32.c use > aio_get_thread_pool(qemu_get_aio_context()) to fetch the main loop's > ThreadPool. > > tests/test-thread-pool.c must be updated to reflect the new > thread_pool_submit() function prototypes. > > Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com> > --- > block/raw-posix.c | 8 ++++++-- > block/raw-win32.c | 4 +++- > include/block/thread-pool.h | 10 ++++++---- > tests/test-thread-pool.c | 44 +++++++++++++++++++++----------------------- > thread-pool.c | 23 +++++++---------------- > 5 files changed, 43 insertions(+), 46 deletions(-) > > diff --git a/block/raw-posix.c b/block/raw-posix.c > index 4dfdf98..01e5ae8 100644 > --- a/block/raw-posix.c > +++ b/block/raw-posix.c > @@ -750,6 +750,7 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd, > BlockDriverCompletionFunc *cb, void *opaque, int type) > { > RawPosixAIOData *acb = g_slice_new(RawPosixAIOData); > + ThreadPool *pool; > > acb->bs = bs; > acb->aio_type = type; > @@ -763,7 +764,8 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd, > acb->aio_offset = sector_num * 512; > > trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); > - return thread_pool_submit_aio(aio_worker, acb, cb, opaque); > + pool = aio_get_thread_pool(qemu_get_aio_context()); > + return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); > } > > static BlockDriverAIOCB *raw_aio_submit(BlockDriverState *bs, > @@ -1413,6 +1415,7 @@ static BlockDriverAIOCB *hdev_aio_ioctl(BlockDriverState *bs, > { > BDRVRawState *s = bs->opaque; > RawPosixAIOData *acb; > + ThreadPool *pool; > > if (fd_open(bs) < 0) > return NULL; > @@ -1424,7 +1427,8 @@ static BlockDriverAIOCB *hdev_aio_ioctl(BlockDriverState *bs, > acb->aio_offset = 0; > acb->aio_ioctl_buf = buf; > acb->aio_ioctl_cmd = req; > - return thread_pool_submit_aio(aio_worker, acb, cb, opaque); > + pool = aio_get_thread_pool(qemu_get_aio_context()); > + return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); > } > > #elif defined(__FreeBSD__) || defined(__FreeBSD_kernel__) > diff --git a/block/raw-win32.c b/block/raw-win32.c > index b89ac19..515614b 100644 > --- a/block/raw-win32.c > +++ b/block/raw-win32.c > @@ -144,6 +144,7 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile, > BlockDriverCompletionFunc *cb, void *opaque, int type) > { > RawWin32AIOData *acb = g_slice_new(RawWin32AIOData); > + ThreadPool *pool; > > acb->bs = bs; > acb->hfile = hfile; > @@ -157,7 +158,8 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile, > acb->aio_offset = sector_num * 512; > > trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); > - return thread_pool_submit_aio(aio_worker, acb, cb, opaque); > + pool = aio_get_thread_pool(qemu_get_aio_context()); > + return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); > } > > int qemu_ftruncate64(int fd, int64_t length) > diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h > index e1453c6..32afcdd 100644 > --- a/include/block/thread-pool.h > +++ b/include/block/thread-pool.h > @@ -31,9 +31,11 @@ typedef struct ThreadPool ThreadPool; > ThreadPool *thread_pool_new(struct AioContext *ctx); > void thread_pool_free(ThreadPool *pool); > > -BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg, > - BlockDriverCompletionFunc *cb, void *opaque); > -int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg); > -void thread_pool_submit(ThreadPoolFunc *func, void *arg); > +BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool, > + ThreadPoolFunc *func, void *arg, > + BlockDriverCompletionFunc *cb, void *opaque); > +int coroutine_fn thread_pool_submit_co(ThreadPool *pool, > + ThreadPoolFunc *func, void *arg); > +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg); > > #endif > diff --git a/tests/test-thread-pool.c b/tests/test-thread-pool.c > index 9998e03..22915aa 100644 > --- a/tests/test-thread-pool.c > +++ b/tests/test-thread-pool.c > @@ -4,6 +4,8 @@ > #include "block/thread-pool.h" > #include "block/block.h" > > +static AioContext *ctx; > +static ThreadPool *pool; > static int active; > > typedef struct { > @@ -38,19 +40,10 @@ static void done_cb(void *opaque, int ret) > active--; > } > > -/* A non-blocking poll of the main AIO context (we cannot use aio_poll > - * because we do not know the AioContext). > - */ > -static void qemu_aio_wait_nonblocking(void) > -{ > - qemu_notify_event(); > - qemu_aio_wait(); > -} > - > /* Wait until all aio and bh activity has finished */ > static void qemu_aio_wait_all(void) > { > - while (qemu_aio_wait()) { > + while (aio_poll(ctx, true)) { > /* Do nothing */ > } > } > @@ -58,7 +51,7 @@ static void qemu_aio_wait_all(void) > static void test_submit(void) > { > WorkerTestData data = { .n = 0 }; > - thread_pool_submit(worker_cb, &data); > + thread_pool_submit(pool, worker_cb, &data); > qemu_aio_wait_all(); > g_assert_cmpint(data.n, ==, 1); > } > @@ -66,7 +59,8 @@ static void test_submit(void) > static void test_submit_aio(void) > { > WorkerTestData data = { .n = 0, .ret = -EINPROGRESS }; > - data.aiocb = thread_pool_submit_aio(worker_cb, &data, done_cb, &data); > + data.aiocb = thread_pool_submit_aio(pool, worker_cb, &data, > + done_cb, &data); > > /* The callbacks are not called until after the first wait. */ > active = 1; > @@ -84,7 +78,7 @@ static void co_test_cb(void *opaque) > active = 1; > data->n = 0; > data->ret = -EINPROGRESS; > - thread_pool_submit_co(worker_cb, data); > + thread_pool_submit_co(pool, worker_cb, data); > > /* The test continues in test_submit_co, after qemu_coroutine_enter... */ > > @@ -126,12 +120,12 @@ static void test_submit_many(void) > for (i = 0; i < 100; i++) { > data[i].n = 0; > data[i].ret = -EINPROGRESS; > - thread_pool_submit_aio(worker_cb, &data[i], done_cb, &data[i]); > + thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i]); > } > > active = 100; > while (active > 0) { > - qemu_aio_wait(); > + aio_poll(ctx, true); > } > for (i = 0; i < 100; i++) { > g_assert_cmpint(data[i].n, ==, 1); > @@ -154,7 +148,7 @@ static void test_cancel(void) > for (i = 0; i < 100; i++) { > data[i].n = 0; > data[i].ret = -EINPROGRESS; > - data[i].aiocb = thread_pool_submit_aio(long_cb, &data[i], > + data[i].aiocb = thread_pool_submit_aio(pool, long_cb, &data[i], > done_cb, &data[i]); > } > > @@ -162,7 +156,8 @@ static void test_cancel(void) > * run, but do not waste too much time... > */ > active = 100; > - qemu_aio_wait_nonblocking(); > + aio_notify(ctx); > + aio_poll(ctx, false); > > /* Wait some time for the threads to start, with some sanity > * testing on the behavior of the scheduler... > @@ -208,11 +203,10 @@ static void test_cancel(void) > > int main(int argc, char **argv) > { > - /* These should be removed once each AioContext has its thread pool. > - * The test should create its own AioContext. > - */ > - qemu_init_main_loop(); > - bdrv_init(); > + int ret; > + > + ctx = aio_context_new(); > + pool = aio_get_thread_pool(ctx); > > g_test_init(&argc, &argv, NULL); > g_test_add_func("/thread-pool/submit", test_submit); > @@ -220,5 +214,9 @@ int main(int argc, char **argv) > g_test_add_func("/thread-pool/submit-co", test_submit_co); > g_test_add_func("/thread-pool/submit-many", test_submit_many); > g_test_add_func("/thread-pool/cancel", test_cancel); > - return g_test_run(); > + > + ret = g_test_run(); > + > + aio_context_unref(ctx); > + return ret; > } > diff --git a/thread-pool.c b/thread-pool.c > index 7a07408..e0e0a47 100644 > --- a/thread-pool.c > +++ b/thread-pool.c > @@ -78,9 +78,6 @@ struct ThreadPool { > bool stopping; > }; > > -/* Currently there is only one thread pool instance. */ > -static ThreadPool global_pool; > - > static void *worker_thread(void *opaque) > { > ThreadPool *pool = opaque; > @@ -239,10 +236,10 @@ static const AIOCBInfo thread_pool_aiocb_info = { > .cancel = thread_pool_cancel, > }; > > -BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg, > +BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool, > + ThreadPoolFunc *func, void *arg, > BlockDriverCompletionFunc *cb, void *opaque) > { > - ThreadPool *pool = &global_pool; > ThreadPoolElement *req; > > req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque); > @@ -278,18 +275,19 @@ static void thread_pool_co_cb(void *opaque, int ret) > qemu_coroutine_enter(co->co, NULL); > } > > -int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg) > +int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func, > + void *arg) > { > ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS }; > assert(qemu_in_coroutine()); > - thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc); > + thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc); > qemu_coroutine_yield(); > return tpc.ret; > } > > -void thread_pool_submit(ThreadPoolFunc *func, void *arg) > +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg) > { > - thread_pool_submit_aio(func, arg, NULL, NULL); > + thread_pool_submit_aio(pool, func, arg, NULL, NULL); > } > > static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) > @@ -363,10 +361,3 @@ void thread_pool_free(ThreadPool *pool) > event_notifier_cleanup(&pool->notifier); > g_free(pool); > } > - > -static void thread_pool_init(void) > -{ > - thread_pool_init_one(&global_pool, NULL); > -} > - > -block_init(thread_pool_init) >
On Wed, Mar 06, 2013 at 05:35:09PM +0100, Paolo Bonzini wrote: > Il 06/03/2013 16:45, Stefan Hajnoczi ha scritto: > > Now that each AioContext has a ThreadPool and the main loop AioContext > > can be fetched with qemu_get_aio_context(), we can eliminate the concept > > of a global thread pool from thread-pool.c. > > > > The submit functions must take a ThreadPool* argument. > > This is certainly ok for thread-pool.c. For raw-posix and raw-win32, > what about adding already a bdrv_get_aio_context() function and using > that in paio_submit? Is it putting the cart before the horse? Thanks, we might as well do that. Then I won't have to go back and modify block/raw-posix.c and block/raw-win32.c in the next patch series which lets BlockDriverState bind to an AioContext. Stefan
diff --git a/block/raw-posix.c b/block/raw-posix.c index 4dfdf98..01e5ae8 100644 --- a/block/raw-posix.c +++ b/block/raw-posix.c @@ -750,6 +750,7 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd, BlockDriverCompletionFunc *cb, void *opaque, int type) { RawPosixAIOData *acb = g_slice_new(RawPosixAIOData); + ThreadPool *pool; acb->bs = bs; acb->aio_type = type; @@ -763,7 +764,8 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd, acb->aio_offset = sector_num * 512; trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); - return thread_pool_submit_aio(aio_worker, acb, cb, opaque); + pool = aio_get_thread_pool(qemu_get_aio_context()); + return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); } static BlockDriverAIOCB *raw_aio_submit(BlockDriverState *bs, @@ -1413,6 +1415,7 @@ static BlockDriverAIOCB *hdev_aio_ioctl(BlockDriverState *bs, { BDRVRawState *s = bs->opaque; RawPosixAIOData *acb; + ThreadPool *pool; if (fd_open(bs) < 0) return NULL; @@ -1424,7 +1427,8 @@ static BlockDriverAIOCB *hdev_aio_ioctl(BlockDriverState *bs, acb->aio_offset = 0; acb->aio_ioctl_buf = buf; acb->aio_ioctl_cmd = req; - return thread_pool_submit_aio(aio_worker, acb, cb, opaque); + pool = aio_get_thread_pool(qemu_get_aio_context()); + return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); } #elif defined(__FreeBSD__) || defined(__FreeBSD_kernel__) diff --git a/block/raw-win32.c b/block/raw-win32.c index b89ac19..515614b 100644 --- a/block/raw-win32.c +++ b/block/raw-win32.c @@ -144,6 +144,7 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile, BlockDriverCompletionFunc *cb, void *opaque, int type) { RawWin32AIOData *acb = g_slice_new(RawWin32AIOData); + ThreadPool *pool; acb->bs = bs; acb->hfile = hfile; @@ -157,7 +158,8 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile, acb->aio_offset = sector_num * 512; trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); - return thread_pool_submit_aio(aio_worker, acb, cb, opaque); + pool = aio_get_thread_pool(qemu_get_aio_context()); + return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); } int qemu_ftruncate64(int fd, int64_t length) diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h index e1453c6..32afcdd 100644 --- a/include/block/thread-pool.h +++ b/include/block/thread-pool.h @@ -31,9 +31,11 @@ typedef struct ThreadPool ThreadPool; ThreadPool *thread_pool_new(struct AioContext *ctx); void thread_pool_free(ThreadPool *pool); -BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg, - BlockDriverCompletionFunc *cb, void *opaque); -int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg); -void thread_pool_submit(ThreadPoolFunc *func, void *arg); +BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool, + ThreadPoolFunc *func, void *arg, + BlockDriverCompletionFunc *cb, void *opaque); +int coroutine_fn thread_pool_submit_co(ThreadPool *pool, + ThreadPoolFunc *func, void *arg); +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg); #endif diff --git a/tests/test-thread-pool.c b/tests/test-thread-pool.c index 9998e03..22915aa 100644 --- a/tests/test-thread-pool.c +++ b/tests/test-thread-pool.c @@ -4,6 +4,8 @@ #include "block/thread-pool.h" #include "block/block.h" +static AioContext *ctx; +static ThreadPool *pool; static int active; typedef struct { @@ -38,19 +40,10 @@ static void done_cb(void *opaque, int ret) active--; } -/* A non-blocking poll of the main AIO context (we cannot use aio_poll - * because we do not know the AioContext). - */ -static void qemu_aio_wait_nonblocking(void) -{ - qemu_notify_event(); - qemu_aio_wait(); -} - /* Wait until all aio and bh activity has finished */ static void qemu_aio_wait_all(void) { - while (qemu_aio_wait()) { + while (aio_poll(ctx, true)) { /* Do nothing */ } } @@ -58,7 +51,7 @@ static void qemu_aio_wait_all(void) static void test_submit(void) { WorkerTestData data = { .n = 0 }; - thread_pool_submit(worker_cb, &data); + thread_pool_submit(pool, worker_cb, &data); qemu_aio_wait_all(); g_assert_cmpint(data.n, ==, 1); } @@ -66,7 +59,8 @@ static void test_submit(void) static void test_submit_aio(void) { WorkerTestData data = { .n = 0, .ret = -EINPROGRESS }; - data.aiocb = thread_pool_submit_aio(worker_cb, &data, done_cb, &data); + data.aiocb = thread_pool_submit_aio(pool, worker_cb, &data, + done_cb, &data); /* The callbacks are not called until after the first wait. */ active = 1; @@ -84,7 +78,7 @@ static void co_test_cb(void *opaque) active = 1; data->n = 0; data->ret = -EINPROGRESS; - thread_pool_submit_co(worker_cb, data); + thread_pool_submit_co(pool, worker_cb, data); /* The test continues in test_submit_co, after qemu_coroutine_enter... */ @@ -126,12 +120,12 @@ static void test_submit_many(void) for (i = 0; i < 100; i++) { data[i].n = 0; data[i].ret = -EINPROGRESS; - thread_pool_submit_aio(worker_cb, &data[i], done_cb, &data[i]); + thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i]); } active = 100; while (active > 0) { - qemu_aio_wait(); + aio_poll(ctx, true); } for (i = 0; i < 100; i++) { g_assert_cmpint(data[i].n, ==, 1); @@ -154,7 +148,7 @@ static void test_cancel(void) for (i = 0; i < 100; i++) { data[i].n = 0; data[i].ret = -EINPROGRESS; - data[i].aiocb = thread_pool_submit_aio(long_cb, &data[i], + data[i].aiocb = thread_pool_submit_aio(pool, long_cb, &data[i], done_cb, &data[i]); } @@ -162,7 +156,8 @@ static void test_cancel(void) * run, but do not waste too much time... */ active = 100; - qemu_aio_wait_nonblocking(); + aio_notify(ctx); + aio_poll(ctx, false); /* Wait some time for the threads to start, with some sanity * testing on the behavior of the scheduler... @@ -208,11 +203,10 @@ static void test_cancel(void) int main(int argc, char **argv) { - /* These should be removed once each AioContext has its thread pool. - * The test should create its own AioContext. - */ - qemu_init_main_loop(); - bdrv_init(); + int ret; + + ctx = aio_context_new(); + pool = aio_get_thread_pool(ctx); g_test_init(&argc, &argv, NULL); g_test_add_func("/thread-pool/submit", test_submit); @@ -220,5 +214,9 @@ int main(int argc, char **argv) g_test_add_func("/thread-pool/submit-co", test_submit_co); g_test_add_func("/thread-pool/submit-many", test_submit_many); g_test_add_func("/thread-pool/cancel", test_cancel); - return g_test_run(); + + ret = g_test_run(); + + aio_context_unref(ctx); + return ret; } diff --git a/thread-pool.c b/thread-pool.c index 7a07408..e0e0a47 100644 --- a/thread-pool.c +++ b/thread-pool.c @@ -78,9 +78,6 @@ struct ThreadPool { bool stopping; }; -/* Currently there is only one thread pool instance. */ -static ThreadPool global_pool; - static void *worker_thread(void *opaque) { ThreadPool *pool = opaque; @@ -239,10 +236,10 @@ static const AIOCBInfo thread_pool_aiocb_info = { .cancel = thread_pool_cancel, }; -BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg, +BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool, + ThreadPoolFunc *func, void *arg, BlockDriverCompletionFunc *cb, void *opaque) { - ThreadPool *pool = &global_pool; ThreadPoolElement *req; req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque); @@ -278,18 +275,19 @@ static void thread_pool_co_cb(void *opaque, int ret) qemu_coroutine_enter(co->co, NULL); } -int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg) +int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func, + void *arg) { ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS }; assert(qemu_in_coroutine()); - thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc); + thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc); qemu_coroutine_yield(); return tpc.ret; } -void thread_pool_submit(ThreadPoolFunc *func, void *arg) +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg) { - thread_pool_submit_aio(func, arg, NULL, NULL); + thread_pool_submit_aio(pool, func, arg, NULL, NULL); } static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) @@ -363,10 +361,3 @@ void thread_pool_free(ThreadPool *pool) event_notifier_cleanup(&pool->notifier); g_free(pool); } - -static void thread_pool_init(void) -{ - thread_pool_init_one(&global_pool, NULL); -} - -block_init(thread_pool_init)
Now that each AioContext has a ThreadPool and the main loop AioContext can be fetched with qemu_get_aio_context(), we can eliminate the concept of a global thread pool from thread-pool.c. The submit functions must take a ThreadPool* argument. block/raw-posix.c and block/raw-win32.c use aio_get_thread_pool(qemu_get_aio_context()) to fetch the main loop's ThreadPool. tests/test-thread-pool.c must be updated to reflect the new thread_pool_submit() function prototypes. Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com> --- block/raw-posix.c | 8 ++++++-- block/raw-win32.c | 4 +++- include/block/thread-pool.h | 10 ++++++---- tests/test-thread-pool.c | 44 +++++++++++++++++++++----------------------- thread-pool.c | 23 +++++++---------------- 5 files changed, 43 insertions(+), 46 deletions(-)