@@ -71,17 +71,22 @@ typedef struct BDRVNBDState {
QIOChannel *ioc; /* The current I/O channel */
NBDExportInfo info;
- CoMutex send_mutex;
+ /*
+ * Protects free_sema, in_flight, requests[].coroutine,
+ * reconnect_delay_timer.
+ */
+ QemuMutex requests_lock;
CoQueue free_sema;
-
- CoMutex receive_mutex;
int in_flight;
- NBDClientState state;
-
+ NBDClientRequest requests[MAX_NBD_REQUESTS];
QEMUTimer *reconnect_delay_timer;
+
+ CoMutex send_mutex;
+ CoMutex receive_mutex;
+ NBDClientState state;
+
QEMUTimer *open_timer;
- NBDClientRequest requests[MAX_NBD_REQUESTS];
NBDReply reply;
BlockDriverState *bs;
@@ -350,7 +355,7 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
return 0;
}
-/* called under s->send_mutex */
+/* Called with s->requests_lock taken. */
static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
{
bool blocking = nbd_client_connecting_wait(s);
@@ -382,9 +387,9 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
s->ioc = NULL;
}
- qemu_co_mutex_unlock(&s->send_mutex);
+ qemu_mutex_unlock(&s->requests_lock);
nbd_co_do_establish_connection(s->bs, blocking, NULL);
- qemu_co_mutex_lock(&s->send_mutex);
+ qemu_mutex_lock(&s->requests_lock);
/*
* The reconnect attempt is done (maybe successfully, maybe not), so
@@ -466,11 +471,10 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
int rc, i = -1;
- qemu_co_mutex_lock(&s->send_mutex);
-
+ qemu_mutex_lock(&s->requests_lock);
while (s->in_flight == MAX_NBD_REQUESTS ||
(!nbd_client_connected(s) && s->in_flight > 0)) {
- qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
+ qemu_co_queue_wait(&s->free_sema, &s->requests_lock);
}
s->in_flight++;
@@ -491,13 +495,13 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
}
}
- g_assert(qemu_in_coroutine());
assert(i < MAX_NBD_REQUESTS);
-
s->requests[i].coroutine = qemu_coroutine_self();
s->requests[i].offset = request->from;
s->requests[i].receiving = false;
+ qemu_mutex_unlock(&s->requests_lock);
+ qemu_co_mutex_lock(&s->send_mutex);
request->handle = INDEX_TO_HANDLE(s, i);
assert(s->ioc);
@@ -517,17 +521,19 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
} else {
rc = nbd_send_request(s->ioc, request);
}
+ qemu_co_mutex_unlock(&s->send_mutex);
-err:
if (rc < 0) {
+ qemu_mutex_lock(&s->requests_lock);
+err:
nbd_channel_error(s, rc);
if (i != -1) {
s->requests[i].coroutine = NULL;
}
s->in_flight--;
qemu_co_queue_next(&s->free_sema);
+ qemu_mutex_unlock(&s->requests_lock);
}
- qemu_co_mutex_unlock(&s->send_mutex);
return rc;
}
@@ -1017,12 +1023,11 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
return true;
break_loop:
+ qemu_mutex_lock(&s->requests_lock);
s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
-
- qemu_co_mutex_lock(&s->send_mutex);
s->in_flight--;
qemu_co_queue_next(&s->free_sema);
- qemu_co_mutex_unlock(&s->send_mutex);
+ qemu_mutex_unlock(&s->requests_lock);
return false;
}
@@ -1855,8 +1860,9 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
s->bs = bs;
- qemu_co_mutex_init(&s->send_mutex);
+ qemu_mutex_init(&s->requests_lock);
qemu_co_queue_init(&s->free_sema);
+ qemu_co_mutex_init(&s->send_mutex);
qemu_co_mutex_init(&s->receive_mutex);
if (!yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp)) {
@@ -2043,9 +2049,11 @@ static void nbd_cancel_in_flight(BlockDriverState *bs)
reconnect_delay_timer_del(s);
+ qemu_mutex_lock(&s->requests_lock);
if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
s->state = NBD_CLIENT_CONNECTING_NOWAIT;
}
+ qemu_mutex_unlock(&s->requests_lock);
nbd_co_establish_connection_cancel(s->conn);
}