@@ -53,6 +53,7 @@ struct BlockJobDriver {
*/
void (*attached_aio_context)(BlockJob *job, AioContext *new_context);
+ /* Called with job mutex *not* held. */
void (*set_speed)(BlockJob *job, int64_t speed);
};
@@ -49,6 +49,8 @@ typedef struct Job {
/**
* The type of this job.
* Set it in job_create and just read.
+ * All calls to the driver function must be not locked by job_mutex,
+ * to avoid deadlocks.
*/
const JobDriver *driver;
@@ -315,6 +315,10 @@ static void coroutine_fn backup_pause(Job *job)
}
}
+/*
+ * Called with job mutex *not* held (we don't want to call block_copy_kick
+ * with the lock held!)
+ */
static void coroutine_fn backup_set_speed(BlockJob *job, int64_t speed)
{
BackupBlockJob *s = container_of(job, BackupBlockJob, common);
@@ -1150,9 +1150,11 @@ static void mirror_complete(Job *job, Error **errp)
s->should_complete = true;
/* If the job is paused, it will be re-entered when it is resumed */
+ job_lock();
if (!job_is_paused(job)) {
- job_enter(job);
+ job_enter_locked(job);
}
+ job_unlock();
}
static void coroutine_fn mirror_pause(Job *job)
@@ -1171,10 +1173,13 @@ static bool mirror_drained_poll(BlockJob *job)
* from one of our own drain sections, to avoid a deadlock waiting for
* ourselves.
*/
- if (!job_is_paused(&s->common.job) && !job_is_cancelled(&s->common.job) &&
- !s->in_drain) {
+ job_lock();
+ if (!job_is_paused(&s->common.job) &&
+ !job_is_cancelled_locked(&s->common.job) && !s->in_drain) {
+ job_unlock();
return true;
}
+ job_unlock();
return !!s->in_flight;
}
@@ -150,9 +150,11 @@ void blockdev_mark_auto_del(BlockBackend *blk)
AioContext *aio_context = job_get_aiocontext(&job->job);
aio_context_acquire(aio_context);
+ job_lock();
job_cancel(&job->job, false);
aio_context_release(aio_context);
+ job_unlock();
}
}
@@ -3309,48 +3311,44 @@ out:
aio_context_release(aio_context);
}
-/* Get a block job using its ID and acquire its AioContext */
-static BlockJob *find_block_job(const char *id, AioContext **aio_context,
- Error **errp)
+/* Get a block job using its ID and acquire its job_lock */
+static BlockJob *find_block_job(const char *id, Error **errp)
{
BlockJob *job;
assert(id != NULL);
- *aio_context = NULL;
-
+ job_lock();
job = block_job_get(id);
if (!job) {
error_set(errp, ERROR_CLASS_DEVICE_NOT_ACTIVE,
"Block job '%s' not found", id);
+ job_unlock();
return NULL;
}
- *aio_context = blk_get_aio_context(job->blk);
- aio_context_acquire(*aio_context);
-
return job;
}
+/* Called with job_mutex *not* held. */
void qmp_block_job_set_speed(const char *device, int64_t speed, Error **errp)
{
- AioContext *aio_context;
- BlockJob *job = find_block_job(device, &aio_context, errp);
+ BlockJob *job = find_block_job(device, errp);
if (!job) {
return;
}
block_job_set_speed(job, speed, errp);
- aio_context_release(aio_context);
+ job_unlock();
}
+/* Called with job_mutex *not* held. */
void qmp_block_job_cancel(const char *device,
bool has_force, bool force, Error **errp)
{
- AioContext *aio_context;
- BlockJob *job = find_block_job(device, &aio_context, errp);
+ BlockJob *job = find_block_job(device, errp);
if (!job) {
return;
@@ -3369,13 +3367,13 @@ void qmp_block_job_cancel(const char *device,
trace_qmp_block_job_cancel(job);
job_user_cancel(&job->job, force, errp);
out:
- aio_context_release(aio_context);
+ job_unlock();
}
+/* Called with job_mutex *not* held. */
void qmp_block_job_pause(const char *device, Error **errp)
{
- AioContext *aio_context;
- BlockJob *job = find_block_job(device, &aio_context, errp);
+ BlockJob *job = find_block_job(device, errp);
if (!job) {
return;
@@ -3383,13 +3381,13 @@ void qmp_block_job_pause(const char *device, Error **errp)
trace_qmp_block_job_pause(job);
job_user_pause(&job->job, errp);
- aio_context_release(aio_context);
+ job_unlock();
}
+/* Called with job_mutex *not* held. */
void qmp_block_job_resume(const char *device, Error **errp)
{
- AioContext *aio_context;
- BlockJob *job = find_block_job(device, &aio_context, errp);
+ BlockJob *job = find_block_job(device, errp);
if (!job) {
return;
@@ -3397,13 +3395,13 @@ void qmp_block_job_resume(const char *device, Error **errp)
trace_qmp_block_job_resume(job);
job_user_resume(&job->job, errp);
- aio_context_release(aio_context);
+ job_unlock();
}
+/* Called with job_mutex *not* held. */
void qmp_block_job_complete(const char *device, Error **errp)
{
- AioContext *aio_context;
- BlockJob *job = find_block_job(device, &aio_context, errp);
+ BlockJob *job = find_block_job(device, errp);
if (!job) {
return;
@@ -3411,13 +3409,13 @@ void qmp_block_job_complete(const char *device, Error **errp)
trace_qmp_block_job_complete(job);
job_complete(&job->job, errp);
- aio_context_release(aio_context);
+ job_unlock();
}
+/* Called with job_mutex *not* held. */
void qmp_block_job_finalize(const char *id, Error **errp)
{
- AioContext *aio_context;
- BlockJob *job = find_block_job(id, &aio_context, errp);
+ BlockJob *job = find_block_job(id, errp);
if (!job) {
return;
@@ -3427,20 +3425,14 @@ void qmp_block_job_finalize(const char *id, Error **errp)
job_ref(&job->job);
job_finalize(&job->job, errp);
- /*
- * Job's context might have changed via job_finalize (and job_txn_apply
- * automatically acquires the new one), so make sure we release the correct
- * one.
- */
- aio_context = blk_get_aio_context(job->blk);
job_unref(&job->job);
- aio_context_release(aio_context);
+ job_unlock();
}
+/* Called with job_mutex *not* held. */
void qmp_block_job_dismiss(const char *id, Error **errp)
{
- AioContext *aio_context;
- BlockJob *bjob = find_block_job(id, &aio_context, errp);
+ BlockJob *bjob = find_block_job(id, errp);
Job *job;
if (!bjob) {
@@ -3450,7 +3442,7 @@ void qmp_block_job_dismiss(const char *id, Error **errp)
trace_qmp_block_job_dismiss(bjob);
job = &bjob->job;
job_dismiss(&job, errp);
- aio_context_release(aio_context);
+ job_unlock();
}
void qmp_change_backing_file(const char *device,
@@ -42,15 +42,16 @@
* The first includes functions used by the monitor. The monitor is
* peculiar in that it accesses the block job list with block_job_get, and
* therefore needs consistency across block_job_get and the actual operation
- * (e.g. block_job_set_speed). The consistency is achieved with
- * aio_context_acquire/release. These functions are declared in blockjob.h.
+ * (e.g. block_job_set_speed). To achieve this consistency, the caller
+ * calls block_job_lock/block_job_unlock itself around the whole operation.
+ * These functions are declared in blockjob.h.
*
* The second includes functions used by the block job drivers and sometimes
- * by the core block layer. These do not care about locking, because the
- * whole coroutine runs under the AioContext lock, and are declared in
- * blockjob_int.h.
+ * by the core block layer. These delegate the locking to the callee instead,
+ * and are declared in blockjob_int.h.
*/
+/* Does not need job_mutex. Value is never modified */
static bool is_block_job(Job *job)
{
return job_type(job) == JOB_TYPE_BACKUP ||
@@ -59,6 +60,7 @@ static bool is_block_job(Job *job)
job_type(job) == JOB_TYPE_STREAM;
}
+/* Called with job_mutex *not* held. */
BlockJob *block_job_next(BlockJob *bjob)
{
Job *job = bjob ? &bjob->job : NULL;
@@ -70,6 +72,7 @@ BlockJob *block_job_next(BlockJob *bjob)
return job ? container_of(job, BlockJob, job) : NULL;
}
+/* Called with job_mutex held. */
BlockJob *block_job_get(const char *id)
{
Job *job = job_get(id);
@@ -97,24 +100,31 @@ static char *child_job_get_parent_desc(BdrvChild *c)
return g_strdup_printf("%s job '%s'", job_type_str(&job->job), job->job.id);
}
+/* Called with job_mutex *not* held. */
static void child_job_drained_begin(BdrvChild *c)
{
BlockJob *job = c->opaque;
+ job_lock();
job_pause(&job->job);
+ job_unlock();
}
+/* Called with job_mutex *not* held. */
static bool child_job_drained_poll(BdrvChild *c)
{
BlockJob *bjob = c->opaque;
Job *job = &bjob->job;
const BlockJobDriver *drv = block_job_driver(bjob);
+ job_lock();
/* An inactive or completed job doesn't have any pending requests. Jobs
* with !job->busy are either already paused or have a pause point after
* being reentered, so no job driver code will run before they pause. */
- if (!job_is_busy(job) || job_is_completed(job)) {
+ if (!job_is_busy(job) || job_is_completed_locked(job)) {
+ job_unlock();
return false;
}
+ job_unlock();
/* Otherwise, assume that it isn't fully stopped yet, but allow the job to
* override this assumption. */
@@ -125,10 +135,13 @@ static bool child_job_drained_poll(BdrvChild *c)
}
}
+/* Called with job_mutex *not* held. */
static void child_job_drained_end(BdrvChild *c, int *drained_end_counter)
{
BlockJob *job = c->opaque;
+ job_lock();
job_resume(&job->job);
+ job_unlock();
}
static bool child_job_can_set_aio_ctx(BdrvChild *c, AioContext *ctx,
@@ -246,11 +259,15 @@ int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs,
return 0;
}
+/* Called with job_mutex held. Temporarly releases the lock. */
static void block_job_on_idle(Notifier *n, void *opaque)
{
+ job_unlock();
aio_wait_kick();
+ job_lock();
}
+/* Does not need job_mutex. Value is never modified */
bool block_job_is_internal(BlockJob *job)
{
return (job->job.id == NULL);
@@ -267,6 +284,7 @@ static bool job_timer_pending(Job *job)
return timer_pending(&job->sleep_timer);
}
+/* Called with job_mutex held. May temporarly release the lock. */
bool block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
{
const BlockJobDriver *drv = block_job_driver(job);
@@ -286,7 +304,9 @@ bool block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
job->speed = speed;
if (drv->set_speed) {
+ job_unlock();
drv->set_speed(job, speed);
+ job_lock();
}
if (speed && speed <= old_speed) {
@@ -304,6 +324,7 @@ int64_t block_job_ratelimit_get_delay(BlockJob *job, uint64_t n)
return ratelimit_calculate_delay(&job->limit, n);
}
+/* Called with block_job_mutex *not* held. */
BlockJobInfo *block_job_query(BlockJob *blkjob, Error **errp)
{
BlockJobInfo *info;
@@ -319,6 +340,7 @@ BlockJobInfo *block_job_query(BlockJob *blkjob, Error **errp)
progress_get_snapshot(&job->progress, &progress_current,
&progress_total);
+ job_lock();
info = g_new0(BlockJobInfo, 1);
info->type = g_strdup(job_type_str(job));
info->device = g_strdup(job->id);
@@ -328,11 +350,11 @@ BlockJobInfo *block_job_query(BlockJob *blkjob, Error **errp)
info->len = progress_total;
info->speed = blkjob->speed;
info->io_status = blkjob->iostatus;
- info->ready = job_is_ready(job);
+ info->ready = job_is_ready_locked(job);
info->status = job_get_status(job);
info->auto_finalize = job->auto_finalize;
info->auto_dismiss = job->auto_dismiss;
- job_ret = job_get_ret(job);
+ job_ret = job_get_ret_locked(job);
if (job_ret) {
Error *job_err = job_get_err(job);
info->has_error = true;
@@ -340,9 +362,11 @@ BlockJobInfo *block_job_query(BlockJob *blkjob, Error **errp)
g_strdup(error_get_pretty(job_err)) :
g_strdup(strerror(-job_ret));
}
+ job_unlock();
return info;
}
+/* Called with job_mutex held. */
static void block_job_iostatus_set_err(BlockJob *job, int error)
{
if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
@@ -351,6 +375,7 @@ static void block_job_iostatus_set_err(BlockJob *job, int error)
}
}
+/* Called with job_mutex held. */
static void block_job_event_cancelled(Notifier *n, void *opaque)
{
BlockJob *job = opaque;
@@ -370,6 +395,7 @@ static void block_job_event_cancelled(Notifier *n, void *opaque)
job->speed);
}
+/* Called with job_mutex held. */
static void block_job_event_completed(Notifier *n, void *opaque)
{
BlockJob *blkjob = opaque;
@@ -381,7 +407,7 @@ static void block_job_event_completed(Notifier *n, void *opaque)
return;
}
- if (job_get_ret(job) < 0) {
+ if (job_get_ret_locked(job) < 0) {
msg = error_get_pretty(job_get_err(job));
}
@@ -397,6 +423,7 @@ static void block_job_event_completed(Notifier *n, void *opaque)
msg);
}
+/* Called with job_mutex held. */
static void block_job_event_pending(Notifier *n, void *opaque)
{
BlockJob *job = opaque;
@@ -409,6 +436,7 @@ static void block_job_event_pending(Notifier *n, void *opaque)
job->job.id);
}
+/* Called with job_mutex held. */
static void block_job_event_ready(Notifier *n, void *opaque)
{
BlockJob *job = opaque;
@@ -430,10 +458,11 @@ static void block_job_event_ready(Notifier *n, void *opaque)
/*
- * API for block job drivers and the block layer. These functions are
- * declared in blockjob_int.h.
+ * API for block job drivers and the block layer, who do not know about
+ * job_mutex. These functions are declared in blockjob_int.h.
*/
+/* Called with block_job_mutex *not* held, but temporarly releases it. */
void *block_job_create(const char *job_id, const BlockJobDriver *driver,
JobTxn *txn, BlockDriverState *bs, uint64_t perm,
uint64_t shared_perm, int64_t speed, int flags,
@@ -472,6 +501,8 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
job->ready_notifier.notify = block_job_event_ready;
job->idle_notifier.notify = block_job_on_idle;
+ job_lock();
+
notifier_list_add(&job->job.on_finalize_cancelled,
&job->finalize_cancelled_notifier);
notifier_list_add(&job->job.on_finalize_completed,
@@ -482,7 +513,11 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
error_setg(&job->blocker, "block device is in use by block job: %s",
job_type_str(&job->job));
+
+ job_unlock();
+ /* calls drain and friends, that already take the lock */
block_job_add_bdrv(job, "main node", bs, 0, BLK_PERM_ALL, &error_abort);
+ job_lock();
bdrv_op_unblock(bs, BLOCK_OP_TYPE_DATAPLANE, job->blocker);
@@ -493,27 +528,35 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
if (!block_job_set_speed(job, speed, errp)) {
job_early_fail(&job->job);
+ job_unlock();
return NULL;
}
+ job_unlock();
return job;
}
+/* Called with job_mutex *not* held. */
void block_job_iostatus_reset(BlockJob *job)
{
+ job_lock();
if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
+ job_unlock();
return;
}
assert(job_user_paused(&job->job) && job_should_pause(&job->job));
job->iostatus = BLOCK_DEVICE_IO_STATUS_OK;
+ job_unlock();
}
+/* Called with job_mutex *not* held. */
void block_job_user_resume(Job *job)
{
BlockJob *bjob = container_of(job, BlockJob, job);
block_job_iostatus_reset(bjob);
}
+/* Called with job_mutex *not* held. */
BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
int is_read, int error)
{
@@ -544,12 +587,14 @@ BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
action);
}
if (action == BLOCK_ERROR_ACTION_STOP) {
+ job_lock();
if (!job_user_paused(&job->job)) {
job_pause(&job->job);
/* make the pause user visible, which will be resumed from QMP. */
job_set_user_paused(&job->job);
}
block_job_iostatus_set_err(job, error);
+ job_unlock();
}
return action;
}
@@ -29,29 +29,26 @@
#include "qapi/error.h"
#include "trace/trace-root.h"
-/* Get a job using its ID and acquire its AioContext */
-static Job *find_job(const char *id, AioContext **aio_context, Error **errp)
+/* Get a job using its ID and acquire its job_lock */
+static Job *find_job(const char *id, Error **errp)
{
Job *job;
- *aio_context = NULL;
+ job_lock();
job = job_get(id);
if (!job) {
error_setg(errp, "Job not found");
+ job_unlock();
return NULL;
}
- *aio_context = job_get_aiocontext(job);
- aio_context_acquire(*aio_context);
-
return job;
}
void qmp_job_cancel(const char *id, Error **errp)
{
- AioContext *aio_context;
- Job *job = find_job(id, &aio_context, errp);
+ Job *job = find_job(id, errp);
if (!job) {
return;
@@ -59,13 +56,12 @@ void qmp_job_cancel(const char *id, Error **errp)
trace_qmp_job_cancel(job);
job_user_cancel(job, true, errp);
- aio_context_release(aio_context);
+ job_unlock();
}
void qmp_job_pause(const char *id, Error **errp)
{
- AioContext *aio_context;
- Job *job = find_job(id, &aio_context, errp);
+ Job *job = find_job(id, errp);
if (!job) {
return;
@@ -73,13 +69,12 @@ void qmp_job_pause(const char *id, Error **errp)
trace_qmp_job_pause(job);
job_user_pause(job, errp);
- aio_context_release(aio_context);
+ job_unlock();
}
void qmp_job_resume(const char *id, Error **errp)
{
- AioContext *aio_context;
- Job *job = find_job(id, &aio_context, errp);
+ Job *job = find_job(id, errp);
if (!job) {
return;
@@ -87,13 +82,12 @@ void qmp_job_resume(const char *id, Error **errp)
trace_qmp_job_resume(job);
job_user_resume(job, errp);
- aio_context_release(aio_context);
+ job_unlock();
}
void qmp_job_complete(const char *id, Error **errp)
{
- AioContext *aio_context;
- Job *job = find_job(id, &aio_context, errp);
+ Job *job = find_job(id, errp);
if (!job) {
return;
@@ -101,13 +95,12 @@ void qmp_job_complete(const char *id, Error **errp)
trace_qmp_job_complete(job);
job_complete(job, errp);
- aio_context_release(aio_context);
+ job_unlock();
}
void qmp_job_finalize(const char *id, Error **errp)
{
- AioContext *aio_context;
- Job *job = find_job(id, &aio_context, errp);
+ Job *job = find_job(id, errp);
if (!job) {
return;
@@ -117,20 +110,13 @@ void qmp_job_finalize(const char *id, Error **errp)
job_ref(job);
job_finalize(job, errp);
- /*
- * Job's context might have changed via job_finalize (and job_txn_apply
- * automatically acquires the new one), so make sure we release the correct
- * one.
- */
- aio_context = job_get_aiocontext(job);
job_unref(job);
- aio_context_release(aio_context);
+ job_unlock();
}
void qmp_job_dismiss(const char *id, Error **errp)
{
- AioContext *aio_context;
- Job *job = find_job(id, &aio_context, errp);
+ Job *job = find_job(id, errp);
if (!job) {
return;
@@ -138,9 +124,10 @@ void qmp_job_dismiss(const char *id, Error **errp)
trace_qmp_job_dismiss(job);
job_dismiss(&job, errp);
- aio_context_release(aio_context);
+ job_unlock();
}
+/* Called with job_mutex held. */
static JobInfo *job_query_single(Job *job, Error **errp)
{
JobInfo *info;
@@ -175,15 +162,15 @@ JobInfoList *qmp_query_jobs(Error **errp)
for (job = job_next(NULL); job; job = job_next(job)) {
JobInfo *value;
- AioContext *aio_context;
if (job_is_internal(job)) {
continue;
}
- aio_context = job_get_aiocontext(job);
- aio_context_acquire(aio_context);
+
+ job_lock();
value = job_query_single(job, errp);
- aio_context_release(aio_context);
+ job_unlock();
+
if (!value) {
qapi_free_JobInfoList(head);
return NULL;
@@ -93,19 +93,22 @@ static void __attribute__((__constructor__)) job_init(void)
qemu_mutex_init(&job_mutex);
}
+/* Does not need job_mutex */
AioContext *job_get_aiocontext(Job *job)
{
- return job->aio_context;
+ return qatomic_read(&job->aio_context);
}
+/* Does not need job_mutex */
void job_set_aiocontext(Job *job, AioContext *aio)
{
- job->aio_context = aio;
+ qatomic_set(&job->aio_context, aio);
}
+/* Called with job_mutex held. */
bool job_is_busy(Job *job)
{
- return qatomic_read(&job->busy);
+ return job->busy;
}
/* Called with job_mutex held. */
@@ -124,59 +127,75 @@ int job_get_ret(Job *job)
return ret;
}
+/* Called with job_mutex held. */
Error *job_get_err(Job *job)
{
return job->err;
}
+/* Called with job_mutex held. */
JobStatus job_get_status(Job *job)
{
return job->status;
}
-
+/* Called with job_mutex *not* held. */
void job_set_cancelled(Job *job, bool cancel)
{
+ job_lock();
job->cancelled = cancel;
+ job_unlock();
}
+/* Called with job_mutex *not* held. */
bool job_is_force_cancel(Job *job)
{
- return job->force_cancel;
+ bool ret;
+ job_lock();
+ ret = job->force_cancel;
+ job_unlock();
+ return ret;
}
+/* Does not need job_mutex */
JobTxn *job_txn_new(void)
{
JobTxn *txn = g_new0(JobTxn, 1);
QLIST_INIT(&txn->jobs);
- txn->refcnt = 1;
+ qatomic_set(&txn->refcnt, 1);
return txn;
}
+/* Does not need job_mutex */
static void job_txn_ref(JobTxn *txn)
{
- txn->refcnt++;
+ qatomic_inc(&txn->refcnt);
}
+/* Does not need job_mutex */
void job_txn_unref(JobTxn *txn)
{
- if (txn && --txn->refcnt == 0) {
+ if (txn && qatomic_dec_fetch(&txn->refcnt) == 0) {
g_free(txn);
}
}
+/* Called with job_mutex *not* held. */
void job_txn_add_job(JobTxn *txn, Job *job)
{
if (!txn) {
return;
}
+ job_lock();
assert(!job->txn);
job->txn = txn;
QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
+ job_unlock();
job_txn_ref(txn);
}
+/* Called with job_mutex held. */
static void job_txn_del_job(Job *job)
{
if (job->txn) {
@@ -186,6 +205,7 @@ static void job_txn_del_job(Job *job)
}
}
+/* Called with job_mutex held. */
static int job_txn_apply(Job *job, int fn(Job *))
{
AioContext *inner_ctx;
@@ -221,11 +241,13 @@ static int job_txn_apply(Job *job, int fn(Job *))
return rc;
}
+/* Does not need job_mutex */
bool job_is_internal(Job *job)
{
return (job->id == NULL);
}
+/* Called with job_mutex held. */
static void job_state_transition(Job *job, JobStatus s1)
{
JobStatus s0 = job->status;
@@ -241,6 +263,7 @@ static void job_state_transition(Job *job, JobStatus s1)
}
}
+/* Called with job_mutex held. */
int job_apply_verb(Job *job, JobVerb verb, Error **errp)
{
JobStatus s0 = job->status;
@@ -255,11 +278,13 @@ int job_apply_verb(Job *job, JobVerb verb, Error **errp)
return -EPERM;
}
+/* Does not need job_mutex. Value is never modified */
JobType job_type(const Job *job)
{
return job->driver->job_type;
}
+/* Does not need job_mutex. Value is never modified */
const char *job_type_str(const Job *job)
{
return JobType_str(job_type(job));
@@ -353,24 +378,34 @@ static bool job_started(Job *job)
return job->co;
}
+/* Called with job_mutex held. */
bool job_should_pause(Job *job)
{
return job->pause_count > 0;
}
+/* Called with job_mutex held. */
bool job_is_paused(Job *job)
{
return job->paused;
}
+/* Called with job_mutex *not* held. */
Job *job_next(Job *job)
{
+ Job *ret;
+ job_lock();
if (!job) {
- return QLIST_FIRST(&jobs);
+ ret = QLIST_FIRST(&jobs);
+ job_unlock();
+ return ret;
}
- return QLIST_NEXT(job, job_list);
+ ret = QLIST_NEXT(job, job_list);
+ job_unlock();
+ return ret;
}
+/* Called with job_mutex held. */
Job *job_get(const char *id)
{
Job *job;
@@ -388,13 +423,14 @@ Job *job_get(const char *id)
return NULL;
}
+/* Called with job_mutex *not* held. */
static void job_sleep_timer_cb(void *opaque)
{
Job *job = opaque;
-
job_enter(job);
}
+/* Called with job_mutex *not* held. */
void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
AioContext *ctx, int flags, BlockCompletionFunc *cb,
void *opaque, Error **errp)
@@ -449,6 +485,7 @@ void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
job_sleep_timer_cb, job);
QLIST_INSERT_HEAD(&jobs, job, job_list);
+ job_unlock();
/* Single jobs are modeled as single-job transactions for sake of
* consolidating the job management logic */
@@ -463,11 +500,13 @@ void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
return job;
}
+/* Called with job_mutex held. */
void job_ref(Job *job)
{
++job->refcnt;
}
+/* Called with job_mutex held. Temporarly releases the lock. */
void job_unref(Job *job)
{
if (--job->refcnt == 0) {
@@ -476,7 +515,9 @@ void job_unref(Job *job)
assert(!job->txn);
if (job->driver->free) {
+ job_unlock();
job->driver->free(job);
+ job_lock();
}
QLIST_REMOVE(job, job_list);
@@ -488,46 +529,55 @@ void job_unref(Job *job)
}
}
+/* API is thread safe */
void job_progress_update(Job *job, uint64_t done)
{
progress_work_done(&job->progress, done);
}
+/* API is thread safe */
void job_progress_set_remaining(Job *job, uint64_t remaining)
{
progress_set_remaining(&job->progress, remaining);
}
+/* API is thread safe */
void job_progress_increase_remaining(Job *job, uint64_t delta)
{
progress_increase_remaining(&job->progress, delta);
}
+/* Called with job_mutex held. */
void job_event_cancelled(Job *job)
{
notifier_list_notify(&job->on_finalize_cancelled, job);
}
+/* Called with job_mutex held. */
void job_event_completed(Job *job)
{
notifier_list_notify(&job->on_finalize_completed, job);
}
+/* Called with job_mutex held. */
static void job_event_pending(Job *job)
{
notifier_list_notify(&job->on_pending, job);
}
+/* Called with job_mutex held. */
static void job_event_ready(Job *job)
{
notifier_list_notify(&job->on_ready, job);
}
+/* Called with job_mutex held. */
static void job_event_idle(Job *job)
{
notifier_list_notify(&job->on_idle, job);
}
+/* Called with job_mutex held, but releases it temporarly. */
void job_enter_cond(Job *job, bool(*fn)(Job *job))
{
if (!job_started(job)) {
@@ -537,14 +587,11 @@ void job_enter_cond(Job *job, bool(*fn)(Job *job))
return;
}
- job_lock();
if (job->busy) {
- job_unlock();
return;
}
if (fn && !fn(job)) {
- job_unlock();
return;
}
@@ -552,7 +599,8 @@ void job_enter_cond(Job *job, bool(*fn)(Job *job))
timer_del(&job->sleep_timer);
job->busy = true;
job_unlock();
- aio_co_enter(job->aio_context, job->co);
+ aio_co_enter(job_get_aiocontext(job), job->co);
+ job_lock();
}
/* Called with job_mutex held. */
@@ -565,7 +613,7 @@ void job_enter_locked(Job *job)
void job_enter(Job *job)
{
job_lock();
- job_enter_locked(job, NULL);
+ job_enter_locked(job);
job_unlock();
}
@@ -574,7 +622,11 @@ void job_enter(Job *job)
* is allowed and cancels the timer.
*
* If @ns is (uint64_t) -1, no timer is scheduled and job_enter() must be
- * called explicitly. */
+ * called explicitly.
+ *
+ * Called with job_mutex *not* held (we don't want the coroutine
+ * to yield with the lock held!).
+ */
static void coroutine_fn job_do_yield(Job *job, uint64_t ns)
{
job_lock();
@@ -587,86 +639,122 @@ static void coroutine_fn job_do_yield(Job *job, uint64_t ns)
qemu_coroutine_yield();
/* Set by job_enter_cond() before re-entering the coroutine. */
+ job_lock();
assert(job->busy);
+ job_unlock();
}
+/*
+ * Called with job_mutex *not* held (we don't want the coroutine
+ * to yield with the lock held!).
+ */
void coroutine_fn job_pause_point(Job *job)
{
assert(job && job_started(job));
+ job_lock();
if (!job_should_pause(job)) {
+ job_unlock();
return;
}
- if (job_is_cancelled(job)) {
+ if (job_is_cancelled_locked(job)) {
+ job_unlock();
return;
}
if (job->driver->pause) {
+ job_unlock();
job->driver->pause(job);
+ job_lock();
}
- if (job_should_pause(job) && !job_is_cancelled(job)) {
+ if (job_should_pause(job) && !job_is_cancelled_locked(job)) {
JobStatus status = job->status;
job_state_transition(job, status == JOB_STATUS_READY
? JOB_STATUS_STANDBY
: JOB_STATUS_PAUSED);
job->paused = true;
+ job_unlock();
job_do_yield(job, -1);
+ job_lock();
job->paused = false;
job_state_transition(job, status);
}
+ job_unlock();
if (job->driver->resume) {
job->driver->resume(job);
}
}
+/*
+ * Called with job_mutex *not* held (we don't want the coroutine
+ * to yield with the lock held!).
+ */
void job_yield(Job *job)
{
+ bool res;
+ job_lock();
assert(job->busy);
/* Check cancellation *before* setting busy = false, too! */
- if (job_is_cancelled(job)) {
+ if (job_is_cancelled_locked(job)) {
+ job_unlock();
return;
}
- if (!job_should_pause(job)) {
+ res = job_should_pause(job);
+ job_unlock();
+
+ if (!res) {
job_do_yield(job, -1);
}
job_pause_point(job);
}
+/*
+ * Called with job_mutex *not* held (we don't want the coroutine
+ * to yield with the lock held!).
+ */
void coroutine_fn job_sleep_ns(Job *job, int64_t ns)
{
+ bool res;
+ job_lock();
assert(job->busy);
/* Check cancellation *before* setting busy = false, too! */
- if (job_is_cancelled(job)) {
+ if (job_is_cancelled_locked(job)) {
+ job_unlock();
return;
}
- if (!job_should_pause(job)) {
+ res = job_should_pause(job);
+ job_unlock();
+
+ if (!res) {
job_do_yield(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
}
job_pause_point(job);
}
-/* Assumes the block_job_mutex is held */
+/* Called with job_mutex held. */
static bool job_timer_not_pending(Job *job)
{
return !timer_pending(&job->sleep_timer);
}
+/* Called with job_mutex held. */
void job_pause(Job *job)
{
job->pause_count++;
if (!job->paused) {
- job_enter(job);
+ job_enter_locked(job);
}
}
+/* Called with job_mutex held. */
void job_resume(Job *job)
{
assert(job->pause_count > 0);
@@ -679,6 +767,7 @@ void job_resume(Job *job)
job_enter_cond(job, job_timer_not_pending);
}
+/* Called with job_mutex held. */
void job_user_pause(Job *job, Error **errp)
{
if (job_apply_verb(job, JOB_VERB_PAUSE, errp)) {
@@ -692,16 +781,19 @@ void job_user_pause(Job *job, Error **errp)
job_pause(job);
}
+/* Called with job_mutex held. */
bool job_user_paused(Job *job)
{
return job->user_paused;
}
+/* Called with job_mutex held. */
void job_set_user_paused(Job *job)
{
job->user_paused = true;
}
+/* Called with job_mutex held. Temporarly releases the lock. */
void job_user_resume(Job *job, Error **errp)
{
assert(job);
@@ -713,12 +805,15 @@ void job_user_resume(Job *job, Error **errp)
return;
}
if (job->driver->user_resume) {
+ job_unlock();
job->driver->user_resume(job);
+ job_lock();
}
job->user_paused = false;
job_resume(job);
}
+/* Called with job_mutex held. */
static void job_do_dismiss(Job *job)
{
assert(job);
@@ -732,6 +827,7 @@ static void job_do_dismiss(Job *job)
job_unref(job);
}
+/* Called with job_mutex held. */
void job_dismiss(Job **jobptr, Error **errp)
{
Job *job = *jobptr;
@@ -761,9 +857,10 @@ static void job_conclude(Job *job)
}
}
+/* Called with job_mutex held. */
static void job_update_rc(Job *job)
{
- if (!job->ret && job_is_cancelled(job)) {
+ if (!job->ret && job_is_cancelled_locked(job)) {
job->ret = -ECANCELED;
}
if (job->ret) {
@@ -774,22 +871,25 @@ static void job_update_rc(Job *job)
}
}
+/* Called with job_mutex *not* held. */
static void job_commit(Job *job)
{
- assert(!job->ret);
+ assert(!job_get_ret(job));
if (job->driver->commit) {
job->driver->commit(job);
}
}
+/* Called with job_mutex *not* held. */
static void job_abort(Job *job)
{
- assert(job->ret);
+ assert(job_get_ret(job));
if (job->driver->abort) {
job->driver->abort(job);
}
}
+/* Called with job_mutex *not* held. */
static void job_clean(Job *job)
{
if (job->driver->clean) {
@@ -797,14 +897,18 @@ static void job_clean(Job *job)
}
}
+/* Called with job lock held, but it releases it temporarily */
static int job_finalize_single(Job *job)
{
- assert(job_is_completed(job));
+ int ret;
+ assert(job_is_completed_locked(job));
/* Ensure abort is called for late-transactional failures */
job_update_rc(job);
- if (!job->ret) {
+ ret = job->ret;
+ job_unlock();
+ if (!ret) {
job_commit(job);
} else {
job_abort(job);
@@ -812,12 +916,13 @@ static int job_finalize_single(Job *job)
job_clean(job);
if (job->cb) {
- job->cb(job->opaque, job->ret);
+ job->cb(job->opaque, ret);
}
+ job_lock();
/* Emit events only if we actually started */
if (job_started(job)) {
- if (job_is_cancelled(job)) {
+ if (job_is_cancelled_locked(job)) {
job_event_cancelled(job);
} else {
job_event_completed(job);
@@ -829,15 +934,20 @@ static int job_finalize_single(Job *job)
return 0;
}
+/* Called with job_mutex held. Temporarly releases the lock. */
static void job_cancel_async(Job *job, bool force)
{
if (job->driver->cancel) {
+ job_unlock();
job->driver->cancel(job, force);
+ job_lock();
}
if (job->user_paused) {
/* Do not call job_enter here, the caller will handle it. */
if (job->driver->user_resume) {
+ job_unlock();
job->driver->user_resume(job);
+ job_lock();
}
job->user_paused = false;
assert(job->pause_count > 0);
@@ -848,27 +958,21 @@ static void job_cancel_async(Job *job, bool force)
job->force_cancel |= force;
}
+/* Called with job_mutex held. */
static void job_completed_txn_abort(Job *job)
{
- AioContext *outer_ctx = job->aio_context;
AioContext *ctx;
JobTxn *txn = job->txn;
Job *other_job;
- if (txn->aborting) {
+ if (qatomic_cmpxchg(&txn->aborting, false, true)) {
/*
* We are cancelled by another job, which will handle everything.
*/
return;
}
- txn->aborting = true;
job_txn_ref(txn);
- /* We can only hold the single job's AioContext lock while calling
- * job_finalize_single() because the finalization callbacks can involve
- * calls of AIO_WAIT_WHILE(), which could deadlock otherwise. */
- aio_context_release(outer_ctx);
-
/* Other jobs are effectively cancelled by us, set the status for
* them; this job, however, may or may not be cancelled, depending
* on the caller, so leave it. */
@@ -884,33 +988,39 @@ static void job_completed_txn_abort(Job *job)
other_job = QLIST_FIRST(&txn->jobs);
ctx = other_job->aio_context;
aio_context_acquire(ctx);
- if (!job_is_completed(other_job)) {
- assert(job_is_cancelled(other_job));
+ if (!job_is_completed_locked(other_job)) {
+ assert(job_is_cancelled_locked(other_job));
job_finish_sync(other_job, NULL, NULL);
}
job_finalize_single(other_job);
aio_context_release(ctx);
}
- aio_context_acquire(outer_ctx);
-
job_txn_unref(txn);
}
+/* Called with job_mutex held. Temporarly releases the lock. */
static int job_prepare(Job *job)
{
+ int ret;
+
if (job->ret == 0 && job->driver->prepare) {
- job->ret = job->driver->prepare(job);
+ job_unlock();
+ ret = job->driver->prepare(job);
+ job_lock();
+ job->ret = ret;
job_update_rc(job);
}
return job->ret;
}
+/* Does not need job_mutex */
static int job_needs_finalize(Job *job)
{
return !job->auto_finalize;
}
+/* Called with job_mutex held. */
static void job_do_finalize(Job *job)
{
int rc;
@@ -925,6 +1035,7 @@ static void job_do_finalize(Job *job)
}
}
+/* Called with job_mutex held. */
void job_finalize(Job *job, Error **errp)
{
assert(job && job->id);
@@ -934,6 +1045,7 @@ void job_finalize(Job *job, Error **errp)
job_do_finalize(job);
}
+/* Called with job_mutex held. */
static int job_transition_to_pending(Job *job)
{
job_state_transition(job, JOB_STATUS_PENDING);
@@ -943,17 +1055,22 @@ static int job_transition_to_pending(Job *job)
return 0;
}
+/* Called with job_mutex *not* held. */
void job_transition_to_ready(Job *job)
{
+ job_lock();
job_state_transition(job, JOB_STATUS_READY);
job_event_ready(job);
+ job_unlock();
}
+/* Called with job_mutex held. */
static void job_completed_txn_success(Job *job)
{
- JobTxn *txn = job->txn;
+ JobTxn *txn;
Job *other_job;
+ txn = job->txn;
job_state_transition(job, JOB_STATUS_WAITING);
/*
@@ -961,7 +1078,7 @@ static void job_completed_txn_success(Job *job)
* txn.
*/
QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
- if (!job_is_completed(other_job)) {
+ if (!job_is_completed_locked(other_job)) {
return;
}
assert(other_job->ret == 0);
@@ -975,9 +1092,10 @@ static void job_completed_txn_success(Job *job)
}
}
+/* Called with job_mutex held. */
static void job_completed(Job *job)
{
- assert(job && job->txn && !job_is_completed(job));
+ assert(job && job->txn && !job_is_completed_locked(job));
job_update_rc(job);
trace_job_completed(job, job->ret);
@@ -988,14 +1106,16 @@ static void job_completed(Job *job)
}
}
-/** Useful only as a type shim for aio_bh_schedule_oneshot. */
+/**
+ * Useful only as a type shim for aio_bh_schedule_oneshot.
+ * Called with job_mutex *not* held.
+ */
static void job_exit(void *opaque)
{
Job *job = (Job *)opaque;
- AioContext *ctx;
+ job_lock();
job_ref(job);
- aio_context_acquire(job->aio_context);
/* This is a lie, we're not quiescent, but still doing the completion
* callbacks. However, completion callbacks tend to involve operations that
@@ -1012,29 +1132,40 @@ static void job_exit(void *opaque)
* acquiring the new lock, and we ref/unref to avoid job_completed freeing
* the job underneath us.
*/
- ctx = job->aio_context;
job_unref(job);
- aio_context_release(ctx);
+ job_unlock();
}
/**
* All jobs must allow a pause point before entering their job proper. This
* ensures that jobs can be paused prior to being started, then resumed later.
+ *
+ * Called with job_mutex *not* held.
*/
static void coroutine_fn job_co_entry(void *opaque)
{
Job *job = opaque;
+ Error *local_error = NULL;
+ int ret;
assert(job && job->driver && job->driver->run);
job_pause_point(job);
- job->ret = job->driver->run(job, &job->err);
+ ret = job->driver->run(job, &local_error);
+ job_lock();
+ if (local_error) {
+ error_propagate(&job->err, local_error);
+ }
+ job->ret = ret;
job->deferred_to_main_loop = true;
job->busy = true;
+ job_unlock();
aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job);
}
+/* Called with job_mutex *not* held. */
void job_start(Job *job)
{
+ job_lock();
assert(job && !job_started(job) && job->paused &&
job->driver && job->driver->run);
job->co = qemu_coroutine_create(job_co_entry, job);
@@ -1042,9 +1173,11 @@ void job_start(Job *job)
job->busy = true;
job->paused = false;
job_state_transition(job, JOB_STATUS_RUNNING);
- aio_co_enter(job->aio_context, job->co);
+ job_unlock();
+ aio_co_enter(job_get_aiocontext(job), job->co);
}
+/* Called with job_mutex held. */
void job_cancel(Job *job, bool force)
{
if (job->status == JOB_STATUS_CONCLUDED) {
@@ -1057,10 +1190,11 @@ void job_cancel(Job *job, bool force)
} else if (job->deferred_to_main_loop) {
job_completed_txn_abort(job);
} else {
- job_enter(job);
+ job_enter_locked(job);
}
}
+/* Called with job_mutex held. */
void job_user_cancel(Job *job, bool force, Error **errp)
{
if (job_apply_verb(job, JOB_VERB_CANCEL, errp)) {
@@ -1069,19 +1203,36 @@ void job_user_cancel(Job *job, bool force, Error **errp)
job_cancel(job, force);
}
-/* A wrapper around job_cancel() taking an Error ** parameter so it may be
+/*
+ * A wrapper around job_cancel() taking an Error ** parameter so it may be
* used with job_finish_sync() without the need for (rather nasty) function
- * pointer casts there. */
+ * pointer casts there.
+ *
+ * Called with job_mutex held.
+ */
static void job_cancel_err(Job *job, Error **errp)
{
job_cancel(job, false);
}
+/*
+ * Called with job_mutex *not* held, unlike most other APIs consumed
+ * by the monitor!
+ */
int job_cancel_sync(Job *job)
{
- return job_finish_sync(job, &job_cancel_err, NULL);
+ int ret;
+
+ job_lock();
+ ret = job_finish_sync(job, &job_cancel_err, NULL);
+ job_unlock();
+ return ret;
}
+/*
+ * Called with job_mutex *not* held, unlike most other APIs consumed
+ * by the monitor!
+ */
void job_cancel_sync_all(void)
{
Job *job;
@@ -1095,11 +1246,13 @@ void job_cancel_sync_all(void)
}
}
+/* Called with job_mutex held. */
int job_complete_sync(Job *job, Error **errp)
{
return job_finish_sync(job, job_complete, errp);
}
+/* Called with job_mutex held. Temporarly releases the lock. */
void job_complete(Job *job, Error **errp)
{
/* Should not be reachable via external interface for internal jobs */
@@ -1107,15 +1260,18 @@ void job_complete(Job *job, Error **errp)
if (job_apply_verb(job, JOB_VERB_COMPLETE, errp)) {
return;
}
- if (job_is_cancelled(job) || !job->driver->complete) {
+ if (job_is_cancelled_locked(job) || !job->driver->complete) {
error_setg(errp, "The active block job '%s' cannot be completed",
job->id);
return;
}
+ job_unlock();
job->driver->complete(job, errp);
+ job_lock();
}
+/* Called with job_mutex held. */
int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp)
{
Error *local_err = NULL;
@@ -1132,10 +1288,12 @@ int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp)
return -EBUSY;
}
- AIO_WAIT_WHILE(job->aio_context,
- (job_enter(job), !job_is_completed(job)));
+ job_unlock();
+ AIO_WAIT_WHILE(NULL, (job_enter(job), !job_is_completed(job)));
+ job_lock();
- ret = (job_is_cancelled(job) && job->ret == 0) ? -ECANCELED : job->ret;
+ ret = (job_is_cancelled_locked(job) && job->ret == 0) ?
+ -ECANCELED : job->ret;
job_unref(job);
return ret;
}
@@ -898,17 +898,19 @@ static void common_block_job_cb(void *opaque, int ret)
}
}
+/* Called with job_mutex held. Releases it temporarly */
static void run_block_job(BlockJob *job, Error **errp)
{
uint64_t progress_current, progress_total;
AioContext *aio_context = blk_get_aio_context(job->blk);
int ret = 0;
- aio_context_acquire(aio_context);
job_ref(&job->job);
do {
float progress = 0.0f;
+ job_unlock();
aio_poll(aio_context, true);
+ job_lock();
progress_get_snapshot(&job->job.progress, &progress_current,
&progress_total);
@@ -916,15 +918,15 @@ static void run_block_job(BlockJob *job, Error **errp)
progress = (float)progress_current / progress_total * 100.f;
}
qemu_progress_print(progress, 0);
- } while (!job_is_ready(&job->job) && !job_is_completed(&job->job));
+ } while (!job_is_ready_locked(&job->job) &&
+ !job_is_completed_locked(&job->job));
- if (!job_is_completed(&job->job)) {
+ if (!job_is_completed_locked(&job->job)) {
ret = job_complete_sync(&job->job, errp);
} else {
- ret = job_get_ret(&job->job);
+ ret = job_get_ret_locked(&job->job);
}
job_unref(&job->job);
- aio_context_release(aio_context);
/* publish completion progress only when success */
if (!ret) {
@@ -1076,9 +1078,12 @@ static int img_commit(int argc, char **argv)
bdrv_ref(bs);
}
+ job_lock();
job = block_job_get("commit");
assert(job);
run_block_job(job, &local_err);
+ job_unlock();
+
if (local_err) {
goto unref_backing;
}
This lock is going to replace most of the AioContext locks in the job and blockjob, so that a Job can run in an arbitrary AioContext. Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com> --- include/block/blockjob_int.h | 1 + include/qemu/job.h | 2 + block/backup.c | 4 + block/mirror.c | 11 +- blockdev.c | 62 ++++---- blockjob.c | 67 +++++++-- job-qmp.c | 55 +++---- job.c | 284 +++++++++++++++++++++++++++-------- qemu-img.c | 15 +- 9 files changed, 350 insertions(+), 151 deletions(-)