diff mbox

[RFC,11/17] COLO ctl: implement colo checkpoint protocol

Message ID 1406125538-27992-12-git-send-email-yanghy@cn.fujitsu.com
State New
Headers show

Commit Message

Yang Hongyang July 23, 2014, 2:25 p.m. UTC
implement colo checkpoint protocol.

Checkpoint synchronzing points.

                  Primary                 Secondary
  NEW             @
                                          Suspend
  SUSPENDED                               @
                  Suspend&Save state
  SEND            @
                  Send state              Receive state
  RECEIVED                                @
                  Flush network           Load state
  LOADED                                  @
                  Resume                  Resume

                  Start Comparing
NOTE:
 1) '@' who sends the message
 2) Every sync-point is synchronized by two sides with only
    one handshake(single direction) for low-latency.
    If more strict synchronization is required, a opposite direction
    sync-point should be added.
 3) Since sync-points are single direction, the remote side may
    go forward a lot when this side just receives the sync-point.

Signed-off-by: Yang Hongyang <yanghy@cn.fujitsu.com>
---
 migration-colo.c | 268 +++++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 262 insertions(+), 6 deletions(-)

Comments

Dr. David Alan Gilbert Aug. 1, 2014, 3:03 p.m. UTC | #1
* Yang Hongyang (yanghy@cn.fujitsu.com) wrote:
> implement colo checkpoint protocol.
> 
> Checkpoint synchronzing points.
> 
>                   Primary                 Secondary
>   NEW             @
>                                           Suspend
>   SUSPENDED                               @
>                   Suspend&Save state
>   SEND            @
>                   Send state              Receive state
>   RECEIVED                                @
>                   Flush network           Load state
>   LOADED                                  @
>                   Resume                  Resume
> 
>                   Start Comparing
> NOTE:
>  1) '@' who sends the message
>  2) Every sync-point is synchronized by two sides with only
>     one handshake(single direction) for low-latency.
>     If more strict synchronization is required, a opposite direction
>     sync-point should be added.
>  3) Since sync-points are single direction, the remote side may
>     go forward a lot when this side just receives the sync-point.
> 
> Signed-off-by: Yang Hongyang <yanghy@cn.fujitsu.com>
> ---
>  migration-colo.c | 268 +++++++++++++++++++++++++++++++++++++++++++++++++++++--
>  1 file changed, 262 insertions(+), 6 deletions(-)
> 
> diff --git a/migration-colo.c b/migration-colo.c
> index 2699e77..a708872 100644
> --- a/migration-colo.c
> +++ b/migration-colo.c
> @@ -24,6 +24,41 @@
>   */
>  #define CHKPOINT_TIMER 10000
>  
> +enum {
> +    COLO_READY = 0x46,
> +
> +    /*
> +     * Checkpoint synchronzing points.
> +     *
> +     *                  Primary                 Secondary
> +     *  NEW             @
> +     *                                          Suspend
> +     *  SUSPENDED                               @
> +     *                  Suspend&Save state
> +     *  SEND            @
> +     *                  Send state              Receive state
> +     *  RECEIVED                                @
> +     *                  Flush network           Load state
> +     *  LOADED                                  @
> +     *                  Resume                  Resume
> +     *
> +     *                  Start Comparing
> +     * NOTE:
> +     * 1) '@' who sends the message
> +     * 2) Every sync-point is synchronized by two sides with only
> +     *    one handshake(single direction) for low-latency.
> +     *    If more strict synchronization is required, a opposite direction
> +     *    sync-point should be added.
> +     * 3) Since sync-points are single direction, the remote side may
> +     *    go forward a lot when this side just receives the sync-point.
> +     */
> +    COLO_CHECKPOINT_NEW,
> +    COLO_CHECKPOINT_SUSPENDED,
> +    COLO_CHECKPOINT_SEND,
> +    COLO_CHECKPOINT_RECEIVED,
> +    COLO_CHECKPOINT_LOADED,
> +};
> +
>  static QEMUBH *colo_bh;
>  
>  bool colo_supported(void)
> @@ -185,30 +220,161 @@ static const QEMUFileOps colo_read_ops = {
>      .close = colo_close,
>  };
>  
> +/* colo checkpoint control helper */
> +static bool is_master(void);
> +static bool is_slave(void);
> +
> +static void ctl_error_handler(void *opaque, int err)
> +{
> +    if (is_slave()) {
> +        /* TODO: determine whether we need to failover */
> +        /* FIXME: we will not failover currently, just kill slave */
> +        error_report("error: colo transmission failed!\n");
> +        exit(1);
> +    } else if (is_master()) {
> +        /* Master still alive, do not failover */
> +        error_report("error: colo transmission failed!\n");
> +        return;
> +    } else {
> +        error_report("COLO: Unexpected error happend!\n");
> +        exit(EXIT_FAILURE);
> +    }
> +}
> +
> +static int colo_ctl_put(QEMUFile *f, uint64_t request)
> +{
> +    int ret = 0;
> +
> +    qemu_put_be64(f, request);
> +    qemu_fflush(f);
> +
> +    ret = qemu_file_get_error(f);
> +    if (ret < 0) {
> +        ctl_error_handler(f, ret);
> +        return 1;
> +    }
> +
> +    return ret;
> +}
> +
> +static int colo_ctl_get_value(QEMUFile *f, uint64_t *value)
> +{
> +    int ret = 0;
> +    uint64_t temp;
> +
> +    temp = qemu_get_be64(f);
> +
> +    ret = qemu_file_get_error(f);
> +    if (ret < 0) {
> +        ctl_error_handler(f, ret);
> +        return 1;
> +    }
> +
> +    *value = temp;
> +    return 0;
> +}
> +
> +static int colo_ctl_get(QEMUFile *f, uint64_t require)
> +{
> +    int ret;
> +    uint64_t value;
> +
> +    ret = colo_ctl_get_value(f, &value);
> +    if (ret) {
> +        return ret;
> +    }
> +
> +    if (value != require) {
> +        error_report("unexpected state received!\n");

I find it useful to print the expected/received state to
be able to figure out what went wrong.

> +        exit(1);
> +    }
> +
> +    return ret;
> +}
> +
>  /* save */
>  
> -static __attribute__((unused)) bool is_master(void)
> +static bool is_master(void)
>  {
>      MigrationState *s = migrate_get_current();
>      return (s->state == MIG_STATE_COLO);
>  }
>  
> +static int do_colo_transaction(MigrationState *s, QEMUFile *control,
> +                               QEMUFile *trans)
> +{
> +    int ret;
> +
> +    ret = colo_ctl_put(s->file, COLO_CHECKPOINT_NEW);
> +    if (ret) {
> +        goto out;
> +    }
> +
> +    ret = colo_ctl_get(control, COLO_CHECKPOINT_SUSPENDED);

What happens at this point if the slave just doesn't respond?
(i.e. the socket doesn't drop - you just don't get the byte).

> +    if (ret) {
> +        goto out;
> +    }
> +
> +    /* TODO: suspend and save vm state to colo buffer */
> +
> +    ret = colo_ctl_put(s->file, COLO_CHECKPOINT_SEND);
> +    if (ret) {
> +        goto out;
> +    }
> +
> +    /* TODO: send vmstate to slave */
> +
> +    ret = colo_ctl_get(control, COLO_CHECKPOINT_RECEIVED);
> +    if (ret) {
> +        goto out;
> +    }
> +
> +    /* TODO: Flush network etc. */
> +
> +    ret = colo_ctl_get(control, COLO_CHECKPOINT_LOADED);
> +    if (ret) {
> +        goto out;
> +    }
> +
> +    /* TODO: resume master */
> +
> +out:
> +    return ret;
> +}
> +
>  static void *colo_thread(void *opaque)
>  {
>      MigrationState *s = opaque;
>      int dev_hotplug = qdev_hotplug, wait_cp = 0;
>      int64_t start_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
>      int64_t current_time;
> +    QEMUFile *colo_control = NULL, *colo_trans = NULL;
> +    int ret;
>  
>      if (colo_compare_init() < 0) {
>          error_report("Init colo compare error\n");
>          goto out;
>      }
>  
> +    colo_control = qemu_fopen_socket(qemu_get_fd(s->file), "rb");
> +    if (!colo_control) {
> +        error_report("open colo_control failed\n");
> +        goto out;
> +    }

In my postcopy world I'm trying to abstract this type of thing into a 'return path'
so that the QEMUFile can implement it however it wants and you don't
need to assume it's a socket.  But I'm still fighting some of those details.

Dave

> +
>      qdev_hotplug = 0;
>  
>      colo_buffer_init();
>  
> +    /*
> +     * Wait for slave finish loading vm states and enter COLO
> +     * restore.
> +     */
> +    ret = colo_ctl_get(colo_control, COLO_READY);
> +    if (ret) {
> +        goto out;
> +    }
> +
>      while (s->state == MIG_STATE_COLO) {
>          /* wait for a colo checkpoint */
>          wait_cp = colo_compare();
> @@ -230,13 +396,33 @@ static void *colo_thread(void *opaque)
>  
>          /* start a colo checkpoint */
>  
> -        /*TODO: COLO save */
> +        /* open colo buffer for write */
> +        colo_trans = qemu_fopen_ops(&colo_buffer, &colo_write_ops);
> +        if (!colo_trans) {
> +            error_report("open colo buffer failed\n");
> +            goto out;
> +        }
>  
> +        if (do_colo_transaction(s, colo_control, colo_trans)) {
> +            goto out;
> +        }
> +
> +        qemu_fclose(colo_trans);
> +        colo_trans = NULL;
>          start_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
>      }
>  
>  out:
> +    if (colo_trans) {
> +        qemu_fclose(colo_trans);
> +    }
> +
>      colo_buffer_destroy();
> +
> +    if (colo_control) {
> +        qemu_fclose(colo_control);
> +    }
> +
>      colo_compare_destroy();
>  
>      if (s->state != MIG_STATE_ERROR) {
> @@ -281,7 +467,7 @@ void colo_init_checkpointer(MigrationState *s)
>  
>  static Coroutine *colo;
>  
> -static __attribute__((unused)) bool is_slave(void)
> +static bool is_slave(void)
>  {
>      return colo != NULL;
>  }
> @@ -293,13 +479,32 @@ static __attribute__((unused)) bool is_slave(void)
>   */
>  static int slave_wait_new_checkpoint(QEMUFile *f)
>  {
> -    /* TODO: wait checkpoint start command from master */
> -    return 1;
> +    int fd = qemu_get_fd(f);
> +    int ret;
> +    uint64_t cmd;
> +
> +    yield_until_fd_readable(fd);
> +
> +    ret = colo_ctl_get_value(f, &cmd);
> +    if (ret) {
> +        return 1;
> +    }
> +
> +    if (cmd == COLO_CHECKPOINT_NEW) {
> +        return 0;
> +    } else {
> +        /* Unexpected data received */
> +        ctl_error_handler(f, ret);
> +        return 1;
> +    }
>  }
>  
>  void colo_process_incoming_checkpoints(QEMUFile *f)
>  {
> +    int fd = qemu_get_fd(f);
>      int dev_hotplug = qdev_hotplug;
> +    QEMUFile *ctl = NULL;
> +    int ret;
>  
>      if (!restore_use_colo()) {
>          return;
> @@ -310,18 +515,69 @@ void colo_process_incoming_checkpoints(QEMUFile *f)
>      colo = qemu_coroutine_self();
>      assert(colo != NULL);
>  
> +    ctl = qemu_fopen_socket(fd, "wb");
> +    if (!ctl) {
> +        error_report("can't open incoming channel\n");
> +        goto out;
> +    }
> +
>      colo_buffer_init();
>  
> +    ret = colo_ctl_put(ctl, COLO_READY);
> +    if (ret) {
> +        goto out;
> +    }
> +
> +    /* TODO: in COLO mode, slave is runing, so start the vm */
> +
>      while (true) {
>          if (slave_wait_new_checkpoint(f)) {
>              break;
>          }
>  
> -        /* TODO: COLO restore */
> +        /* start colo checkpoint */
> +
> +        /* TODO: suspend guest */
> +
> +        ret = colo_ctl_put(ctl, COLO_CHECKPOINT_SUSPENDED);
> +        if (ret) {
> +            goto out;
> +        }
> +
> +        /* TODO: open colo buffer for read */
> +
> +        ret = colo_ctl_get(f, COLO_CHECKPOINT_SEND);
> +        if (ret) {
> +            goto out;
> +        }
> +
> +        /* TODO: read migration data into colo buffer */
> +
> +        ret = colo_ctl_put(ctl, COLO_CHECKPOINT_RECEIVED);
> +        if (ret) {
> +            goto out;
> +        }
> +
> +        /* TODO: load vm state */
> +
> +        ret = colo_ctl_put(ctl, COLO_CHECKPOINT_LOADED);
> +        if (ret) {
> +            goto out;
> +        }
> +
> +        /* TODO: resume guest */
> +
> +        /* TODO: close colo buffer */
>      }
>  
> +out:
>      colo_buffer_destroy();
>      colo = NULL;
> +
> +    if (ctl) {
> +        qemu_fclose(ctl);
> +    }
> +
>      restore_exit_colo();
>  
>      qdev_hotplug = dev_hotplug;
> -- 
> 1.9.1
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Yang Hongyang Sept. 12, 2014, 6:20 a.m. UTC | #2
在 08/01/2014 11:03 PM, Dr. David Alan Gilbert 写道:
> * Yang Hongyang (yanghy@cn.fujitsu.com) wrote:
>> implement colo checkpoint protocol.
>>
>> Checkpoint synchronzing points.
>>
>>                    Primary                 Secondary
>>    NEW             @
>>                                            Suspend
>>    SUSPENDED                               @
>>                    Suspend&Save state
>>    SEND            @
>>                    Send state              Receive state
>>    RECEIVED                                @
>>                    Flush network           Load state
>>    LOADED                                  @
>>                    Resume                  Resume
>>
>>                    Start Comparing
>> NOTE:
>>   1) '@' who sends the message
>>   2) Every sync-point is synchronized by two sides with only
>>      one handshake(single direction) for low-latency.
>>      If more strict synchronization is required, a opposite direction
>>      sync-point should be added.
>>   3) Since sync-points are single direction, the remote side may
>>      go forward a lot when this side just receives the sync-point.
>>
>> Signed-off-by: Yang Hongyang <yanghy@cn.fujitsu.com>
>> ---
>>   migration-colo.c | 268 +++++++++++++++++++++++++++++++++++++++++++++++++++++--
>>   1 file changed, 262 insertions(+), 6 deletions(-)
>>
>> diff --git a/migration-colo.c b/migration-colo.c
>> index 2699e77..a708872 100644
>> --- a/migration-colo.c
>> +++ b/migration-colo.c
>> @@ -24,6 +24,41 @@
>>    */
>>   #define CHKPOINT_TIMER 10000
>>
>> +enum {
>> +    COLO_READY = 0x46,
>> +
>> +    /*
>> +     * Checkpoint synchronzing points.
>> +     *
>> +     *                  Primary                 Secondary
>> +     *  NEW             @
>> +     *                                          Suspend
>> +     *  SUSPENDED                               @
>> +     *                  Suspend&Save state
>> +     *  SEND            @
>> +     *                  Send state              Receive state
>> +     *  RECEIVED                                @
>> +     *                  Flush network           Load state
>> +     *  LOADED                                  @
>> +     *                  Resume                  Resume
>> +     *
>> +     *                  Start Comparing
>> +     * NOTE:
>> +     * 1) '@' who sends the message
>> +     * 2) Every sync-point is synchronized by two sides with only
>> +     *    one handshake(single direction) for low-latency.
>> +     *    If more strict synchronization is required, a opposite direction
>> +     *    sync-point should be added.
>> +     * 3) Since sync-points are single direction, the remote side may
>> +     *    go forward a lot when this side just receives the sync-point.
>> +     */
>> +    COLO_CHECKPOINT_NEW,
>> +    COLO_CHECKPOINT_SUSPENDED,
>> +    COLO_CHECKPOINT_SEND,
>> +    COLO_CHECKPOINT_RECEIVED,
>> +    COLO_CHECKPOINT_LOADED,
>> +};
>> +
>>   static QEMUBH *colo_bh;
>>
>>   bool colo_supported(void)
>> @@ -185,30 +220,161 @@ static const QEMUFileOps colo_read_ops = {
>>       .close = colo_close,
>>   };
>>
>> +/* colo checkpoint control helper */
>> +static bool is_master(void);
>> +static bool is_slave(void);
>> +
>> +static void ctl_error_handler(void *opaque, int err)
>> +{
>> +    if (is_slave()) {
>> +        /* TODO: determine whether we need to failover */
>> +        /* FIXME: we will not failover currently, just kill slave */
>> +        error_report("error: colo transmission failed!\n");
>> +        exit(1);
>> +    } else if (is_master()) {
>> +        /* Master still alive, do not failover */
>> +        error_report("error: colo transmission failed!\n");
>> +        return;
>> +    } else {
>> +        error_report("COLO: Unexpected error happend!\n");
>> +        exit(EXIT_FAILURE);
>> +    }
>> +}
>> +
>> +static int colo_ctl_put(QEMUFile *f, uint64_t request)
>> +{
>> +    int ret = 0;
>> +
>> +    qemu_put_be64(f, request);
>> +    qemu_fflush(f);
>> +
>> +    ret = qemu_file_get_error(f);
>> +    if (ret < 0) {
>> +        ctl_error_handler(f, ret);
>> +        return 1;
>> +    }
>> +
>> +    return ret;
>> +}
>> +
>> +static int colo_ctl_get_value(QEMUFile *f, uint64_t *value)
>> +{
>> +    int ret = 0;
>> +    uint64_t temp;
>> +
>> +    temp = qemu_get_be64(f);
>> +
>> +    ret = qemu_file_get_error(f);
>> +    if (ret < 0) {
>> +        ctl_error_handler(f, ret);
>> +        return 1;
>> +    }
>> +
>> +    *value = temp;
>> +    return 0;
>> +}
>> +
>> +static int colo_ctl_get(QEMUFile *f, uint64_t require)
>> +{
>> +    int ret;
>> +    uint64_t value;
>> +
>> +    ret = colo_ctl_get_value(f, &value);
>> +    if (ret) {
>> +        return ret;
>> +    }
>> +
>> +    if (value != require) {
>> +        error_report("unexpected state received!\n");
>
> I find it useful to print the expected/received state to
> be able to figure out what went wrong.

Good idea!

>
>> +        exit(1);
>> +    }
>> +
>> +    return ret;
>> +}
>> +
>>   /* save */
>>
>> -static __attribute__((unused)) bool is_master(void)
>> +static bool is_master(void)
>>   {
>>       MigrationState *s = migrate_get_current();
>>       return (s->state == MIG_STATE_COLO);
>>   }
>>
>> +static int do_colo_transaction(MigrationState *s, QEMUFile *control,
>> +                               QEMUFile *trans)
>> +{
>> +    int ret;
>> +
>> +    ret = colo_ctl_put(s->file, COLO_CHECKPOINT_NEW);
>> +    if (ret) {
>> +        goto out;
>> +    }
>> +
>> +    ret = colo_ctl_get(control, COLO_CHECKPOINT_SUSPENDED);
>
> What happens at this point if the slave just doesn't respond?
> (i.e. the socket doesn't drop - you just don't get the byte).

If the socket return bytes that were not expected, exit. If
socket return error, do some cleanup and quit COLO process.
refer to: colo_ctl_get() and colo_ctl_get_value()

>
>> +    if (ret) {
>> +        goto out;
>> +    }
>> +
>> +    /* TODO: suspend and save vm state to colo buffer */
>> +
>> +    ret = colo_ctl_put(s->file, COLO_CHECKPOINT_SEND);
>> +    if (ret) {
>> +        goto out;
>> +    }
>> +
>> +    /* TODO: send vmstate to slave */
>> +
>> +    ret = colo_ctl_get(control, COLO_CHECKPOINT_RECEIVED);
>> +    if (ret) {
>> +        goto out;
>> +    }
>> +
>> +    /* TODO: Flush network etc. */
>> +
>> +    ret = colo_ctl_get(control, COLO_CHECKPOINT_LOADED);
>> +    if (ret) {
>> +        goto out;
>> +    }
>> +
>> +    /* TODO: resume master */
>> +
>> +out:
>> +    return ret;
>> +}
>> +
>>   static void *colo_thread(void *opaque)
>>   {
>>       MigrationState *s = opaque;
>>       int dev_hotplug = qdev_hotplug, wait_cp = 0;
>>       int64_t start_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
>>       int64_t current_time;
>> +    QEMUFile *colo_control = NULL, *colo_trans = NULL;
>> +    int ret;
>>
>>       if (colo_compare_init() < 0) {
>>           error_report("Init colo compare error\n");
>>           goto out;
>>       }
>>
>> +    colo_control = qemu_fopen_socket(qemu_get_fd(s->file), "rb");
>> +    if (!colo_control) {
>> +        error_report("open colo_control failed\n");
>> +        goto out;
>> +    }
>
> In my postcopy world I'm trying to abstract this type of thing into a 'return path'
> so that the QEMUFile can implement it however it wants and you don't
> need to assume it's a socket.  But I'm still fighting some of those details.
>
> Dave
>
>> +
>>       qdev_hotplug = 0;
>>
>>       colo_buffer_init();
>>
>> +    /*
>> +     * Wait for slave finish loading vm states and enter COLO
>> +     * restore.
>> +     */
>> +    ret = colo_ctl_get(colo_control, COLO_READY);
>> +    if (ret) {
>> +        goto out;
>> +    }
>> +
>>       while (s->state == MIG_STATE_COLO) {
>>           /* wait for a colo checkpoint */
>>           wait_cp = colo_compare();
>> @@ -230,13 +396,33 @@ static void *colo_thread(void *opaque)
>>
>>           /* start a colo checkpoint */
>>
>> -        /*TODO: COLO save */
>> +        /* open colo buffer for write */
>> +        colo_trans = qemu_fopen_ops(&colo_buffer, &colo_write_ops);
>> +        if (!colo_trans) {
>> +            error_report("open colo buffer failed\n");
>> +            goto out;
>> +        }
>>
>> +        if (do_colo_transaction(s, colo_control, colo_trans)) {
>> +            goto out;
>> +        }
>> +
>> +        qemu_fclose(colo_trans);
>> +        colo_trans = NULL;
>>           start_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
>>       }
>>
>>   out:
>> +    if (colo_trans) {
>> +        qemu_fclose(colo_trans);
>> +    }
>> +
>>       colo_buffer_destroy();
>> +
>> +    if (colo_control) {
>> +        qemu_fclose(colo_control);
>> +    }
>> +
>>       colo_compare_destroy();
>>
>>       if (s->state != MIG_STATE_ERROR) {
>> @@ -281,7 +467,7 @@ void colo_init_checkpointer(MigrationState *s)
>>
>>   static Coroutine *colo;
>>
>> -static __attribute__((unused)) bool is_slave(void)
>> +static bool is_slave(void)
>>   {
>>       return colo != NULL;
>>   }
>> @@ -293,13 +479,32 @@ static __attribute__((unused)) bool is_slave(void)
>>    */
>>   static int slave_wait_new_checkpoint(QEMUFile *f)
>>   {
>> -    /* TODO: wait checkpoint start command from master */
>> -    return 1;
>> +    int fd = qemu_get_fd(f);
>> +    int ret;
>> +    uint64_t cmd;
>> +
>> +    yield_until_fd_readable(fd);
>> +
>> +    ret = colo_ctl_get_value(f, &cmd);
>> +    if (ret) {
>> +        return 1;
>> +    }
>> +
>> +    if (cmd == COLO_CHECKPOINT_NEW) {
>> +        return 0;
>> +    } else {
>> +        /* Unexpected data received */
>> +        ctl_error_handler(f, ret);
>> +        return 1;
>> +    }
>>   }
>>
>>   void colo_process_incoming_checkpoints(QEMUFile *f)
>>   {
>> +    int fd = qemu_get_fd(f);
>>       int dev_hotplug = qdev_hotplug;
>> +    QEMUFile *ctl = NULL;
>> +    int ret;
>>
>>       if (!restore_use_colo()) {
>>           return;
>> @@ -310,18 +515,69 @@ void colo_process_incoming_checkpoints(QEMUFile *f)
>>       colo = qemu_coroutine_self();
>>       assert(colo != NULL);
>>
>> +    ctl = qemu_fopen_socket(fd, "wb");
>> +    if (!ctl) {
>> +        error_report("can't open incoming channel\n");
>> +        goto out;
>> +    }
>> +
>>       colo_buffer_init();
>>
>> +    ret = colo_ctl_put(ctl, COLO_READY);
>> +    if (ret) {
>> +        goto out;
>> +    }
>> +
>> +    /* TODO: in COLO mode, slave is runing, so start the vm */
>> +
>>       while (true) {
>>           if (slave_wait_new_checkpoint(f)) {
>>               break;
>>           }
>>
>> -        /* TODO: COLO restore */
>> +        /* start colo checkpoint */
>> +
>> +        /* TODO: suspend guest */
>> +
>> +        ret = colo_ctl_put(ctl, COLO_CHECKPOINT_SUSPENDED);
>> +        if (ret) {
>> +            goto out;
>> +        }
>> +
>> +        /* TODO: open colo buffer for read */
>> +
>> +        ret = colo_ctl_get(f, COLO_CHECKPOINT_SEND);
>> +        if (ret) {
>> +            goto out;
>> +        }
>> +
>> +        /* TODO: read migration data into colo buffer */
>> +
>> +        ret = colo_ctl_put(ctl, COLO_CHECKPOINT_RECEIVED);
>> +        if (ret) {
>> +            goto out;
>> +        }
>> +
>> +        /* TODO: load vm state */
>> +
>> +        ret = colo_ctl_put(ctl, COLO_CHECKPOINT_LOADED);
>> +        if (ret) {
>> +            goto out;
>> +        }
>> +
>> +        /* TODO: resume guest */
>> +
>> +        /* TODO: close colo buffer */
>>       }
>>
>> +out:
>>       colo_buffer_destroy();
>>       colo = NULL;
>> +
>> +    if (ctl) {
>> +        qemu_fclose(ctl);
>> +    }
>> +
>>       restore_exit_colo();
>>
>>       qdev_hotplug = dev_hotplug;
>> --
>> 1.9.1
>>
> --
> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
> .
>
Dr. David Alan Gilbert Sept. 12, 2014, 11:17 a.m. UTC | #3
* Hongyang Yang (yanghy@cn.fujitsu.com) wrote:
> 
> 
> ??? 08/01/2014 11:03 PM, Dr. David Alan Gilbert ??????:
> >* Yang Hongyang (yanghy@cn.fujitsu.com) wrote:

<snip>

> >>+static int do_colo_transaction(MigrationState *s, QEMUFile *control,
> >>+                               QEMUFile *trans)
> >>+{
> >>+    int ret;
> >>+
> >>+    ret = colo_ctl_put(s->file, COLO_CHECKPOINT_NEW);
> >>+    if (ret) {
> >>+        goto out;
> >>+    }
> >>+
> >>+    ret = colo_ctl_get(control, COLO_CHECKPOINT_SUSPENDED);
> >
> >What happens at this point if the slave just doesn't respond?
> >(i.e. the socket doesn't drop - you just don't get the byte).
> 
> If the socket return bytes that were not expected, exit. If
> socket return error, do some cleanup and quit COLO process.
> refer to: colo_ctl_get() and colo_ctl_get_value()

But what happens if the slave just doesn't respond at all; e.g.
if the slave host loses power, it'll take a while (many seconds)
before the socket will timeout.

Dave
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Yang Hongyang Sept. 12, 2014, 11:40 a.m. UTC | #4
在 09/12/2014 07:17 PM, Dr. David Alan Gilbert 写道:
> * Hongyang Yang (yanghy@cn.fujitsu.com) wrote:
>>
>>
>> ??? 08/01/2014 11:03 PM, Dr. David Alan Gilbert ??????:
>>> * Yang Hongyang (yanghy@cn.fujitsu.com) wrote:
>
> <snip>
>
>>>> +static int do_colo_transaction(MigrationState *s, QEMUFile *control,
>>>> +                               QEMUFile *trans)
>>>> +{
>>>> +    int ret;
>>>> +
>>>> +    ret = colo_ctl_put(s->file, COLO_CHECKPOINT_NEW);
>>>> +    if (ret) {
>>>> +        goto out;
>>>> +    }
>>>> +
>>>> +    ret = colo_ctl_get(control, COLO_CHECKPOINT_SUSPENDED);
>>>
>>> What happens at this point if the slave just doesn't respond?
>>> (i.e. the socket doesn't drop - you just don't get the byte).
>>
>> If the socket return bytes that were not expected, exit. If
>> socket return error, do some cleanup and quit COLO process.
>> refer to: colo_ctl_get() and colo_ctl_get_value()
>
> But what happens if the slave just doesn't respond at all; e.g.
> if the slave host loses power, it'll take a while (many seconds)
> before the socket will timeout.

It will wait until the call returns timeout error, and then do some
cleanup and quit COLO process. There may be better way to handle
this?

>
> Dave
> --
> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
> .
>
Dr. David Alan Gilbert Sept. 12, 2014, 11:57 a.m. UTC | #5
* Hongyang Yang (yanghy@cn.fujitsu.com) wrote:
> 
> 
> ??? 09/12/2014 07:17 PM, Dr. David Alan Gilbert ??????:
> >* Hongyang Yang (yanghy@cn.fujitsu.com) wrote:
> >>
> >>
> >>??? 08/01/2014 11:03 PM, Dr. David Alan Gilbert ??????:
> >>>* Yang Hongyang (yanghy@cn.fujitsu.com) wrote:
> >
> ><snip>
> >
> >>>>+static int do_colo_transaction(MigrationState *s, QEMUFile *control,
> >>>>+                               QEMUFile *trans)
> >>>>+{
> >>>>+    int ret;
> >>>>+
> >>>>+    ret = colo_ctl_put(s->file, COLO_CHECKPOINT_NEW);
> >>>>+    if (ret) {
> >>>>+        goto out;
> >>>>+    }
> >>>>+
> >>>>+    ret = colo_ctl_get(control, COLO_CHECKPOINT_SUSPENDED);
> >>>
> >>>What happens at this point if the slave just doesn't respond?
> >>>(i.e. the socket doesn't drop - you just don't get the byte).
> >>
> >>If the socket return bytes that were not expected, exit. If
> >>socket return error, do some cleanup and quit COLO process.
> >>refer to: colo_ctl_get() and colo_ctl_get_value()
> >
> >But what happens if the slave just doesn't respond at all; e.g.
> >if the slave host loses power, it'll take a while (many seconds)
> >before the socket will timeout.
> 
> It will wait until the call returns timeout error, and then do some
> cleanup and quit COLO process.

If it was to wait here for ~30seconds for the timeout what would happen
to the primary? Would it be stopped from sending any network traffic
for those 30 seconds - I think that's too long to fail over.

> There may be better way to handle this?

In postcopy I always take reads coming back from the destination
in a separate thread, because that thread can't block the main thread
going out (I originally did that using async reads but the thread
is nicer).  You could also use something like a poll() with a shorter
timeout to however long you are happy for COLO to go before it fails.

Dave
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
diff mbox

Patch

diff --git a/migration-colo.c b/migration-colo.c
index 2699e77..a708872 100644
--- a/migration-colo.c
+++ b/migration-colo.c
@@ -24,6 +24,41 @@ 
  */
 #define CHKPOINT_TIMER 10000
 
+enum {
+    COLO_READY = 0x46,
+
+    /*
+     * Checkpoint synchronzing points.
+     *
+     *                  Primary                 Secondary
+     *  NEW             @
+     *                                          Suspend
+     *  SUSPENDED                               @
+     *                  Suspend&Save state
+     *  SEND            @
+     *                  Send state              Receive state
+     *  RECEIVED                                @
+     *                  Flush network           Load state
+     *  LOADED                                  @
+     *                  Resume                  Resume
+     *
+     *                  Start Comparing
+     * NOTE:
+     * 1) '@' who sends the message
+     * 2) Every sync-point is synchronized by two sides with only
+     *    one handshake(single direction) for low-latency.
+     *    If more strict synchronization is required, a opposite direction
+     *    sync-point should be added.
+     * 3) Since sync-points are single direction, the remote side may
+     *    go forward a lot when this side just receives the sync-point.
+     */
+    COLO_CHECKPOINT_NEW,
+    COLO_CHECKPOINT_SUSPENDED,
+    COLO_CHECKPOINT_SEND,
+    COLO_CHECKPOINT_RECEIVED,
+    COLO_CHECKPOINT_LOADED,
+};
+
 static QEMUBH *colo_bh;
 
 bool colo_supported(void)
@@ -185,30 +220,161 @@  static const QEMUFileOps colo_read_ops = {
     .close = colo_close,
 };
 
+/* colo checkpoint control helper */
+static bool is_master(void);
+static bool is_slave(void);
+
+static void ctl_error_handler(void *opaque, int err)
+{
+    if (is_slave()) {
+        /* TODO: determine whether we need to failover */
+        /* FIXME: we will not failover currently, just kill slave */
+        error_report("error: colo transmission failed!\n");
+        exit(1);
+    } else if (is_master()) {
+        /* Master still alive, do not failover */
+        error_report("error: colo transmission failed!\n");
+        return;
+    } else {
+        error_report("COLO: Unexpected error happend!\n");
+        exit(EXIT_FAILURE);
+    }
+}
+
+static int colo_ctl_put(QEMUFile *f, uint64_t request)
+{
+    int ret = 0;
+
+    qemu_put_be64(f, request);
+    qemu_fflush(f);
+
+    ret = qemu_file_get_error(f);
+    if (ret < 0) {
+        ctl_error_handler(f, ret);
+        return 1;
+    }
+
+    return ret;
+}
+
+static int colo_ctl_get_value(QEMUFile *f, uint64_t *value)
+{
+    int ret = 0;
+    uint64_t temp;
+
+    temp = qemu_get_be64(f);
+
+    ret = qemu_file_get_error(f);
+    if (ret < 0) {
+        ctl_error_handler(f, ret);
+        return 1;
+    }
+
+    *value = temp;
+    return 0;
+}
+
+static int colo_ctl_get(QEMUFile *f, uint64_t require)
+{
+    int ret;
+    uint64_t value;
+
+    ret = colo_ctl_get_value(f, &value);
+    if (ret) {
+        return ret;
+    }
+
+    if (value != require) {
+        error_report("unexpected state received!\n");
+        exit(1);
+    }
+
+    return ret;
+}
+
 /* save */
 
-static __attribute__((unused)) bool is_master(void)
+static bool is_master(void)
 {
     MigrationState *s = migrate_get_current();
     return (s->state == MIG_STATE_COLO);
 }
 
+static int do_colo_transaction(MigrationState *s, QEMUFile *control,
+                               QEMUFile *trans)
+{
+    int ret;
+
+    ret = colo_ctl_put(s->file, COLO_CHECKPOINT_NEW);
+    if (ret) {
+        goto out;
+    }
+
+    ret = colo_ctl_get(control, COLO_CHECKPOINT_SUSPENDED);
+    if (ret) {
+        goto out;
+    }
+
+    /* TODO: suspend and save vm state to colo buffer */
+
+    ret = colo_ctl_put(s->file, COLO_CHECKPOINT_SEND);
+    if (ret) {
+        goto out;
+    }
+
+    /* TODO: send vmstate to slave */
+
+    ret = colo_ctl_get(control, COLO_CHECKPOINT_RECEIVED);
+    if (ret) {
+        goto out;
+    }
+
+    /* TODO: Flush network etc. */
+
+    ret = colo_ctl_get(control, COLO_CHECKPOINT_LOADED);
+    if (ret) {
+        goto out;
+    }
+
+    /* TODO: resume master */
+
+out:
+    return ret;
+}
+
 static void *colo_thread(void *opaque)
 {
     MigrationState *s = opaque;
     int dev_hotplug = qdev_hotplug, wait_cp = 0;
     int64_t start_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
     int64_t current_time;
+    QEMUFile *colo_control = NULL, *colo_trans = NULL;
+    int ret;
 
     if (colo_compare_init() < 0) {
         error_report("Init colo compare error\n");
         goto out;
     }
 
+    colo_control = qemu_fopen_socket(qemu_get_fd(s->file), "rb");
+    if (!colo_control) {
+        error_report("open colo_control failed\n");
+        goto out;
+    }
+
     qdev_hotplug = 0;
 
     colo_buffer_init();
 
+    /*
+     * Wait for slave finish loading vm states and enter COLO
+     * restore.
+     */
+    ret = colo_ctl_get(colo_control, COLO_READY);
+    if (ret) {
+        goto out;
+    }
+
     while (s->state == MIG_STATE_COLO) {
         /* wait for a colo checkpoint */
         wait_cp = colo_compare();
@@ -230,13 +396,33 @@  static void *colo_thread(void *opaque)
 
         /* start a colo checkpoint */
 
-        /*TODO: COLO save */
+        /* open colo buffer for write */
+        colo_trans = qemu_fopen_ops(&colo_buffer, &colo_write_ops);
+        if (!colo_trans) {
+            error_report("open colo buffer failed\n");
+            goto out;
+        }
 
+        if (do_colo_transaction(s, colo_control, colo_trans)) {
+            goto out;
+        }
+
+        qemu_fclose(colo_trans);
+        colo_trans = NULL;
         start_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
     }
 
 out:
+    if (colo_trans) {
+        qemu_fclose(colo_trans);
+    }
+
     colo_buffer_destroy();
+
+    if (colo_control) {
+        qemu_fclose(colo_control);
+    }
+
     colo_compare_destroy();
 
     if (s->state != MIG_STATE_ERROR) {
@@ -281,7 +467,7 @@  void colo_init_checkpointer(MigrationState *s)
 
 static Coroutine *colo;
 
-static __attribute__((unused)) bool is_slave(void)
+static bool is_slave(void)
 {
     return colo != NULL;
 }
@@ -293,13 +479,32 @@  static __attribute__((unused)) bool is_slave(void)
  */
 static int slave_wait_new_checkpoint(QEMUFile *f)
 {
-    /* TODO: wait checkpoint start command from master */
-    return 1;
+    int fd = qemu_get_fd(f);
+    int ret;
+    uint64_t cmd;
+
+    yield_until_fd_readable(fd);
+
+    ret = colo_ctl_get_value(f, &cmd);
+    if (ret) {
+        return 1;
+    }
+
+    if (cmd == COLO_CHECKPOINT_NEW) {
+        return 0;
+    } else {
+        /* Unexpected data received */
+        ctl_error_handler(f, ret);
+        return 1;
+    }
 }
 
 void colo_process_incoming_checkpoints(QEMUFile *f)
 {
+    int fd = qemu_get_fd(f);
     int dev_hotplug = qdev_hotplug;
+    QEMUFile *ctl = NULL;
+    int ret;
 
     if (!restore_use_colo()) {
         return;
@@ -310,18 +515,69 @@  void colo_process_incoming_checkpoints(QEMUFile *f)
     colo = qemu_coroutine_self();
     assert(colo != NULL);
 
+    ctl = qemu_fopen_socket(fd, "wb");
+    if (!ctl) {
+        error_report("can't open incoming channel\n");
+        goto out;
+    }
+
     colo_buffer_init();
 
+    ret = colo_ctl_put(ctl, COLO_READY);
+    if (ret) {
+        goto out;
+    }
+
+    /* TODO: in COLO mode, slave is runing, so start the vm */
+
     while (true) {
         if (slave_wait_new_checkpoint(f)) {
             break;
         }
 
-        /* TODO: COLO restore */
+        /* start colo checkpoint */
+
+        /* TODO: suspend guest */
+
+        ret = colo_ctl_put(ctl, COLO_CHECKPOINT_SUSPENDED);
+        if (ret) {
+            goto out;
+        }
+
+        /* TODO: open colo buffer for read */
+
+        ret = colo_ctl_get(f, COLO_CHECKPOINT_SEND);
+        if (ret) {
+            goto out;
+        }
+
+        /* TODO: read migration data into colo buffer */
+
+        ret = colo_ctl_put(ctl, COLO_CHECKPOINT_RECEIVED);
+        if (ret) {
+            goto out;
+        }
+
+        /* TODO: load vm state */
+
+        ret = colo_ctl_put(ctl, COLO_CHECKPOINT_LOADED);
+        if (ret) {
+            goto out;
+        }
+
+        /* TODO: resume guest */
+
+        /* TODO: close colo buffer */
     }
 
+out:
     colo_buffer_destroy();
     colo = NULL;
+
+    if (ctl) {
+        qemu_fclose(ctl);
+    }
+
     restore_exit_colo();
 
     qdev_hotplug = dev_hotplug;