diff mbox series

[v2,20/29] migration/multifd: Add incoming QIOChannelFile support

Message ID 20231023203608.26370-21-farosas@suse.de
State New
Headers show
Series migration: File based migration with multifd and fixed-ram | expand

Commit Message

Fabiano Rosas Oct. 23, 2023, 8:35 p.m. UTC
On the receiving side we don't need to differentiate between main
channel and threads, so whichever channel is defined first gets to be
the main one. And since there are no packets, use the atomic channel
count to index into the params array.

Signed-off-by: Fabiano Rosas <farosas@suse.de>
---
 migration/file.c      | 39 +++++++++++++++++++++++++++++----------
 migration/migration.c |  2 ++
 migration/multifd.c   |  7 ++++++-
 migration/multifd.h   |  1 +
 4 files changed, 38 insertions(+), 11 deletions(-)

Comments

Daniel P. Berrangé Oct. 25, 2023, 10:29 a.m. UTC | #1
On Mon, Oct 23, 2023 at 05:35:59PM -0300, Fabiano Rosas wrote:
> On the receiving side we don't need to differentiate between main
> channel and threads, so whichever channel is defined first gets to be
> the main one. And since there are no packets, use the atomic channel
> count to index into the params array.
> 
> Signed-off-by: Fabiano Rosas <farosas@suse.de>
> ---
>  migration/file.c      | 39 +++++++++++++++++++++++++++++----------
>  migration/migration.c |  2 ++
>  migration/multifd.c   |  7 ++++++-
>  migration/multifd.h   |  1 +
>  4 files changed, 38 insertions(+), 11 deletions(-)
> 
> diff --git a/migration/file.c b/migration/file.c
> index 93b9b7bf5d..ad75225f43 100644
> --- a/migration/file.c
> +++ b/migration/file.c
> @@ -6,13 +6,15 @@
>   */
>  
>  #include "qemu/osdep.h"
> -#include "qemu/cutils.h"
>  #include "qapi/error.h"
> +#include "qemu/cutils.h"
> +#include "qemu/error-report.h"
>  #include "channel.h"
>  #include "file.h"
>  #include "migration.h"
>  #include "io/channel-file.h"
>  #include "io/channel-util.h"
> +#include "options.h"
>  #include "trace.h"
>  
>  #define OFFSET_OPTION ",offset="
> @@ -136,7 +138,8 @@ void file_start_incoming_migration(const char *filespec, Error **errp)
>      g_autofree char *filename = g_strdup(filespec);
>      QIOChannelFile *fioc = NULL;
>      uint64_t offset = 0;
> -    QIOChannel *ioc;
> +    int channels = 1;
> +    int i = 0, fd;
>  
>      trace_migration_file_incoming(filename);
>  
> @@ -146,16 +149,32 @@ void file_start_incoming_migration(const char *filespec, Error **errp)
>  
>      fioc = qio_channel_file_new_path(filename, O_RDONLY, 0, errp);
>      if (!fioc) {
> -        return;
> +        goto out;
> +    }
> +
> +    if (migrate_multifd()) {
> +        channels += migrate_multifd_channels();
>      }
>  
> -    ioc = QIO_CHANNEL(fioc);
> -    if (offset && qio_channel_io_seek(ioc, offset, SEEK_SET, errp) < 0) {
> +    fd = fioc->fd;
> +
> +    do {
> +        QIOChannel *ioc = QIO_CHANNEL(fioc);
> +
> +        if (offset && qio_channel_io_seek(ioc, offset, SEEK_SET, errp) < 0) {
> +            return;
> +        }
> +
> +        qio_channel_set_name(ioc, "migration-file-incoming");
> +        qio_channel_add_watch_full(ioc, G_IO_IN,
> +                                   file_accept_incoming_migration,
> +                                   NULL, NULL,
> +                                   g_main_context_get_thread_default());
> +    } while (++i < channels && (fioc = qio_channel_file_new_fd(fd)));

IIUC, this loop is failing to call qio_channel_io_seek to set
the offset on the last 'fioc' that is created.


With regards,
Daniel
Fabiano Rosas Oct. 25, 2023, 2:18 p.m. UTC | #2
Daniel P. Berrangé <berrange@redhat.com> writes:

> On Mon, Oct 23, 2023 at 05:35:59PM -0300, Fabiano Rosas wrote:
>> On the receiving side we don't need to differentiate between main
>> channel and threads, so whichever channel is defined first gets to be
>> the main one. And since there are no packets, use the atomic channel
>> count to index into the params array.
>> 
>> Signed-off-by: Fabiano Rosas <farosas@suse.de>
>> ---
>>  migration/file.c      | 39 +++++++++++++++++++++++++++++----------
>>  migration/migration.c |  2 ++
>>  migration/multifd.c   |  7 ++++++-
>>  migration/multifd.h   |  1 +
>>  4 files changed, 38 insertions(+), 11 deletions(-)
>> 
>> diff --git a/migration/file.c b/migration/file.c
>> index 93b9b7bf5d..ad75225f43 100644
>> --- a/migration/file.c
>> +++ b/migration/file.c
>> @@ -6,13 +6,15 @@
>>   */
>>  
>>  #include "qemu/osdep.h"
>> -#include "qemu/cutils.h"
>>  #include "qapi/error.h"
>> +#include "qemu/cutils.h"
>> +#include "qemu/error-report.h"
>>  #include "channel.h"
>>  #include "file.h"
>>  #include "migration.h"
>>  #include "io/channel-file.h"
>>  #include "io/channel-util.h"
>> +#include "options.h"
>>  #include "trace.h"
>>  
>>  #define OFFSET_OPTION ",offset="
>> @@ -136,7 +138,8 @@ void file_start_incoming_migration(const char *filespec, Error **errp)
>>      g_autofree char *filename = g_strdup(filespec);
>>      QIOChannelFile *fioc = NULL;
>>      uint64_t offset = 0;
>> -    QIOChannel *ioc;
>> +    int channels = 1;
>> +    int i = 0, fd;
>>  
>>      trace_migration_file_incoming(filename);
>>  
>> @@ -146,16 +149,32 @@ void file_start_incoming_migration(const char *filespec, Error **errp)
>>  
>>      fioc = qio_channel_file_new_path(filename, O_RDONLY, 0, errp);
>>      if (!fioc) {
>> -        return;
>> +        goto out;
>> +    }
>> +
>> +    if (migrate_multifd()) {
>> +        channels += migrate_multifd_channels();
>>      }
>>  
>> -    ioc = QIO_CHANNEL(fioc);
>> -    if (offset && qio_channel_io_seek(ioc, offset, SEEK_SET, errp) < 0) {
>> +    fd = fioc->fd;
>> +
>> +    do {
>> +        QIOChannel *ioc = QIO_CHANNEL(fioc);
>> +
>> +        if (offset && qio_channel_io_seek(ioc, offset, SEEK_SET, errp) < 0) {
>> +            return;
>> +        }
>> +
>> +        qio_channel_set_name(ioc, "migration-file-incoming");
>> +        qio_channel_add_watch_full(ioc, G_IO_IN,
>> +                                   file_accept_incoming_migration,
>> +                                   NULL, NULL,
>> +                                   g_main_context_get_thread_default());
>> +    } while (++i < channels && (fioc = qio_channel_file_new_fd(fd)));
>
> IIUC, this loop is failing to call qio_channel_io_seek to set
> the offset on the last 'fioc' that is created.
>

Ah, this is actually bogus. We don't need to offset the secondary
channels. That does nothing since we carry pointers to everything in the
fixed-ram header. This should be out of the loop.
Peter Xu Oct. 31, 2023, 9:28 p.m. UTC | #3
On Mon, Oct 23, 2023 at 05:35:59PM -0300, Fabiano Rosas wrote:
> On the receiving side we don't need to differentiate between main
> channel and threads, so whichever channel is defined first gets to be
> the main one. And since there are no packets, use the atomic channel
> count to index into the params array.
> 
> Signed-off-by: Fabiano Rosas <farosas@suse.de>
> ---
>  migration/file.c      | 39 +++++++++++++++++++++++++++++----------
>  migration/migration.c |  2 ++
>  migration/multifd.c   |  7 ++++++-
>  migration/multifd.h   |  1 +
>  4 files changed, 38 insertions(+), 11 deletions(-)
> 
> diff --git a/migration/file.c b/migration/file.c
> index 93b9b7bf5d..ad75225f43 100644
> --- a/migration/file.c
> +++ b/migration/file.c
> @@ -6,13 +6,15 @@
>   */
>  
>  #include "qemu/osdep.h"
> -#include "qemu/cutils.h"
>  #include "qapi/error.h"
> +#include "qemu/cutils.h"
> +#include "qemu/error-report.h"
>  #include "channel.h"
>  #include "file.h"
>  #include "migration.h"
>  #include "io/channel-file.h"
>  #include "io/channel-util.h"
> +#include "options.h"
>  #include "trace.h"
>  
>  #define OFFSET_OPTION ",offset="
> @@ -136,7 +138,8 @@ void file_start_incoming_migration(const char *filespec, Error **errp)
>      g_autofree char *filename = g_strdup(filespec);
>      QIOChannelFile *fioc = NULL;
>      uint64_t offset = 0;
> -    QIOChannel *ioc;
> +    int channels = 1;
> +    int i = 0, fd;
>  
>      trace_migration_file_incoming(filename);
>  
> @@ -146,16 +149,32 @@ void file_start_incoming_migration(const char *filespec, Error **errp)
>  
>      fioc = qio_channel_file_new_path(filename, O_RDONLY, 0, errp);
>      if (!fioc) {
> -        return;
> +        goto out;

Can we already return with *errp set?  Why still need the error_report()?

> +    }
> +
> +    if (migrate_multifd()) {
> +        channels += migrate_multifd_channels();
>      }
>  
> -    ioc = QIO_CHANNEL(fioc);
> -    if (offset && qio_channel_io_seek(ioc, offset, SEEK_SET, errp) < 0) {
> +    fd = fioc->fd;
> +
> +    do {
> +        QIOChannel *ioc = QIO_CHANNEL(fioc);
> +
> +        if (offset && qio_channel_io_seek(ioc, offset, SEEK_SET, errp) < 0) {
> +            return;
> +        }
> +
> +        qio_channel_set_name(ioc, "migration-file-incoming");
> +        qio_channel_add_watch_full(ioc, G_IO_IN,
> +                                   file_accept_incoming_migration,
> +                                   NULL, NULL,
> +                                   g_main_context_get_thread_default());
> +    } while (++i < channels && (fioc = qio_channel_file_new_fd(fd)));
> +
> +out:
> +    if (!fioc) {
> +        error_report("Error creating migration incoming channel");
>          return;
>      }
> -    qio_channel_set_name(QIO_CHANNEL(ioc), "migration-file-incoming");
> -    qio_channel_add_watch_full(ioc, G_IO_IN,
> -                               file_accept_incoming_migration,
> -                               NULL, NULL,
> -                               g_main_context_get_thread_default());
>  }
> diff --git a/migration/migration.c b/migration/migration.c
> index ba806cea55..5fa726f6d4 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -756,6 +756,8 @@ void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
>          }
>  
>          default_channel = (channel_magic == cpu_to_be32(QEMU_VM_FILE_MAGIC));
> +    } else if (migrate_multifd() && migrate_fixed_ram()) {
> +        default_channel = multifd_recv_first_channel();

Is this check required?  IIUC you wanted to set default_channel only when
the 1st time trigger this function, but then IIUC that's exactly what:

        default_channel = !mis->from_src_file;

is about?

I think it may be clearer to add "migrate_multifd_packets()" too in the
previous "if" check to make sure fixed-ram won't peak it.

IIUC now it won't fall into that now only because file URI doesn't yet
support QIO_CHANNEL_FEATURE_READ_MSG_PEEK, however AFAIU it'll be fairly
easy to add it, and even more reasonable than a socket, when adding.

Fundamentally that trick can only work with multifd init packets, that
matches with what migrate_multifd_packets() means.

>      } else {
>          default_channel = !mis->from_src_file;
>      }
> diff --git a/migration/multifd.c b/migration/multifd.c
> index 75a17ea8ab..ad51210f13 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -1242,6 +1242,11 @@ int multifd_load_setup(Error **errp)
>      return 0;
>  }
>  
> +bool multifd_recv_first_channel(void)
> +{
> +    return !multifd_recv_state;
> +}
> +
>  bool multifd_recv_all_channels_created(void)
>  {
>      int thread_count = migrate_multifd_channels();
> @@ -1284,7 +1289,7 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
>          /* initial packet */
>          num_packets = 1;
>      } else {
> -        id = 0;
> +        id = qatomic_read(&multifd_recv_state->count);

I was quite confused on the previous "id=0" and now it answers..

Can we merge the two patches somehow?

>      }
>  
>      p = &multifd_recv_state->params[id];
> diff --git a/migration/multifd.h b/migration/multifd.h
> index a835643b48..a112ec7ac6 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -18,6 +18,7 @@ void multifd_save_cleanup(void);
>  int multifd_load_setup(Error **errp);
>  void multifd_load_cleanup(void);
>  void multifd_load_shutdown(void);
> +bool multifd_recv_first_channel(void);
>  bool multifd_recv_all_channels_created(void);
>  void multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
>  void multifd_recv_sync_main(void);
> -- 
> 2.35.3
>
diff mbox series

