diff mbox series

[RFC,v3,11/30] migration/multifd: Allow multifd without packets

Message ID 20231127202612.23012-12-farosas@suse.de
State New
Headers show
Series migration: File based migration with multifd and fixed-ram | expand

Commit Message

Fabiano Rosas Nov. 27, 2023, 8:25 p.m. UTC
For the upcoming support to the new 'fixed-ram' migration stream
format, we cannot use multifd packets because each write into the
ramblock section in the migration file is expected to contain only the
guest pages. They are written at their respective offsets relative to
the ramblock section header.

There is no space for the packet information and the expected gains
from the new approach come partly from being able to write the pages
sequentially without extraneous data in between.

The new format also doesn't need the packets and all necessary
information can be taken from the standard migration headers with some
(future) changes to multifd code.

Use the presence of the fixed-ram capability to decide whether to send
packets. For now this has no effect as fixed-ram cannot yet be enabled
with multifd.

Signed-off-by: Fabiano Rosas <farosas@suse.de>
---
- moved more of the packet code under use_packets
---
 migration/multifd.c | 138 +++++++++++++++++++++++++++-----------------
 migration/options.c |   5 ++
 migration/options.h |   1 +
 3 files changed, 91 insertions(+), 53 deletions(-)

Comments

Peter Xu Jan. 15, 2024, 11:51 a.m. UTC | #1
On Mon, Nov 27, 2023 at 05:25:53PM -0300, Fabiano Rosas wrote:
> For the upcoming support to the new 'fixed-ram' migration stream
> format, we cannot use multifd packets because each write into the
> ramblock section in the migration file is expected to contain only the
> guest pages. They are written at their respective offsets relative to
> the ramblock section header.
> 
> There is no space for the packet information and the expected gains
> from the new approach come partly from being able to write the pages
> sequentially without extraneous data in between.
> 
> The new format also doesn't need the packets and all necessary
> information can be taken from the standard migration headers with some
> (future) changes to multifd code.
> 
> Use the presence of the fixed-ram capability to decide whether to send
> packets. For now this has no effect as fixed-ram cannot yet be enabled
> with multifd.
> 
> Signed-off-by: Fabiano Rosas <farosas@suse.de>
> ---
> - moved more of the packet code under use_packets
> ---
>  migration/multifd.c | 138 +++++++++++++++++++++++++++-----------------
>  migration/options.c |   5 ++
>  migration/options.h |   1 +
>  3 files changed, 91 insertions(+), 53 deletions(-)
> 
> diff --git a/migration/multifd.c b/migration/multifd.c
> index ec58c58082..9625640d61 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -654,18 +654,22 @@ static void *multifd_send_thread(void *opaque)
>      Error *local_err = NULL;
>      int ret = 0;
>      bool use_zero_copy_send = migrate_zero_copy_send();
> +    bool use_packets = migrate_multifd_packets();
>  
>      thread = migration_threads_add(p->name, qemu_get_thread_id());
>  
>      trace_multifd_send_thread_start(p->id);
>      rcu_register_thread();
>  
> -    if (multifd_send_initial_packet(p, &local_err) < 0) {
> -        ret = -1;
> -        goto out;
> +    if (use_packets) {
> +        if (multifd_send_initial_packet(p, &local_err) < 0) {
> +            ret = -1;
> +            goto out;
> +        }
> +
> +        /* initial packet */
> +        p->num_packets = 1;
>      }
> -    /* initial packet */
> -    p->num_packets = 1;
>  
>      while (true) {
>          qemu_sem_post(&multifd_send_state->channels_ready);
> @@ -677,11 +681,10 @@ static void *multifd_send_thread(void *opaque)
>          qemu_mutex_lock(&p->mutex);
>  
>          if (p->pending_job) {
> -            uint64_t packet_num = p->packet_num;
>              uint32_t flags;
>              p->normal_num = 0;
>  
> -            if (use_zero_copy_send) {
> +            if (!use_packets || use_zero_copy_send) {
>                  p->iovs_num = 0;
>              } else {
>                  p->iovs_num = 1;
> @@ -699,16 +702,20 @@ static void *multifd_send_thread(void *opaque)
>                      break;
>                  }
>              }
> -            multifd_send_fill_packet(p);
> +
> +            if (use_packets) {
> +                multifd_send_fill_packet(p);
> +                p->num_packets++;
> +            }
> +
>              flags = p->flags;
>              p->flags = 0;
> -            p->num_packets++;
>              p->total_normal_pages += p->normal_num;
>              p->pages->num = 0;
>              p->pages->block = NULL;
>              qemu_mutex_unlock(&p->mutex);
>  
> -            trace_multifd_send(p->id, packet_num, p->normal_num, flags,
> +            trace_multifd_send(p->id, p->packet_num, p->normal_num, flags,
>                                 p->next_packet_size);
>  
>              if (use_zero_copy_send) {
> @@ -718,7 +725,7 @@ static void *multifd_send_thread(void *opaque)
>                  if (ret != 0) {
>                      break;
>                  }
> -            } else {
> +            } else if (use_packets) {
>                  /* Send header using the same writev call */
>                  p->iov[0].iov_len = p->packet_len;
>                  p->iov[0].iov_base = p->packet;
> @@ -904,6 +911,7 @@ int multifd_save_setup(Error **errp)
>  {
>      int thread_count;
>      uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
> +    bool use_packets = migrate_multifd_packets();
>      uint8_t i;
>  
>      if (!migrate_multifd()) {
> @@ -928,14 +936,20 @@ int multifd_save_setup(Error **errp)
>          p->pending_job = 0;
>          p->id = i;
>          p->pages = multifd_pages_init(page_count);
> -        p->packet_len = sizeof(MultiFDPacket_t)
> -                      + sizeof(uint64_t) * page_count;
> -        p->packet = g_malloc0(p->packet_len);
> -        p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
> -        p->packet->version = cpu_to_be32(MULTIFD_VERSION);
> +
> +        if (use_packets) {
> +            p->packet_len = sizeof(MultiFDPacket_t)
> +                          + sizeof(uint64_t) * page_count;
> +            p->packet = g_malloc0(p->packet_len);
> +            p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
> +            p->packet->version = cpu_to_be32(MULTIFD_VERSION);
> +
> +            /* We need one extra place for the packet header */
> +            p->iov = g_new0(struct iovec, page_count + 1);
> +        } else {
> +            p->iov = g_new0(struct iovec, page_count);
> +        }
>          p->name = g_strdup_printf("multifdsend_%d", i);
> -        /* We need one extra place for the packet header */
> -        p->iov = g_new0(struct iovec, page_count + 1);
>          p->normal = g_new0(ram_addr_t, page_count);
>          p->page_size = qemu_target_page_size();
>          p->page_count = page_count;
> @@ -1067,7 +1081,7 @@ void multifd_recv_sync_main(void)
>  {
>      int i;
>  
> -    if (!migrate_multifd()) {
> +    if (!migrate_multifd() || !migrate_multifd_packets()) {
>          return;
>      }
>      for (i = 0; i < migrate_multifd_channels(); i++) {

This noops the recv sync when use_packets=1, makes sense.

How about multifd_send_sync_main()?  Should we do the same?

> @@ -1094,38 +1108,44 @@ static void *multifd_recv_thread(void *opaque)
>  {
>      MultiFDRecvParams *p = opaque;
>      Error *local_err = NULL;
> +    bool use_packets = migrate_multifd_packets();
>      int ret;
>  
>      trace_multifd_recv_thread_start(p->id);
>      rcu_register_thread();
>  
>      while (true) {
> -        uint32_t flags;
> +        uint32_t flags = 0;
> +        p->normal_num = 0;
>  
>          if (p->quit) {
>              break;
>          }
>  
> -        ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
> -                                       p->packet_len, &local_err);
> -        if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
> -            break;
> -        }
> +        if (use_packets) {
> +            ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
> +                                           p->packet_len, &local_err);
> +            if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
> +                break;
> +            }
> +
> +            qemu_mutex_lock(&p->mutex);
> +            ret = multifd_recv_unfill_packet(p, &local_err);
> +            if (ret) {
> +                qemu_mutex_unlock(&p->mutex);
> +                break;
> +            }
> +            p->num_packets++;
> +
> +            flags = p->flags;
> +            /* recv methods don't know how to handle the SYNC flag */
> +            p->flags &= ~MULTIFD_FLAG_SYNC;
> +            trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags,
> +                               p->next_packet_size);
>  
> -        qemu_mutex_lock(&p->mutex);
> -        ret = multifd_recv_unfill_packet(p, &local_err);
> -        if (ret) {
> -            qemu_mutex_unlock(&p->mutex);
> -            break;
> +            p->total_normal_pages += p->normal_num;
>          }
>  
> -        flags = p->flags;
> -        /* recv methods don't know how to handle the SYNC flag */
> -        p->flags &= ~MULTIFD_FLAG_SYNC;
> -        trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags,
> -                           p->next_packet_size);
> -        p->num_packets++;
> -        p->total_normal_pages += p->normal_num;
>          qemu_mutex_unlock(&p->mutex);
>  
>          if (p->normal_num) {
> @@ -1135,7 +1155,7 @@ static void *multifd_recv_thread(void *opaque)
>              }
>          }
>  
> -        if (flags & MULTIFD_FLAG_SYNC) {
> +        if (use_packets && (flags & MULTIFD_FLAG_SYNC)) {
>              qemu_sem_post(&multifd_recv_state->sem_sync);
>              qemu_sem_wait(&p->sem_sync);
>          }
> @@ -1159,6 +1179,7 @@ int multifd_load_setup(Error **errp)
>  {
>      int thread_count;
>      uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
> +    bool use_packets = migrate_multifd_packets();
>      uint8_t i;
>  
>      /*
> @@ -1183,9 +1204,12 @@ int multifd_load_setup(Error **errp)
>          qemu_sem_init(&p->sem_sync, 0);
>          p->quit = false;
>          p->id = i;
> -        p->packet_len = sizeof(MultiFDPacket_t)
> -                      + sizeof(uint64_t) * page_count;
> -        p->packet = g_malloc0(p->packet_len);
> +
> +        if (use_packets) {
> +            p->packet_len = sizeof(MultiFDPacket_t)
> +                + sizeof(uint64_t) * page_count;
> +            p->packet = g_malloc0(p->packet_len);
> +        }
>          p->name = g_strdup_printf("multifdrecv_%d", i);
>          p->iov = g_new0(struct iovec, page_count);
>          p->normal = g_new0(ram_addr_t, page_count);
> @@ -1231,18 +1255,27 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
>  {
>      MultiFDRecvParams *p;
>      Error *local_err = NULL;
> -    int id;
> +    bool use_packets = migrate_multifd_packets();
> +    int id, num_packets = 0;
>  
> -    id = multifd_recv_initial_packet(ioc, &local_err);
> -    if (id < 0) {
> -        multifd_recv_terminate_threads(local_err);
> -        error_propagate_prepend(errp, local_err,
> -                                "failed to receive packet"
> -                                " via multifd channel %d: ",
> -                                qatomic_read(&multifd_recv_state->count));
> -        return;
> +    if (use_packets) {
> +        id = multifd_recv_initial_packet(ioc, &local_err);
> +        if (id < 0) {
> +            multifd_recv_terminate_threads(local_err);
> +            error_propagate_prepend(errp, local_err,
> +                                    "failed to receive packet"
> +                                    " via multifd channel %d: ",
> +                                    qatomic_read(&multifd_recv_state->count));
> +            return;
> +        }
> +        trace_multifd_recv_new_channel(id);
> +
> +        /* initial packet */
> +        num_packets = 1;
> +    } else {
> +        /* next patch gives this a meaningful value */
> +        id = 0;
>      }
> -    trace_multifd_recv_new_channel(id);
>  
>      p = &multifd_recv_state->params[id];
>      if (p->c != NULL) {
> @@ -1253,9 +1286,8 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
>          return;
>      }
>      p->c = ioc;
> +    p->num_packets = num_packets;
>      object_ref(OBJECT(ioc));
> -    /* initial packet */
> -    p->num_packets = 1;
>  
>      p->running = true;
>      qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
> diff --git a/migration/options.c b/migration/options.c
> index 775428a8a5..10730b13ba 100644
> --- a/migration/options.c
> +++ b/migration/options.c
> @@ -385,6 +385,11 @@ bool migrate_multifd_flush_after_each_section(void)
>      return s->multifd_flush_after_each_section;
>  }
>  
> +bool migrate_multifd_packets(void)

Maybe multifd_use_packets()?  Dropping the migrate_ prefix as this is not a
global API but multifd-only.  Then if multifd_packets() reads too weird and
unclear, "add" makes it clear.

> +{
> +    return !migrate_fixed_ram();
> +}
> +
>  bool migrate_postcopy(void)
>  {
>      return migrate_postcopy_ram() || migrate_dirty_bitmaps();
> diff --git a/migration/options.h b/migration/options.h
> index 8680a10b79..8a19d6939c 100644
> --- a/migration/options.h
> +++ b/migration/options.h
> @@ -56,6 +56,7 @@ bool migrate_zero_copy_send(void);
>   */
>  
>  bool migrate_multifd_flush_after_each_section(void);
> +bool migrate_multifd_packets(void);
>  bool migrate_postcopy(void);
>  bool migrate_rdma(void);
>  bool migrate_tls(void);
> -- 
> 2.35.3
>
Fabiano Rosas Jan. 15, 2024, 6:39 p.m. UTC | #2
Peter Xu <peterx@redhat.com> writes:

> On Mon, Nov 27, 2023 at 05:25:53PM -0300, Fabiano Rosas wrote:
>> For the upcoming support to the new 'fixed-ram' migration stream
>> format, we cannot use multifd packets because each write into the
>> ramblock section in the migration file is expected to contain only the
>> guest pages. They are written at their respective offsets relative to
>> the ramblock section header.
>> 
>> There is no space for the packet information and the expected gains
>> from the new approach come partly from being able to write the pages
>> sequentially without extraneous data in between.
>> 
>> The new format also doesn't need the packets and all necessary
>> information can be taken from the standard migration headers with some
>> (future) changes to multifd code.
>> 
>> Use the presence of the fixed-ram capability to decide whether to send
>> packets. For now this has no effect as fixed-ram cannot yet be enabled
>> with multifd.
>> 
>> Signed-off-by: Fabiano Rosas <farosas@suse.de>
>> ---
>> - moved more of the packet code under use_packets
>> ---
>>  migration/multifd.c | 138 +++++++++++++++++++++++++++-----------------
>>  migration/options.c |   5 ++
>>  migration/options.h |   1 +
>>  3 files changed, 91 insertions(+), 53 deletions(-)
>> 
>> diff --git a/migration/multifd.c b/migration/multifd.c
>> index ec58c58082..9625640d61 100644
>> --- a/migration/multifd.c
>> +++ b/migration/multifd.c
>> @@ -654,18 +654,22 @@ static void *multifd_send_thread(void *opaque)
>>      Error *local_err = NULL;
>>      int ret = 0;
>>      bool use_zero_copy_send = migrate_zero_copy_send();
>> +    bool use_packets = migrate_multifd_packets();
>>  
>>      thread = migration_threads_add(p->name, qemu_get_thread_id());
>>  
>>      trace_multifd_send_thread_start(p->id);
>>      rcu_register_thread();
>>  
>> -    if (multifd_send_initial_packet(p, &local_err) < 0) {
>> -        ret = -1;
>> -        goto out;
>> +    if (use_packets) {
>> +        if (multifd_send_initial_packet(p, &local_err) < 0) {
>> +            ret = -1;
>> +            goto out;
>> +        }
>> +
>> +        /* initial packet */
>> +        p->num_packets = 1;
>>      }
>> -    /* initial packet */
>> -    p->num_packets = 1;
>>  
>>      while (true) {
>>          qemu_sem_post(&multifd_send_state->channels_ready);
>> @@ -677,11 +681,10 @@ static void *multifd_send_thread(void *opaque)
>>          qemu_mutex_lock(&p->mutex);
>>  
>>          if (p->pending_job) {
>> -            uint64_t packet_num = p->packet_num;
>>              uint32_t flags;
>>              p->normal_num = 0;
>>  
>> -            if (use_zero_copy_send) {
>> +            if (!use_packets || use_zero_copy_send) {
>>                  p->iovs_num = 0;
>>              } else {
>>                  p->iovs_num = 1;
>> @@ -699,16 +702,20 @@ static void *multifd_send_thread(void *opaque)
>>                      break;
>>                  }
>>              }
>> -            multifd_send_fill_packet(p);
>> +
>> +            if (use_packets) {
>> +                multifd_send_fill_packet(p);
>> +                p->num_packets++;
>> +            }
>> +
>>              flags = p->flags;
>>              p->flags = 0;
>> -            p->num_packets++;
>>              p->total_normal_pages += p->normal_num;
>>              p->pages->num = 0;
>>              p->pages->block = NULL;
>>              qemu_mutex_unlock(&p->mutex);
>>  
>> -            trace_multifd_send(p->id, packet_num, p->normal_num, flags,
>> +            trace_multifd_send(p->id, p->packet_num, p->normal_num, flags,
>>                                 p->next_packet_size);
>>  
>>              if (use_zero_copy_send) {
>> @@ -718,7 +725,7 @@ static void *multifd_send_thread(void *opaque)
>>                  if (ret != 0) {
>>                      break;
>>                  }
>> -            } else {
>> +            } else if (use_packets) {
>>                  /* Send header using the same writev call */
>>                  p->iov[0].iov_len = p->packet_len;
>>                  p->iov[0].iov_base = p->packet;
>> @@ -904,6 +911,7 @@ int multifd_save_setup(Error **errp)
>>  {
>>      int thread_count;
>>      uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
>> +    bool use_packets = migrate_multifd_packets();
>>      uint8_t i;
>>  
>>      if (!migrate_multifd()) {
>> @@ -928,14 +936,20 @@ int multifd_save_setup(Error **errp)
>>          p->pending_job = 0;
>>          p->id = i;
>>          p->pages = multifd_pages_init(page_count);
>> -        p->packet_len = sizeof(MultiFDPacket_t)
>> -                      + sizeof(uint64_t) * page_count;
>> -        p->packet = g_malloc0(p->packet_len);
>> -        p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
>> -        p->packet->version = cpu_to_be32(MULTIFD_VERSION);
>> +
>> +        if (use_packets) {
>> +            p->packet_len = sizeof(MultiFDPacket_t)
>> +                          + sizeof(uint64_t) * page_count;
>> +            p->packet = g_malloc0(p->packet_len);
>> +            p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
>> +            p->packet->version = cpu_to_be32(MULTIFD_VERSION);
>> +
>> +            /* We need one extra place for the packet header */
>> +            p->iov = g_new0(struct iovec, page_count + 1);
>> +        } else {
>> +            p->iov = g_new0(struct iovec, page_count);
>> +        }
>>          p->name = g_strdup_printf("multifdsend_%d", i);
>> -        /* We need one extra place for the packet header */
>> -        p->iov = g_new0(struct iovec, page_count + 1);
>>          p->normal = g_new0(ram_addr_t, page_count);
>>          p->page_size = qemu_target_page_size();
>>          p->page_count = page_count;
>> @@ -1067,7 +1081,7 @@ void multifd_recv_sync_main(void)
>>  {
>>      int i;
>>  
>> -    if (!migrate_multifd()) {
>> +    if (!migrate_multifd() || !migrate_multifd_packets()) {
>>          return;
>>      }
>>      for (i = 0; i < migrate_multifd_channels(); i++) {
>
> This noops the recv sync when use_packets=1, makes sense.
>
> How about multifd_send_sync_main()?  Should we do the same?
>

It seems it got lost during rebase.

>> @@ -1094,38 +1108,44 @@ static void *multifd_recv_thread(void *opaque)
>>  {
>>      MultiFDRecvParams *p = opaque;
>>      Error *local_err = NULL;
>> +    bool use_packets = migrate_multifd_packets();
>>      int ret;
>>  
>>      trace_multifd_recv_thread_start(p->id);
>>      rcu_register_thread();
>>  
>>      while (true) {
>> -        uint32_t flags;
>> +        uint32_t flags = 0;
>> +        p->normal_num = 0;
>>  
>>          if (p->quit) {
>>              break;
>>          }
>>  
>> -        ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
>> -                                       p->packet_len, &local_err);
>> -        if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
>> -            break;
>> -        }
>> +        if (use_packets) {
>> +            ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
>> +                                           p->packet_len, &local_err);
>> +            if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
>> +                break;
>> +            }
>> +
>> +            qemu_mutex_lock(&p->mutex);
>> +            ret = multifd_recv_unfill_packet(p, &local_err);
>> +            if (ret) {
>> +                qemu_mutex_unlock(&p->mutex);
>> +                break;
>> +            }
>> +            p->num_packets++;
>> +
>> +            flags = p->flags;
>> +            /* recv methods don't know how to handle the SYNC flag */
>> +            p->flags &= ~MULTIFD_FLAG_SYNC;
>> +            trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags,
>> +                               p->next_packet_size);
>>  
>> -        qemu_mutex_lock(&p->mutex);
>> -        ret = multifd_recv_unfill_packet(p, &local_err);
>> -        if (ret) {
>> -            qemu_mutex_unlock(&p->mutex);
>> -            break;
>> +            p->total_normal_pages += p->normal_num;
>>          }
>>  
>> -        flags = p->flags;
>> -        /* recv methods don't know how to handle the SYNC flag */
>> -        p->flags &= ~MULTIFD_FLAG_SYNC;
>> -        trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags,
>> -                           p->next_packet_size);
>> -        p->num_packets++;
>> -        p->total_normal_pages += p->normal_num;
>>          qemu_mutex_unlock(&p->mutex);
>>  
>>          if (p->normal_num) {
>> @@ -1135,7 +1155,7 @@ static void *multifd_recv_thread(void *opaque)
>>              }
>>          }
>>  
>> -        if (flags & MULTIFD_FLAG_SYNC) {
>> +        if (use_packets && (flags & MULTIFD_FLAG_SYNC)) {
>>              qemu_sem_post(&multifd_recv_state->sem_sync);
>>              qemu_sem_wait(&p->sem_sync);
>>          }
>> @@ -1159,6 +1179,7 @@ int multifd_load_setup(Error **errp)
>>  {
>>      int thread_count;
>>      uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
>> +    bool use_packets = migrate_multifd_packets();
>>      uint8_t i;
>>  
>>      /*
>> @@ -1183,9 +1204,12 @@ int multifd_load_setup(Error **errp)
>>          qemu_sem_init(&p->sem_sync, 0);
>>          p->quit = false;
>>          p->id = i;
>> -        p->packet_len = sizeof(MultiFDPacket_t)
>> -                      + sizeof(uint64_t) * page_count;
>> -        p->packet = g_malloc0(p->packet_len);
>> +
>> +        if (use_packets) {
>> +            p->packet_len = sizeof(MultiFDPacket_t)
>> +                + sizeof(uint64_t) * page_count;
>> +            p->packet = g_malloc0(p->packet_len);
>> +        }
>>          p->name = g_strdup_printf("multifdrecv_%d", i);
>>          p->iov = g_new0(struct iovec, page_count);
>>          p->normal = g_new0(ram_addr_t, page_count);
>> @@ -1231,18 +1255,27 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
>>  {
>>      MultiFDRecvParams *p;
>>      Error *local_err = NULL;
>> -    int id;
>> +    bool use_packets = migrate_multifd_packets();
>> +    int id, num_packets = 0;
>>  
>> -    id = multifd_recv_initial_packet(ioc, &local_err);
>> -    if (id < 0) {
>> -        multifd_recv_terminate_threads(local_err);
>> -        error_propagate_prepend(errp, local_err,
>> -                                "failed to receive packet"
>> -                                " via multifd channel %d: ",
>> -                                qatomic_read(&multifd_recv_state->count));
>> -        return;
>> +    if (use_packets) {
>> +        id = multifd_recv_initial_packet(ioc, &local_err);
>> +        if (id < 0) {
>> +            multifd_recv_terminate_threads(local_err);
>> +            error_propagate_prepend(errp, local_err,
>> +                                    "failed to receive packet"
>> +                                    " via multifd channel %d: ",
>> +                                    qatomic_read(&multifd_recv_state->count));
>> +            return;
>> +        }
>> +        trace_multifd_recv_new_channel(id);
>> +
>> +        /* initial packet */
>> +        num_packets = 1;
>> +    } else {
>> +        /* next patch gives this a meaningful value */
>> +        id = 0;
>>      }
>> -    trace_multifd_recv_new_channel(id);
>>  
>>      p = &multifd_recv_state->params[id];
>>      if (p->c != NULL) {
>> @@ -1253,9 +1286,8 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
>>          return;
>>      }
>>      p->c = ioc;
>> +    p->num_packets = num_packets;
>>      object_ref(OBJECT(ioc));
>> -    /* initial packet */
>> -    p->num_packets = 1;
>>  
>>      p->running = true;
>>      qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
>> diff --git a/migration/options.c b/migration/options.c
>> index 775428a8a5..10730b13ba 100644
>> --- a/migration/options.c
>> +++ b/migration/options.c
>> @@ -385,6 +385,11 @@ bool migrate_multifd_flush_after_each_section(void)
>>      return s->multifd_flush_after_each_section;
>>  }
>>  
>> +bool migrate_multifd_packets(void)
>
> Maybe multifd_use_packets()?  Dropping the migrate_ prefix as this is not a
> global API but multifd-only.  Then if multifd_packets() reads too weird and
> unclear, "add" makes it clear.
>

We removed all the instances of migrate_use_* from the migration code
recently. Not sure we should introduce them back, it seems like a step
back.

We're setting 'use_packets = migrate_multifd_packets()' in most places,
so I guess 'use_packets = multifd_packets()' wouldn't be too bad.

>> +{
>> +    return !migrate_fixed_ram();
>> +}
>> +
>>  bool migrate_postcopy(void)
>>  {
>>      return migrate_postcopy_ram() || migrate_dirty_bitmaps();
>> diff --git a/migration/options.h b/migration/options.h
>> index 8680a10b79..8a19d6939c 100644
>> --- a/migration/options.h
>> +++ b/migration/options.h
>> @@ -56,6 +56,7 @@ bool migrate_zero_copy_send(void);
>>   */
>>  
>>  bool migrate_multifd_flush_after_each_section(void);
>> +bool migrate_multifd_packets(void);
>>  bool migrate_postcopy(void);
>>  bool migrate_rdma(void);
>>  bool migrate_tls(void);
>> -- 
>> 2.35.3
>>
Peter Xu Jan. 15, 2024, 11:01 p.m. UTC | #3
On Mon, Jan 15, 2024 at 03:39:29PM -0300, Fabiano Rosas wrote:
> > Maybe multifd_use_packets()?  Dropping the migrate_ prefix as this is not a
> > global API but multifd-only.  Then if multifd_packets() reads too weird and
> > unclear, "add" makes it clear.
> 
> We removed all the instances of migrate_use_* from the migration code
> recently. Not sure we should introduce them back, it seems like a step
> back.
> 
> We're setting 'use_packets = migrate_multifd_packets()' in most places,
> so I guess 'use_packets = multifd_packets()' wouldn't be too bad.

I actually prefers keep using "_use_" all over the places because it's
clearer to me. :) While I don't see much benefit saving three chars.  Try
"git grep _use_ | wc -l" in both QEMU / Linux, then we'll see that reports
275 / 4680 corrrespondingly.

But yeah that's trivial, multifd_packets() is still okay.
diff mbox series

Patch

diff --git a/migration/multifd.c b/migration/multifd.c
index ec58c58082..9625640d61 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -654,18 +654,22 @@  static void *multifd_send_thread(void *opaque)
     Error *local_err = NULL;
     int ret = 0;
     bool use_zero_copy_send = migrate_zero_copy_send();
+    bool use_packets = migrate_multifd_packets();
 
     thread = migration_threads_add(p->name, qemu_get_thread_id());
 
     trace_multifd_send_thread_start(p->id);
     rcu_register_thread();
 
-    if (multifd_send_initial_packet(p, &local_err) < 0) {
-        ret = -1;
-        goto out;
+    if (use_packets) {
+        if (multifd_send_initial_packet(p, &local_err) < 0) {
+            ret = -1;
+            goto out;
+        }
+
+        /* initial packet */
+        p->num_packets = 1;
     }
-    /* initial packet */
-    p->num_packets = 1;
 
     while (true) {
         qemu_sem_post(&multifd_send_state->channels_ready);
@@ -677,11 +681,10 @@  static void *multifd_send_thread(void *opaque)
         qemu_mutex_lock(&p->mutex);
 
         if (p->pending_job) {
-            uint64_t packet_num = p->packet_num;
             uint32_t flags;
             p->normal_num = 0;
 
-            if (use_zero_copy_send) {
+            if (!use_packets || use_zero_copy_send) {
                 p->iovs_num = 0;
             } else {
                 p->iovs_num = 1;
@@ -699,16 +702,20 @@  static void *multifd_send_thread(void *opaque)
                     break;
                 }
             }
-            multifd_send_fill_packet(p);
+
+            if (use_packets) {
+                multifd_send_fill_packet(p);
+                p->num_packets++;
+            }
+
             flags = p->flags;
             p->flags = 0;
-            p->num_packets++;
             p->total_normal_pages += p->normal_num;
             p->pages->num = 0;
             p->pages->block = NULL;
             qemu_mutex_unlock(&p->mutex);
 
-            trace_multifd_send(p->id, packet_num, p->normal_num, flags,
+            trace_multifd_send(p->id, p->packet_num, p->normal_num, flags,
                                p->next_packet_size);
 
             if (use_zero_copy_send) {
@@ -718,7 +725,7 @@  static void *multifd_send_thread(void *opaque)
                 if (ret != 0) {
                     break;
                 }
-            } else {
+            } else if (use_packets) {
                 /* Send header using the same writev call */
                 p->iov[0].iov_len = p->packet_len;
                 p->iov[0].iov_base = p->packet;
@@ -904,6 +911,7 @@  int multifd_save_setup(Error **errp)
 {
     int thread_count;
     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
+    bool use_packets = migrate_multifd_packets();
     uint8_t i;
 
     if (!migrate_multifd()) {
@@ -928,14 +936,20 @@  int multifd_save_setup(Error **errp)
         p->pending_job = 0;
         p->id = i;
         p->pages = multifd_pages_init(page_count);
-        p->packet_len = sizeof(MultiFDPacket_t)
-                      + sizeof(uint64_t) * page_count;
-        p->packet = g_malloc0(p->packet_len);
-        p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
-        p->packet->version = cpu_to_be32(MULTIFD_VERSION);
+
+        if (use_packets) {
+            p->packet_len = sizeof(MultiFDPacket_t)
+                          + sizeof(uint64_t) * page_count;
+            p->packet = g_malloc0(p->packet_len);
+            p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
+            p->packet->version = cpu_to_be32(MULTIFD_VERSION);
+
+            /* We need one extra place for the packet header */
+            p->iov = g_new0(struct iovec, page_count + 1);
+        } else {
+            p->iov = g_new0(struct iovec, page_count);
+        }
         p->name = g_strdup_printf("multifdsend_%d", i);
-        /* We need one extra place for the packet header */
-        p->iov = g_new0(struct iovec, page_count + 1);
         p->normal = g_new0(ram_addr_t, page_count);
         p->page_size = qemu_target_page_size();
         p->page_count = page_count;
@@ -1067,7 +1081,7 @@  void multifd_recv_sync_main(void)
 {
     int i;
 
-    if (!migrate_multifd()) {
+    if (!migrate_multifd() || !migrate_multifd_packets()) {
         return;
     }
     for (i = 0; i < migrate_multifd_channels(); i++) {
@@ -1094,38 +1108,44 @@  static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *p = opaque;
     Error *local_err = NULL;
+    bool use_packets = migrate_multifd_packets();
     int ret;
 
     trace_multifd_recv_thread_start(p->id);
     rcu_register_thread();
 
     while (true) {
-        uint32_t flags;
+        uint32_t flags = 0;
+        p->normal_num = 0;
 
         if (p->quit) {
             break;
         }
 
-        ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
-                                       p->packet_len, &local_err);
-        if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
-            break;
-        }
+        if (use_packets) {
+            ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
+                                           p->packet_len, &local_err);
+            if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
+                break;
+            }
+
+            qemu_mutex_lock(&p->mutex);
+            ret = multifd_recv_unfill_packet(p, &local_err);
+            if (ret) {
+                qemu_mutex_unlock(&p->mutex);
+                break;
+            }
+            p->num_packets++;
+
+            flags = p->flags;
+            /* recv methods don't know how to handle the SYNC flag */
+            p->flags &= ~MULTIFD_FLAG_SYNC;
+            trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags,
+                               p->next_packet_size);
 
-        qemu_mutex_lock(&p->mutex);
-        ret = multifd_recv_unfill_packet(p, &local_err);
-        if (ret) {
-            qemu_mutex_unlock(&p->mutex);
-            break;
+            p->total_normal_pages += p->normal_num;
         }
 
-        flags = p->flags;
-        /* recv methods don't know how to handle the SYNC flag */
-        p->flags &= ~MULTIFD_FLAG_SYNC;
-        trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags,
-                           p->next_packet_size);
-        p->num_packets++;
-        p->total_normal_pages += p->normal_num;
         qemu_mutex_unlock(&p->mutex);
 
         if (p->normal_num) {
@@ -1135,7 +1155,7 @@  static void *multifd_recv_thread(void *opaque)
             }
         }
 
-        if (flags & MULTIFD_FLAG_SYNC) {
+        if (use_packets && (flags & MULTIFD_FLAG_SYNC)) {
             qemu_sem_post(&multifd_recv_state->sem_sync);
             qemu_sem_wait(&p->sem_sync);
         }
@@ -1159,6 +1179,7 @@  int multifd_load_setup(Error **errp)
 {
     int thread_count;
     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
+    bool use_packets = migrate_multifd_packets();
     uint8_t i;
 
     /*
@@ -1183,9 +1204,12 @@  int multifd_load_setup(Error **errp)
         qemu_sem_init(&p->sem_sync, 0);
         p->quit = false;
         p->id = i;
-        p->packet_len = sizeof(MultiFDPacket_t)
-                      + sizeof(uint64_t) * page_count;
-        p->packet = g_malloc0(p->packet_len);
+
+        if (use_packets) {
+            p->packet_len = sizeof(MultiFDPacket_t)
+                + sizeof(uint64_t) * page_count;
+            p->packet = g_malloc0(p->packet_len);
+        }
         p->name = g_strdup_printf("multifdrecv_%d", i);
         p->iov = g_new0(struct iovec, page_count);
         p->normal = g_new0(ram_addr_t, page_count);
@@ -1231,18 +1255,27 @@  void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
 {
     MultiFDRecvParams *p;
     Error *local_err = NULL;
-    int id;
+    bool use_packets = migrate_multifd_packets();
+    int id, num_packets = 0;
 
-    id = multifd_recv_initial_packet(ioc, &local_err);
-    if (id < 0) {
-        multifd_recv_terminate_threads(local_err);
-        error_propagate_prepend(errp, local_err,
-                                "failed to receive packet"
-                                " via multifd channel %d: ",
-                                qatomic_read(&multifd_recv_state->count));
-        return;
+    if (use_packets) {
+        id = multifd_recv_initial_packet(ioc, &local_err);
+        if (id < 0) {
+            multifd_recv_terminate_threads(local_err);
+            error_propagate_prepend(errp, local_err,
+                                    "failed to receive packet"
+                                    " via multifd channel %d: ",
+                                    qatomic_read(&multifd_recv_state->count));
+            return;
+        }
+        trace_multifd_recv_new_channel(id);
+
+        /* initial packet */
+        num_packets = 1;
+    } else {
+        /* next patch gives this a meaningful value */
+        id = 0;
     }
-    trace_multifd_recv_new_channel(id);
 
     p = &multifd_recv_state->params[id];
     if (p->c != NULL) {
@@ -1253,9 +1286,8 @@  void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
         return;
     }
     p->c = ioc;
+    p->num_packets = num_packets;
     object_ref(OBJECT(ioc));
-    /* initial packet */
-    p->num_packets = 1;
 
     p->running = true;
     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
diff --git a/migration/options.c b/migration/options.c
index 775428a8a5..10730b13ba 100644
--- a/migration/options.c
+++ b/migration/options.c
@@ -385,6 +385,11 @@  bool migrate_multifd_flush_after_each_section(void)
     return s->multifd_flush_after_each_section;
 }
 
+bool migrate_multifd_packets(void)
+{
+    return !migrate_fixed_ram();
+}
+
 bool migrate_postcopy(void)
 {
     return migrate_postcopy_ram() || migrate_dirty_bitmaps();
diff --git a/migration/options.h b/migration/options.h
index 8680a10b79..8a19d6939c 100644
--- a/migration/options.h
+++ b/migration/options.h
@@ -56,6 +56,7 @@  bool migrate_zero_copy_send(void);
  */
 
 bool migrate_multifd_flush_after_each_section(void);
+bool migrate_multifd_packets(void);
 bool migrate_postcopy(void);
 bool migrate_rdma(void);
 bool migrate_tls(void);