diff mbox series

[v10,14/21] jobs: protect job.aio_context with BQL and job_mutex

Message ID 20220725073855.76049-15-eesposit@redhat.com
State New
Headers show
Series job: replace AioContext lock with job_mutex | expand

Commit Message

Emanuele Giuseppe Esposito July 25, 2022, 7:38 a.m. UTC
In order to make it thread safe, implement a "fake rwlock",
where we allow reads under BQL *or* job_mutex held, but
writes only under BQL *and* job_mutex.

The only write we have is in child_job_set_aio_ctx, which always
happens under drain (so the job is paused).
For this reason, introduce job_set_aio_context and make sure that
the context is set under BQL, job_mutex and drain.
Also make sure all other places where the aiocontext is read
are protected.

Note: at this stage, job_{lock/unlock} and job lock guard macros
are *nop*.

Suggested-by: Paolo Bonzini <pbonzini@redhat.com>
Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
---
 block/replication.c |  6 ++++--
 blockjob.c          |  3 ++-
 include/qemu/job.h  | 19 ++++++++++++++++++-
 job.c               | 12 ++++++++++++
 4 files changed, 36 insertions(+), 4 deletions(-)

Comments

Vladimir Sementsov-Ogievskiy July 27, 2022, 3:22 p.m. UTC | #1
On 7/25/22 10:38, Emanuele Giuseppe Esposito wrote:
> In order to make it thread safe, implement a "fake rwlock",
> where we allow reads under BQL *or* job_mutex held, but
> writes only under BQL *and* job_mutex.
> 
> The only write we have is in child_job_set_aio_ctx, which always
> happens under drain (so the job is paused).
> For this reason, introduce job_set_aio_context and make sure that
> the context is set under BQL, job_mutex and drain.

actually, we only make sure that pause was scheduled