Patch

diff --git a/migration/file.c b/migration/file.c
index 93b9b7bf5d..ad75225f43 100644
--- a/migration/file.c
+++ b/migration/file.c
@@ -6,13 +6,15 @@ 
  */
 
 #include "qemu/osdep.h"
-#include "qemu/cutils.h"
 #include "qapi/error.h"
+#include "qemu/cutils.h"
+#include "qemu/error-report.h"
 #include "channel.h"
 #include "file.h"
 #include "migration.h"
 #include "io/channel-file.h"
 #include "io/channel-util.h"
+#include "options.h"
 #include "trace.h"
 
 #define OFFSET_OPTION ",offset="
@@ -136,7 +138,8 @@  void file_start_incoming_migration(const char *filespec, Error **errp)
     g_autofree char *filename = g_strdup(filespec);
     QIOChannelFile *fioc = NULL;
     uint64_t offset = 0;
-    QIOChannel *ioc;
+    int channels = 1;
+    int i = 0, fd;
 
     trace_migration_file_incoming(filename);
 
@@ -146,16 +149,32 @@  void file_start_incoming_migration(const char *filespec, Error **errp)
 
     fioc = qio_channel_file_new_path(filename, O_RDONLY, 0, errp);
     if (!fioc) {
-        return;
+        goto out;
+    }
+
+    if (migrate_multifd()) {
+        channels += migrate_multifd_channels();
     }
 
