@@ -49,4 +49,27 @@
QIOChannel *qio_channel_new_fd(int fd,
Error **errp);
+/**
+ * qio_channel_util_set_aio_fd_handler:
+ * @read_fd: the file descriptor for the read handler
+ * @read_ctx: the AioContext for the read handler
+ * @io_read: the read handler
+ * @write_fd: the file descriptor for the write handler
+ * @write_ctx: the AioContext for the write handler
+ * @io_write: the write handler
+ * @opaque: the opaque argument to the read and write handler
+ *
+ * Set the read and write handlers when @read_ctx and @write_ctx are non-NULL,
+ * respectively. To leave a handler in its current state, pass a NULL
+ * AioContext. To clear a handler, pass a non-NULL AioContext and a NULL
+ * handler.
+ */
+void qio_channel_util_set_aio_fd_handler(int read_fd,
+ AioContext *read_ctx,
+ IOHandler *io_read,
+ int write_fd,
+ AioContext *write_ctx,
+ IOHandler *io_write,
+ void *opaque);
+
#endif /* QIO_CHANNEL_UTIL_H */
@@ -81,9 +81,11 @@ struct QIOChannel {
Object parent;
unsigned int features; /* bitmask of QIOChannelFeatures */
char *name;
- AioContext *ctx;
+ AioContext *read_ctx;
Coroutine *read_coroutine;
+ AioContext *write_ctx;
Coroutine *write_coroutine;
+ bool follow_coroutine_ctx;
#ifdef _WIN32
HANDLE event; /* For use with GSource on Win32 */
#endif
@@ -140,8 +142,9 @@ struct QIOChannelClass {
int whence,
Error **errp);
void (*io_set_aio_fd_handler)(QIOChannel *ioc,
- AioContext *ctx,
+ AioContext *read_ctx,
IOHandler *io_read,
+ AioContext *write_ctx,
IOHandler *io_write,
void *opaque);
int (*io_flush)(QIOChannel *ioc,
@@ -498,6 +501,21 @@ int qio_channel_set_blocking(QIOChannel *ioc,
bool enabled,
Error **errp);
+/**
+ * qio_channel_set_follow_coroutine_ctx:
+ * @ioc: the channel object
+ * @enabled: whether or not to follow the coroutine's AioContext
+ *
+ * If @enabled is true, calls to qio_channel_yield() use the current
+ * coroutine's AioContext. Usually this is desirable.
+ *
+ * If @enabled is false, calls to qio_channel_yield() use the global iohandler
+ * AioContext. This is may be used by coroutines that run in the main loop and
+ * do not wish to respond to I/O during nested event loops. This is the
+ * default for compatibility with code that is not aware of AioContexts.
+ */
+void qio_channel_set_follow_coroutine_ctx(QIOChannel *ioc, bool enabled);
+
/**
* qio_channel_close:
* @ioc: the channel object
@@ -703,41 +721,6 @@ GSource *qio_channel_add_watch_source(QIOChannel *ioc,
GDestroyNotify notify,
GMainContext *context);
-/**
- * qio_channel_attach_aio_context:
- * @ioc: the channel object
- * @ctx: the #AioContext to set the handlers on
- *
- * Request that qio_channel_yield() sets I/O handlers on
- * the given #AioContext. If @ctx is %NULL, qio_channel_yield()
- * uses QEMU's main thread event loop.
- *
- * You can move a #QIOChannel from one #AioContext to another even if
- * I/O handlers are set for a coroutine. However, #QIOChannel provides
- * no synchronization between the calls to qio_channel_yield() and
- * qio_channel_attach_aio_context().
- *
- * Therefore you should first call qio_channel_detach_aio_context()
- * to ensure that the coroutine is not entered concurrently. Then,
- * while the coroutine has yielded, call qio_channel_attach_aio_context(),
- * and then aio_co_schedule() to place the coroutine on the new
- * #AioContext. The calls to qio_channel_detach_aio_context()
- * and qio_channel_attach_aio_context() should be protected with
- * aio_context_acquire() and aio_context_release().
- */
-void qio_channel_attach_aio_context(QIOChannel *ioc,
- AioContext *ctx);
-
-/**
- * qio_channel_detach_aio_context:
- * @ioc: the channel object
- *
- * Disable any I/O handlers set by qio_channel_yield(). With the
- * help of aio_co_schedule(), this allows moving a coroutine that was
- * paused by qio_channel_yield() to another context.
- */
-void qio_channel_detach_aio_context(QIOChannel *ioc);
-
/**
* qio_channel_yield:
* @ioc: the channel object
@@ -785,8 +768,9 @@ void qio_channel_wait(QIOChannel *ioc,
/**
* qio_channel_set_aio_fd_handler:
* @ioc: the channel object
- * @ctx: the AioContext to set the handlers on
+ * @read_ctx: the AioContext to set the read handler on or NULL
* @io_read: the read handler
+ * @write_ctx: the AioContext to set the write handler on or NULL
* @io_write: the write handler
* @opaque: the opaque value passed to the handler
*
@@ -794,10 +778,17 @@ void qio_channel_wait(QIOChannel *ioc,
* be used by channel implementations to forward the handlers
* to another channel (e.g. from #QIOChannelTLS to the
* underlying socket).
+ *
+ * When @read_ctx is NULL, don't touch the read handler. When @write_ctx is
+ * NULL, don't touch the write handler. Note that setting the read handler
+ * clears the write handler, and vice versa, if they share the same AioContext.
+ * Therefore the caller must pass both handlers together when sharing the same
+ * AioContext.
*/
void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
- AioContext *ctx,
+ AioContext *read_ctx,
IOHandler *io_read,
+ AioContext *write_ctx,
IOHandler *io_write,
void *opaque);
@@ -43,6 +43,7 @@ typedef struct {
unsigned int in_flight; /* atomic */
/* Protected by ctx lock */
+ bool in_qio_channel_yield;
bool wait_idle;
VuDev vu_dev;
QIOChannel *ioc; /* The I/O channel with the client */
@@ -352,7 +352,7 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
}
qio_channel_set_blocking(s->ioc, false, NULL);
- qio_channel_attach_aio_context(s->ioc, bdrv_get_aio_context(bs));
+ qio_channel_set_follow_coroutine_ctx(s->ioc, true);
/* successfully connected */
WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
@@ -397,7 +397,6 @@ static void coroutine_fn GRAPH_RDLOCK nbd_reconnect_attempt(BDRVNBDState *s)
/* Finalize previous connection if any */
if (s->ioc) {
- qio_channel_detach_aio_context(s->ioc);
yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
nbd_yank, s->bs);
object_unref(OBJECT(s->ioc));
@@ -2089,10 +2088,6 @@ static void nbd_attach_aio_context(BlockDriverState *bs,
* the reconnect_delay_timer cannot be active here.
*/
assert(!s->reconnect_delay_timer);
-
- if (s->ioc) {
- qio_channel_attach_aio_context(s->ioc, new_context);
- }
}
static void nbd_detach_aio_context(BlockDriverState *bs)
@@ -2101,10 +2096,6 @@ static void nbd_detach_aio_context(BlockDriverState *bs)
assert(!s->open_timer);
assert(!s->reconnect_delay_timer);
-
- if (s->ioc) {
- qio_channel_detach_aio_context(s->ioc);
- }
}
static BlockDriver bdrv_nbd = {
@@ -20,6 +20,7 @@
#include "qemu/osdep.h"
#include "io/channel-command.h"
+#include "io/channel-util.h"
#include "io/channel-watch.h"
#include "qapi/error.h"
#include "qemu/module.h"
@@ -331,14 +332,17 @@ static int qio_channel_command_close(QIOChannel *ioc,
static void qio_channel_command_set_aio_fd_handler(QIOChannel *ioc,
- AioContext *ctx,
+ AioContext *read_ctx,
IOHandler *io_read,
+ AioContext *write_ctx,
IOHandler *io_write,
void *opaque)
{
QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc);
- aio_set_fd_handler(ctx, cioc->readfd, io_read, NULL, NULL, NULL, opaque);
- aio_set_fd_handler(ctx, cioc->writefd, NULL, io_write, NULL, NULL, opaque);
+
+ qio_channel_util_set_aio_fd_handler(cioc->readfd, read_ctx, io_read,
+ cioc->writefd, write_ctx, io_write,
+ opaque);
}
@@ -20,6 +20,7 @@
#include "qemu/osdep.h"
#include "io/channel-file.h"
+#include "io/channel-util.h"
#include "io/channel-watch.h"
#include "qapi/error.h"
#include "qemu/module.h"
@@ -192,13 +193,17 @@ static int qio_channel_file_close(QIOChannel *ioc,
static void qio_channel_file_set_aio_fd_handler(QIOChannel *ioc,
- AioContext *ctx,
+ AioContext *read_ctx,
IOHandler *io_read,
+ AioContext *write_ctx,
IOHandler *io_write,
void *opaque)
{
QIOChannelFile *fioc = QIO_CHANNEL_FILE(ioc);
- aio_set_fd_handler(ctx, fioc->fd, io_read, io_write, NULL, NULL, opaque);
+
+ qio_channel_util_set_aio_fd_handler(fioc->fd, read_ctx, io_read,
+ fioc->fd, write_ctx, io_write,
+ opaque);
}
static GSource *qio_channel_file_create_watch(QIOChannel *ioc,
@@ -128,8 +128,9 @@ qio_channel_null_close(QIOChannel *ioc,
static void
qio_channel_null_set_aio_fd_handler(QIOChannel *ioc G_GNUC_UNUSED,
- AioContext *ctx G_GNUC_UNUSED,
+ AioContext *read_ctx G_GNUC_UNUSED,
IOHandler *io_read G_GNUC_UNUSED,
+ AioContext *write_ctx G_GNUC_UNUSED,
IOHandler *io_write G_GNUC_UNUSED,
void *opaque G_GNUC_UNUSED)
{
@@ -22,6 +22,7 @@
#include "qapi/qapi-visit-sockets.h"
#include "qemu/module.h"
#include "io/channel-socket.h"
+#include "io/channel-util.h"
#include "io/channel-watch.h"
#include "trace.h"
#include "qapi/clone-visitor.h"
@@ -893,13 +894,17 @@ qio_channel_socket_shutdown(QIOChannel *ioc,
}
static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc,
- AioContext *ctx,
+ AioContext *read_ctx,
IOHandler *io_read,
+ AioContext *write_ctx,
IOHandler *io_write,
void *opaque)
{
QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
- aio_set_fd_handler(ctx, sioc->fd, io_read, io_write, NULL, NULL, opaque);
+
+ qio_channel_util_set_aio_fd_handler(sioc->fd, read_ctx, io_read,
+ sioc->fd, write_ctx, io_write,
+ opaque);
}
static GSource *qio_channel_socket_create_watch(QIOChannel *ioc,
@@ -388,14 +388,16 @@ static int qio_channel_tls_close(QIOChannel *ioc,
}
static void qio_channel_tls_set_aio_fd_handler(QIOChannel *ioc,
- AioContext *ctx,
+ AioContext *read_ctx,
IOHandler *io_read,
+ AioContext *write_ctx,
IOHandler *io_write,
void *opaque)
{
QIOChannelTLS *tioc = QIO_CHANNEL_TLS(ioc);
- qio_channel_set_aio_fd_handler(tioc->master, ctx, io_read, io_write, opaque);
+ qio_channel_set_aio_fd_handler(tioc->master, read_ctx, io_read,
+ write_ctx, io_write, opaque);
}
typedef struct QIOChannelTLSSource QIOChannelTLSSource;
@@ -36,3 +36,27 @@ QIOChannel *qio_channel_new_fd(int fd,
}
return ioc;
}
+
+
+void qio_channel_util_set_aio_fd_handler(int read_fd,
+ AioContext *read_ctx,
+ IOHandler *io_read,
+ int write_fd,
+ AioContext *write_ctx,
+ IOHandler *io_write,
+ void *opaque)
+{
+ if (read_fd == write_fd && read_ctx == write_ctx) {
+ aio_set_fd_handler(read_ctx, read_fd, io_read, io_write,
+ NULL, NULL, opaque);
+ } else {
+ if (read_ctx) {
+ aio_set_fd_handler(read_ctx, read_fd, io_read, NULL,
+ NULL, NULL, opaque);
+ }
+ if (write_ctx) {
+ aio_set_fd_handler(write_ctx, write_fd, NULL, io_write,
+ NULL, NULL, opaque);
+ }
+ }
+}
@@ -365,6 +365,12 @@ int qio_channel_set_blocking(QIOChannel *ioc,
}
+void qio_channel_set_follow_coroutine_ctx(QIOChannel *ioc, bool enabled)
+{
+ ioc->follow_coroutine_ctx = enabled;
+}
+
+
int qio_channel_close(QIOChannel *ioc,
Error **errp)
{
@@ -388,14 +394,16 @@ GSource *qio_channel_create_watch(QIOChannel *ioc,
void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
- AioContext *ctx,
+ AioContext *read_ctx,
IOHandler *io_read,
+ AioContext *write_ctx,
IOHandler *io_write,
void *opaque)
{
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
- klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque);
+ klass->io_set_aio_fd_handler(ioc, read_ctx, io_read, write_ctx, io_write,
+ opaque);
}
guint qio_channel_add_watch_full(QIOChannel *ioc,
@@ -542,56 +550,101 @@ static void qio_channel_restart_write(void *opaque)
aio_co_wake(co);
}
-static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc)
+static void coroutine_fn
+qio_channel_set_fd_handlers(QIOChannel *ioc, GIOCondition condition)
{
- IOHandler *rd_handler = NULL, *wr_handler = NULL;
+ AioContext *ctx = ioc->follow_coroutine_ctx ?
+ qemu_coroutine_get_aio_context(qemu_coroutine_self()) :
+ iohandler_get_aio_context();
+ AioContext *read_ctx = NULL;
+ IOHandler *io_read = NULL;
+ AioContext *write_ctx = NULL;
+ IOHandler *io_write = NULL;
+
+ if (condition == G_IO_IN) {
+ ioc->read_coroutine = qemu_coroutine_self();
+ ioc->read_ctx = ctx;
+ read_ctx = ctx;
+ io_read = qio_channel_restart_read;
+
+ /*
+ * Thread safety: if the other coroutine is set and its AioContext
+ * matches ours, then there is mutual exclusion between read and write
+ * because they share a single thread and it's safe to set both read
+ * and write fd handlers here. If the AioContext does not match ours,
+ * then both threads may run in parallel but there is no shared state
+ * to worry about.
+ */
+ if (ioc->write_coroutine && ioc->write_ctx == ctx) {
+ write_ctx = ctx;
+ io_write = qio_channel_restart_write;
+ }
+ } else if (condition == G_IO_OUT) {
+ ioc->write_coroutine = qemu_coroutine_self();
+ ioc->write_ctx = ctx;
+ write_ctx = ctx;
+ io_write = qio_channel_restart_write;
+ if (ioc->read_coroutine && ioc->read_ctx == ctx) {
+ read_ctx = ctx;
+ io_read = qio_channel_restart_read;
+ }
+ } else {
+ abort();
+ }
+
+ qio_channel_set_aio_fd_handler(ioc, read_ctx, io_read,
+ write_ctx, io_write, ioc);
+}
+
+static void coroutine_fn
+qio_channel_clear_fd_handlers(QIOChannel *ioc, GIOCondition condition)
+{
+ AioContext *read_ctx = NULL;
+ IOHandler *io_read = NULL;
+ AioContext *write_ctx = NULL;
+ IOHandler *io_write = NULL;
AioContext *ctx;
- if (ioc->read_coroutine) {
- rd_handler = qio_channel_restart_read;
+ if (condition == G_IO_IN) {
+ ctx = ioc->read_ctx;
+ read_ctx = ctx;
+ io_read = NULL;
+ if (ioc->write_coroutine && ioc->write_ctx == ctx) {
+ write_ctx = ctx;
+ io_write = qio_channel_restart_write;
+ }
+ } else if (condition == G_IO_OUT) {
+ ctx = ioc->write_ctx;
+ write_ctx = ctx;
+ io_write = NULL;
+ if (ioc->read_coroutine && ioc->read_ctx == ctx) {
+ read_ctx = ctx;
+ io_read = qio_channel_restart_read;
+ }
+ } else {
+ abort();
}
- if (ioc->write_coroutine) {
- wr_handler = qio_channel_restart_write;
- }
-
- ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context();
- qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc);
-}
-
-void qio_channel_attach_aio_context(QIOChannel *ioc,
- AioContext *ctx)
-{
- assert(!ioc->read_coroutine);
- assert(!ioc->write_coroutine);
- ioc->ctx = ctx;
-}
-void qio_channel_detach_aio_context(QIOChannel *ioc)
-{
- ioc->read_coroutine = NULL;
- ioc->write_coroutine = NULL;
- qio_channel_set_aio_fd_handlers(ioc);
- ioc->ctx = NULL;
+ qio_channel_set_aio_fd_handler(ioc, read_ctx, io_read,
+ write_ctx, io_write, ioc);
}
void coroutine_fn qio_channel_yield(QIOChannel *ioc,
GIOCondition condition)
{
- AioContext *ioc_ctx = ioc->ctx ?: qemu_get_aio_context();
+ AioContext *ioc_ctx;
assert(qemu_in_coroutine());
- assert(in_aio_context_home_thread(ioc_ctx));
+ ioc_ctx = qemu_coroutine_get_aio_context(qemu_coroutine_self());
if (condition == G_IO_IN) {
assert(!ioc->read_coroutine);
- ioc->read_coroutine = qemu_coroutine_self();
} else if (condition == G_IO_OUT) {
assert(!ioc->write_coroutine);
- ioc->write_coroutine = qemu_coroutine_self();
} else {
abort();
}
- qio_channel_set_aio_fd_handlers(ioc);
+ qio_channel_set_fd_handlers(ioc, condition);
qemu_coroutine_yield();
assert(in_aio_context_home_thread(ioc_ctx));
@@ -599,11 +652,10 @@ void coroutine_fn qio_channel_yield(QIOChannel *ioc,
* through the aio_fd_handlers. */
if (condition == G_IO_IN) {
assert(ioc->read_coroutine == NULL);
- qio_channel_set_aio_fd_handlers(ioc);
} else if (condition == G_IO_OUT) {
assert(ioc->write_coroutine == NULL);
- qio_channel_set_aio_fd_handlers(ioc);
}
+ qio_channel_clear_fd_handlers(ioc, condition);
}
void qio_channel_wake_read(QIOChannel *ioc)
@@ -158,8 +158,9 @@ qio_channel_block_close(QIOChannel *ioc,
static void
qio_channel_block_set_aio_fd_handler(QIOChannel *ioc,
- AioContext *ctx,
+ AioContext *read_ctx,
IOHandler *io_read,
+ AioContext *write_ctx,
IOHandler *io_write,
void *opaque)
{
@@ -3103,22 +3103,23 @@ static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
}
static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc,
- AioContext *ctx,
- IOHandler *io_read,
- IOHandler *io_write,
- void *opaque)
+ AioContext *read_ctx,
+ IOHandler *io_read,
+ AioContext *write_ctx,
+ IOHandler *io_write,
+ void *opaque)
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
if (io_read) {
- aio_set_fd_handler(ctx, rioc->rdmain->recv_comp_channel->fd, io_read,
- io_write, NULL, NULL, opaque);
- aio_set_fd_handler(ctx, rioc->rdmain->send_comp_channel->fd, io_read,
- io_write, NULL, NULL, opaque);
+ aio_set_fd_handler(read_ctx, rioc->rdmain->recv_comp_channel->fd,
+ io_read, io_write, NULL, NULL, opaque);
+ aio_set_fd_handler(read_ctx, rioc->rdmain->send_comp_channel->fd,
+ io_read, io_write, NULL, NULL, opaque);
} else {
- aio_set_fd_handler(ctx, rioc->rdmaout->recv_comp_channel->fd, io_read,
- io_write, NULL, NULL, opaque);
- aio_set_fd_handler(ctx, rioc->rdmaout->send_comp_channel->fd, io_read,
- io_write, NULL, NULL, opaque);
+ aio_set_fd_handler(write_ctx, rioc->rdmaout->recv_comp_channel->fd,
+ io_read, io_write, NULL, NULL, opaque);
+ aio_set_fd_handler(write_ctx, rioc->rdmaout->send_comp_channel->fd,
+ io_read, io_write, NULL, NULL, opaque);
}
}
@@ -1333,6 +1333,7 @@ static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
*/
qio_channel_set_blocking(client->ioc, false, NULL);
+ qio_channel_set_follow_coroutine_ctx(client->ioc, true);
trace_nbd_negotiate_begin();
memcpy(buf, "NBDMAGIC", 8);
@@ -1352,11 +1353,6 @@ static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
return ret;
}
- /* Attach the channel to the same AioContext as the export */
- if (client->exp && client->exp->common.ctx) {
- qio_channel_attach_aio_context(client->ioc, client->exp->common.ctx);
- }
-
assert(!client->optlen);
trace_nbd_negotiate_success();
@@ -1465,7 +1461,6 @@ void nbd_client_put(NBDClient *client)
*/
assert(client->closing);
- qio_channel_detach_aio_context(client->ioc);
object_unref(OBJECT(client->sioc));
object_unref(OBJECT(client->ioc));
if (client->tlscreds) {
@@ -1544,8 +1539,6 @@ static void blk_aio_attached(AioContext *ctx, void *opaque)
exp->common.ctx = ctx;
QTAILQ_FOREACH(client, &exp->clients, next) {
- qio_channel_attach_aio_context(client->ioc, ctx);
-
assert(client->nb_requests == 0);
assert(client->recv_coroutine == NULL);
assert(client->send_coroutine == NULL);
@@ -1555,14 +1548,9 @@ static void blk_aio_attached(AioContext *ctx, void *opaque)
static void blk_aio_detach(void *opaque)
{
NBDExport *exp = opaque;
- NBDClient *client;
trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
- QTAILQ_FOREACH(client, &exp->clients, next) {
- qio_channel_detach_aio_context(client->ioc);
- }
-
exp->common.ctx = NULL;
}
@@ -735,8 +735,7 @@ static void coroutine_fn prh_co_entry(void *opaque)
qio_channel_set_blocking(QIO_CHANNEL(client->ioc),
false, NULL);
- qio_channel_attach_aio_context(QIO_CHANNEL(client->ioc),
- qemu_get_aio_context());
+ qio_channel_set_follow_coroutine_ctx(QIO_CHANNEL(client->ioc), true);
/* A very simple negotiation for future extensibility. No features
* are defined so write 0.
@@ -796,7 +795,6 @@ static void coroutine_fn prh_co_entry(void *opaque)
}
out:
- qio_channel_detach_aio_context(QIO_CHANNEL(client->ioc));
object_unref(OBJECT(client->ioc));
g_free(client);
}
@@ -127,7 +127,14 @@ vu_message_read(VuDev *vu_dev, int conn_fd, VhostUserMsg *vmsg)
if (rc < 0) {
if (rc == QIO_CHANNEL_ERR_BLOCK) {
assert(local_err == NULL);
- qio_channel_yield(ioc, G_IO_IN);
+ if (server->ctx) {
+ server->in_qio_channel_yield = true;
+ qio_channel_yield(ioc, G_IO_IN);
+ server->in_qio_channel_yield = false;
+ } else {
+ /* Wait until attached to an AioContext again */
+ qemu_coroutine_yield();
+ }
continue;
} else {
error_report_err(local_err);
@@ -278,7 +285,7 @@ set_watch(VuDev *vu_dev, int fd, int vu_evt,
vu_fd_watch->fd = fd;
vu_fd_watch->cb = cb;
qemu_socket_set_nonblock(fd);
- aio_set_fd_handler(server->ioc->ctx, fd, kick_handler,
+ aio_set_fd_handler(server->ctx, fd, kick_handler,
NULL, NULL, NULL, vu_fd_watch);
vu_fd_watch->vu_dev = vu_dev;
vu_fd_watch->pvt = pvt;
@@ -299,7 +306,7 @@ static void remove_watch(VuDev *vu_dev, int fd)
if (!vu_fd_watch) {
return;
}
- aio_set_fd_handler(server->ioc->ctx, fd, NULL, NULL, NULL, NULL, NULL);
+ aio_set_fd_handler(server->ctx, fd, NULL, NULL, NULL, NULL, NULL);
QTAILQ_REMOVE(&server->vu_fd_watches, vu_fd_watch, next);
g_free(vu_fd_watch);
@@ -344,6 +351,8 @@ static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc,
/* TODO vu_message_write() spins if non-blocking! */
qio_channel_set_blocking(server->ioc, false, NULL);
+ qio_channel_set_follow_coroutine_ctx(server->ioc, true);
+
server->co_trip = qemu_coroutine_create(vu_client_trip, server);
aio_context_acquire(server->ctx);
@@ -399,13 +408,12 @@ void vhost_user_server_attach_aio_context(VuServer *server, AioContext *ctx)
return;
}
- qio_channel_attach_aio_context(server->ioc, ctx);
-
QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) {
aio_set_fd_handler(ctx, vu_fd_watch->fd, kick_handler, NULL,
NULL, NULL, vu_fd_watch);
}
+ assert(!server->in_qio_channel_yield);
aio_co_schedule(ctx, server->co_trip);
}
@@ -419,11 +427,16 @@ void vhost_user_server_detach_aio_context(VuServer *server)
aio_set_fd_handler(server->ctx, vu_fd_watch->fd,
NULL, NULL, NULL, NULL, vu_fd_watch);
}
-
- qio_channel_detach_aio_context(server->ioc);
}
server->ctx = NULL;
+
+ if (server->ioc) {
+ if (server->in_qio_channel_yield) {
+ /* Stop receiving the next vhost-user message */
+ qio_channel_wake_read(server->ioc);
+ }
+ }
}
bool vhost_user_server_start(VuServer *server,