diff mbox series

[v11,13/21] jobs: protect job.aio_context with BQL and job_mutex

Message ID 20220826132104.3678958-14-eesposit@redhat.com
State New
Headers show
Series job: replace AioContext lock with job_mutex | expand

Commit Message

Emanuele Giuseppe Esposito Aug. 26, 2022, 1:20 p.m. UTC
In order to make it thread safe, implement a "fake rwlock",
where we allow reads under BQL *or* job_mutex held, but
writes only under BQL *and* job_mutex.

The only write we have is in child_job_set_aio_ctx, which always
happens under drain (so the job is paused).
For this reason, introduce job_set_aio_context and make sure that
the context is set under BQL, job_mutex and drain.
Also make sure all other places where the aiocontext is read
are protected.

The reads in commit.c and mirror.c are actually safe, because always
done under BQL.

Note: at this stage, job_{lock/unlock} and job lock guard macros
are *nop*.

Suggested-by: Paolo Bonzini <pbonzini@redhat.com>
Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
---
 block/replication.c |  7 +++++--
 blockjob.c          |  3 ++-
 include/qemu/job.h  | 23 ++++++++++++++++++++---
 job.c               | 12 ++++++++++++
 4 files changed, 39 insertions(+), 6 deletions(-)

Comments

Vladimir Sementsov-Ogievskiy Sept. 14, 2022, 1:25 p.m. UTC | #1
On 8/26/22 16:20, Emanuele Giuseppe Esposito wrote:
> In order to make it thread safe, implement a "fake rwlock",
> where we allow reads under BQL *or* job_mutex held, but
> writes only under BQL *and* job_mutex.
> 
> The only write we have is in child_job_set_aio_ctx, which always
> happens under drain (so the job is paused).
> For this reason, introduce job_set_aio_context and make sure that
> the context is set under BQL, job_mutex and drain.
> Also make sure all other places where the aiocontext is read
> are protected.
> 
> The reads in commit.c and mirror.c are actually safe, because always
> done under BQL.
> 
> Note: at this stage, job_{lock/unlock} and job lock guard macros
> are *nop*.
> 
> Suggested-by: Paolo Bonzini <pbonzini@redhat.com>
> Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
> ---
>   block/replication.c |  7 +++++--
>   blockjob.c          |  3 ++-
>   include/qemu/job.h  | 23 ++++++++++++++++++++---
>   job.c               | 12 ++++++++++++
>   4 files changed, 39 insertions(+), 6 deletions(-)
> 
> diff --git a/block/replication.c b/block/replication.c
> index 55c8f894aa..6e02d98126 100644
> --- a/block/replication.c
> +++ b/block/replication.c
> @@ -142,14 +142,17 @@ static void replication_close(BlockDriverState *bs)
>   {
>       BDRVReplicationState *s = bs->opaque;
>       Job *commit_job;
> +    GLOBAL_STATE_CODE();
>   
>       if (s->stage == BLOCK_REPLICATION_RUNNING) {
>           replication_stop(s->rs, false, NULL);
>       }
>       if (s->stage == BLOCK_REPLICATION_FAILOVER) {
>           commit_job = &s->commit_job->job;
> -        assert(commit_job->aio_context == qemu_get_current_aio_context());
> -        job_cancel_sync(commit_job, false);
> +        WITH_JOB_LOCK_GUARD() {
> +            assert(commit_job->aio_context == qemu_get_current_aio_context());
> +            job_cancel_sync_locked(commit_job, false);
> +        }

As Kevin said, this hunk seems not needed.. Why to add locking for reading aio_context, when we have GLOBAL_STATE_CODE()?

>       }
>   
>       if (s->mode == REPLICATION_MODE_SECONDARY) {
> diff --git a/blockjob.c b/blockjob.c
> index 96fb9d9f73..c8919cef9b 100644
> --- a/blockjob.c
> +++ b/blockjob.c
> @@ -162,12 +162,13 @@ static void child_job_set_aio_ctx(BdrvChild *c, AioContext *ctx,
>           bdrv_set_aio_context_ignore(sibling->bs, ctx, ignore);
>       }
>   
> -    job->job.aio_context = ctx;
> +    job_set_aio_context(&job->job, ctx);
>   }
>   
>   static AioContext *child_job_get_parent_aio_context(BdrvChild *c)
>   {
>       BlockJob *job = c->opaque;
> +    GLOBAL_STATE_CODE();
>   
>       return job->job.aio_context;
>   }
> diff --git a/include/qemu/job.h b/include/qemu/job.h
> index 5709e8d4a8..cede227e67 100644
> --- a/include/qemu/job.h
> +++ b/include/qemu/job.h
> @@ -74,11 +74,17 @@ typedef struct Job {
>       /* ProgressMeter API is thread-safe */
>       ProgressMeter progress;
>   
> +    /**
> +     * AioContext to run the job coroutine in.
> +     * The job Aiocontext can be read when holding *either*
> +     * the BQL (so we are in the main loop) or the job_mutex.
> +     * It can only be written when we hold *both* BQL
> +     * and the job_mutex.
> +     */
> +    AioContext *aio_context;
>   
> -    /** Protected by AioContext lock */
>   
> -    /** AioContext to run the job coroutine in */
> -    AioContext *aio_context;
> +    /** Protected by AioContext lock */
>   
>       /** Reference count of the block job */
>       int refcnt;
> @@ -741,4 +747,15 @@ int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp),
>   int job_finish_sync_locked(Job *job, void (*finish)(Job *, Error **errp),
>                              Error **errp);
>   
> +/**
> + * Sets the @job->aio_context.
> + * Called with job_mutex *not* held.
> + *
> + * This function must run in the main thread to protect against
> + * concurrent read in job_finish_sync_locked(), takes the job_mutex
> + * lock to protect against the read in job_do_yield_locked(), and must
> + * be called when the coroutine is quiescent.

May be "job is quiscent" or "job is doing nothing", "no in-flight io operations in job".

For example, backup has several running coroutines in contest of block_copy process, and main coroutine of the job
is almost always "quescent"..

> + */
> +void job_set_aio_context(Job *job, AioContext *ctx);
> +
>   #endif
> diff --git a/job.c b/job.c
> index 85ae843f03..9f2fb2e73b 100644
> --- a/job.c
> +++ b/job.c
> @@ -396,6 +396,17 @@ Job *job_get(const char *id)
>       return job_get_locked(id);
>   }
>   
> +void job_set_aio_context(Job *job, AioContext *ctx)
> +{
> +    /* protect against read in job_finish_sync_locked and job_start */
> +    GLOBAL_STATE_CODE();
> +    /* protect against read in job_do_yield_locked */
> +    JOB_LOCK_GUARD();
> +    /* ensure the coroutine is quiescent while the AioContext is changed */

same not here.

> +    assert(job->paused || job_is_completed_locked(job));
> +    job->aio_context = ctx;
> +}
> +
>   /* Called with job_mutex *not* held. */
>   static void job_sleep_timer_cb(void *opaque)
>   {
> @@ -1379,6 +1390,7 @@ int job_finish_sync_locked(Job *job,
>   {
>       Error *local_err = NULL;
>       int ret;
> +    GLOBAL_STATE_CODE();
>   
>       job_ref_locked(job);
>
Emanuele Giuseppe Esposito Sept. 18, 2022, 4:54 p.m. UTC | #2
Am 14/09/2022 um 15:25 schrieb Vladimir Sementsov-Ogievskiy:
> On 8/26/22 16:20, Emanuele Giuseppe Esposito wrote:
>> In order to make it thread safe, implement a "fake rwlock",
>> where we allow reads under BQL *or* job_mutex held, but
>> writes only under BQL *and* job_mutex.
>>
>> The only write we have is in child_job_set_aio_ctx, which always
>> happens under drain (so the job is paused).
>> For this reason, introduce job_set_aio_context and make sure that
>> the context is set under BQL, job_mutex and drain.
>> Also make sure all other places where the aiocontext is read
>> are protected.
>>
>> The reads in commit.c and mirror.c are actually safe, because always
>> done under BQL.
>>
>> Note: at this stage, job_{lock/unlock} and job lock guard macros
>> are *nop*.
>>
>> Suggested-by: Paolo Bonzini <pbonzini@redhat.com>
>> Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
>> ---
>>   block/replication.c |  7 +++++--
>>   blockjob.c          |  3 ++-
>>   include/qemu/job.h  | 23 ++++++++++++++++++++---
>>   job.c               | 12 ++++++++++++
>>   4 files changed, 39 insertions(+), 6 deletions(-)
>>
>> diff --git a/block/replication.c b/block/replication.c
>> index 55c8f894aa..6e02d98126 100644
>> --- a/block/replication.c
>> +++ b/block/replication.c
>> @@ -142,14 +142,17 @@ static void replication_close(BlockDriverState *bs)
>>   {
>>       BDRVReplicationState *s = bs->opaque;
>>       Job *commit_job;
>> +    GLOBAL_STATE_CODE();
>>         if (s->stage == BLOCK_REPLICATION_RUNNING) {
>>           replication_stop(s->rs, false, NULL);
>>       }
>>       if (s->stage == BLOCK_REPLICATION_FAILOVER) {
>>           commit_job = &s->commit_job->job;
>> -        assert(commit_job->aio_context ==
>> qemu_get_current_aio_context());
>> -        job_cancel_sync(commit_job, false);
>> +        WITH_JOB_LOCK_GUARD() {
>> +            assert(commit_job->aio_context ==
>> qemu_get_current_aio_context());
>> +            job_cancel_sync_locked(commit_job, false);
>> +        }
> 
> As Kevin said, this hunk seems not needed.. Why to add locking for
> reading aio_context, when we have GLOBAL_STATE_CODE()?

Ok, getting rid of it.
> 
>>       }
>>         if (s->mode == REPLICATION_MODE_SECONDARY) {
>> diff --git a/blockjob.c b/blockjob.c
>> index 96fb9d9f73..c8919cef9b 100644
>> --- a/blockjob.c
>> +++ b/blockjob.c
>> @@ -162,12 +162,13 @@ static void child_job_set_aio_ctx(BdrvChild *c,
>> AioContext *ctx,
>>           bdrv_set_aio_context_ignore(sibling->bs, ctx, ignore);
>>       }
>>   -    job->job.aio_context = ctx;
>> +    job_set_aio_context(&job->job, ctx);
>>   }
>>     static AioContext *child_job_get_parent_aio_context(BdrvChild *c)
>>   {
>>       BlockJob *job = c->opaque;
>> +    GLOBAL_STATE_CODE();
>>         return job->job.aio_context;
>>   }
>> diff --git a/include/qemu/job.h b/include/qemu/job.h
>> index 5709e8d4a8..cede227e67 100644
>> --- a/include/qemu/job.h
>> +++ b/include/qemu/job.h
>> @@ -74,11 +74,17 @@ typedef struct Job {
>>       /* ProgressMeter API is thread-safe */
>>       ProgressMeter progress;
>>   +    /**
>> +     * AioContext to run the job coroutine in.
>> +     * The job Aiocontext can be read when holding *either*
>> +     * the BQL (so we are in the main loop) or the job_mutex.
>> +     * It can only be written when we hold *both* BQL
>> +     * and the job_mutex.
>> +     */
>> +    AioContext *aio_context;
>>   -    /** Protected by AioContext lock */
>>   -    /** AioContext to run the job coroutine in */
>> -    AioContext *aio_context;
>> +    /** Protected by AioContext lock */
>>         /** Reference count of the block job */
>>       int refcnt;
>> @@ -741,4 +747,15 @@ int job_finish_sync(Job *job, void (*finish)(Job
>> *, Error **errp),
>>   int job_finish_sync_locked(Job *job, void (*finish)(Job *, Error
>> **errp),
>>                              Error **errp);
>>   +/**
>> + * Sets the @job->aio_context.
>> + * Called with job_mutex *not* held.
>> + *
>> + * This function must run in the main thread to protect against
>> + * concurrent read in job_finish_sync_locked(), takes the job_mutex
>> + * lock to protect against the read in job_do_yield_locked(), and must
>> + * be called when the coroutine is quiescent.
> 
> May be "job is quiscent" or "job is doing nothing", "no in-flight io
> operations in job".
> 
> For example, backup has several running coroutines in contest of
> block_copy process, and main coroutine of the job
> is almost always "quescent"..

"job is quiescent" seems ok

> 
>> + */
>> +void job_set_aio_context(Job *job, AioContext *ctx);
>> +
>>   #endif
>> diff --git a/job.c b/job.c
>> index 85ae843f03..9f2fb2e73b 100644
>> --- a/job.c
>> +++ b/job.c
>> @@ -396,6 +396,17 @@ Job *job_get(const char *id)
>>       return job_get_locked(id);
>>   }
>>   +void job_set_aio_context(Job *job, AioContext *ctx)
>> +{
>> +    /* protect against read in job_finish_sync_locked and job_start */
>> +    GLOBAL_STATE_CODE();
>> +    /* protect against read in job_do_yield_locked */
>> +    JOB_LOCK_GUARD();
>> +    /* ensure the coroutine is quiescent while the AioContext is
>> changed */
> 
> same not here.

Ok

Thank you,
Emanuele
> 
>> +    assert(job->paused || job_is_completed_locked(job));
>> +    job->aio_context = ctx;
>> +}
>> +
>>   /* Called with job_mutex *not* held. */
>>   static void job_sleep_timer_cb(void *opaque)
>>   {
>> @@ -1379,6 +1390,7 @@ int job_finish_sync_locked(Job *job,
>>   {
>>       Error *local_err = NULL;
>>       int ret;
>> +    GLOBAL_STATE_CODE();
>>         job_ref_locked(job);
>>   
> 
>
diff mbox series

Patch

diff --git a/block/replication.c b/block/replication.c
index 55c8f894aa..6e02d98126 100644
--- a/block/replication.c
+++ b/block/replication.c
@@ -142,14 +142,17 @@  static void replication_close(BlockDriverState *bs)
 {
     BDRVReplicationState *s = bs->opaque;
     Job *commit_job;
+    GLOBAL_STATE_CODE();
 
     if (s->stage == BLOCK_REPLICATION_RUNNING) {
         replication_stop(s->rs, false, NULL);
     }
     if (s->stage == BLOCK_REPLICATION_FAILOVER) {
         commit_job = &s->commit_job->job;
-        assert(commit_job->aio_context == qemu_get_current_aio_context());
-        job_cancel_sync(commit_job, false);
+        WITH_JOB_LOCK_GUARD() {
+            assert(commit_job->aio_context == qemu_get_current_aio_context());
+            job_cancel_sync_locked(commit_job, false);
+        }
     }
 
     if (s->mode == REPLICATION_MODE_SECONDARY) {
diff --git a/blockjob.c b/blockjob.c
index 96fb9d9f73..c8919cef9b 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -162,12 +162,13 @@  static void child_job_set_aio_ctx(BdrvChild *c, AioContext *ctx,
         bdrv_set_aio_context_ignore(sibling->bs, ctx, ignore);
     }
 
-    job->job.aio_context = ctx;
+    job_set_aio_context(&job->job, ctx);
 }
 
 static AioContext *child_job_get_parent_aio_context(BdrvChild *c)
 {
     BlockJob *job = c->opaque;
+    GLOBAL_STATE_CODE();
 
     return job->job.aio_context;
 }
diff --git a/include/qemu/job.h b/include/qemu/job.h
index 5709e8d4a8..cede227e67 100644
--- a/include/qemu/job.h
+++ b/include/qemu/job.h
@@ -74,11 +74,17 @@  typedef struct Job {
     /* ProgressMeter API is thread-safe */
     ProgressMeter progress;
 
+    /**
+     * AioContext to run the job coroutine in.
+     * The job Aiocontext can be read when holding *either*
+     * the BQL (so we are in the main loop) or the job_mutex.
+     * It can only be written when we hold *both* BQL
+     * and the job_mutex.
+     */
+    AioContext *aio_context;
 
-    /** Protected by AioContext lock */
 
-    /** AioContext to run the job coroutine in */
-    AioContext *aio_context;
+    /** Protected by AioContext lock */
 
     /** Reference count of the block job */
     int refcnt;
@@ -741,4 +747,15 @@  int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp),
 int job_finish_sync_locked(Job *job, void (*finish)(Job *, Error **errp),
                            Error **errp);
 
+/**
+ * Sets the @job->aio_context.
+ * Called with job_mutex *not* held.
+ *
+ * This function must run in the main thread to protect against
+ * concurrent read in job_finish_sync_locked(), takes the job_mutex
+ * lock to protect against the read in job_do_yield_locked(), and must
+ * be called when the coroutine is quiescent.
+ */
+void job_set_aio_context(Job *job, AioContext *ctx);
+
 #endif
diff --git a/job.c b/job.c
index 85ae843f03..9f2fb2e73b 100644
--- a/job.c
+++ b/job.c
@@ -396,6 +396,17 @@  Job *job_get(const char *id)
     return job_get_locked(id);
 }
 
+void job_set_aio_context(Job *job, AioContext *ctx)
+{
+    /* protect against read in job_finish_sync_locked and job_start */
+    GLOBAL_STATE_CODE();
+    /* protect against read in job_do_yield_locked */
+    JOB_LOCK_GUARD();
+    /* ensure the coroutine is quiescent while the AioContext is changed */
+    assert(job->paused || job_is_completed_locked(job));
+    job->aio_context = ctx;
+}
+
 /* Called with job_mutex *not* held. */
 static void job_sleep_timer_cb(void *opaque)
 {
@@ -1379,6 +1390,7 @@  int job_finish_sync_locked(Job *job,
 {
     Error *local_err = NULL;
     int ret;
+    GLOBAL_STATE_CODE();
 
     job_ref_locked(job);