Message ID | 1406125538-27992-12-git-send-email-yanghy@cn.fujitsu.com |
---|---|
State | New |
Headers | show |
* Yang Hongyang (yanghy@cn.fujitsu.com) wrote: > implement colo checkpoint protocol. > > Checkpoint synchronzing points. > > Primary Secondary > NEW @ > Suspend > SUSPENDED @ > Suspend&Save state > SEND @ > Send state Receive state > RECEIVED @ > Flush network Load state > LOADED @ > Resume Resume > > Start Comparing > NOTE: > 1) '@' who sends the message > 2) Every sync-point is synchronized by two sides with only > one handshake(single direction) for low-latency. > If more strict synchronization is required, a opposite direction > sync-point should be added. > 3) Since sync-points are single direction, the remote side may > go forward a lot when this side just receives the sync-point. > > Signed-off-by: Yang Hongyang <yanghy@cn.fujitsu.com> > --- > migration-colo.c | 268 +++++++++++++++++++++++++++++++++++++++++++++++++++++-- > 1 file changed, 262 insertions(+), 6 deletions(-) > > diff --git a/migration-colo.c b/migration-colo.c > index 2699e77..a708872 100644 > --- a/migration-colo.c > +++ b/migration-colo.c > @@ -24,6 +24,41 @@ > */ > #define CHKPOINT_TIMER 10000 > > +enum { > + COLO_READY = 0x46, > + > + /* > + * Checkpoint synchronzing points. > + * > + * Primary Secondary > + * NEW @ > + * Suspend > + * SUSPENDED @ > + * Suspend&Save state > + * SEND @ > + * Send state Receive state > + * RECEIVED @ > + * Flush network Load state > + * LOADED @ > + * Resume Resume > + * > + * Start Comparing > + * NOTE: > + * 1) '@' who sends the message > + * 2) Every sync-point is synchronized by two sides with only > + * one handshake(single direction) for low-latency. > + * If more strict synchronization is required, a opposite direction > + * sync-point should be added. > + * 3) Since sync-points are single direction, the remote side may > + * go forward a lot when this side just receives the sync-point. > + */ > + COLO_CHECKPOINT_NEW, > + COLO_CHECKPOINT_SUSPENDED, > + COLO_CHECKPOINT_SEND, > + COLO_CHECKPOINT_RECEIVED, > + COLO_CHECKPOINT_LOADED, > +}; > + > static QEMUBH *colo_bh; > > bool colo_supported(void) > @@ -185,30 +220,161 @@ static const QEMUFileOps colo_read_ops = { > .close = colo_close, > }; > > +/* colo checkpoint control helper */ > +static bool is_master(void); > +static bool is_slave(void); > + > +static void ctl_error_handler(void *opaque, int err) > +{ > + if (is_slave()) { > + /* TODO: determine whether we need to failover */ > + /* FIXME: we will not failover currently, just kill slave */ > + error_report("error: colo transmission failed!\n"); > + exit(1); > + } else if (is_master()) { > + /* Master still alive, do not failover */ > + error_report("error: colo transmission failed!\n"); > + return; > + } else { > + error_report("COLO: Unexpected error happend!\n"); > + exit(EXIT_FAILURE); > + } > +} > + > +static int colo_ctl_put(QEMUFile *f, uint64_t request) > +{ > + int ret = 0; > + > + qemu_put_be64(f, request); > + qemu_fflush(f); > + > + ret = qemu_file_get_error(f); > + if (ret < 0) { > + ctl_error_handler(f, ret); > + return 1; > + } > + > + return ret; > +} > + > +static int colo_ctl_get_value(QEMUFile *f, uint64_t *value) > +{ > + int ret = 0; > + uint64_t temp; > + > + temp = qemu_get_be64(f); > + > + ret = qemu_file_get_error(f); > + if (ret < 0) { > + ctl_error_handler(f, ret); > + return 1; > + } > + > + *value = temp; > + return 0; > +} > + > +static int colo_ctl_get(QEMUFile *f, uint64_t require) > +{ > + int ret; > + uint64_t value; > + > + ret = colo_ctl_get_value(f, &value); > + if (ret) { > + return ret; > + } > + > + if (value != require) { > + error_report("unexpected state received!\n"); I find it useful to print the expected/received state to be able to figure out what went wrong. > + exit(1); > + } > + > + return ret; > +} > + > /* save */ > > -static __attribute__((unused)) bool is_master(void) > +static bool is_master(void) > { > MigrationState *s = migrate_get_current(); > return (s->state == MIG_STATE_COLO); > } > > +static int do_colo_transaction(MigrationState *s, QEMUFile *control, > + QEMUFile *trans) > +{ > + int ret; > + > + ret = colo_ctl_put(s->file, COLO_CHECKPOINT_NEW); > + if (ret) { > + goto out; > + } > + > + ret = colo_ctl_get(control, COLO_CHECKPOINT_SUSPENDED); What happens at this point if the slave just doesn't respond? (i.e. the socket doesn't drop - you just don't get the byte). > + if (ret) { > + goto out; > + } > + > + /* TODO: suspend and save vm state to colo buffer */ > + > + ret = colo_ctl_put(s->file, COLO_CHECKPOINT_SEND); > + if (ret) { > + goto out; > + } > + > + /* TODO: send vmstate to slave */ > + > + ret = colo_ctl_get(control, COLO_CHECKPOINT_RECEIVED); > + if (ret) { > + goto out; > + } > + > + /* TODO: Flush network etc. */ > + > + ret = colo_ctl_get(control, COLO_CHECKPOINT_LOADED); > + if (ret) { > + goto out; > + } > + > + /* TODO: resume master */ > + > +out: > + return ret; > +} > + > static void *colo_thread(void *opaque) > { > MigrationState *s = opaque; > int dev_hotplug = qdev_hotplug, wait_cp = 0; > int64_t start_time = qemu_clock_get_ms(QEMU_CLOCK_HOST); > int64_t current_time; > + QEMUFile *colo_control = NULL, *colo_trans = NULL; > + int ret; > > if (colo_compare_init() < 0) { > error_report("Init colo compare error\n"); > goto out; > } > > + colo_control = qemu_fopen_socket(qemu_get_fd(s->file), "rb"); > + if (!colo_control) { > + error_report("open colo_control failed\n"); > + goto out; > + } In my postcopy world I'm trying to abstract this type of thing into a 'return path' so that the QEMUFile can implement it however it wants and you don't need to assume it's a socket. But I'm still fighting some of those details. Dave > + > qdev_hotplug = 0; > > colo_buffer_init(); > > + /* > + * Wait for slave finish loading vm states and enter COLO > + * restore. > + */ > + ret = colo_ctl_get(colo_control, COLO_READY); > + if (ret) { > + goto out; > + } > + > while (s->state == MIG_STATE_COLO) { > /* wait for a colo checkpoint */ > wait_cp = colo_compare(); > @@ -230,13 +396,33 @@ static void *colo_thread(void *opaque) > > /* start a colo checkpoint */ > > - /*TODO: COLO save */ > + /* open colo buffer for write */ > + colo_trans = qemu_fopen_ops(&colo_buffer, &colo_write_ops); > + if (!colo_trans) { > + error_report("open colo buffer failed\n"); > + goto out; > + } > > + if (do_colo_transaction(s, colo_control, colo_trans)) { > + goto out; > + } > + > + qemu_fclose(colo_trans); > + colo_trans = NULL; > start_time = qemu_clock_get_ms(QEMU_CLOCK_HOST); > } > > out: > + if (colo_trans) { > + qemu_fclose(colo_trans); > + } > + > colo_buffer_destroy(); > + > + if (colo_control) { > + qemu_fclose(colo_control); > + } > + > colo_compare_destroy(); > > if (s->state != MIG_STATE_ERROR) { > @@ -281,7 +467,7 @@ void colo_init_checkpointer(MigrationState *s) > > static Coroutine *colo; > > -static __attribute__((unused)) bool is_slave(void) > +static bool is_slave(void) > { > return colo != NULL; > } > @@ -293,13 +479,32 @@ static __attribute__((unused)) bool is_slave(void) > */ > static int slave_wait_new_checkpoint(QEMUFile *f) > { > - /* TODO: wait checkpoint start command from master */ > - return 1; > + int fd = qemu_get_fd(f); > + int ret; > + uint64_t cmd; > + > + yield_until_fd_readable(fd); > + > + ret = colo_ctl_get_value(f, &cmd); > + if (ret) { > + return 1; > + } > + > + if (cmd == COLO_CHECKPOINT_NEW) { > + return 0; > + } else { > + /* Unexpected data received */ > + ctl_error_handler(f, ret); > + return 1; > + } > } > > void colo_process_incoming_checkpoints(QEMUFile *f) > { > + int fd = qemu_get_fd(f); > int dev_hotplug = qdev_hotplug; > + QEMUFile *ctl = NULL; > + int ret; > > if (!restore_use_colo()) { > return; > @@ -310,18 +515,69 @@ void colo_process_incoming_checkpoints(QEMUFile *f) > colo = qemu_coroutine_self(); > assert(colo != NULL); > > + ctl = qemu_fopen_socket(fd, "wb"); > + if (!ctl) { > + error_report("can't open incoming channel\n"); > + goto out; > + } > + > colo_buffer_init(); > > + ret = colo_ctl_put(ctl, COLO_READY); > + if (ret) { > + goto out; > + } > + > + /* TODO: in COLO mode, slave is runing, so start the vm */ > + > while (true) { > if (slave_wait_new_checkpoint(f)) { > break; > } > > - /* TODO: COLO restore */ > + /* start colo checkpoint */ > + > + /* TODO: suspend guest */ > + > + ret = colo_ctl_put(ctl, COLO_CHECKPOINT_SUSPENDED); > + if (ret) { > + goto out; > + } > + > + /* TODO: open colo buffer for read */ > + > + ret = colo_ctl_get(f, COLO_CHECKPOINT_SEND); > + if (ret) { > + goto out; > + } > + > + /* TODO: read migration data into colo buffer */ > + > + ret = colo_ctl_put(ctl, COLO_CHECKPOINT_RECEIVED); > + if (ret) { > + goto out; > + } > + > + /* TODO: load vm state */ > + > + ret = colo_ctl_put(ctl, COLO_CHECKPOINT_LOADED); > + if (ret) { > + goto out; > + } > + > + /* TODO: resume guest */ > + > + /* TODO: close colo buffer */ > } > > +out: > colo_buffer_destroy(); > colo = NULL; > + > + if (ctl) { > + qemu_fclose(ctl); > + } > + > restore_exit_colo(); > > qdev_hotplug = dev_hotplug; > -- > 1.9.1 > -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
在 08/01/2014 11:03 PM, Dr. David Alan Gilbert 写道: > * Yang Hongyang (yanghy@cn.fujitsu.com) wrote: >> implement colo checkpoint protocol. >> >> Checkpoint synchronzing points. >> >> Primary Secondary >> NEW @ >> Suspend >> SUSPENDED @ >> Suspend&Save state >> SEND @ >> Send state Receive state >> RECEIVED @ >> Flush network Load state >> LOADED @ >> Resume Resume >> >> Start Comparing >> NOTE: >> 1) '@' who sends the message >> 2) Every sync-point is synchronized by two sides with only >> one handshake(single direction) for low-latency. >> If more strict synchronization is required, a opposite direction >> sync-point should be added. >> 3) Since sync-points are single direction, the remote side may >> go forward a lot when this side just receives the sync-point. >> >> Signed-off-by: Yang Hongyang <yanghy@cn.fujitsu.com> >> --- >> migration-colo.c | 268 +++++++++++++++++++++++++++++++++++++++++++++++++++++-- >> 1 file changed, 262 insertions(+), 6 deletions(-) >> >> diff --git a/migration-colo.c b/migration-colo.c >> index 2699e77..a708872 100644 >> --- a/migration-colo.c >> +++ b/migration-colo.c >> @@ -24,6 +24,41 @@ >> */ >> #define CHKPOINT_TIMER 10000 >> >> +enum { >> + COLO_READY = 0x46, >> + >> + /* >> + * Checkpoint synchronzing points. >> + * >> + * Primary Secondary >> + * NEW @ >> + * Suspend >> + * SUSPENDED @ >> + * Suspend&Save state >> + * SEND @ >> + * Send state Receive state >> + * RECEIVED @ >> + * Flush network Load state >> + * LOADED @ >> + * Resume Resume >> + * >> + * Start Comparing >> + * NOTE: >> + * 1) '@' who sends the message >> + * 2) Every sync-point is synchronized by two sides with only >> + * one handshake(single direction) for low-latency. >> + * If more strict synchronization is required, a opposite direction >> + * sync-point should be added. >> + * 3) Since sync-points are single direction, the remote side may >> + * go forward a lot when this side just receives the sync-point. >> + */ >> + COLO_CHECKPOINT_NEW, >> + COLO_CHECKPOINT_SUSPENDED, >> + COLO_CHECKPOINT_SEND, >> + COLO_CHECKPOINT_RECEIVED, >> + COLO_CHECKPOINT_LOADED, >> +}; >> + >> static QEMUBH *colo_bh; >> >> bool colo_supported(void) >> @@ -185,30 +220,161 @@ static const QEMUFileOps colo_read_ops = { >> .close = colo_close, >> }; >> >> +/* colo checkpoint control helper */ >> +static bool is_master(void); >> +static bool is_slave(void); >> + >> +static void ctl_error_handler(void *opaque, int err) >> +{ >> + if (is_slave()) { >> + /* TODO: determine whether we need to failover */ >> + /* FIXME: we will not failover currently, just kill slave */ >> + error_report("error: colo transmission failed!\n"); >> + exit(1); >> + } else if (is_master()) { >> + /* Master still alive, do not failover */ >> + error_report("error: colo transmission failed!\n"); >> + return; >> + } else { >> + error_report("COLO: Unexpected error happend!\n"); >> + exit(EXIT_FAILURE); >> + } >> +} >> + >> +static int colo_ctl_put(QEMUFile *f, uint64_t request) >> +{ >> + int ret = 0; >> + >> + qemu_put_be64(f, request); >> + qemu_fflush(f); >> + >> + ret = qemu_file_get_error(f); >> + if (ret < 0) { >> + ctl_error_handler(f, ret); >> + return 1; >> + } >> + >> + return ret; >> +} >> + >> +static int colo_ctl_get_value(QEMUFile *f, uint64_t *value) >> +{ >> + int ret = 0; >> + uint64_t temp; >> + >> + temp = qemu_get_be64(f); >> + >> + ret = qemu_file_get_error(f); >> + if (ret < 0) { >> + ctl_error_handler(f, ret); >> + return 1; >> + } >> + >> + *value = temp; >> + return 0; >> +} >> + >> +static int colo_ctl_get(QEMUFile *f, uint64_t require) >> +{ >> + int ret; >> + uint64_t value; >> + >> + ret = colo_ctl_get_value(f, &value); >> + if (ret) { >> + return ret; >> + } >> + >> + if (value != require) { >> + error_report("unexpected state received!\n"); > > I find it useful to print the expected/received state to > be able to figure out what went wrong. Good idea! > >> + exit(1); >> + } >> + >> + return ret; >> +} >> + >> /* save */ >> >> -static __attribute__((unused)) bool is_master(void) >> +static bool is_master(void) >> { >> MigrationState *s = migrate_get_current(); >> return (s->state == MIG_STATE_COLO); >> } >> >> +static int do_colo_transaction(MigrationState *s, QEMUFile *control, >> + QEMUFile *trans) >> +{ >> + int ret; >> + >> + ret = colo_ctl_put(s->file, COLO_CHECKPOINT_NEW); >> + if (ret) { >> + goto out; >> + } >> + >> + ret = colo_ctl_get(control, COLO_CHECKPOINT_SUSPENDED); > > What happens at this point if the slave just doesn't respond? > (i.e. the socket doesn't drop - you just don't get the byte). If the socket return bytes that were not expected, exit. If socket return error, do some cleanup and quit COLO process. refer to: colo_ctl_get() and colo_ctl_get_value() > >> + if (ret) { >> + goto out; >> + } >> + >> + /* TODO: suspend and save vm state to colo buffer */ >> + >> + ret = colo_ctl_put(s->file, COLO_CHECKPOINT_SEND); >> + if (ret) { >> + goto out; >> + } >> + >> + /* TODO: send vmstate to slave */ >> + >> + ret = colo_ctl_get(control, COLO_CHECKPOINT_RECEIVED); >> + if (ret) { >> + goto out; >> + } >> + >> + /* TODO: Flush network etc. */ >> + >> + ret = colo_ctl_get(control, COLO_CHECKPOINT_LOADED); >> + if (ret) { >> + goto out; >> + } >> + >> + /* TODO: resume master */ >> + >> +out: >> + return ret; >> +} >> + >> static void *colo_thread(void *opaque) >> { >> MigrationState *s = opaque; >> int dev_hotplug = qdev_hotplug, wait_cp = 0; >> int64_t start_time = qemu_clock_get_ms(QEMU_CLOCK_HOST); >> int64_t current_time; >> + QEMUFile *colo_control = NULL, *colo_trans = NULL; >> + int ret; >> >> if (colo_compare_init() < 0) { >> error_report("Init colo compare error\n"); >> goto out; >> } >> >> + colo_control = qemu_fopen_socket(qemu_get_fd(s->file), "rb"); >> + if (!colo_control) { >> + error_report("open colo_control failed\n"); >> + goto out; >> + } > > In my postcopy world I'm trying to abstract this type of thing into a 'return path' > so that the QEMUFile can implement it however it wants and you don't > need to assume it's a socket. But I'm still fighting some of those details. > > Dave > >> + >> qdev_hotplug = 0; >> >> colo_buffer_init(); >> >> + /* >> + * Wait for slave finish loading vm states and enter COLO >> + * restore. >> + */ >> + ret = colo_ctl_get(colo_control, COLO_READY); >> + if (ret) { >> + goto out; >> + } >> + >> while (s->state == MIG_STATE_COLO) { >> /* wait for a colo checkpoint */ >> wait_cp = colo_compare(); >> @@ -230,13 +396,33 @@ static void *colo_thread(void *opaque) >> >> /* start a colo checkpoint */ >> >> - /*TODO: COLO save */ >> + /* open colo buffer for write */ >> + colo_trans = qemu_fopen_ops(&colo_buffer, &colo_write_ops); >> + if (!colo_trans) { >> + error_report("open colo buffer failed\n"); >> + goto out; >> + } >> >> + if (do_colo_transaction(s, colo_control, colo_trans)) { >> + goto out; >> + } >> + >> + qemu_fclose(colo_trans); >> + colo_trans = NULL; >> start_time = qemu_clock_get_ms(QEMU_CLOCK_HOST); >> } >> >> out: >> + if (colo_trans) { >> + qemu_fclose(colo_trans); >> + } >> + >> colo_buffer_destroy(); >> + >> + if (colo_control) { >> + qemu_fclose(colo_control); >> + } >> + >> colo_compare_destroy(); >> >> if (s->state != MIG_STATE_ERROR) { >> @@ -281,7 +467,7 @@ void colo_init_checkpointer(MigrationState *s) >> >> static Coroutine *colo; >> >> -static __attribute__((unused)) bool is_slave(void) >> +static bool is_slave(void) >> { >> return colo != NULL; >> } >> @@ -293,13 +479,32 @@ static __attribute__((unused)) bool is_slave(void) >> */ >> static int slave_wait_new_checkpoint(QEMUFile *f) >> { >> - /* TODO: wait checkpoint start command from master */ >> - return 1; >> + int fd = qemu_get_fd(f); >> + int ret; >> + uint64_t cmd; >> + >> + yield_until_fd_readable(fd); >> + >> + ret = colo_ctl_get_value(f, &cmd); >> + if (ret) { >> + return 1; >> + } >> + >> + if (cmd == COLO_CHECKPOINT_NEW) { >> + return 0; >> + } else { >> + /* Unexpected data received */ >> + ctl_error_handler(f, ret); >> + return 1; >> + } >> } >> >> void colo_process_incoming_checkpoints(QEMUFile *f) >> { >> + int fd = qemu_get_fd(f); >> int dev_hotplug = qdev_hotplug; >> + QEMUFile *ctl = NULL; >> + int ret; >> >> if (!restore_use_colo()) { >> return; >> @@ -310,18 +515,69 @@ void colo_process_incoming_checkpoints(QEMUFile *f) >> colo = qemu_coroutine_self(); >> assert(colo != NULL); >> >> + ctl = qemu_fopen_socket(fd, "wb"); >> + if (!ctl) { >> + error_report("can't open incoming channel\n"); >> + goto out; >> + } >> + >> colo_buffer_init(); >> >> + ret = colo_ctl_put(ctl, COLO_READY); >> + if (ret) { >> + goto out; >> + } >> + >> + /* TODO: in COLO mode, slave is runing, so start the vm */ >> + >> while (true) { >> if (slave_wait_new_checkpoint(f)) { >> break; >> } >> >> - /* TODO: COLO restore */ >> + /* start colo checkpoint */ >> + >> + /* TODO: suspend guest */ >> + >> + ret = colo_ctl_put(ctl, COLO_CHECKPOINT_SUSPENDED); >> + if (ret) { >> + goto out; >> + } >> + >> + /* TODO: open colo buffer for read */ >> + >> + ret = colo_ctl_get(f, COLO_CHECKPOINT_SEND); >> + if (ret) { >> + goto out; >> + } >> + >> + /* TODO: read migration data into colo buffer */ >> + >> + ret = colo_ctl_put(ctl, COLO_CHECKPOINT_RECEIVED); >> + if (ret) { >> + goto out; >> + } >> + >> + /* TODO: load vm state */ >> + >> + ret = colo_ctl_put(ctl, COLO_CHECKPOINT_LOADED); >> + if (ret) { >> + goto out; >> + } >> + >> + /* TODO: resume guest */ >> + >> + /* TODO: close colo buffer */ >> } >> >> +out: >> colo_buffer_destroy(); >> colo = NULL; >> + >> + if (ctl) { >> + qemu_fclose(ctl); >> + } >> + >> restore_exit_colo(); >> >> qdev_hotplug = dev_hotplug; >> -- >> 1.9.1 >> > -- > Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK > . >
* Hongyang Yang (yanghy@cn.fujitsu.com) wrote: > > > ??? 08/01/2014 11:03 PM, Dr. David Alan Gilbert ??????: > >* Yang Hongyang (yanghy@cn.fujitsu.com) wrote: <snip> > >>+static int do_colo_transaction(MigrationState *s, QEMUFile *control, > >>+ QEMUFile *trans) > >>+{ > >>+ int ret; > >>+ > >>+ ret = colo_ctl_put(s->file, COLO_CHECKPOINT_NEW); > >>+ if (ret) { > >>+ goto out; > >>+ } > >>+ > >>+ ret = colo_ctl_get(control, COLO_CHECKPOINT_SUSPENDED); > > > >What happens at this point if the slave just doesn't respond? > >(i.e. the socket doesn't drop - you just don't get the byte). > > If the socket return bytes that were not expected, exit. If > socket return error, do some cleanup and quit COLO process. > refer to: colo_ctl_get() and colo_ctl_get_value() But what happens if the slave just doesn't respond at all; e.g. if the slave host loses power, it'll take a while (many seconds) before the socket will timeout. Dave -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
在 09/12/2014 07:17 PM, Dr. David Alan Gilbert 写道: > * Hongyang Yang (yanghy@cn.fujitsu.com) wrote: >> >> >> ??? 08/01/2014 11:03 PM, Dr. David Alan Gilbert ??????: >>> * Yang Hongyang (yanghy@cn.fujitsu.com) wrote: > > <snip> > >>>> +static int do_colo_transaction(MigrationState *s, QEMUFile *control, >>>> + QEMUFile *trans) >>>> +{ >>>> + int ret; >>>> + >>>> + ret = colo_ctl_put(s->file, COLO_CHECKPOINT_NEW); >>>> + if (ret) { >>>> + goto out; >>>> + } >>>> + >>>> + ret = colo_ctl_get(control, COLO_CHECKPOINT_SUSPENDED); >>> >>> What happens at this point if the slave just doesn't respond? >>> (i.e. the socket doesn't drop - you just don't get the byte). >> >> If the socket return bytes that were not expected, exit. If >> socket return error, do some cleanup and quit COLO process. >> refer to: colo_ctl_get() and colo_ctl_get_value() > > But what happens if the slave just doesn't respond at all; e.g. > if the slave host loses power, it'll take a while (many seconds) > before the socket will timeout. It will wait until the call returns timeout error, and then do some cleanup and quit COLO process. There may be better way to handle this? > > Dave > -- > Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK > . >
* Hongyang Yang (yanghy@cn.fujitsu.com) wrote: > > > ??? 09/12/2014 07:17 PM, Dr. David Alan Gilbert ??????: > >* Hongyang Yang (yanghy@cn.fujitsu.com) wrote: > >> > >> > >>??? 08/01/2014 11:03 PM, Dr. David Alan Gilbert ??????: > >>>* Yang Hongyang (yanghy@cn.fujitsu.com) wrote: > > > ><snip> > > > >>>>+static int do_colo_transaction(MigrationState *s, QEMUFile *control, > >>>>+ QEMUFile *trans) > >>>>+{ > >>>>+ int ret; > >>>>+ > >>>>+ ret = colo_ctl_put(s->file, COLO_CHECKPOINT_NEW); > >>>>+ if (ret) { > >>>>+ goto out; > >>>>+ } > >>>>+ > >>>>+ ret = colo_ctl_get(control, COLO_CHECKPOINT_SUSPENDED); > >>> > >>>What happens at this point if the slave just doesn't respond? > >>>(i.e. the socket doesn't drop - you just don't get the byte). > >> > >>If the socket return bytes that were not expected, exit. If > >>socket return error, do some cleanup and quit COLO process. > >>refer to: colo_ctl_get() and colo_ctl_get_value() > > > >But what happens if the slave just doesn't respond at all; e.g. > >if the slave host loses power, it'll take a while (many seconds) > >before the socket will timeout. > > It will wait until the call returns timeout error, and then do some > cleanup and quit COLO process. If it was to wait here for ~30seconds for the timeout what would happen to the primary? Would it be stopped from sending any network traffic for those 30 seconds - I think that's too long to fail over. > There may be better way to handle this? In postcopy I always take reads coming back from the destination in a separate thread, because that thread can't block the main thread going out (I originally did that using async reads but the thread is nicer). You could also use something like a poll() with a shorter timeout to however long you are happy for COLO to go before it fails. Dave -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
diff --git a/migration-colo.c b/migration-colo.c index 2699e77..a708872 100644 --- a/migration-colo.c +++ b/migration-colo.c @@ -24,6 +24,41 @@ */ #define CHKPOINT_TIMER 10000 +enum { + COLO_READY = 0x46, + + /* + * Checkpoint synchronzing points. + * + * Primary Secondary + * NEW @ + * Suspend + * SUSPENDED @ + * Suspend&Save state + * SEND @ + * Send state Receive state + * RECEIVED @ + * Flush network Load state + * LOADED @ + * Resume Resume + * + * Start Comparing + * NOTE: + * 1) '@' who sends the message + * 2) Every sync-point is synchronized by two sides with only + * one handshake(single direction) for low-latency. + * If more strict synchronization is required, a opposite direction + * sync-point should be added. + * 3) Since sync-points are single direction, the remote side may + * go forward a lot when this side just receives the sync-point. + */ + COLO_CHECKPOINT_NEW, + COLO_CHECKPOINT_SUSPENDED, + COLO_CHECKPOINT_SEND, + COLO_CHECKPOINT_RECEIVED, + COLO_CHECKPOINT_LOADED, +}; + static QEMUBH *colo_bh; bool colo_supported(void) @@ -185,30 +220,161 @@ static const QEMUFileOps colo_read_ops = { .close = colo_close, }; +/* colo checkpoint control helper */ +static bool is_master(void); +static bool is_slave(void); + +static void ctl_error_handler(void *opaque, int err) +{ + if (is_slave()) { + /* TODO: determine whether we need to failover */ + /* FIXME: we will not failover currently, just kill slave */ + error_report("error: colo transmission failed!\n"); + exit(1); + } else if (is_master()) { + /* Master still alive, do not failover */ + error_report("error: colo transmission failed!\n"); + return; + } else { + error_report("COLO: Unexpected error happend!\n"); + exit(EXIT_FAILURE); + } +} + +static int colo_ctl_put(QEMUFile *f, uint64_t request) +{ + int ret = 0; + + qemu_put_be64(f, request); + qemu_fflush(f); + + ret = qemu_file_get_error(f); + if (ret < 0) { + ctl_error_handler(f, ret); + return 1; + } + + return ret; +} + +static int colo_ctl_get_value(QEMUFile *f, uint64_t *value) +{ + int ret = 0; + uint64_t temp; + + temp = qemu_get_be64(f); + + ret = qemu_file_get_error(f); + if (ret < 0) { + ctl_error_handler(f, ret); + return 1; + } + + *value = temp; + return 0; +} + +static int colo_ctl_get(QEMUFile *f, uint64_t require) +{ + int ret; + uint64_t value; + + ret = colo_ctl_get_value(f, &value); + if (ret) { + return ret; + } + + if (value != require) { + error_report("unexpected state received!\n"); + exit(1); + } + + return ret; +} + /* save */ -static __attribute__((unused)) bool is_master(void) +static bool is_master(void) { MigrationState *s = migrate_get_current(); return (s->state == MIG_STATE_COLO); } +static int do_colo_transaction(MigrationState *s, QEMUFile *control, + QEMUFile *trans) +{ + int ret; + + ret = colo_ctl_put(s->file, COLO_CHECKPOINT_NEW); + if (ret) { + goto out; + } + + ret = colo_ctl_get(control, COLO_CHECKPOINT_SUSPENDED); + if (ret) { + goto out; + } + + /* TODO: suspend and save vm state to colo buffer */ + + ret = colo_ctl_put(s->file, COLO_CHECKPOINT_SEND); + if (ret) { + goto out; + } + + /* TODO: send vmstate to slave */ + + ret = colo_ctl_get(control, COLO_CHECKPOINT_RECEIVED); + if (ret) { + goto out; + } + + /* TODO: Flush network etc. */ + + ret = colo_ctl_get(control, COLO_CHECKPOINT_LOADED); + if (ret) { + goto out; + } + + /* TODO: resume master */ + +out: + return ret; +} + static void *colo_thread(void *opaque) { MigrationState *s = opaque; int dev_hotplug = qdev_hotplug, wait_cp = 0; int64_t start_time = qemu_clock_get_ms(QEMU_CLOCK_HOST); int64_t current_time; + QEMUFile *colo_control = NULL, *colo_trans = NULL; + int ret; if (colo_compare_init() < 0) { error_report("Init colo compare error\n"); goto out; } + colo_control = qemu_fopen_socket(qemu_get_fd(s->file), "rb"); + if (!colo_control) { + error_report("open colo_control failed\n"); + goto out; + } + qdev_hotplug = 0; colo_buffer_init(); + /* + * Wait for slave finish loading vm states and enter COLO + * restore. + */ + ret = colo_ctl_get(colo_control, COLO_READY); + if (ret) { + goto out; + } + while (s->state == MIG_STATE_COLO) { /* wait for a colo checkpoint */ wait_cp = colo_compare(); @@ -230,13 +396,33 @@ static void *colo_thread(void *opaque) /* start a colo checkpoint */ - /*TODO: COLO save */ + /* open colo buffer for write */ + colo_trans = qemu_fopen_ops(&colo_buffer, &colo_write_ops); + if (!colo_trans) { + error_report("open colo buffer failed\n"); + goto out; + } + if (do_colo_transaction(s, colo_control, colo_trans)) { + goto out; + } + + qemu_fclose(colo_trans); + colo_trans = NULL; start_time = qemu_clock_get_ms(QEMU_CLOCK_HOST); } out: + if (colo_trans) { + qemu_fclose(colo_trans); + } + colo_buffer_destroy(); + + if (colo_control) { + qemu_fclose(colo_control); + } + colo_compare_destroy(); if (s->state != MIG_STATE_ERROR) { @@ -281,7 +467,7 @@ void colo_init_checkpointer(MigrationState *s) static Coroutine *colo; -static __attribute__((unused)) bool is_slave(void) +static bool is_slave(void) { return colo != NULL; } @@ -293,13 +479,32 @@ static __attribute__((unused)) bool is_slave(void) */ static int slave_wait_new_checkpoint(QEMUFile *f) { - /* TODO: wait checkpoint start command from master */ - return 1; + int fd = qemu_get_fd(f); + int ret; + uint64_t cmd; + + yield_until_fd_readable(fd); + + ret = colo_ctl_get_value(f, &cmd); + if (ret) { + return 1; + } + + if (cmd == COLO_CHECKPOINT_NEW) { + return 0; + } else { + /* Unexpected data received */ + ctl_error_handler(f, ret); + return 1; + } } void colo_process_incoming_checkpoints(QEMUFile *f) { + int fd = qemu_get_fd(f); int dev_hotplug = qdev_hotplug; + QEMUFile *ctl = NULL; + int ret; if (!restore_use_colo()) { return; @@ -310,18 +515,69 @@ void colo_process_incoming_checkpoints(QEMUFile *f) colo = qemu_coroutine_self(); assert(colo != NULL); + ctl = qemu_fopen_socket(fd, "wb"); + if (!ctl) { + error_report("can't open incoming channel\n"); + goto out; + } + colo_buffer_init(); + ret = colo_ctl_put(ctl, COLO_READY); + if (ret) { + goto out; + } + + /* TODO: in COLO mode, slave is runing, so start the vm */ + while (true) { if (slave_wait_new_checkpoint(f)) { break; } - /* TODO: COLO restore */ + /* start colo checkpoint */ + + /* TODO: suspend guest */ + + ret = colo_ctl_put(ctl, COLO_CHECKPOINT_SUSPENDED); + if (ret) { + goto out; + } + + /* TODO: open colo buffer for read */ + + ret = colo_ctl_get(f, COLO_CHECKPOINT_SEND); + if (ret) { + goto out; + } + + /* TODO: read migration data into colo buffer */ + + ret = colo_ctl_put(ctl, COLO_CHECKPOINT_RECEIVED); + if (ret) { + goto out; + } + + /* TODO: load vm state */ + + ret = colo_ctl_put(ctl, COLO_CHECKPOINT_LOADED); + if (ret) { + goto out; + } + + /* TODO: resume guest */ + + /* TODO: close colo buffer */ } +out: colo_buffer_destroy(); colo = NULL; + + if (ctl) { + qemu_fclose(ctl); + } + restore_exit_colo(); qdev_hotplug = dev_hotplug;
implement colo checkpoint protocol. Checkpoint synchronzing points. Primary Secondary NEW @ Suspend SUSPENDED @ Suspend&Save state SEND @ Send state Receive state RECEIVED @ Flush network Load state LOADED @ Resume Resume Start Comparing NOTE: 1) '@' who sends the message 2) Every sync-point is synchronized by two sides with only one handshake(single direction) for low-latency. If more strict synchronization is required, a opposite direction sync-point should be added. 3) Since sync-points are single direction, the remote side may go forward a lot when this side just receives the sync-point. Signed-off-by: Yang Hongyang <yanghy@cn.fujitsu.com> --- migration-colo.c | 268 +++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 262 insertions(+), 6 deletions(-)