-    ioc = QIO_CHANNEL(fioc);
-    if (offset && qio_channel_io_seek(ioc, offset, SEEK_SET, errp) < 0) {
+    fd = fioc->fd;
+
+    do {
+        QIOChannel *ioc = QIO_CHANNEL(fioc);
+
+        if (offset && qio_channel_io_seek(ioc, offset, SEEK_SET, errp) < 0) {
+            return;
+        }
+
+        qio_channel_set_name(ioc, "migration-file-incoming");
+        qio_channel_add_watch_full(ioc, G_IO_IN,
+                                   file_accept_incoming_migration,
+                                   NULL, NULL,
+                                   g_main_context_get_thread_default());
+    } while (++i < channels && (fioc = qio_channel_file_new_fd(fd)));
+
+out:
+    if (!fioc) {
+        error_report("Error creating migration incoming channel");
         return;
     }
-    qio_channel_set_name(QIO_CHANNEL(ioc), "migration-file-incoming");
-    qio_channel_add_watch_full(ioc, G_IO_IN,
-                               file_accept_incoming_migration,
-                               NULL, NULL,
-                               g_main_context_get_thread_default());
 }
diff --git a/migration/migration.c b/migration/migration.c
index ba806cea55..5fa726f6d4 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -756,6 +756,8 @@  void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
         }
 
         default_channel = (channel_magic == cpu_to_be32(QEMU_VM_FILE_MAGIC));
