Message ID | 20240131103111.306523-4-peterx@redhat.com |
---|---|
State | New |
Headers | show |
Series | migration/multifd: Refactor ->send_prepare() and cleanups | expand |
peterx@redhat.com writes: > From: Peter Xu <peterx@redhat.com> > > Multifd send side has two fields to indicate error quits: > > - MultiFDSendParams.quit > - &multifd_send_state->exiting > > Merge them into the global one. The replacement is done by changing all > p->quit checks into the global var check. The global check doesn't need > any lock. > > A few more things done on top of this altogether: > > - multifd_send_terminate_threads() > > Moving the xchg() of &multifd_send_state->exiting upper, so as to cover > the tracepoint, migrate_set_error() and migrate_set_state(). Good. > > - multifd_send_sync_main() > > In the 2nd loop, add one more check over the global var to make sure we > don't keep the looping if QEMU already decided to quit. Yes, also because we don't necessarily enter at multifd_send_page() every time. > > - multifd_tls_outgoing_handshake() > > Use multifd_send_terminate_threads() to set the error state. That has > a benefit of updating MigrationState.error to that error too, so we can > persist that 1st error we hit in that specific channel. Makes sense. > > - multifd_new_send_channel_async() > > Take similar approach like above, drop the migrate_set_error() because > multifd_send_terminate_threads() already covers that. Unwrap the helper > multifd_new_send_channel_cleanup() along the way; not really needed. > > Signed-off-by: Peter Xu <peterx@redhat.com> > --- > migration/multifd.h | 2 -- > migration/multifd.c | 85 ++++++++++++++++++--------------------------- > 2 files changed, 33 insertions(+), 54 deletions(-) > > diff --git a/migration/multifd.h b/migration/multifd.h > index 35d11f103c..7c040cb85a 100644 > --- a/migration/multifd.h > +++ b/migration/multifd.h > @@ -95,8 +95,6 @@ typedef struct { > QemuMutex mutex; > /* is this channel thread running */ > bool running; > - /* should this thread finish */ > - bool quit; > /* multifd flags for each packet */ > uint32_t flags; > /* global number of generated multifd packets */ > diff --git a/migration/multifd.c b/migration/multifd.c > index b8d2c96533..2c98023d67 100644 > --- a/migration/multifd.c > +++ b/migration/multifd.c > @@ -372,6 +372,11 @@ struct { > MultiFDMethods *ops; > } *multifd_send_state; > > +static bool multifd_send_should_exit(void) > +{ > + return qatomic_read(&multifd_send_state->exiting); > +} > + > /* > * The migration thread can wait on either of the two semaphores. This > * function can be used to kick the main thread out of waiting on either of > @@ -409,7 +414,7 @@ static int multifd_send_pages(void) > MultiFDSendParams *p = NULL; /* make happy gcc */ > MultiFDPages_t *pages = multifd_send_state->pages; > > - if (qatomic_read(&multifd_send_state->exiting)) { > + if (multifd_send_should_exit()) { > return -1; > } > > @@ -421,14 +426,11 @@ static int multifd_send_pages(void) > */ > next_channel %= migrate_multifd_channels(); > for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { > - p = &multifd_send_state->params[i]; > - > - qemu_mutex_lock(&p->mutex); > - if (p->quit) { > - error_report("%s: channel %d has already quit!", __func__, i); > - qemu_mutex_unlock(&p->mutex); > + if (multifd_send_should_exit()) { > return -1; > } > + p = &multifd_send_state->params[i]; > + qemu_mutex_lock(&p->mutex); > if (!p->pending_job) { > p->pending_job++; > next_channel = (i + 1) % migrate_multifd_channels(); Hm, I'm not sure it's correct to check 'exiting' outside of the lock. While it is an atomic operation, it is not atomic in relation to pending_job... ... looking closer, it seems that we can do what you suggest because p->pending_job is not touched by the multifd_send_thread in case of error, which means this function will indeed miss the 'exiting' flag, but pending_job > 0 means it will loop to the next channel and _then_ it will see the 'exiting' flag. > @@ -483,6 +485,16 @@ static void multifd_send_terminate_threads(Error *err) > { > int i; > > + /* > + * We don't want to exit each threads twice. Depending on where > + * we get the error, or if there are two independent errors in two > + * threads at the same time, we can end calling this function > + * twice. > + */ > + if (qatomic_xchg(&multifd_send_state->exiting, 1)) { > + return; > + } > + > trace_multifd_send_terminate_threads(err != NULL); > > if (err) { > @@ -497,26 +509,13 @@ static void multifd_send_terminate_threads(Error *err) > } > } > > - /* > - * We don't want to exit each threads twice. Depending on where > - * we get the error, or if there are two independent errors in two > - * threads at the same time, we can end calling this function > - * twice. > - */ > - if (qatomic_xchg(&multifd_send_state->exiting, 1)) { > - return; > - } > - > for (i = 0; i < migrate_multifd_channels(); i++) { > MultiFDSendParams *p = &multifd_send_state->params[i]; > > - qemu_mutex_lock(&p->mutex); > - p->quit = true; Now that you removed this, we decoupled kicking the threads from setting the exit/error, so this function could be split in two. We could set the exiting flag at the places the error occurred (multifd threads, thread creation, etc) and "terminate the threads" at multifd_save_cleanup(). That second part we already do actually: void multifd_save_cleanup(void) { ... multifd_send_terminate_threads(NULL); ^see? for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; if (p->running) { qemu_thread_join(&p->thread); } } ... } I think there's no reason anymore for the channels to kick each other. They would all be waiting at p->sem and multifd_send_cleanup() would kick + join them. > qemu_sem_post(&p->sem); > if (p->c) { > qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); > } > - qemu_mutex_unlock(&p->mutex); > } > } > > @@ -615,16 +614,13 @@ int multifd_send_sync_main(void) > for (i = 0; i < migrate_multifd_channels(); i++) { > MultiFDSendParams *p = &multifd_send_state->params[i]; > > - trace_multifd_send_sync_main_signal(p->id); > - > - qemu_mutex_lock(&p->mutex); > - > - if (p->quit) { > - error_report("%s: channel %d has already quit", __func__, i); > - qemu_mutex_unlock(&p->mutex); > + if (multifd_send_should_exit()) { > return -1; > } > > + trace_multifd_send_sync_main_signal(p->id); > + > + qemu_mutex_lock(&p->mutex); > p->packet_num = multifd_send_state->packet_num++; > p->flags |= MULTIFD_FLAG_SYNC; > p->pending_job++; > @@ -634,6 +630,10 @@ int multifd_send_sync_main(void) > for (i = 0; i < migrate_multifd_channels(); i++) { > MultiFDSendParams *p = &multifd_send_state->params[i]; > > + if (multifd_send_should_exit()) { > + return -1; > + } > + > qemu_sem_wait(&multifd_send_state->channels_ready); > trace_multifd_send_sync_main_wait(p->id); > qemu_sem_wait(&p->sem_sync); > @@ -671,7 +671,7 @@ static void *multifd_send_thread(void *opaque) > qemu_sem_post(&multifd_send_state->channels_ready); > qemu_sem_wait(&p->sem); > > - if (qatomic_read(&multifd_send_state->exiting)) { > + if (multifd_send_should_exit()) { > break; > } > qemu_mutex_lock(&p->mutex); > @@ -786,12 +786,7 @@ static void multifd_tls_outgoing_handshake(QIOTask *task, > > trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err)); > > - migrate_set_error(migrate_get_current(), err); > - /* > - * Error happen, mark multifd_send_thread status as 'quit' although it > - * is not created, and then tell who pay attention to me. > - */ > - p->quit = true; > + multifd_send_terminate_threads(err); > multifd_send_kick_main(p); > error_free(err); > } > @@ -857,22 +852,6 @@ static bool multifd_channel_connect(MultiFDSendParams *p, > return true; > } > > -static void multifd_new_send_channel_cleanup(MultiFDSendParams *p, > - QIOChannel *ioc, Error *err) > -{ > - migrate_set_error(migrate_get_current(), err); > - /* Error happen, we need to tell who pay attention to me */ > - multifd_send_kick_main(p); > - /* > - * Although multifd_send_thread is not created, but main migration > - * thread need to judge whether it is running, so we need to mark > - * its status. > - */ > - p->quit = true; > - object_unref(OBJECT(ioc)); > - error_free(err); > -} > - > static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) > { > MultiFDSendParams *p = opaque; > @@ -889,7 +868,10 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) > } > > trace_multifd_new_send_channel_async_error(p->id, local_err); > - multifd_new_send_channel_cleanup(p, ioc, local_err); > + multifd_send_terminate_threads(local_err); > + multifd_send_kick_main(p); > + object_unref(OBJECT(ioc)); > + error_free(local_err); > } > > static void multifd_new_send_channel_create(gpointer opaque) > @@ -921,7 +903,6 @@ int multifd_save_setup(Error **errp) > qemu_mutex_init(&p->mutex); > qemu_sem_init(&p->sem, 0); > qemu_sem_init(&p->sem_sync, 0); > - p->quit = false; > p->pending_job = 0; > p->id = i; > p->pages = multifd_pages_init(page_count);
On Wed, Jan 31, 2024 at 12:05:08PM -0300, Fabiano Rosas wrote: > peterx@redhat.com writes: > > > From: Peter Xu <peterx@redhat.com> > > > > Multifd send side has two fields to indicate error quits: > > > > - MultiFDSendParams.quit > > - &multifd_send_state->exiting > > > > Merge them into the global one. The replacement is done by changing all > > p->quit checks into the global var check. The global check doesn't need > > any lock. > > > > A few more things done on top of this altogether: > > > > - multifd_send_terminate_threads() > > > > Moving the xchg() of &multifd_send_state->exiting upper, so as to cover > > the tracepoint, migrate_set_error() and migrate_set_state(). > > Good. > > > > > - multifd_send_sync_main() > > > > In the 2nd loop, add one more check over the global var to make sure we > > don't keep the looping if QEMU already decided to quit. > > Yes, also because we don't necessarily enter at multifd_send_page() > every time. > > > > > - multifd_tls_outgoing_handshake() > > > > Use multifd_send_terminate_threads() to set the error state. That has > > a benefit of updating MigrationState.error to that error too, so we can > > persist that 1st error we hit in that specific channel. > > Makes sense. > > > > > - multifd_new_send_channel_async() > > > > Take similar approach like above, drop the migrate_set_error() because > > multifd_send_terminate_threads() already covers that. Unwrap the helper > > multifd_new_send_channel_cleanup() along the way; not really needed. > > > > Signed-off-by: Peter Xu <peterx@redhat.com> > > --- > > migration/multifd.h | 2 -- > > migration/multifd.c | 85 ++++++++++++++++++--------------------------- > > 2 files changed, 33 insertions(+), 54 deletions(-) > > > > diff --git a/migration/multifd.h b/migration/multifd.h > > index 35d11f103c..7c040cb85a 100644 > > --- a/migration/multifd.h > > +++ b/migration/multifd.h > > @@ -95,8 +95,6 @@ typedef struct { > > QemuMutex mutex; > > /* is this channel thread running */ > > bool running; > > - /* should this thread finish */ > > - bool quit; > > /* multifd flags for each packet */ > > uint32_t flags; > > /* global number of generated multifd packets */ > > diff --git a/migration/multifd.c b/migration/multifd.c > > index b8d2c96533..2c98023d67 100644 > > --- a/migration/multifd.c > > +++ b/migration/multifd.c > > @@ -372,6 +372,11 @@ struct { > > MultiFDMethods *ops; > > } *multifd_send_state; > > > > +static bool multifd_send_should_exit(void) > > +{ > > + return qatomic_read(&multifd_send_state->exiting); > > +} > > + > > /* > > * The migration thread can wait on either of the two semaphores. This > > * function can be used to kick the main thread out of waiting on either of > > @@ -409,7 +414,7 @@ static int multifd_send_pages(void) > > MultiFDSendParams *p = NULL; /* make happy gcc */ > > MultiFDPages_t *pages = multifd_send_state->pages; > > > > - if (qatomic_read(&multifd_send_state->exiting)) { > > + if (multifd_send_should_exit()) { > > return -1; > > } > > > > @@ -421,14 +426,11 @@ static int multifd_send_pages(void) > > */ > > next_channel %= migrate_multifd_channels(); > > for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { > > - p = &multifd_send_state->params[i]; > > - > > - qemu_mutex_lock(&p->mutex); > > - if (p->quit) { > > - error_report("%s: channel %d has already quit!", __func__, i); > > - qemu_mutex_unlock(&p->mutex); > > + if (multifd_send_should_exit()) { > > return -1; > > } > > + p = &multifd_send_state->params[i]; > > + qemu_mutex_lock(&p->mutex); > > if (!p->pending_job) { > > p->pending_job++; > > next_channel = (i + 1) % migrate_multifd_channels(); > > Hm, I'm not sure it's correct to check 'exiting' outside of the > lock. While it is an atomic operation, it is not atomic in relation to > pending_job... > > ... looking closer, it seems that we can do what you suggest because > p->pending_job is not touched by the multifd_send_thread in case of > error, which means this function will indeed miss the 'exiting' flag, > but pending_job > 0 means it will loop to the next channel and _then_ it > will see the 'exiting' flag. It could still be the last channel we iterate, then IIUC we can still try to assign a job to a thread even if a concurrent error is set there. However IMHO it's okay; the error in the sender thread should ultimately set migrate_set_error() and the main thread should detect that in the migration loop, then we'll still quit. The extra queued job shouldn't matter, IMHO. > > > @@ -483,6 +485,16 @@ static void multifd_send_terminate_threads(Error *err) > > { > > int i; > > > > + /* > > + * We don't want to exit each threads twice. Depending on where > > + * we get the error, or if there are two independent errors in two > > + * threads at the same time, we can end calling this function > > + * twice. > > + */ > > + if (qatomic_xchg(&multifd_send_state->exiting, 1)) { > > + return; > > + } > > + > > trace_multifd_send_terminate_threads(err != NULL); > > > > if (err) { > > @@ -497,26 +509,13 @@ static void multifd_send_terminate_threads(Error *err) > > } > > } > > > > - /* > > - * We don't want to exit each threads twice. Depending on where > > - * we get the error, or if there are two independent errors in two > > - * threads at the same time, we can end calling this function > > - * twice. > > - */ > > - if (qatomic_xchg(&multifd_send_state->exiting, 1)) { > > - return; > > - } > > - > > for (i = 0; i < migrate_multifd_channels(); i++) { > > MultiFDSendParams *p = &multifd_send_state->params[i]; > > > > - qemu_mutex_lock(&p->mutex); > > - p->quit = true; > > Now that you removed this, we decoupled kicking the threads from setting > the exit/error, so this function could be split in two. > > We could set the exiting flag at the places the error occurred (multifd > threads, thread creation, etc) and "terminate the threads" at > multifd_save_cleanup(). That second part we already do actually: > > void multifd_save_cleanup(void) { > ... > multifd_send_terminate_threads(NULL); > ^see? > for (i = 0; i < migrate_multifd_channels(); i++) { > MultiFDSendParams *p = &multifd_send_state->params[i]; > > if (p->running) { > qemu_thread_join(&p->thread); > } > } > ... > } > > I think there's no reason anymore for the channels to kick each > other. They would all be waiting at p->sem and multifd_send_cleanup() > would kick + join them. Sounds good here. I'll attach one patch like this, feel free to have an early look: ===== From f9a3d63d5cca0068daaea4c72392803f4b29dcb5 Mon Sep 17 00:00:00 2001 From: Peter Xu <peterx@redhat.com> Date: Thu, 1 Feb 2024 17:01:54 +0800 Subject: [PATCH] migration/multifd: Split multifd_send_terminate_threads() Split multifd_send_terminate_threads() into two functions: - multifd_send_set_error(): used when an error happened on the sender side, set error and quit state only - multifd_send_terminate_threads(): used only by the main thread to kick all multifd send threads out of sleep, for the last recycling. Use multifd_send_set_error() in the three old call sites where only the error will be set. Use multifd_send_terminate_threads() in the last one where the main thread will kick the multifd threads at last in multifd_save_cleanup(). Both helpers will need to set quitting=1. Suggested-by: Fabiano Rosas <farosas@suse.de> Signed-off-by: Peter Xu <peterx@redhat.com> --- migration/multifd.c | 27 ++++++++++++++++++--------- migration/trace-events | 2 +- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/migration/multifd.c b/migration/multifd.c index c71e74b101..95dc29c8c7 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -536,10 +536,9 @@ int multifd_queue_page(RAMBlock *block, ram_addr_t offset) return 1; } -static void multifd_send_terminate_threads(Error *err) +/* Multifd send side hit an error; remember it and prepare to quit */ +static void multifd_send_set_error(Error *err) { - int i; - /* * We don't want to exit each threads twice. Depending on where * we get the error, or if there are two independent errors in two @@ -550,8 +549,6 @@ static void multifd_send_terminate_threads(Error *err) return; } - trace_multifd_send_terminate_threads(err != NULL); - if (err) { MigrationState *s = migrate_get_current(); migrate_set_error(s, err); @@ -563,7 +560,19 @@ static void multifd_send_terminate_threads(Error *err) MIGRATION_STATUS_FAILED); } } +} + +static void multifd_send_terminate_threads(void) +{ + int i; + + trace_multifd_send_terminate_threads(); + /* + * Tell everyone we're quitting. No xchg() needed here; we simply + * always set it. + */ + qatomic_set(&multifd_send_state->exiting, 1); for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; @@ -586,7 +595,7 @@ void multifd_save_cleanup(void) if (!migrate_multifd()) { return; } - multifd_send_terminate_threads(NULL); + multifd_send_terminate_threads(); for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; @@ -778,7 +787,7 @@ out: if (ret) { assert(local_err); trace_multifd_send_error(p->id); - multifd_send_terminate_threads(local_err); + multifd_send_set_error(local_err); multifd_send_kick_main(p); error_free(local_err); } @@ -814,7 +823,7 @@ static void multifd_tls_outgoing_handshake(QIOTask *task, trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err)); - multifd_send_terminate_threads(err); + multifd_send_set_error(err); multifd_send_kick_main(p); error_free(err); } @@ -896,7 +905,7 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) } trace_multifd_new_send_channel_async_error(p->id, local_err); - multifd_send_terminate_threads(local_err); + multifd_send_set_error(local_err); multifd_send_kick_main(p); object_unref(OBJECT(ioc)); error_free(local_err); diff --git a/migration/trace-events b/migration/trace-events index de4a743c8a..298ad2b0dd 100644 --- a/migration/trace-events +++ b/migration/trace-events @@ -141,7 +141,7 @@ multifd_send_error(uint8_t id) "channel %u" multifd_send_sync_main(long packet_num) "packet num %ld" multifd_send_sync_main_signal(uint8_t id) "channel %u" multifd_send_sync_main_wait(uint8_t id) "channel %u" -multifd_send_terminate_threads(bool error) "error %d" +multifd_send_terminate_threads(void) "" multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t normal_pages) "channel %u packets %" PRIu64 " normal pages %" PRIu64 multifd_send_thread_start(uint8_t id) "%u" multifd_tls_outgoing_handshake_start(void *ioc, void *tioc, const char *hostname) "ioc=%p tioc=%p hostname=%s"
Peter Xu <peterx@redhat.com> writes: > On Wed, Jan 31, 2024 at 12:05:08PM -0300, Fabiano Rosas wrote: >> peterx@redhat.com writes: >> >> > From: Peter Xu <peterx@redhat.com> >> > >> > Multifd send side has two fields to indicate error quits: >> > >> > - MultiFDSendParams.quit >> > - &multifd_send_state->exiting >> > >> > Merge them into the global one. The replacement is done by changing all >> > p->quit checks into the global var check. The global check doesn't need >> > any lock. >> > >> > A few more things done on top of this altogether: >> > >> > - multifd_send_terminate_threads() >> > >> > Moving the xchg() of &multifd_send_state->exiting upper, so as to cover >> > the tracepoint, migrate_set_error() and migrate_set_state(). >> >> Good. >> >> > >> > - multifd_send_sync_main() >> > >> > In the 2nd loop, add one more check over the global var to make sure we >> > don't keep the looping if QEMU already decided to quit. >> >> Yes, also because we don't necessarily enter at multifd_send_page() >> every time. >> >> > >> > - multifd_tls_outgoing_handshake() >> > >> > Use multifd_send_terminate_threads() to set the error state. That has >> > a benefit of updating MigrationState.error to that error too, so we can >> > persist that 1st error we hit in that specific channel. >> >> Makes sense. >> >> > >> > - multifd_new_send_channel_async() >> > >> > Take similar approach like above, drop the migrate_set_error() because >> > multifd_send_terminate_threads() already covers that. Unwrap the helper >> > multifd_new_send_channel_cleanup() along the way; not really needed. >> > >> > Signed-off-by: Peter Xu <peterx@redhat.com> >> > --- >> > migration/multifd.h | 2 -- >> > migration/multifd.c | 85 ++++++++++++++++++--------------------------- >> > 2 files changed, 33 insertions(+), 54 deletions(-) >> > >> > diff --git a/migration/multifd.h b/migration/multifd.h >> > index 35d11f103c..7c040cb85a 100644 >> > --- a/migration/multifd.h >> > +++ b/migration/multifd.h >> > @@ -95,8 +95,6 @@ typedef struct { >> > QemuMutex mutex; >> > /* is this channel thread running */ >> > bool running; >> > - /* should this thread finish */ >> > - bool quit; >> > /* multifd flags for each packet */ >> > uint32_t flags; >> > /* global number of generated multifd packets */ >> > diff --git a/migration/multifd.c b/migration/multifd.c >> > index b8d2c96533..2c98023d67 100644 >> > --- a/migration/multifd.c >> > +++ b/migration/multifd.c >> > @@ -372,6 +372,11 @@ struct { >> > MultiFDMethods *ops; >> > } *multifd_send_state; >> > >> > +static bool multifd_send_should_exit(void) >> > +{ >> > + return qatomic_read(&multifd_send_state->exiting); >> > +} >> > + >> > /* >> > * The migration thread can wait on either of the two semaphores. This >> > * function can be used to kick the main thread out of waiting on either of >> > @@ -409,7 +414,7 @@ static int multifd_send_pages(void) >> > MultiFDSendParams *p = NULL; /* make happy gcc */ >> > MultiFDPages_t *pages = multifd_send_state->pages; >> > >> > - if (qatomic_read(&multifd_send_state->exiting)) { >> > + if (multifd_send_should_exit()) { >> > return -1; >> > } v>> > >> > @@ -421,14 +426,11 @@ static int multifd_send_pages(void) >> > */ >> > next_channel %= migrate_multifd_channels(); >> > for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { >> > - p = &multifd_send_state->params[i]; >> > - >> > - qemu_mutex_lock(&p->mutex); >> > - if (p->quit) { >> > - error_report("%s: channel %d has already quit!", __func__, i); >> > - qemu_mutex_unlock(&p->mutex); >> > + if (multifd_send_should_exit()) { >> > return -1; >> > } >> > + p = &multifd_send_state->params[i]; >> > + qemu_mutex_lock(&p->mutex); >> > if (!p->pending_job) { >> > p->pending_job++; >> > next_channel = (i + 1) % migrate_multifd_channels(); >> >> Hm, I'm not sure it's correct to check 'exiting' outside of the >> lock. While it is an atomic operation, it is not atomic in relation to >> pending_job... >> >> ... looking closer, it seems that we can do what you suggest because >> p->pending_job is not touched by the multifd_send_thread in case of >> error, which means this function will indeed miss the 'exiting' flag, >> but pending_job > 0 means it will loop to the next channel and _then_ it >> will see the 'exiting' flag. > > It could still be the last channel we iterate, then IIUC we can still try > to assign a job to a thread even if a concurrent error is set there. > > However IMHO it's okay; the error in the sender thread should ultimately > set migrate_set_error() and the main thread should detect that in the > migration loop, then we'll still quit. The extra queued job shouldn't > matter, IMHO. > >> >> > @@ -483,6 +485,16 @@ static void multifd_send_terminate_threads(Error *err) >> > { >> > int i; >> > >> > + /* >> > + * We don't want to exit each threads twice. Depending on where >> > + * we get the error, or if there are two independent errors in two >> > + * threads at the same time, we can end calling this function >> > + * twice. >> > + */ >> > + if (qatomic_xchg(&multifd_send_state->exiting, 1)) { >> > + return; >> > + } >> > + >> > trace_multifd_send_terminate_threads(err != NULL); >> > >> > if (err) { >> > @@ -497,26 +509,13 @@ static void multifd_send_terminate_threads(Error *err) >> > } >> > } >> > >> > - /* >> > - * We don't want to exit each threads twice. Depending on where >> > - * we get the error, or if there are two independent errors in two >> > - * threads at the same time, we can end calling this function >> > - * twice. >> > - */ >> > - if (qatomic_xchg(&multifd_send_state->exiting, 1)) { >> > - return; >> > - } >> > - >> > for (i = 0; i < migrate_multifd_channels(); i++) { >> > MultiFDSendParams *p = &multifd_send_state->params[i]; >> > >> > - qemu_mutex_lock(&p->mutex); >> > - p->quit = true; >> >> Now that you removed this, we decoupled kicking the threads from setting >> the exit/error, so this function could be split in two. >> >> We could set the exiting flag at the places the error occurred (multifd >> threads, thread creation, etc) and "terminate the threads" at >> multifd_save_cleanup(). That second part we already do actually: >> >> void multifd_save_cleanup(void) { >> ... >> multifd_send_terminate_threads(NULL); >> ^see? >> for (i = 0; i < migrate_multifd_channels(); i++) { >> MultiFDSendParams *p = &multifd_send_state->params[i]; >> >> if (p->running) { >> qemu_thread_join(&p->thread); >> } >> } >> ... >> } >> >> I think there's no reason anymore for the channels to kick each >> other. They would all be waiting at p->sem and multifd_send_cleanup() >> would kick + join them. > > Sounds good here. > > I'll attach one patch like this, feel free to have an early look: > > ===== > > From f9a3d63d5cca0068daaea4c72392803f4b29dcb5 Mon Sep 17 00:00:00 2001 > From: Peter Xu <peterx@redhat.com> > Date: Thu, 1 Feb 2024 17:01:54 +0800 > Subject: [PATCH] migration/multifd: Split multifd_send_terminate_threads() > > Split multifd_send_terminate_threads() into two functions: > > - multifd_send_set_error(): used when an error happened on the sender > side, set error and quit state only > > - multifd_send_terminate_threads(): used only by the main thread to kick > all multifd send threads out of sleep, for the last recycling. > > Use multifd_send_set_error() in the three old call sites where only the > error will be set. > > Use multifd_send_terminate_threads() in the last one where the main thread > will kick the multifd threads at last in multifd_save_cleanup(). > > Both helpers will need to set quitting=1. > > Suggested-by: Fabiano Rosas <farosas@suse.de> > Signed-off-by: Peter Xu <peterx@redhat.com> New patch looks good. > --- > migration/multifd.c | 27 ++++++++++++++++++--------- > migration/trace-events | 2 +- > 2 files changed, 19 insertions(+), 10 deletions(-) > > diff --git a/migration/multifd.c b/migration/multifd.c > index c71e74b101..95dc29c8c7 100644 > --- a/migration/multifd.c > +++ b/migration/multifd.c > @@ -536,10 +536,9 @@ int multifd_queue_page(RAMBlock *block, ram_addr_t offset) > return 1; > } > > -static void multifd_send_terminate_threads(Error *err) > +/* Multifd send side hit an error; remember it and prepare to quit */ > +static void multifd_send_set_error(Error *err) > { > - int i; > - > /* > * We don't want to exit each threads twice. Depending on where > * we get the error, or if there are two independent errors in two > @@ -550,8 +549,6 @@ static void multifd_send_terminate_threads(Error *err) > return; > } > > - trace_multifd_send_terminate_threads(err != NULL); > - > if (err) { > MigrationState *s = migrate_get_current(); > migrate_set_error(s, err); > @@ -563,7 +560,19 @@ static void multifd_send_terminate_threads(Error *err) > MIGRATION_STATUS_FAILED); > } > } > +} > + > +static void multifd_send_terminate_threads(void) > +{ > + int i; > + > + trace_multifd_send_terminate_threads(); > > + /* > + * Tell everyone we're quitting. No xchg() needed here; we simply > + * always set it. > + */ > + qatomic_set(&multifd_send_state->exiting, 1); > for (i = 0; i < migrate_multifd_channels(); i++) { > MultiFDSendParams *p = &multifd_send_state->params[i]; > > @@ -586,7 +595,7 @@ void multifd_save_cleanup(void) > if (!migrate_multifd()) { > return; > } > - multifd_send_terminate_threads(NULL); > + multifd_send_terminate_threads(); > for (i = 0; i < migrate_multifd_channels(); i++) { > MultiFDSendParams *p = &multifd_send_state->params[i]; We could then move the qemu_thread_join loop into multifd_send_terminate_threads(). (and fix all the bugs we have so that we only progress past multifd_send_terminate_threads() once all threads have exited and no more thread is going to spawn) > > @@ -778,7 +787,7 @@ out: > if (ret) { > assert(local_err); > trace_multifd_send_error(p->id); > - multifd_send_terminate_threads(local_err); > + multifd_send_set_error(local_err); > multifd_send_kick_main(p); > error_free(local_err); > } > @@ -814,7 +823,7 @@ static void multifd_tls_outgoing_handshake(QIOTask *task, > > trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err)); > > - multifd_send_terminate_threads(err); > + multifd_send_set_error(err); > multifd_send_kick_main(p); > error_free(err); > } > @@ -896,7 +905,7 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) > } > > trace_multifd_new_send_channel_async_error(p->id, local_err); > - multifd_send_terminate_threads(local_err); > + multifd_send_set_error(local_err); > multifd_send_kick_main(p); > object_unref(OBJECT(ioc)); > error_free(local_err); > diff --git a/migration/trace-events b/migration/trace-events > index de4a743c8a..298ad2b0dd 100644 > --- a/migration/trace-events > +++ b/migration/trace-events > @@ -141,7 +141,7 @@ multifd_send_error(uint8_t id) "channel %u" > multifd_send_sync_main(long packet_num) "packet num %ld" > multifd_send_sync_main_signal(uint8_t id) "channel %u" > multifd_send_sync_main_wait(uint8_t id) "channel %u" > -multifd_send_terminate_threads(bool error) "error %d" > +multifd_send_terminate_threads(void) "" > multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t normal_pages) "channel %u packets %" PRIu64 " normal pages %" PRIu64 > multifd_send_thread_start(uint8_t id) "%u" > multifd_tls_outgoing_handshake_start(void *ioc, void *tioc, const char *hostname) "ioc=%p tioc=%p hostname=%s" > -- > 2.43.0
On Thu, Feb 01, 2024 at 10:30:19AM -0300, Fabiano Rosas wrote: > > @@ -586,7 +595,7 @@ void multifd_save_cleanup(void) > > if (!migrate_multifd()) { > > return; > > } > > - multifd_send_terminate_threads(NULL); > > + multifd_send_terminate_threads(); > > for (i = 0; i < migrate_multifd_channels(); i++) { > > MultiFDSendParams *p = &multifd_send_state->params[i]; > > We could then move the qemu_thread_join loop into > multifd_send_terminate_threads(). Sure, I can do that. When at it, I found that maybe I should cleanup more things in this function to provide small helpers. I think I'll keep this one alone, while I'll append one more patch to do it. > > (and fix all the bugs we have so that we only progress past > multifd_send_terminate_threads() once all threads have exited and no > more thread is going to spawn) I guess this will still take some effort. I hope that we can avoid some threads from being created at all for either async/tls purpose. For now when I'm doing the cleanup I'll add a TODO too for this. I'll repost a new version for the whole set today.
diff --git a/migration/multifd.h b/migration/multifd.h index 35d11f103c..7c040cb85a 100644 --- a/migration/multifd.h +++ b/migration/multifd.h @@ -95,8 +95,6 @@ typedef struct { QemuMutex mutex; /* is this channel thread running */ bool running; - /* should this thread finish */ - bool quit; /* multifd flags for each packet */ uint32_t flags; /* global number of generated multifd packets */ diff --git a/migration/multifd.c b/migration/multifd.c index b8d2c96533..2c98023d67 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -372,6 +372,11 @@ struct { MultiFDMethods *ops; } *multifd_send_state; +static bool multifd_send_should_exit(void) +{ + return qatomic_read(&multifd_send_state->exiting); +} + /* * The migration thread can wait on either of the two semaphores. This * function can be used to kick the main thread out of waiting on either of @@ -409,7 +414,7 @@ static int multifd_send_pages(void) MultiFDSendParams *p = NULL; /* make happy gcc */ MultiFDPages_t *pages = multifd_send_state->pages; - if (qatomic_read(&multifd_send_state->exiting)) { + if (multifd_send_should_exit()) { return -1; } @@ -421,14 +426,11 @@ static int multifd_send_pages(void) */ next_channel %= migrate_multifd_channels(); for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { - p = &multifd_send_state->params[i]; - - qemu_mutex_lock(&p->mutex); - if (p->quit) { - error_report("%s: channel %d has already quit!", __func__, i); - qemu_mutex_unlock(&p->mutex); + if (multifd_send_should_exit()) { return -1; } + p = &multifd_send_state->params[i]; + qemu_mutex_lock(&p->mutex); if (!p->pending_job) { p->pending_job++; next_channel = (i + 1) % migrate_multifd_channels(); @@ -483,6 +485,16 @@ static void multifd_send_terminate_threads(Error *err) { int i; + /* + * We don't want to exit each threads twice. Depending on where + * we get the error, or if there are two independent errors in two + * threads at the same time, we can end calling this function + * twice. + */ + if (qatomic_xchg(&multifd_send_state->exiting, 1)) { + return; + } + trace_multifd_send_terminate_threads(err != NULL); if (err) { @@ -497,26 +509,13 @@ static void multifd_send_terminate_threads(Error *err) } } - /* - * We don't want to exit each threads twice. Depending on where - * we get the error, or if there are two independent errors in two - * threads at the same time, we can end calling this function - * twice. - */ - if (qatomic_xchg(&multifd_send_state->exiting, 1)) { - return; - } - for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; - qemu_mutex_lock(&p->mutex); - p->quit = true; qemu_sem_post(&p->sem); if (p->c) { qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); } - qemu_mutex_unlock(&p->mutex); } } @@ -615,16 +614,13 @@ int multifd_send_sync_main(void) for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; - trace_multifd_send_sync_main_signal(p->id); - - qemu_mutex_lock(&p->mutex); - - if (p->quit) { - error_report("%s: channel %d has already quit", __func__, i); - qemu_mutex_unlock(&p->mutex); + if (multifd_send_should_exit()) { return -1; } + trace_multifd_send_sync_main_signal(p->id); + + qemu_mutex_lock(&p->mutex); p->packet_num = multifd_send_state->packet_num++; p->flags |= MULTIFD_FLAG_SYNC; p->pending_job++; @@ -634,6 +630,10 @@ int multifd_send_sync_main(void) for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; + if (multifd_send_should_exit()) { + return -1; + } + qemu_sem_wait(&multifd_send_state->channels_ready); trace_multifd_send_sync_main_wait(p->id); qemu_sem_wait(&p->sem_sync); @@ -671,7 +671,7 @@ static void *multifd_send_thread(void *opaque) qemu_sem_post(&multifd_send_state->channels_ready); qemu_sem_wait(&p->sem); - if (qatomic_read(&multifd_send_state->exiting)) { + if (multifd_send_should_exit()) { break; } qemu_mutex_lock(&p->mutex); @@ -786,12 +786,7 @@ static void multifd_tls_outgoing_handshake(QIOTask *task, trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err)); - migrate_set_error(migrate_get_current(), err); - /* - * Error happen, mark multifd_send_thread status as 'quit' although it - * is not created, and then tell who pay attention to me. - */ - p->quit = true; + multifd_send_terminate_threads(err); multifd_send_kick_main(p); error_free(err); } @@ -857,22 +852,6 @@ static bool multifd_channel_connect(MultiFDSendParams *p, return true; } -static void multifd_new_send_channel_cleanup(MultiFDSendParams *p, - QIOChannel *ioc, Error *err) -{ - migrate_set_error(migrate_get_current(), err); - /* Error happen, we need to tell who pay attention to me */ - multifd_send_kick_main(p); - /* - * Although multifd_send_thread is not created, but main migration - * thread need to judge whether it is running, so we need to mark - * its status. - */ - p->quit = true; - object_unref(OBJECT(ioc)); - error_free(err); -} - static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) { MultiFDSendParams *p = opaque; @@ -889,7 +868,10 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) } trace_multifd_new_send_channel_async_error(p->id, local_err); - multifd_new_send_channel_cleanup(p, ioc, local_err); + multifd_send_terminate_threads(local_err); + multifd_send_kick_main(p); + object_unref(OBJECT(ioc)); + error_free(local_err); } static void multifd_new_send_channel_create(gpointer opaque) @@ -921,7 +903,6 @@ int multifd_save_setup(Error **errp) qemu_mutex_init(&p->mutex); qemu_sem_init(&p->sem, 0); qemu_sem_init(&p->sem_sync, 0); - p->quit = false; p->pending_job = 0; p->id = i; p->pages = multifd_pages_init(page_count);