@@ -6,6 +6,7 @@
*/
#include "qemu/osdep.h"
+#include "exec/ramblock.h"
#include "qemu/cutils.h"
#include "qapi/error.h"
#include "channel.h"
@@ -81,9 +81,13 @@ struct {
struct {
MultiFDRecvParams *params;
+ MultiFDRecvData *data;
/* number of created threads */
int count;
- /* syncs main thread and channels */
+ /*
+ * This is always posted by the recv threads, the migration thread
+ * uses it to wait for recv threads to finish assigned tasks.
+ */
QemuSemaphore sem_sync;
/* global number of generated multifd packets */
uint64_t packet_num;
@@ -1119,6 +1123,57 @@ bool multifd_send_setup(void)
return true;
}
+bool multifd_recv(void)
+{
+ int i;
+ static int next_recv_channel;
+ MultiFDRecvParams *p = NULL;
+ MultiFDRecvData *data = multifd_recv_state->data;
+
+ /*
+ * next_channel can remain from a previous migration that was
+ * using more channels, so ensure it doesn't overflow if the
+ * limit is lower now.
+ */
+ next_recv_channel %= migrate_multifd_channels();
+ for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
+ if (multifd_recv_should_exit()) {
+ return false;
+ }
+
+ p = &multifd_recv_state->params[i];
+
+ if (qatomic_read(&p->pending_job) == false) {
+ next_recv_channel = (i + 1) % migrate_multifd_channels();
+ break;
+ }
+ }
+
+ /*
+ * Order pending_job read before manipulating p->data below. Pairs
+ * with qatomic_store_release() at multifd_recv_thread().
+ */
+ smp_mb_acquire();
+
+ assert(!p->data->size);
+ multifd_recv_state->data = p->data;
+ p->data = data;
+
+ /*
+ * Order p->data update before setting pending_job. Pairs with
+ * qatomic_load_acquire() at multifd_recv_thread().
+ */
+ qatomic_store_release(&p->pending_job, true);
+ qemu_sem_post(&p->sem);
+
+ return true;
+}
+
+MultiFDRecvData *multifd_get_recv_data(void)
+{
+ return multifd_recv_state->data;
+}
+
static void multifd_recv_terminate_threads(Error *err)
{
int i;
@@ -1143,11 +1198,26 @@ static void multifd_recv_terminate_threads(Error *err)
MultiFDRecvParams *p = &multifd_recv_state->params[i];
/*
- * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
- * however try to wakeup it without harm in cleanup phase.
+ * The migration thread and channels interact differently
+ * depending on the presence of packets.
*/
if (multifd_use_packets()) {
+ /*
+ * The channel receives as long as there are packets. When
+ * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
+ * channel waits for the migration thread to sync. If the
+ * sync never happens, do it here.
+ */
qemu_sem_post(&p->sem_sync);
+ } else {
+ /*
+ * The channel waits for the migration thread to give it
+ * work. When the migration thread runs out of work, it
+ * releases the channel and waits for any pending work to
+ * finish. If we reach here (e.g. due to error) before the
+ * work runs out, release the channel.
+ */
+ qemu_sem_post(&p->sem);
}
/*
@@ -1176,6 +1246,7 @@ static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
p->c = NULL;
qemu_mutex_destroy(&p->mutex);
qemu_sem_destroy(&p->sem_sync);
+ qemu_sem_destroy(&p->sem);
g_free(p->name);
p->name = NULL;
p->packet_len = 0;
@@ -1193,6 +1264,8 @@ static void multifd_recv_cleanup_state(void)
qemu_sem_destroy(&multifd_recv_state->sem_sync);
g_free(multifd_recv_state->params);
multifd_recv_state->params = NULL;
+ g_free(multifd_recv_state->data);
+ multifd_recv_state->data = NULL;
g_free(multifd_recv_state);
multifd_recv_state = NULL;
}
@@ -1269,11 +1342,11 @@ static void *multifd_recv_thread(void *opaque)
bool has_data = false;
p->normal_num = 0;
- if (multifd_recv_should_exit()) {
- break;
- }
-
if (use_packets) {
+ if (multifd_recv_should_exit()) {
+ break;
+ }
+
ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
p->packet_len, &local_err);
if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */
@@ -1292,6 +1365,30 @@ static void *multifd_recv_thread(void *opaque)
p->flags &= ~MULTIFD_FLAG_SYNC;
has_data = !!p->normal_num;
qemu_mutex_unlock(&p->mutex);
+ } else {
+ /*
+ * No packets, so we need to wait for the vmstate code to
+ * give us work.
+ */
+ qemu_sem_wait(&p->sem);
+
+ if (multifd_recv_should_exit()) {
+ break;
+ }
+
+ /* pairs with qatomic_store_release() at multifd_recv() */
+ if (!qatomic_load_acquire(&p->pending_job)) {
+ /*
+ * Migration thread did not send work, this is
+ * equivalent to pending_sync on the sending
+ * side. Post sem_sync to notify we reached this
+ * point.
+ */
+ qemu_sem_post(&multifd_recv_state->sem_sync);
+ continue;
+ }
+
+ has_data = !!p->data->size;
}
if (has_data) {
@@ -1306,6 +1403,15 @@ static void *multifd_recv_thread(void *opaque)
qemu_sem_post(&multifd_recv_state->sem_sync);
qemu_sem_wait(&p->sem_sync);
}
+ } else {
+ p->total_normal_pages += p->data->size / qemu_target_page_size();
+ p->data->size = 0;
+ /*
+ * Order data->size update before clearing
+ * pending_job. Pairs with smp_mb_acquire() at
+ * multifd_recv().
+ */
+ qatomic_store_release(&p->pending_job, false);
}
}
@@ -1338,6 +1444,10 @@ int multifd_recv_setup(Error **errp)
thread_count = migrate_multifd_channels();
multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
+
+ multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
+ multifd_recv_state->data->size = 0;
+
qatomic_set(&multifd_recv_state->count, 0);
qatomic_set(&multifd_recv_state->exiting, 0);
qemu_sem_init(&multifd_recv_state->sem_sync, 0);
@@ -1348,8 +1458,13 @@ int multifd_recv_setup(Error **errp)
qemu_mutex_init(&p->mutex);
qemu_sem_init(&p->sem_sync, 0);
+ qemu_sem_init(&p->sem, 0);
+ p->pending_job = false;
p->id = i;
+ p->data = g_new0(MultiFDRecvData, 1);
+ p->data->size = 0;
+
if (use_packets) {
p->packet_len = sizeof(MultiFDPacket_t)
+ sizeof(uint64_t) * page_count;
@@ -13,6 +13,8 @@
#ifndef QEMU_MIGRATION_MULTIFD_H
#define QEMU_MIGRATION_MULTIFD_H
+typedef struct MultiFDRecvData MultiFDRecvData;
+
bool multifd_send_setup(void);
void multifd_send_shutdown(void);
int multifd_recv_setup(Error **errp);
@@ -23,6 +25,8 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
void multifd_recv_sync_main(void);
int multifd_send_sync_main(void);
bool multifd_queue_page(RAMBlock *block, ram_addr_t offset);
+bool multifd_recv(void);
+MultiFDRecvData *multifd_get_recv_data(void);
/* Multifd Compression flags */
#define MULTIFD_FLAG_SYNC (1 << 0)
@@ -63,6 +67,13 @@ typedef struct {
RAMBlock *block;
} MultiFDPages_t;
+struct MultiFDRecvData {
+ void *opaque;
+ size_t size;
+ /* for preadv */
+ off_t file_offset;
+};
+
typedef struct {
/* Fields are only written at creating/deletion time */
/* No lock required for them, they are read only */
@@ -152,6 +163,8 @@ typedef struct {
/* syncs main thread and channels */
QemuSemaphore sem_sync;
+ /* sem where to wait for more work */
+ QemuSemaphore sem;
/* this mutex protects the following parameters */
QemuMutex mutex;
@@ -161,6 +174,8 @@ typedef struct {
uint32_t flags;
/* global number of generated multifd packets */
uint64_t packet_num;
+ int pending_job;
+ MultiFDRecvData *data;
/* thread local variables. No locking required */