+    } else if (migrate_multifd() && migrate_fixed_ram()) {
+        default_channel = multifd_recv_first_channel();
     } else {
         default_channel = !mis->from_src_file;
     }
diff --git a/migration/multifd.c b/migration/multifd.c
index 75a17ea8ab..ad51210f13 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -1242,6 +1242,11 @@  int multifd_load_setup(Error **errp)
     return 0;
 }
 
+bool multifd_recv_first_channel(void)
+{
+    return !multifd_recv_state;
+}
+
 bool multifd_recv_all_channels_created(void)
 {
     int thread_count = migrate_multifd_channels();
@@ -1284,7 +1289,7 @@  void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
         /* initial packet */
         num_packets = 1;
     } else {
-        id = 0;
+        id = qatomic_read(&multifd_recv_state->count);
     }
 
     p = &multifd_recv_state->params[id];
diff --git a/migration/multifd.h b/migration/multifd.h
index a835643b48..a112ec7ac6 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -18,6 +18,7 @@  void multifd_save_cleanup(void);
 int multifd_load_setup(Error **errp);
 void multifd_load_cleanup(void);
 void multifd_load_shutdown(void);
+bool multifd_recv_first_channel(void);
 bool multifd_recv_all_channels_created(void);
 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
 void multifd_recv_sync_main(void);