diff mbox series

[RFC,5/7] migration: Modulize multifd send threads with a few helpers

Message ID 20231022201211.452861-6-peterx@redhat.com
State New
Headers show
Series migration/multifd: quit unitifications and separate sync packet | expand

Commit Message

Peter Xu Oct. 22, 2023, 8:12 p.m. UTC
Abstract the multifd send packet logic into two phases:

  - multifd_send_prepare(): prepare the packet headers, with mutex
  - multifd_do_send(): do the send job finally, without mutex

When at it, always allow the send thread to use Error* for detecting
errors, dropping "int ret" altogether.

One trivial change is the send thread now kicks the sem_sync within mutex
critical section, but that shouldn't be a problem.

Signed-off-by: Peter Xu <peterx@redhat.com>
---
 migration/multifd.c | 160 ++++++++++++++++++++++++++------------------
 1 file changed, 96 insertions(+), 64 deletions(-)
diff mbox series

Patch

diff --git a/migration/multifd.c b/migration/multifd.c
index 9d458914a9..8140520843 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -640,13 +640,89 @@  int multifd_send_sync_main(QEMUFile *f)
     return 0;
 }
 
+/*
+ * Returns true if succeed, false otherwise (with errp set).  Caller must
+ * be with p->mutex held.
+ */
+static bool multifd_send_prepare(MultiFDSendParams *p, Error **errp)
+{
+    bool use_zero_copy_send = migrate_zero_copy_send();
+    uint64_t packet_num = p->packet_num;
+    uint32_t flags;
+    int ret;
+
+    p->normal_num = 0;
+
+    if (use_zero_copy_send) {
+        p->iovs_num = 0;
+    } else {
+        p->iovs_num = 1;
+    }
+
+    for (int i = 0; i < p->pages->num; i++) {
+        p->normal[p->normal_num] = p->pages->offset[i];
+        p->normal_num++;
+    }
+
+    if (p->normal_num) {
+        ret = multifd_send_state->ops->send_prepare(p, errp);
+        if (ret != 0) {
+            return false;
+        }
+    }
+    multifd_send_fill_packet(p);
+    flags = p->flags;
+    p->flags = 0;
+    p->num_packets++;
+    p->total_normal_pages += p->normal_num;
+    p->pages->num = 0;
+    p->pages->block = NULL;
+
+    trace_multifd_send(p->id, packet_num, p->normal_num, flags,
+                       p->next_packet_size);
+
+    return true;
+}
+
+/* Returns true if succeed, false otherwise (with errp set) */
+static bool multifd_do_send(MultiFDSendParams *p, Error **errp)
+{
+    bool use_zero_copy_send = migrate_zero_copy_send();
+    int ret;
+
+    if (use_zero_copy_send) {
+        /* Send header first, without zerocopy */
+        ret = qio_channel_write_all(p->c, (void *)p->packet,
+                                    p->packet_len, errp);
+        if (ret != 0) {
+            return false;
+        }
+    } else {
+        /* Send header using the same writev call */
+        p->iov[0].iov_len = p->packet_len;
+        p->iov[0].iov_base = p->packet;
+    }
+
+    ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
+                                      0, p->write_flags, errp);
+    if (ret != 0) {
+        return false;
+    }
+
+    stat64_add(&mig_stats.multifd_bytes,
+               p->next_packet_size + p->packet_len);
+    stat64_add(&mig_stats.transferred,
+               p->next_packet_size + p->packet_len);
+    p->next_packet_size = 0;
+
+    return true;
+}
+
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
     MigrationThread *thread = NULL;
     Error *local_err = NULL;
-    int ret = 0;
-    bool use_zero_copy_send = migrate_zero_copy_send();
 
     thread = migration_threads_add(p->name, qemu_get_thread_id());
 
@@ -654,9 +730,10 @@  static void *multifd_send_thread(void *opaque)
     rcu_register_thread();
 
     if (multifd_send_initial_packet(p, &local_err) < 0) {
-        ret = -1;
+        assert(local_err);
         goto out;
     }
+
     /* initial packet */
     p->num_packets = 1;
 
@@ -667,83 +744,38 @@  static void *multifd_send_thread(void *opaque)
         if (qatomic_read(&multifd_send_state->exiting)) {
             break;
         }
-        qemu_mutex_lock(&p->mutex);
 
+        qemu_mutex_lock(&p->mutex);
         if (p->pending_job) {
-            uint64_t packet_num = p->packet_num;
-            uint32_t flags;
-            p->normal_num = 0;
-
-            if (use_zero_copy_send) {
-                p->iovs_num = 0;
-            } else {
-                p->iovs_num = 1;
-            }
+            bool need_sync = p->flags & MULTIFD_FLAG_SYNC;
 
-            for (int i = 0; i < p->pages->num; i++) {
-                p->normal[p->normal_num] = p->pages->offset[i];
-                p->normal_num++;
+            if (!multifd_send_prepare(p, &local_err)) {
+                assert(local_err);
+                qemu_mutex_unlock(&p->mutex);
+                goto out;
             }
 
-            if (p->normal_num) {
-                ret = multifd_send_state->ops->send_prepare(p, &local_err);
-                if (ret != 0) {
-                    qemu_mutex_unlock(&p->mutex);
-                    break;
-                }
-            }
-            multifd_send_fill_packet(p);
-            flags = p->flags;
-            p->flags = 0;
-            p->num_packets++;
-            p->total_normal_pages += p->normal_num;
-            p->pages->num = 0;
-            p->pages->block = NULL;
+            /* Send the packets without mutex */
             qemu_mutex_unlock(&p->mutex);
-
-            trace_multifd_send(p->id, packet_num, p->normal_num, flags,
-                               p->next_packet_size);
-
-            if (use_zero_copy_send) {
-                /* Send header first, without zerocopy */
-                ret = qio_channel_write_all(p->c, (void *)p->packet,
-                                            p->packet_len, &local_err);
-                if (ret != 0) {
-                    break;
-                }
-            } else {
-                /* Send header using the same writev call */
-                p->iov[0].iov_len = p->packet_len;
-                p->iov[0].iov_base = p->packet;
+            if (!multifd_do_send(p, &local_err)) {
+                assert(local_err);
+                goto out;
             }
-
-            ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
-                                              0, p->write_flags, &local_err);
-            if (ret != 0) {
-                break;
-            }
-
-            stat64_add(&mig_stats.multifd_bytes,
-                       p->next_packet_size + p->packet_len);
-            stat64_add(&mig_stats.transferred,
-                       p->next_packet_size + p->packet_len);
-            p->next_packet_size = 0;
             qemu_mutex_lock(&p->mutex);
+
+            /* Send successful, mark the task completed */
             p->pending_job--;
-            qemu_mutex_unlock(&p->mutex);
 
-            if (flags & MULTIFD_FLAG_SYNC) {
+            /* If this is a sync task, we need one more kick */
+            if (need_sync) {
                 qemu_sem_post(&p->sem_sync);
             }
-        } else {
-            qemu_mutex_unlock(&p->mutex);
-            /* sometimes there are spurious wakeups */
         }
+        qemu_mutex_unlock(&p->mutex);
     }
 
 out:
-    if (ret) {
-        assert(local_err);
+    if (local_err) {
         trace_multifd_send_error(p->id);
         multifd_send_terminate_threads(local_err);
         multifd_send_kick_main(p);