@@ -640,13 +640,89 @@ int multifd_send_sync_main(QEMUFile *f)
return 0;
}
+/*
+ * Returns true if succeed, false otherwise (with errp set). Caller must
+ * be with p->mutex held.
+ */
+static bool multifd_send_prepare(MultiFDSendParams *p, Error **errp)
+{
+ bool use_zero_copy_send = migrate_zero_copy_send();
+ uint64_t packet_num = p->packet_num;
+ uint32_t flags;
+ int ret;
+
+ p->normal_num = 0;
+
+ if (use_zero_copy_send) {
+ p->iovs_num = 0;
+ } else {
+ p->iovs_num = 1;
+ }
+
+ for (int i = 0; i < p->pages->num; i++) {
+ p->normal[p->normal_num] = p->pages->offset[i];
+ p->normal_num++;
+ }
+
+ if (p->normal_num) {
+ ret = multifd_send_state->ops->send_prepare(p, errp);
+ if (ret != 0) {
+ return false;
+ }
+ }
+ multifd_send_fill_packet(p);
+ flags = p->flags;
+ p->flags = 0;
+ p->num_packets++;
+ p->total_normal_pages += p->normal_num;
+ p->pages->num = 0;
+ p->pages->block = NULL;
+
+ trace_multifd_send(p->id, packet_num, p->normal_num, flags,
+ p->next_packet_size);
+
+ return true;
+}
+
+/* Returns true if succeed, false otherwise (with errp set) */
+static bool multifd_do_send(MultiFDSendParams *p, Error **errp)
+{
+ bool use_zero_copy_send = migrate_zero_copy_send();
+ int ret;
+
+ if (use_zero_copy_send) {
+ /* Send header first, without zerocopy */
+ ret = qio_channel_write_all(p->c, (void *)p->packet,
+ p->packet_len, errp);
+ if (ret != 0) {
+ return false;
+ }
+ } else {
+ /* Send header using the same writev call */
+ p->iov[0].iov_len = p->packet_len;
+ p->iov[0].iov_base = p->packet;
+ }
+
+ ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
+ 0, p->write_flags, errp);
+ if (ret != 0) {
+ return false;
+ }
+
+ stat64_add(&mig_stats.multifd_bytes,
+ p->next_packet_size + p->packet_len);
+ stat64_add(&mig_stats.transferred,
+ p->next_packet_size + p->packet_len);
+ p->next_packet_size = 0;
+
+ return true;
+}
+
static void *multifd_send_thread(void *opaque)
{
MultiFDSendParams *p = opaque;
MigrationThread *thread = NULL;
Error *local_err = NULL;
- int ret = 0;
- bool use_zero_copy_send = migrate_zero_copy_send();
thread = migration_threads_add(p->name, qemu_get_thread_id());
@@ -654,9 +730,10 @@ static void *multifd_send_thread(void *opaque)
rcu_register_thread();
if (multifd_send_initial_packet(p, &local_err) < 0) {
- ret = -1;
+ assert(local_err);
goto out;
}
+
/* initial packet */
p->num_packets = 1;
@@ -667,83 +744,38 @@ static void *multifd_send_thread(void *opaque)
if (qatomic_read(&multifd_send_state->exiting)) {
break;
}
- qemu_mutex_lock(&p->mutex);
+ qemu_mutex_lock(&p->mutex);
if (p->pending_job) {
- uint64_t packet_num = p->packet_num;
- uint32_t flags;
- p->normal_num = 0;
-
- if (use_zero_copy_send) {
- p->iovs_num = 0;
- } else {
- p->iovs_num = 1;
- }
+ bool need_sync = p->flags & MULTIFD_FLAG_SYNC;
- for (int i = 0; i < p->pages->num; i++) {
- p->normal[p->normal_num] = p->pages->offset[i];
- p->normal_num++;
+ if (!multifd_send_prepare(p, &local_err)) {
+ assert(local_err);
+ qemu_mutex_unlock(&p->mutex);
+ goto out;
}
- if (p->normal_num) {
- ret = multifd_send_state->ops->send_prepare(p, &local_err);
- if (ret != 0) {
- qemu_mutex_unlock(&p->mutex);
- break;
- }
- }
- multifd_send_fill_packet(p);
- flags = p->flags;
- p->flags = 0;
- p->num_packets++;
- p->total_normal_pages += p->normal_num;
- p->pages->num = 0;
- p->pages->block = NULL;
+ /* Send the packets without mutex */
qemu_mutex_unlock(&p->mutex);
-
- trace_multifd_send(p->id, packet_num, p->normal_num, flags,
- p->next_packet_size);
-
- if (use_zero_copy_send) {
- /* Send header first, without zerocopy */
- ret = qio_channel_write_all(p->c, (void *)p->packet,
- p->packet_len, &local_err);
- if (ret != 0) {
- break;
- }
- } else {
- /* Send header using the same writev call */
- p->iov[0].iov_len = p->packet_len;
- p->iov[0].iov_base = p->packet;
+ if (!multifd_do_send(p, &local_err)) {
+ assert(local_err);
+ goto out;
}
-
- ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
- 0, p->write_flags, &local_err);
- if (ret != 0) {
- break;
- }
-
- stat64_add(&mig_stats.multifd_bytes,
- p->next_packet_size + p->packet_len);
- stat64_add(&mig_stats.transferred,
- p->next_packet_size + p->packet_len);
- p->next_packet_size = 0;
qemu_mutex_lock(&p->mutex);
+
+ /* Send successful, mark the task completed */
p->pending_job--;
- qemu_mutex_unlock(&p->mutex);
- if (flags & MULTIFD_FLAG_SYNC) {
+ /* If this is a sync task, we need one more kick */
+ if (need_sync) {
qemu_sem_post(&p->sem_sync);
}
- } else {
- qemu_mutex_unlock(&p->mutex);
- /* sometimes there are spurious wakeups */
}
+ qemu_mutex_unlock(&p->mutex);
}
out:
- if (ret) {
- assert(local_err);
+ if (local_err) {
trace_multifd_send_error(p->id);
multifd_send_terminate_threads(local_err);
multifd_send_kick_main(p);
Abstract the multifd send packet logic into two phases: - multifd_send_prepare(): prepare the packet headers, with mutex - multifd_do_send(): do the send job finally, without mutex When at it, always allow the send thread to use Error* for detecting errors, dropping "int ret" altogether. One trivial change is the send thread now kicks the sem_sync within mutex critical section, but that shouldn't be a problem. Signed-off-by: Peter Xu <peterx@redhat.com> --- migration/multifd.c | 160 ++++++++++++++++++++++++++------------------ 1 file changed, 96 insertions(+), 64 deletions(-)