Message ID | 20231022201211.452861-7-peterx@redhat.com |
---|---|
State | New |
Headers | show |
Series | migration/multifd: quit unitifications and separate sync packet | expand |
Peter Xu <peterx@redhat.com> writes: > Signed-off-by: Peter Xu <peterx@redhat.com> > --- > migration/multifd.h | 16 ++++++++++------ > migration/multifd.c | 33 +++++++++++++++++++++++---------- > 2 files changed, 33 insertions(+), 16 deletions(-) > > diff --git a/migration/multifd.h b/migration/multifd.h > index 2acf400085..ddee7b8d8a 100644 > --- a/migration/multifd.h > +++ b/migration/multifd.h > @@ -101,12 +101,16 @@ typedef struct { > uint32_t flags; > /* global number of generated multifd packets */ > uint64_t packet_num; > - /* thread has work to do */ > - int pending_job; > - /* array of pages to sent. > - * The owner of 'pages' depends of 'pending_job' value: > - * pending_job == 0 -> migration_thread can use it. > - * pending_job != 0 -> multifd_channel can use it. > + /* thread has a request to sync all data */ > + bool pending_sync; > + /* thread has something to send */ > + bool pending_job; > + /* > + * Array of pages to sent. The owner of 'pages' depends of > + * 'pending_job' value: > + * > + * - true -> multifd_channel owns it. > + * - false -> migration_thread owns it. > */ > MultiFDPages_t *pages; > > diff --git a/migration/multifd.c b/migration/multifd.c > index 8140520843..fe8d746ff9 100644 > --- a/migration/multifd.c > +++ b/migration/multifd.c > @@ -425,7 +425,7 @@ static int multifd_send_pages(QEMUFile *f) > p = &multifd_send_state->params[i]; > qemu_mutex_lock(&p->mutex); > if (!p->pending_job) { > - p->pending_job++; > + p->pending_job = true; > next_channel = (i + 1) % migrate_multifd_channels(); > break; > } > @@ -615,8 +615,7 @@ int multifd_send_sync_main(QEMUFile *f) > > qemu_mutex_lock(&p->mutex); > p->packet_num = multifd_send_state->packet_num++; > - p->flags |= MULTIFD_FLAG_SYNC; > - p->pending_job++; > + p->pending_sync = true; > qemu_mutex_unlock(&p->mutex); > qemu_sem_post(&p->sem); > } > @@ -747,8 +746,6 @@ static void *multifd_send_thread(void *opaque) > > qemu_mutex_lock(&p->mutex); > if (p->pending_job) { > - bool need_sync = p->flags & MULTIFD_FLAG_SYNC; > - > if (!multifd_send_prepare(p, &local_err)) { > assert(local_err); > qemu_mutex_unlock(&p->mutex); > @@ -764,12 +761,27 @@ static void *multifd_send_thread(void *opaque) > qemu_mutex_lock(&p->mutex); > > /* Send successful, mark the task completed */ > - p->pending_job--; > + p->pending_job = false; > + > + } else if (p->pending_sync) { Is your intention here to stop sending the SYNC along with the pages? This would have to loop once more to send the sync.
On Mon, Oct 23, 2023 at 12:15:49PM -0300, Fabiano Rosas wrote: > > @@ -764,12 +761,27 @@ static void *multifd_send_thread(void *opaque) > > qemu_mutex_lock(&p->mutex); > > > > /* Send successful, mark the task completed */ > > - p->pending_job--; > > + p->pending_job = false; > > + > > + } else if (p->pending_sync) { > > Is your intention here to stop sending the SYNC along with the pages? > This would have to loop once more to send the sync. My intention is to be clear on how we do SYNC, e.g., avoid main thread touching p->flags at all. AFAIK we'll need to loop twice either before or after this patch to send SYNC; the old code boosts pending_job for sync too, and kick one more time upon p->sem to guarantee that 2nd loop. The major difference after this patch is, it'll be clear we send the pages first in the 1st packet, then another SYNC packet as the 2nd. Also I hope the pending_sync is more readable too.. One thing I should have mentioned but I didn't: we must handle pending_job before pending_sync here, so that when we do SYNC we make sure all pages will be sent. IOW, below: if (p->pending_sync) { ... } else if (p->pending_job) { ... } should be buggy, because when pending_sync requested with job==true, we can send SYNC before that batch of pages. I'll add a comment block for it: /* * NOTE: we must handle pending_job before pending_sync, so as to * make sure SYNC packet will always cover all queued pages here. */ if (p->pending_job) { One thing I just notice is I forgot to write commit message for this patch.. my apologies. Let me attach a new version here with commit message written, and with the comment squashed in, attached. Thanks, ===8<=== From c7636dffe0f58e42e5aa0028cd0a6208cc75dd46 Mon Sep 17 00:00:00 2001 From: Peter Xu <peterx@redhat.com> Date: Sun, 22 Oct 2023 15:20:29 -0400 Subject: [PATCH] migration: Split multifd pending_job into two booleans We used to have MultiFDSendParams.pending_job covering both sending data and sending SYNC message. The send SYNC message part is tricky, because it directly modifies p->flags, boost pending_job even if there is a request. It makes it the only chance where pending_job can be larger than 1. To make it clear, split the pending_job integer into two booleans: - pending_job: keep its own name, a boolean to show we have data to send - pending_sync: a new boolean shows QEMU requests a SYNC message to send With that, we can remove the only place that main thread will touch p->flags, instead it simply sets pending_sync==true. Multifd send thread also does not need to peek p->flags before hand, it can unconditionally kick p->sem_sync as long as it's a pending_sync request. Signed-off-by: Peter Xu <peterx@redhat.com> --- migration/multifd.h | 16 ++++++++++------ migration/multifd.c | 37 +++++++++++++++++++++++++++---------- 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/migration/multifd.h b/migration/multifd.h index 2acf400085..ddee7b8d8a 100644 --- a/migration/multifd.h +++ b/migration/multifd.h @@ -101,12 +101,16 @@ typedef struct { uint32_t flags; /* global number of generated multifd packets */ uint64_t packet_num; - /* thread has work to do */ - int pending_job; - /* array of pages to sent. - * The owner of 'pages' depends of 'pending_job' value: - * pending_job == 0 -> migration_thread can use it. - * pending_job != 0 -> multifd_channel can use it. + /* thread has a request to sync all data */ + bool pending_sync; + /* thread has something to send */ + bool pending_job; + /* + * Array of pages to sent. The owner of 'pages' depends of + * 'pending_job' value: + * + * - true -> multifd_channel owns it. + * - false -> migration_thread owns it. */ MultiFDPages_t *pages; diff --git a/migration/multifd.c b/migration/multifd.c index 3f4fb6ad40..5d3571faa8 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -434,7 +434,7 @@ static int multifd_send_pages(QEMUFile *f) p = &multifd_send_state->params[i]; qemu_mutex_lock(&p->mutex); if (!p->pending_job) { - p->pending_job++; + p->pending_job = true; next_channel = (i + 1) % migrate_multifd_channels(); break; } @@ -624,8 +624,7 @@ int multifd_send_sync_main(QEMUFile *f) qemu_mutex_lock(&p->mutex); p->packet_num = multifd_send_state->packet_num++; - p->flags |= MULTIFD_FLAG_SYNC; - p->pending_job++; + p->pending_sync = true; qemu_mutex_unlock(&p->mutex); qemu_sem_post(&p->sem); } @@ -755,9 +754,11 @@ static void *multifd_send_thread(void *opaque) } qemu_mutex_lock(&p->mutex); + /* + * NOTE: we must handle pending_job before pending_sync, so as to + * make sure SYNC packet will always cover all queued pages here. + */ if (p->pending_job) { - bool need_sync = p->flags & MULTIFD_FLAG_SYNC; - if (!multifd_send_prepare(p, &local_err)) { assert(local_err); qemu_mutex_unlock(&p->mutex); @@ -773,12 +774,27 @@ static void *multifd_send_thread(void *opaque) qemu_mutex_lock(&p->mutex); /* Send successful, mark the task completed */ - p->pending_job--; + p->pending_job = false; + + } else if (p->pending_sync) { + p->flags |= MULTIFD_FLAG_SYNC; + + if (!multifd_send_prepare(p, &local_err)) { + assert(local_err); + qemu_mutex_unlock(&p->mutex); + goto out; + } - /* If this is a sync task, we need one more kick */ - if (need_sync) { - qemu_sem_post(&p->sem_sync); + /* Send the packets without mutex */ + qemu_mutex_unlock(&p->mutex); + if (!multifd_do_send(p, &local_err)) { + assert(local_err); + goto out; } + qemu_mutex_lock(&p->mutex); + + qemu_sem_post(&p->sem_sync); + p->pending_sync = false; } qemu_mutex_unlock(&p->mutex); } @@ -941,7 +957,8 @@ int multifd_save_setup(Error **errp) qemu_mutex_init(&p->mutex); qemu_sem_init(&p->sem, 0); qemu_sem_init(&p->sem_sync, 0); - p->pending_job = 0; + p->pending_job = false; + p->pending_sync = false; p->id = i; p->pages = multifd_pages_init(page_count); p->packet_len = sizeof(MultiFDPacket_t)
diff --git a/migration/multifd.h b/migration/multifd.h index 2acf400085..ddee7b8d8a 100644 --- a/migration/multifd.h +++ b/migration/multifd.h @@ -101,12 +101,16 @@ typedef struct { uint32_t flags; /* global number of generated multifd packets */ uint64_t packet_num; - /* thread has work to do */ - int pending_job; - /* array of pages to sent. - * The owner of 'pages' depends of 'pending_job' value: - * pending_job == 0 -> migration_thread can use it. - * pending_job != 0 -> multifd_channel can use it. + /* thread has a request to sync all data */ + bool pending_sync; + /* thread has something to send */ + bool pending_job; + /* + * Array of pages to sent. The owner of 'pages' depends of + * 'pending_job' value: + * + * - true -> multifd_channel owns it. + * - false -> migration_thread owns it. */ MultiFDPages_t *pages; diff --git a/migration/multifd.c b/migration/multifd.c index 8140520843..fe8d746ff9 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -425,7 +425,7 @@ static int multifd_send_pages(QEMUFile *f) p = &multifd_send_state->params[i]; qemu_mutex_lock(&p->mutex); if (!p->pending_job) { - p->pending_job++; + p->pending_job = true; next_channel = (i + 1) % migrate_multifd_channels(); break; } @@ -615,8 +615,7 @@ int multifd_send_sync_main(QEMUFile *f) qemu_mutex_lock(&p->mutex); p->packet_num = multifd_send_state->packet_num++; - p->flags |= MULTIFD_FLAG_SYNC; - p->pending_job++; + p->pending_sync = true; qemu_mutex_unlock(&p->mutex); qemu_sem_post(&p->sem); } @@ -747,8 +746,6 @@ static void *multifd_send_thread(void *opaque) qemu_mutex_lock(&p->mutex); if (p->pending_job) { - bool need_sync = p->flags & MULTIFD_FLAG_SYNC; - if (!multifd_send_prepare(p, &local_err)) { assert(local_err); qemu_mutex_unlock(&p->mutex); @@ -764,12 +761,27 @@ static void *multifd_send_thread(void *opaque) qemu_mutex_lock(&p->mutex); /* Send successful, mark the task completed */ - p->pending_job--; + p->pending_job = false; + + } else if (p->pending_sync) { + p->flags |= MULTIFD_FLAG_SYNC; + + if (!multifd_send_prepare(p, &local_err)) { + assert(local_err); + qemu_mutex_unlock(&p->mutex); + goto out; + } - /* If this is a sync task, we need one more kick */ - if (need_sync) { - qemu_sem_post(&p->sem_sync); + /* Send the packets without mutex */ + qemu_mutex_unlock(&p->mutex); + if (!multifd_do_send(p, &local_err)) { + assert(local_err); + goto out; } + qemu_mutex_lock(&p->mutex); + + qemu_sem_post(&p->sem_sync); + p->pending_sync = false; } qemu_mutex_unlock(&p->mutex); } @@ -932,7 +944,8 @@ int multifd_save_setup(Error **errp) qemu_mutex_init(&p->mutex); qemu_sem_init(&p->sem, 0); qemu_sem_init(&p->sem_sync, 0); - p->pending_job = 0; + p->pending_job = false; + p->pending_sync = false; p->id = i; p->pages = multifd_pages_init(page_count); p->packet_len = sizeof(MultiFDPacket_t)
Signed-off-by: Peter Xu <peterx@redhat.com> --- migration/multifd.h | 16 ++++++++++------ migration/multifd.c | 33 +++++++++++++++++++++++---------- 2 files changed, 33 insertions(+), 16 deletions(-)