diff mbox series

[RFC,6/7] migration: Split multifd pending_job into two booleans

Message ID 20231022201211.452861-7-peterx@redhat.com
State New
Headers show
Series migration/multifd: quit unitifications and separate sync packet | expand

Commit Message

Peter Xu Oct. 22, 2023, 8:12 p.m. UTC
Signed-off-by: Peter Xu <peterx@redhat.com>
---
 migration/multifd.h | 16 ++++++++++------
 migration/multifd.c | 33 +++++++++++++++++++++++----------
 2 files changed, 33 insertions(+), 16 deletions(-)

Comments

Fabiano Rosas Oct. 23, 2023, 3:15 p.m. UTC | #1
Peter Xu <peterx@redhat.com> writes:

> Signed-off-by: Peter Xu <peterx@redhat.com>
> ---
>  migration/multifd.h | 16 ++++++++++------
>  migration/multifd.c | 33 +++++++++++++++++++++++----------
>  2 files changed, 33 insertions(+), 16 deletions(-)
>
> diff --git a/migration/multifd.h b/migration/multifd.h
> index 2acf400085..ddee7b8d8a 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -101,12 +101,16 @@ typedef struct {
>      uint32_t flags;
>      /* global number of generated multifd packets */
>      uint64_t packet_num;
> -    /* thread has work to do */
> -    int pending_job;
> -    /* array of pages to sent.
> -     * The owner of 'pages' depends of 'pending_job' value:
> -     * pending_job == 0 -> migration_thread can use it.
> -     * pending_job != 0 -> multifd_channel can use it.
> +    /* thread has a request to sync all data */
> +    bool pending_sync;
> +    /* thread has something to send */
> +    bool pending_job;
> +    /*
> +     * Array of pages to sent. The owner of 'pages' depends of
> +     * 'pending_job' value:
> +     *
> +     *   - true -> multifd_channel owns it.
> +     *   - false -> migration_thread owns it.
>       */
>      MultiFDPages_t *pages;
>  
> diff --git a/migration/multifd.c b/migration/multifd.c
> index 8140520843..fe8d746ff9 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -425,7 +425,7 @@ static int multifd_send_pages(QEMUFile *f)
>          p = &multifd_send_state->params[i];
>          qemu_mutex_lock(&p->mutex);
>          if (!p->pending_job) {
> -            p->pending_job++;
> +            p->pending_job = true;
>              next_channel = (i + 1) % migrate_multifd_channels();
>              break;
>          }
> @@ -615,8 +615,7 @@ int multifd_send_sync_main(QEMUFile *f)
>  
>          qemu_mutex_lock(&p->mutex);
>          p->packet_num = multifd_send_state->packet_num++;
> -        p->flags |= MULTIFD_FLAG_SYNC;
> -        p->pending_job++;
> +        p->pending_sync = true;
>          qemu_mutex_unlock(&p->mutex);
>          qemu_sem_post(&p->sem);
>      }
> @@ -747,8 +746,6 @@ static void *multifd_send_thread(void *opaque)
>  
>          qemu_mutex_lock(&p->mutex);
>          if (p->pending_job) {
> -            bool need_sync = p->flags & MULTIFD_FLAG_SYNC;
> -
>              if (!multifd_send_prepare(p, &local_err)) {
>                  assert(local_err);
>                  qemu_mutex_unlock(&p->mutex);
> @@ -764,12 +761,27 @@ static void *multifd_send_thread(void *opaque)
>              qemu_mutex_lock(&p->mutex);
>  
>              /* Send successful, mark the task completed */
> -            p->pending_job--;
> +            p->pending_job = false;
> +
> +        } else if (p->pending_sync) {

Is your intention here to stop sending the SYNC along with the pages?
This would have to loop once more to send the sync.
Peter Xu Oct. 23, 2023, 3:52 p.m. UTC | #2
On Mon, Oct 23, 2023 at 12:15:49PM -0300, Fabiano Rosas wrote:
> > @@ -764,12 +761,27 @@ static void *multifd_send_thread(void *opaque)
> >              qemu_mutex_lock(&p->mutex);
> >  
> >              /* Send successful, mark the task completed */
> > -            p->pending_job--;
> > +            p->pending_job = false;
> > +
> > +        } else if (p->pending_sync) {
> 
> Is your intention here to stop sending the SYNC along with the pages?
> This would have to loop once more to send the sync.

My intention is to be clear on how we do SYNC, e.g., avoid main thread
touching p->flags at all.

AFAIK we'll need to loop twice either before or after this patch to send
SYNC; the old code boosts pending_job for sync too, and kick one more time
upon p->sem to guarantee that 2nd loop.

The major difference after this patch is, it'll be clear we send the pages
first in the 1st packet, then another SYNC packet as the 2nd.  Also I hope
the pending_sync is more readable too..

One thing I should have mentioned but I didn't: we must handle pending_job
before pending_sync here, so that when we do SYNC we make sure all pages
will be sent.  IOW, below:

  if (p->pending_sync) {
     ...
  } else if (p->pending_job) {
     ...
  }

should be buggy, because when pending_sync requested with job==true, we can
send SYNC before that batch of pages.

I'll add a comment block for it:

        /*
         * NOTE: we must handle pending_job before pending_sync, so as to
         * make sure SYNC packet will always cover all queued pages here.
         */
        if (p->pending_job) {

One thing I just notice is I forgot to write commit message for this
patch.. my apologies.  Let me attach a new version here with commit message
written, and with the comment squashed in, attached.

Thanks,

===8<===

From c7636dffe0f58e42e5aa0028cd0a6208cc75dd46 Mon Sep 17 00:00:00 2001
From: Peter Xu <peterx@redhat.com>
Date: Sun, 22 Oct 2023 15:20:29 -0400
Subject: [PATCH] migration: Split multifd pending_job into two booleans

We used to have MultiFDSendParams.pending_job covering both sending data
and sending SYNC message.  The send SYNC message part is tricky, because it
directly modifies p->flags, boost pending_job even if there is a request.
It makes it the only chance where pending_job can be larger than 1.

To make it clear, split the pending_job integer into two booleans:

  - pending_job:  keep its own name, a boolean to show we have data to send
  - pending_sync: a new boolean shows QEMU requests a SYNC message to send

With that, we can remove the only place that main thread will touch
p->flags, instead it simply sets pending_sync==true.  Multifd send thread
also does not need to peek p->flags before hand, it can unconditionally
kick p->sem_sync as long as it's a pending_sync request.

Signed-off-by: Peter Xu <peterx@redhat.com>
---
 migration/multifd.h | 16 ++++++++++------
 migration/multifd.c | 37 +++++++++++++++++++++++++++----------
 2 files changed, 37 insertions(+), 16 deletions(-)

diff --git a/migration/multifd.h b/migration/multifd.h
index 2acf400085..ddee7b8d8a 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -101,12 +101,16 @@ typedef struct {
     uint32_t flags;
     /* global number of generated multifd packets */
     uint64_t packet_num;
-    /* thread has work to do */
-    int pending_job;
-    /* array of pages to sent.
-     * The owner of 'pages' depends of 'pending_job' value:
-     * pending_job == 0 -> migration_thread can use it.
-     * pending_job != 0 -> multifd_channel can use it.
+    /* thread has a request to sync all data */
+    bool pending_sync;
+    /* thread has something to send */
+    bool pending_job;
+    /*
+     * Array of pages to sent. The owner of 'pages' depends of
+     * 'pending_job' value:
+     *
+     *   - true -> multifd_channel owns it.
+     *   - false -> migration_thread owns it.
      */
     MultiFDPages_t *pages;
 
diff --git a/migration/multifd.c b/migration/multifd.c
index 3f4fb6ad40..5d3571faa8 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -434,7 +434,7 @@ static int multifd_send_pages(QEMUFile *f)
         p = &multifd_send_state->params[i];
         qemu_mutex_lock(&p->mutex);
         if (!p->pending_job) {
-            p->pending_job++;
+            p->pending_job = true;
             next_channel = (i + 1) % migrate_multifd_channels();
             break;
         }
@@ -624,8 +624,7 @@ int multifd_send_sync_main(QEMUFile *f)
 
         qemu_mutex_lock(&p->mutex);
         p->packet_num = multifd_send_state->packet_num++;
-        p->flags |= MULTIFD_FLAG_SYNC;
-        p->pending_job++;
+        p->pending_sync = true;
         qemu_mutex_unlock(&p->mutex);
         qemu_sem_post(&p->sem);
     }
@@ -755,9 +754,11 @@ static void *multifd_send_thread(void *opaque)
         }
 
         qemu_mutex_lock(&p->mutex);
+        /*
+         * NOTE: we must handle pending_job before pending_sync, so as to
+         * make sure SYNC packet will always cover all queued pages here.
+         */
         if (p->pending_job) {
-            bool need_sync = p->flags & MULTIFD_FLAG_SYNC;
-
             if (!multifd_send_prepare(p, &local_err)) {
                 assert(local_err);
                 qemu_mutex_unlock(&p->mutex);
@@ -773,12 +774,27 @@ static void *multifd_send_thread(void *opaque)
             qemu_mutex_lock(&p->mutex);
 
             /* Send successful, mark the task completed */
-            p->pending_job--;
+            p->pending_job = false;
+
+        } else if (p->pending_sync) {
+            p->flags |= MULTIFD_FLAG_SYNC;
+
+            if (!multifd_send_prepare(p, &local_err)) {
+                assert(local_err);
+                qemu_mutex_unlock(&p->mutex);
+                goto out;
+            }
 
-            /* If this is a sync task, we need one more kick */
-            if (need_sync) {
-                qemu_sem_post(&p->sem_sync);
+            /* Send the packets without mutex */
+            qemu_mutex_unlock(&p->mutex);
+            if (!multifd_do_send(p, &local_err)) {
+                assert(local_err);
+                goto out;
             }
+            qemu_mutex_lock(&p->mutex);
+
+            qemu_sem_post(&p->sem_sync);
+            p->pending_sync = false;
         }
         qemu_mutex_unlock(&p->mutex);
     }
@@ -941,7 +957,8 @@ int multifd_save_setup(Error **errp)
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem, 0);
         qemu_sem_init(&p->sem_sync, 0);
-        p->pending_job = 0;
+        p->pending_job = false;
+        p->pending_sync = false;
         p->id = i;
         p->pages = multifd_pages_init(page_count);
         p->packet_len = sizeof(MultiFDPacket_t)
diff mbox series

Patch

diff --git a/migration/multifd.h b/migration/multifd.h
index 2acf400085..ddee7b8d8a 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -101,12 +101,16 @@  typedef struct {
     uint32_t flags;
     /* global number of generated multifd packets */
     uint64_t packet_num;
-    /* thread has work to do */
-    int pending_job;
-    /* array of pages to sent.
-     * The owner of 'pages' depends of 'pending_job' value:
-     * pending_job == 0 -> migration_thread can use it.
-     * pending_job != 0 -> multifd_channel can use it.
+    /* thread has a request to sync all data */
+    bool pending_sync;
+    /* thread has something to send */
+    bool pending_job;
+    /*
+     * Array of pages to sent. The owner of 'pages' depends of
+     * 'pending_job' value:
+     *
+     *   - true -> multifd_channel owns it.
+     *   - false -> migration_thread owns it.
      */
     MultiFDPages_t *pages;
 
diff --git a/migration/multifd.c b/migration/multifd.c
index 8140520843..fe8d746ff9 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -425,7 +425,7 @@  static int multifd_send_pages(QEMUFile *f)
         p = &multifd_send_state->params[i];
         qemu_mutex_lock(&p->mutex);
         if (!p->pending_job) {
-            p->pending_job++;
+            p->pending_job = true;
             next_channel = (i + 1) % migrate_multifd_channels();
             break;
         }
@@ -615,8 +615,7 @@  int multifd_send_sync_main(QEMUFile *f)
 
         qemu_mutex_lock(&p->mutex);
         p->packet_num = multifd_send_state->packet_num++;
-        p->flags |= MULTIFD_FLAG_SYNC;
-        p->pending_job++;
+        p->pending_sync = true;
         qemu_mutex_unlock(&p->mutex);
         qemu_sem_post(&p->sem);
     }
@@ -747,8 +746,6 @@  static void *multifd_send_thread(void *opaque)
 
         qemu_mutex_lock(&p->mutex);
         if (p->pending_job) {
-            bool need_sync = p->flags & MULTIFD_FLAG_SYNC;
-
             if (!multifd_send_prepare(p, &local_err)) {
                 assert(local_err);
                 qemu_mutex_unlock(&p->mutex);
@@ -764,12 +761,27 @@  static void *multifd_send_thread(void *opaque)
             qemu_mutex_lock(&p->mutex);
 
             /* Send successful, mark the task completed */
-            p->pending_job--;
+            p->pending_job = false;
+
+        } else if (p->pending_sync) {
+            p->flags |= MULTIFD_FLAG_SYNC;
+
+            if (!multifd_send_prepare(p, &local_err)) {
+                assert(local_err);
+                qemu_mutex_unlock(&p->mutex);
+                goto out;
+            }
 
-            /* If this is a sync task, we need one more kick */
-            if (need_sync) {
-                qemu_sem_post(&p->sem_sync);
+            /* Send the packets without mutex */
+            qemu_mutex_unlock(&p->mutex);
+            if (!multifd_do_send(p, &local_err)) {
+                assert(local_err);
+                goto out;
             }
+            qemu_mutex_lock(&p->mutex);
+
+            qemu_sem_post(&p->sem_sync);
+            p->pending_sync = false;
         }
         qemu_mutex_unlock(&p->mutex);
     }
@@ -932,7 +944,8 @@  int multifd_save_setup(Error **errp)
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem, 0);
         qemu_sem_init(&p->sem_sync, 0);
-        p->pending_job = 0;
+        p->pending_job = false;
+        p->pending_sync = false;
         p->id = i;
         p->pages = multifd_pages_init(page_count);
         p->packet_len = sizeof(MultiFDPacket_t)