@@ -62,6 +62,11 @@ struct {
* Make it easy for now.
*/
uintptr_t packet_num;
+ /*
+ * Synchronization point past which no more channels will be
+ * created.
+ */
+ QemuSemaphore channels_created;
/* send channels ready */
QemuSemaphore channels_ready;
/*
@@ -622,10 +627,6 @@ static void multifd_send_terminate_threads(void)
/*
* Finally recycle all the threads.
- *
- * TODO: p->running is still buggy, e.g. we can reach here without the
- * corresponding multifd_new_send_channel_async() get invoked yet,
- * then a new thread can even be created after this function returns.
*/
for (i = 0; i < migrate_multifd_channels(); i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
@@ -670,6 +671,7 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
static void multifd_send_cleanup_state(void)
{
+ qemu_sem_destroy(&multifd_send_state->channels_created);
qemu_sem_destroy(&multifd_send_state->channels_ready);
g_free(multifd_send_state->params);
multifd_send_state->params = NULL;
@@ -954,18 +956,26 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
if (migrate_channel_requires_tls_upgrade(ioc)) {
ret = multifd_tls_channel_connect(p, ioc, &local_err);
+ if (ret) {
+ return;
+ }
} else {
ret = multifd_channel_connect(p, ioc, &local_err);
}
+out:
+ /*
+ * Here we're not interested whether creation succeeded, only that
+ * it happened at all.
+ */
+ qemu_sem_post(&multifd_send_state->channels_created);
+
if (ret) {
return;
}
-out:
trace_multifd_new_send_channel_async_error(p->id, local_err);
multifd_send_set_error(local_err);
- multifd_send_kick_main(p);
if (!p->c) {
/*
* If no channel has been created, drop the initial
@@ -998,6 +1008,7 @@ bool multifd_send_setup(void)
multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
multifd_send_state->pages = multifd_pages_init(page_count);
+ qemu_sem_init(&multifd_send_state->channels_created, 0);
qemu_sem_init(&multifd_send_state->channels_ready, 0);
qatomic_set(&multifd_send_state->exiting, 0);
multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
@@ -1023,6 +1034,15 @@ bool multifd_send_setup(void)
multifd_new_send_channel_create(p);
}
+ /*
+ * Wait until channel creation has started for all channels. The
+ * creation can still fail, but no more channels will be created
+ * past this point.
+ */
+ for (i = 0; i < thread_count; i++) {
+ qemu_sem_wait(&multifd_send_state->channels_created);
+ }
+
for (i = 0; i < thread_count; i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];