> Also make sure all other places where the aiocontext is read
> are protected.
> 
> Note: at this stage, job_{lock/unlock} and job lock guard macros
> are *nop*.
> 
> Suggested-by: Paolo Bonzini <pbonzini@redhat.com>
> Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
> Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
> ---
>   block/replication.c |  6 ++++--
>   blockjob.c          |  3 ++-
>   include/qemu/job.h  | 19 ++++++++++++++++++-
>   job.c               | 12 ++++++++++++
>   4 files changed, 36 insertions(+), 4 deletions(-)
> 
> diff --git a/block/replication.c b/block/replication.c
> index 55c8f894aa..2189863df1 100644
> --- a/block/replication.c
> +++ b/block/replication.c
> @@ -148,8 +148,10 @@ static void replication_close(BlockDriverState *bs)
>       }
>       if (s->stage == BLOCK_REPLICATION_FAILOVER) {
>           commit_job = &s->commit_job->job;
> -        assert(commit_job->aio_context == qemu_get_current_aio_context());
> -        job_cancel_sync(commit_job, false);
> +        WITH_JOB_LOCK_GUARD() {
> +            assert(commit_job->aio_context == qemu_get_current_aio_context());
> +            job_cancel_sync_locked(commit_job, false);
> +        }
>       }
>   
>       if (s->mode == REPLICATION_MODE_SECONDARY) {
> diff --git a/blockjob.c b/blockjob.c
> index 96fb9d9f73..9ff2727025 100644
> --- a/blockjob.c
> +++ b/blockjob.c
> @@ -162,12 +162,13 @@ static void child_job_set_aio_ctx(BdrvChild *c, AioContext *ctx,
>           bdrv_set_aio_context_ignore(sibling->bs, ctx, ignore);
>       }
>   
> -    job->job.aio_context = ctx;
> +    job_set_aio_context(&job->job, ctx);
>   }
>   
>   static AioContext *child_job_get_parent_aio_context(BdrvChild *c)
>   {
>       BlockJob *job = c->opaque;
> +    assert(qemu_in_main_thread());
>   
>       return job->job.aio_context;
>   }
> diff --git a/include/qemu/job.h b/include/qemu/job.h
> index 5709e8d4a8..c144aabefc 100644
> --- a/include/qemu/job.h
> +++ b/include/qemu/job.h
> @@ -77,7 +77,12 @@ typedef struct Job {
>   
>       /** Protected by AioContext lock */
>   
> -    /** AioContext to run the job coroutine in */
> +    /**
> +     * AioContext to run the job coroutine in.
> +     * This field can be read when holding either the BQL (so we are in
> +     * the main loop) or the job_mutex.
> +     * It can be only written when we hold *both* BQL and job_mutex.
> +     */
>       AioContext *aio_context;
>   
>       /** Reference count of the block job */
> @@ -741,4 +746,16 @@ int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp),
>   int job_finish_sync_locked(Job *job, void (*finish)(Job *, Error **errp),
>                              Error **errp);
>   
> +/**
> + * Sets the @job->aio_context.
> + * Called with job_mutex *not* held.
> + *
> + * This function must run in the main thread to protect against
> + * concurrent read in job_finish_sync_locked(),
> + * takes the job_mutex lock to protect against the read in
> + * job_do_yield_locked(), and must be called when the coroutine
> + * is quiescent.

What means "coroutine is quescent"? Could we just swap it by "job is paused"?

> + */
> +void job_set_aio_context(Job *job, AioContext *ctx);
> +
>   #endif
> diff --git a/job.c b/job.c
> index ecec66b44e..0a857b1468 100644
> --- a/job.c
> +++ b/job.c
> @@ -394,6 +394,17 @@ Job *job_get(const char *id)
>       return job_get_locked(id);
>   }
>   
> +void job_set_aio_context(Job *job, AioContext *ctx)
> +{
> +    /* protect against read in job_finish_sync_locked and job_start */
> +    assert(qemu_in_main_thread());
> +    /* protect against read in job_do_yield_locked */
> +    JOB_LOCK_GUARD();
> +    /* ensure the coroutine is quiescent while the AioContext is changed */

pause_count means pause was scheduled. Job may be paused already, or may be not. I'm not against the assertion, it helps. I just think that we don't have the guarantee that comment claims. (And I still don't understand what means coroutine is quiescent. And not that there are may be several job related coroutines: the main one and some worker coroutines).

> +    assert(job->pause_count > 0);
> +    job->aio_context = ctx;
> +}
> +
>   /* Called with job_mutex *not* held. */
>   static void job_sleep_timer_cb(void *opaque)
>   {
> @@ -1376,6 +1387,7 @@ int job_finish_sync_locked(Job *job,
>   {
>       Error *local_err = NULL;
>       int ret;
> +    assert(qemu_in_main_thread());
>   
>       job_ref_locked(job);
>   

without "quiescent coroutine" concept:
Reviewed-by: Vladimir Sementsov-Ogievskiy <vsementsov@yandex-team.ru>
Kevin Wolf Aug. 5, 2022, 9:12 a.m. UTC | #2
Am 25.07.2022 um 09:38 hat Emanuele Giuseppe Esposito geschrieben:
> In order to make it thread safe, implement a "fake rwlock",
> where we allow reads under BQL *or* job_mutex held, but
> writes only under BQL *and* job_mutex.

Oh, so the "or BQL" part is only for job.aio_context? Okay.

> The only write we have is in child_job_set_aio_ctx, which always
> happens under drain (so the job is paused).
> For this reason, introduce job_set_aio_context and make sure that
> the context is set under BQL, job_mutex and drain.
> Also make sure all other places where the aiocontext is read
> are protected.
> 
> Note: at this stage, job_{lock/unlock} and job lock guard macros
> are *nop*.
> 
> Suggested-by: Paolo Bonzini <pbonzini@redhat.com>
> Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
> Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
> ---
>  block/replication.c |  6 ++++--
>  blockjob.c          |  3 ++-
>  include/qemu/job.h  | 19 ++++++++++++++++++-
>  job.c               | 12 ++++++++++++
>  4 files changed, 36 insertions(+), 4 deletions(-)
> 
> diff --git a/block/replication.c b/block/replication.c
> index 55c8f894aa..2189863df1 100644
> --- a/block/replication.c
> +++ b/block/replication.c
> @@ -148,8 +148,10 @@ static void replication_close(BlockDriverState *bs)
>      }
>      if (s->stage == BLOCK_REPLICATION_FAILOVER) {
>          commit_job = &s->commit_job->job;
> -        assert(commit_job->aio_context == qemu_get_current_aio_context());
> -        job_cancel_sync(commit_job, false);
> +        WITH_JOB_LOCK_GUARD() {
> +            assert(commit_job->aio_context == qemu_get_current_aio_context());
> +            job_cancel_sync_locked(commit_job, false);
> +        }
>      }

.bdrv_close runs under the BQL, so why is this needed? Maybe a
GLOBAL_STATE_CODE() annotation would be helpful, though.

>      if (s->mode == REPLICATION_MODE_SECONDARY) {
> diff --git a/blockjob.c b/blockjob.c
> index 96fb9d9f73..9ff2727025 100644
> --- a/blockjob.c
> +++ b/blockjob.c
> @@ -162,12 +162,13 @@ static void child_job_set_aio_ctx(BdrvChild *c, AioContext *ctx,
>          bdrv_set_aio_context_ignore(sibling->bs, ctx, ignore);
>      }
>  
> -    job->job.aio_context = ctx;
> +    job_set_aio_context(&job->job, ctx);
>  }
>  
>  static AioContext *child_job_get_parent_aio_context(BdrvChild *c)
>  {
>      BlockJob *job = c->opaque;
> +    assert(qemu_in_main_thread());

Any reason not to use GLOBAL_STATE_CODE()?

>      return job->job.aio_context;
>  }
> diff --git a/include/qemu/job.h b/include/qemu/job.h
> index 5709e8d4a8..c144aabefc 100644
> --- a/include/qemu/job.h
> +++ b/include/qemu/job.h
> @@ -77,7 +77,12 @@ typedef struct Job {
>  
>      /** Protected by AioContext lock */

I think this section comment should move down below aio_context now.

> -    /** AioContext to run the job coroutine in */
> +    /**
> +     * AioContext to run the job coroutine in.
> +     * This field can be read when holding either the BQL (so we are in
> +     * the main loop) or the job_mutex.
> +     * It can be only written when we hold *both* BQL and job_mutex.
> +     */
>      AioContext *aio_context;
>  
>      /** Reference count of the block job */
> @@ -741,4 +746,16 @@ int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp),
>  int job_finish_sync_locked(Job *job, void (*finish)(Job *, Error **errp),
>                             Error **errp);
>  
> +/**
> + * Sets the @job->aio_context.
> + * Called with job_mutex *not* held.
> + *
> + * This function must run in the main thread to protect against
> + * concurrent read in job_finish_sync_locked(),

Odd line break here in the middle of a sentence.

> + * takes the job_mutex lock to protect against the read in
> + * job_do_yield_locked(), and must be called when the coroutine
> + * is quiescent.
> + */
> +void job_set_aio_context(Job *job, AioContext *ctx);
> +
>  #endif
> diff --git a/job.c b/job.c
> index ecec66b44e..0a857b1468 100644
> --- a/job.c
> +++ b/job.c
> @@ -394,6 +394,17 @@ Job *job_get(const char *id)
>      return job_get_locked(id);
>  }
>  
> +void job_set_aio_context(Job *job, AioContext *ctx)
> +{
> +    /* protect against read in job_finish_sync_locked and job_start */
> +    assert(qemu_in_main_thread());

Same question about GLOBAL_STATE_CODE().

> +    /* protect against read in job_do_yield_locked */
> +    JOB_LOCK_GUARD();
> +    /* ensure the coroutine is quiescent while the AioContext is changed */
> +    assert(job->pause_count > 0);

job->pause_count only shows that pausing was requested. The coroutine is
only really quiescent if job->busy == false, too.

Or maybe job->paused is actually the one you want here.

> +    job->aio_context = ctx;
> +}
> +
>  /* Called with job_mutex *not* held. */
>  static void job_sleep_timer_cb(void *opaque)
>  {
> @@ -1376,6 +1387,7 @@ int job_finish_sync_locked(Job *job,
>  {
>      Error *local_err = NULL;
>      int ret;
> +    assert(qemu_in_main_thread());
>  
>      job_ref_locked(job);

Another GLOBAL_STATE_CODE()?

Kevin
Emanuele Giuseppe Esposito Aug. 17, 2022, 8:04 a.m. UTC | #3
Am 05/08/2022 um 11:12 schrieb Kevin Wolf:
> Am 25.07.2022 um 09:38 hat Emanuele Giuseppe Esposito geschrieben:
>> In order to make it thread safe, implement a "fake rwlock",
>> where we allow reads under BQL *or* job_mutex held, but
>> writes only under BQL *and* job_mutex.
> 
> Oh, so the "or BQL" part is only for job.aio_context? Okay.
> 
>> The only write we have is in child_job_set_aio_ctx, which always
>> happens under drain (so the job is paused).
>> For this reason, introduce job_set_aio_context and make sure that
>> the context is set under BQL, job_mutex and drain.
>> Also make sure all other places where the aiocontext is read
>> are protected.
>>
>> Note: at this stage, job_{lock/unlock} and job lock guard macros
>> are *nop*.
>>
>> Suggested-by: Paolo Bonzini <pbonzini@redhat.com>
>> Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
>> Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
>> ---
>>  block/replication.c |  6 ++++--
>>  blockjob.c          |  3 ++-
>>  include/qemu/job.h  | 19 ++++++++++++++++++-
>>  job.c               | 12 ++++++++++++
>>  4 files changed, 36 insertions(+), 4 deletions(-)
>>
>> diff --git a/block/replication.c b/block/replication.c
>> index 55c8f894aa..2189863df1 100644
>> --- a/block/replication.c
>> +++ b/block/replication.c
>> @@ -148,8 +148,10 @@ static void replication_close(BlockDriverState *bs)
>>      }
>>      if (s->stage == BLOCK_REPLICATION_FAILOVER) {
>>          commit_job = &s->commit_job->job;
>> -        assert(commit_job->aio_context == qemu_get_current_aio_context());
>> -        job_cancel_sync(commit_job, false);
>> +        WITH_JOB_LOCK_GUARD() {
>> +            assert(commit_job->aio_context == qemu_get_current_aio_context());
>> +            job_cancel_sync_locked(commit_job, false);
>> +        }
>>      }
> 
> .bdrv_close runs under the BQL, so why is this needed? Maybe a
> GLOBAL_STATE_CODE() annotation would be helpful, though.

I think I left it because it would be confusing to leave a _locked
function without the job lock. I'll add the GLOBAL_STATE_CODE anyways.

> 
>>      if (s->mode == REPLICATION_MODE_SECONDARY) {
>> diff --git a/blockjob.c b/blockjob.c
>> index 96fb9d9f73..9ff2727025 100644
>> --- a/blockjob.c
>> +++ b/blockjob.c
>> @@ -162,12 +162,13 @@ static void child_job_set_aio_ctx(BdrvChild *c, AioContext *ctx,
>>          bdrv_set_aio_context_ignore(sibling->bs, ctx, ignore);
>>      }
>>  
>> -    job->job.aio_context = ctx;
>> +    job_set_aio_context(&job->job, ctx);
>>  }
>>  
>>  static AioContext *child_job_get_parent_aio_context(BdrvChild *c)
>>  {
>>      BlockJob *job = c->opaque;
>> +    assert(qemu_in_main_thread());
> 
> Any reason not to use GLOBAL_STATE_CODE()?

4 months ago GLOBAL_STATE_CODE did not exist yet, and I didn't think
about updating it :)
> 
>>      return job->job.aio_context;
>>  }

>> +    /* protect against read in job_do_yield_locked */
>> +    JOB_LOCK_GUARD();
>> +    /* ensure the coroutine is quiescent while the AioContext is changed */
>> +    assert(job->pause_count > 0);
> 
> job->pause_count only shows that pausing was requested. The coroutine is
> only really quiescent if job->busy == false, too.
> 
> Or maybe job->paused is actually the one you want here.
I think job->paused works too.

Emanuele
Emanuele Giuseppe Esposito Aug. 17, 2022, 1:10 p.m. UTC | #4
Am 17/08/2022 um 10:04 schrieb Emanuele Giuseppe Esposito:
>>> +    /* protect against read in job_do_yield_locked */
>>> +    JOB_LOCK_GUARD();
>>> +    /* ensure the coroutine is quiescent while the AioContext is changed */
>>> +    assert(job->pause_count > 0);
>> job->pause_count only shows that pausing was requested. The coroutine is
>> only really quiescent if job->busy == false, too.
>>
>> Or maybe job->paused is actually the one you want here.
> I think job->paused works too.
> 

Actually it doesn't. At least test-block-iothread
test_propagate_mirror() fails, for both job->paused and !job->busy. I
think the reason is that if we wait for the flag to be set, we need to
actually wait that the job gets to the next pause point, because
job_pause() doesn't really pause the job, it just kind of "schedules"
the pause on next pause point.

So, either we leave pause_count > 0, or somehow wait (I was thinking
AIO_WAIT_WHILE(job->paused) but that's probably a very bad idea).

Do you have any suggestion for that?
Maybe Paolo has a better idea on how to do it?

Emanuele
Emanuele Giuseppe Esposito Aug. 18, 2022, 8:48 a.m. UTC | #5
Am 17/08/2022 um 15:10 schrieb Emanuele Giuseppe Esposito:
> 
> 
> Am 17/08/2022 um 10:04 schrieb Emanuele Giuseppe Esposito:
>>>> +    /* protect against read in job_do_yield_locked */
>>>> +    JOB_LOCK_GUARD();
>>>> +    /* ensure the coroutine is quiescent while the AioContext is changed */
>>>> +    assert(job->pause_count > 0);
>>> job->pause_count only shows that pausing was requested. The coroutine is
>>> only really quiescent if job->busy == false, too.
>>>
>>> Or maybe job->paused is actually the one you want here.
>> I think job->paused works too.
>>
> 
> Actually it doesn't. At least test-block-iothread
> test_propagate_mirror() fails, for both job->paused and !job->busy. I
> think the reason is that if we wait for the flag to be set, we need to
> actually wait that the job gets to the next pause point, because
> job_pause() doesn't really pause the job, it just kind of "schedules"
> the pause on next pause point.
> 
> So, either we leave pause_count > 0, or somehow wait (I was thinking
> AIO_WAIT_WHILE(job->paused) but that's probably a very bad idea).
> 
> Do you have any suggestion for that?
> Maybe Paolo has a better idea on how to do it?
> 

Additional update: after debugging it a little bit, I figured that there
are places where busy=false and paused=false too. This is clear in
job_do_dismiss and not-so-clear in job_exit (at least for me).

I don't know if this is expected or a bug, because it just means that if
we are draining and the job is completing, there is no way to stop it.

Is it a bug? Or am I missing something?

Emanuele
diff mbox series

Patch

diff --git a/block/replication.c b/block/replication.c
index 55c8f894aa..2189863df1 100644
--- a/block/replication.c
+++ b/block/replication.c
@@ -148,8 +148,10 @@  static void replication_close(BlockDriverState *bs)
     }
     if (s->stage == BLOCK_REPLICATION_FAILOVER) {
         commit_job = &s->commit_job->job;
-        assert(commit_job->aio_context == qemu_get_current_aio_context());
-        job_cancel_sync(commit_job, false);
+        WITH_JOB_LOCK_GUARD() {
+            assert(commit_job->aio_context == qemu_get_current_aio_context());
+            job_cancel_sync_locked(commit_job, false);
+        }
     }
 
     if (s->mode == REPLICATION_MODE_SECONDARY) {
diff --git a/blockjob.c b/blockjob.c
index 96fb9d9f73..9ff2727025 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -162,12 +162,13 @@  static void child_job_set_aio_ctx(BdrvChild *c, AioContext *ctx,
         bdrv_set_aio_context_ignore(sibling->bs, ctx, ignore);
     }
 
-    job->job.aio_context = ctx;
+    job_set_aio_context(&job->job, ctx);
 }
 
 static AioContext *child_job_get_parent_aio_context(BdrvChild *c)
 {
     BlockJob *job = c->opaque;
+    assert(qemu_in_main_thread());
 
     return job->job.aio_context;
 }
diff --git a/include/qemu/job.h b/include/qemu/job.h
index 5709e8d4a8..c144aabefc 100644
--- a/include/qemu/job.h
+++ b/include/qemu/job.h
@@ -77,7 +77,12 @@  typedef struct Job {
 
     /** Protected by AioContext lock */
 
-    /** AioContext to run the job coroutine in */
+    /**
+     * AioContext to run the job coroutine in.
+     * This field can be read when holding either the BQL (so we are in
+     * the main loop) or the job_mutex.
+     * It can be only written when we hold *both* BQL and job_mutex.
+     */
     AioContext *aio_context;
 
     /** Reference count of the block job */
@@ -741,4 +746,16 @@  int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp),
 int job_finish_sync_locked(Job *job, void (*finish)(Job *, Error **errp),
                            Error **errp);
 
+/**
+ * Sets the @job->aio_context.
+ * Called with job_mutex *not* held.
+ *
+ * This function must run in the main thread to protect against
+ * concurrent read in job_finish_sync_locked(),
+ * takes the job_mutex lock to protect against the read in
+ * job_do_yield_locked(), and must be called when the coroutine
+ * is quiescent.
+ */
+void job_set_aio_context(Job *job, AioContext *ctx);
+
 #endif
diff --git a/job.c b/job.c
index ecec66b44e..0a857b1468 100644
--- a/job.c
+++ b/job.c
@@ -394,6 +394,17 @@  Job *job_get(const char *id)
     return job_get_locked(id);
 }
 
+void job_set_aio_context(Job *job, AioContext *ctx)
+{
+    /* protect against read in job_finish_sync_locked and job_start */
+    assert(qemu_in_main_thread());
+    /* protect against read in job_do_yield_locked */
+    JOB_LOCK_GUARD();
+    /* ensure the coroutine is quiescent while the AioContext is changed */
+    assert(job->pause_count > 0);
+    job->aio_context = ctx;
+}
+
 /* Called with job_mutex *not* held. */
 static void job_sleep_timer_cb(void *opaque)
 {
@@ -1376,6 +1387,7 @@  int job_finish_sync_locked(Job *job,
 {
     Error *local_err = NULL;
     int ret;
+    assert(qemu_in_main_thread());
 
     job_ref_locked(job);