Message ID | 20210510085941.22769-7-eesposit@redhat.com |
---|---|
State | New |
Headers | show |
Series | block-copy: make helper APIs thread safe | expand |
10.05.2021 11:59, Emanuele Giuseppe Esposito wrote: > Divide the fields in AioTaskPool in IN and Status, and > introduce a CoQueue instead of .wait to take care of suspending > and resuming the calling coroutine, and a lock to protect the > busy_tasks counter accesses and the AioTask .ret field. "and" in commit message is almost always a sign, that there should be several commits :) Please, do at least refactoring to drop "main_co" in separate of adding a mutex. Hmm, actually, that was done in Denis's series "[PATCH v8 0/6] block: seriously improve savevm/loadvm performance". (https://patchew.org/QEMU/20200709132644.28470-1-den@openvz.org/) Probably, you could reuse patches 01,02 of it. (add Den to cc) > > Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com> > --- > block/aio_task.c | 63 ++++++++++++++++++++++++---------------- > include/block/aio_task.h | 2 +- > 2 files changed, 39 insertions(+), 26 deletions(-) > > diff --git a/block/aio_task.c b/block/aio_task.c > index 88989fa248..7ac6b5dd72 100644 > --- a/block/aio_task.c > +++ b/block/aio_task.c > @@ -27,62 +27,70 @@ > #include "block/aio_task.h" > > struct AioTaskPool { > - Coroutine *main_co; > - int status; > + /* IN: just set in aio_task_pool_new and never modified */ > int max_busy_tasks; > + > + /* Status: either atomic or protected by the lock */ > + int status; > int busy_tasks; > - bool waiting; > + CoQueue queue; > + CoMutex lock; > }; > > static void coroutine_fn aio_task_co(void *opaque) > { > + int ret; > AioTask *task = opaque; > AioTaskPool *pool = task->pool; > > - assert(pool->busy_tasks < pool->max_busy_tasks); > - pool->busy_tasks++; > + WITH_QEMU_LOCK_GUARD(&pool->lock) { > + assert(pool->busy_tasks < pool->max_busy_tasks); > + pool->busy_tasks++; > > - task->ret = task->func(task); > + ret = task->func(task); > + task->ret = ret; > > - pool->busy_tasks--; > + pool->busy_tasks--; > + } > > - if (task->ret < 0 && pool->status == 0) { > - pool->status = task->ret; > + if (ret < 0) { > + qatomic_cmpxchg(&pool->status, 0, ret); > } Can we just do it inside critical section above and avoid extra cmpxchg? We'll need just qatomic_set as a pair to qatomic_read() > > g_free(task); > > - if (pool->waiting) { > - pool->waiting = false; > - aio_co_wake(pool->main_co); > - } > + qemu_co_queue_next(&pool->queue); this call doesn't need mutex protection? Then we should modify comment insid AioTaskPool structure. Anyway, I think it's simpler to just have one QEMU_MUTEX_GUARD() for the whole function. > } > > -void coroutine_fn aio_task_pool_wait_one(AioTaskPool *pool) > +/* Called with lock held */ again, such things usually called _locked(). > +static void coroutine_fn aio_task_pool_wait_one_unlocked(AioTaskPool *pool) > { > assert(pool->busy_tasks > 0); > - assert(qemu_coroutine_self() == pool->main_co); > - > - pool->waiting = true; > - qemu_coroutine_yield(); > - > - assert(!pool->waiting); > + qemu_co_queue_wait(&pool->queue, &pool->lock); > assert(pool->busy_tasks < pool->max_busy_tasks); > } > > +void coroutine_fn aio_task_pool_wait_one(AioTaskPool *pool) > +{ > + QEMU_LOCK_GUARD(&pool->lock); > + aio_task_pool_wait_one_unlocked(pool); > +} > + > void coroutine_fn aio_task_pool_wait_slot(AioTaskPool *pool) > { > + QEMU_LOCK_GUARD(&pool->lock); > if (pool->busy_tasks < pool->max_busy_tasks) { > return; > } > > - aio_task_pool_wait_one(pool); > + aio_task_pool_wait_one_unlocked(pool); > } > > void coroutine_fn aio_task_pool_wait_all(AioTaskPool *pool) > { > + QEMU_LOCK_GUARD(&pool->lock); > while (pool->busy_tasks > 0) { > - aio_task_pool_wait_one(pool); > + aio_task_pool_wait_one_unlocked(pool); > } > } > > @@ -98,8 +106,8 @@ AioTaskPool *coroutine_fn aio_task_pool_new(int max_busy_tasks) > { > AioTaskPool *pool = g_new0(AioTaskPool, 1); > > - pool->main_co = qemu_coroutine_self(); > pool->max_busy_tasks = max_busy_tasks; > + qemu_co_queue_init(&pool->queue); > > return pool; > } > @@ -115,10 +123,15 @@ int aio_task_pool_status(AioTaskPool *pool) > return 0; /* Sugar for lazy allocation of aio pool */ > } > > - return pool->status; > + return qatomic_read(&pool->status); > } > > bool aio_task_pool_empty(AioTaskPool *pool) > { > - return pool->busy_tasks == 0; > + int tasks; > + > + qemu_co_mutex_lock(&pool->lock); > + tasks = pool->busy_tasks; > + qemu_co_mutex_unlock(&pool->lock); > + return tasks == 0; > } > diff --git a/include/block/aio_task.h b/include/block/aio_task.h > index 50bc1e1817..b22a4310aa 100644 > --- a/include/block/aio_task.h > +++ b/include/block/aio_task.h > @@ -33,7 +33,7 @@ typedef int coroutine_fn (*AioTaskFunc)(AioTask *task); > struct AioTask { > AioTaskPool *pool; > AioTaskFunc func; > - int ret; > + int ret; /* atomic */ > }; > > AioTaskPool *coroutine_fn aio_task_pool_new(int max_busy_tasks); >
On 10/05/21 13:56, Vladimir Sementsov-Ogievskiy wrote: >> >> + } >> - if (task->ret < 0 && pool->status == 0) { >> - pool->status = task->ret; >> + if (ret < 0) { >> + qatomic_cmpxchg(&pool->status, 0, ret); >> } > > Can we just do it inside critical section above and avoid extra cmpxchg? > We'll need just qatomic_set as a pair to qatomic_read() Good idea. >> g_free(task); >> - if (pool->waiting) { >> - pool->waiting = false; >> - aio_co_wake(pool->main_co); >> - } >> + qemu_co_queue_next(&pool->queue); > > this call doesn't need mutex protection? It does indeed. I second the idea of "stealing" Denis's two patches to block/aio_task and only adding the mutex (plus qatomic_read/set) here. Paolo > Then we should modify comment > insid AioTaskPool structure. > > Anyway, I think it's simpler to just have one QEMU_MUTEX_GUARD() for the > whole function.
On Mon, May 10, 2021 at 10:59:41AM +0200, Emanuele Giuseppe Esposito wrote: > Divide the fields in AioTaskPool in IN and Status, and > introduce a CoQueue instead of .wait to take care of suspending > and resuming the calling coroutine, and a lock to protect the > busy_tasks counter accesses and the AioTask .ret field. The thread-safety concerns with the aio_task.h API are unclear to me. The API is designed to have a "main" coroutine that adds task functions to the pool and waits for them to complete. Task functions execute in coroutines (up to the pool's max_busy_tasks limit). It seems like the API was designed to be called only from its main coroutine so why make everything thread-safe? Is there a caller that shares an AioTaskPool between threads? Or will the task functions switch threads somehow? What exactly is the new thread-safety model? Please document it. Unfortunately aio_task.h doesn't have doc comments already but it will be necessary if there are thread-safety concerns.
diff --git a/block/aio_task.c b/block/aio_task.c index 88989fa248..7ac6b5dd72 100644 --- a/block/aio_task.c +++ b/block/aio_task.c @@ -27,62 +27,70 @@ #include "block/aio_task.h" struct AioTaskPool { - Coroutine *main_co; - int status; + /* IN: just set in aio_task_pool_new and never modified */ int max_busy_tasks; + + /* Status: either atomic or protected by the lock */ + int status; int busy_tasks; - bool waiting; + CoQueue queue; + CoMutex lock; }; static void coroutine_fn aio_task_co(void *opaque) { + int ret; AioTask *task = opaque; AioTaskPool *pool = task->pool; - assert(pool->busy_tasks < pool->max_busy_tasks); - pool->busy_tasks++; + WITH_QEMU_LOCK_GUARD(&pool->lock) { + assert(pool->busy_tasks < pool->max_busy_tasks); + pool->busy_tasks++; - task->ret = task->func(task); + ret = task->func(task); + task->ret = ret; - pool->busy_tasks--; + pool->busy_tasks--; + } - if (task->ret < 0 && pool->status == 0) { - pool->status = task->ret; + if (ret < 0) { + qatomic_cmpxchg(&pool->status, 0, ret); } g_free(task); - if (pool->waiting) { - pool->waiting = false; - aio_co_wake(pool->main_co); - } + qemu_co_queue_next(&pool->queue); } -void coroutine_fn aio_task_pool_wait_one(AioTaskPool *pool) +/* Called with lock held */ +static void coroutine_fn aio_task_pool_wait_one_unlocked(AioTaskPool *pool) { assert(pool->busy_tasks > 0); - assert(qemu_coroutine_self() == pool->main_co); - - pool->waiting = true; - qemu_coroutine_yield(); - - assert(!pool->waiting); + qemu_co_queue_wait(&pool->queue, &pool->lock); assert(pool->busy_tasks < pool->max_busy_tasks); } +void coroutine_fn aio_task_pool_wait_one(AioTaskPool *pool) +{ + QEMU_LOCK_GUARD(&pool->lock); + aio_task_pool_wait_one_unlocked(pool); +} + void coroutine_fn aio_task_pool_wait_slot(AioTaskPool *pool) { + QEMU_LOCK_GUARD(&pool->lock); if (pool->busy_tasks < pool->max_busy_tasks) { return; } - aio_task_pool_wait_one(pool); + aio_task_pool_wait_one_unlocked(pool); } void coroutine_fn aio_task_pool_wait_all(AioTaskPool *pool) { + QEMU_LOCK_GUARD(&pool->lock); while (pool->busy_tasks > 0) { - aio_task_pool_wait_one(pool); + aio_task_pool_wait_one_unlocked(pool); } } @@ -98,8 +106,8 @@ AioTaskPool *coroutine_fn aio_task_pool_new(int max_busy_tasks) { AioTaskPool *pool = g_new0(AioTaskPool, 1); - pool->main_co = qemu_coroutine_self(); pool->max_busy_tasks = max_busy_tasks; + qemu_co_queue_init(&pool->queue); return pool; } @@ -115,10 +123,15 @@ int aio_task_pool_status(AioTaskPool *pool) return 0; /* Sugar for lazy allocation of aio pool */ } - return pool->status; + return qatomic_read(&pool->status); } bool aio_task_pool_empty(AioTaskPool *pool) { - return pool->busy_tasks == 0; + int tasks; + + qemu_co_mutex_lock(&pool->lock); + tasks = pool->busy_tasks; + qemu_co_mutex_unlock(&pool->lock); + return tasks == 0; } diff --git a/include/block/aio_task.h b/include/block/aio_task.h index 50bc1e1817..b22a4310aa 100644 --- a/include/block/aio_task.h +++ b/include/block/aio_task.h @@ -33,7 +33,7 @@ typedef int coroutine_fn (*AioTaskFunc)(AioTask *task); struct AioTask { AioTaskPool *pool; AioTaskFunc func; - int ret; + int ret; /* atomic */ }; AioTaskPool *coroutine_fn aio_task_pool_new(int max_busy_tasks);
Divide the fields in AioTaskPool in IN and Status, and introduce a CoQueue instead of .wait to take care of suspending and resuming the calling coroutine, and a lock to protect the busy_tasks counter accesses and the AioTask .ret field. Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com> --- block/aio_task.c | 63 ++++++++++++++++++++++++---------------- include/block/aio_task.h | 2 +- 2 files changed, 39 insertions(+), 26 deletions(-)