diff mbox

[v2,2/2] migration: Implement multiple compression threads

Message ID 1415272128-8273-3-git-send-email-liang.z.li@intel.com
State New
Headers show

Commit Message

Li, Liang Z Nov. 6, 2014, 11:08 a.m. UTC
Instead of sending the guest memory directly, this solution compress
the ram page before sending, after receiving, the data will be
decompressed.
This feature can help to reduce the data transferred about
60%, this is very useful when the network bandwidth is limited,
and the migration time can also be reduced about 70%. The
feature is off by default, following the document
docs/multiple-compression-threads.txt for information to use it.

Reviewed-by: Eric Blake <eblake@redhat.com>
Signed-off-by: Li Liang <liang.z.li@intel.com>
---
 arch_init.c                   | 435 ++++++++++++++++++++++++++++++++++++++++--
 hmp-commands.hx               |  56 ++++++
 hmp.c                         |  57 ++++++
 hmp.h                         |   6 +
 include/migration/migration.h |  12 +-
 include/migration/qemu-file.h |   1 +
 migration.c                   |  99 ++++++++++
 monitor.c                     |  21 ++
 qapi-schema.json              |  88 ++++++++-
 qmp-commands.hx               | 131 +++++++++++++
 10 files changed, 890 insertions(+), 16 deletions(-)

Comments

Eric Blake Nov. 6, 2014, 12:57 p.m. UTC | #1
On 11/06/2014 12:08 PM, Li Liang wrote:
> Instead of sending the guest memory directly, this solution compress
> the ram page before sending, after receiving, the data will be
> decompressed.
> This feature can help to reduce the data transferred about
> 60%, this is very useful when the network bandwidth is limited,
> and the migration time can also be reduced about 70%. The
> feature is off by default, following the document
> docs/multiple-compression-threads.txt for information to use it.
> 
> Reviewed-by: Eric Blake <eblake@redhat.com>

Please DON'T add this line unless the author spelled it out (or if they
mentioned that it would be okay if you fix minor issues).  I
intentionally omitted a reviewed-by on v1:

https://lists.gnu.org/archive/html/qemu-devel/2014-11/msg00672.html

because I was not happy with the patch as it was presented and did not
think the work to fix it was trivial.  Furthermore, my review of v1 was
just over the interface, and not the entire patch; there are very likely
still bugs lurking in the .c files.  Once again, I'm going to limit my
review of v2 to the interface (at least in this email):

> Signed-off-by: Li Liang <liang.z.li@intel.com>
> ---

> +++ b/qapi-schema.json
> @@ -491,13 +491,17 @@
>  #          to enable the capability on the source VM. The feature is disabled by
>  #          default. (since 1.6)
>  #
> +# @compress: Using the multiple compression threads to accelerate live migration.
> +#          This feature can help to reduce the migration traffic, by sending
> +#          compressed pages. The feature is disabled by default. (since 2.3)
> +#
>  # @auto-converge: If enabled, QEMU will automatically throttle down the guest
>  #          to speed up convergence of RAM migration. (since 1.6)
>  #
>  # Since: 1.2
>  ##
>  { 'enum': 'MigrationCapability',
> -  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks'] }
> +  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks', 'compress'] }
>  

I'll repeat what I said on v1 (but this time, with some links to back it
up :)

We really need to avoid a proliferation of new commands, two per tunable
does not scale well.  I think now is the time to implement my earlier
suggestion at making MigrationCapability become THE resource for tunables:

https://lists.gnu.org/archive/html/qemu-devel/2014-03/msg02274.html

> +++ b/qmp-commands.hx
> @@ -705,7 +705,138 @@ Example:
>  <- { "return": 67108864 }
>  
>  EQMP
> +{
> +        .name       = "migrate-set-compress-level",
> +        .args_type  = "value:i",
> +        .mhandler.cmd_new = qmp_marshal_input_migrate_set_compress_level,
> +    },
> +
> +SQMP
> +migrate-set-compress-level
> +----------------------

Convention in this file is to have the --- line extended out to the
length of the text it is tied to (you are missing four bytes,
corresponding to the tail "evel")

> +
> +Set compress level to be used by compress migration, the compress level is an integer

s/compress level/the compression level/ (twice)

> +between 0 and 9

s/9/9, where 9 means try harder for smaller compression at the expense
of more CPU time/

> +
> +Arguments:
> +
> +- "value": compress level (json-int)
> +
> +Example:
> +
> +-> { "execute": "migrate-set-compress-level", "arguments": { "value": 536870912 } }

Umm, 536870912 is not an integer between 0 and 9.


> +SQMP
> +query-migrate-compress-level
> +------------------------

--- length

> +
> +Show compress level to be used by compress migration
> +
> +returns a json-object with the following information:
> +- "size" : json-int
> +
> +Example:
> +
> +-> { "execute": "query-migrate-compress-level" }
> +<- { "return": 67108864 }

Ewww. Please no new interfaces that return raw ints.  Rather, return a
dictionary with one key/value pair holding the int.  Raw ints are not as
extensible as dictionaries.  Also, make the example realistic - 67108864
is not a valid compression level.

{ "return": { "level": 9 } }


> +migrate-set-compress-threads
> +----------------------

--- length

> +
> +Set compress thread count to be used by compress migration, the compress thread count is an integer
> +between 1 and 255
> +
> +Arguments:
> +
> +- "value": compress threads (json-int)
> +
> +Example:
> +
> +-> { "execute": "migrate-set-compress-threads", "arguments": { "value": 536870912 } }

Value out of range 1-255

> +<- { "return": {} }
> +
> +EQMP
> +    {
> +        .name       = "query-migrate-compress-threads",
> +        .args_type  = "",
> +        .mhandler.cmd_new = qmp_marshal_input_query_migrate_compress_threads,
> +    },
> +
> +SQMP
> +query-migrate-compress-threads
> +------------------------

--- length

> +
> +Show compress thread count to be used by compress migration
> +
> +returns a json-object with the following information:
> +- "size" : json-int
> +
> +Example:
> +
> +-> { "execute": "query-migrate-compress-threads" }
> +<- { "return": 67108864 }

out of range, raw int return

and so on in the rest of the patch (I'll quit calling it out, especially
if we switch over to my enhanced set-capabilities proposal)
Dr. David Alan Gilbert Nov. 6, 2014, 3:41 p.m. UTC | #2
* Li Liang (liang.z.li@intel.com) wrote:
> Instead of sending the guest memory directly, this solution compress
> the ram page before sending, after receiving, the data will be
> decompressed.
> This feature can help to reduce the data transferred about
> 60%, this is very useful when the network bandwidth is limited,
> and the migration time can also be reduced about 70%. The
> feature is off by default, following the document
> docs/multiple-compression-threads.txt for information to use it.

More technical comments below; but could you split the patch up a bit
more please - it's a bit daunting; probably with the commands in a separate
patch, and maybe split the compress stuff into one patch and the decompress
into another.

Another thing; I've not figured out how all of this gets cleaned up in
a migration_cancel or if the migration fails.

> Reviewed-by: Eric Blake <eblake@redhat.com>
> Signed-off-by: Li Liang <liang.z.li@intel.com>
> ---
>  arch_init.c                   | 435 ++++++++++++++++++++++++++++++++++++++++--
>  hmp-commands.hx               |  56 ++++++
>  hmp.c                         |  57 ++++++
>  hmp.h                         |   6 +
>  include/migration/migration.h |  12 +-
>  include/migration/qemu-file.h |   1 +
>  migration.c                   |  99 ++++++++++
>  monitor.c                     |  21 ++
>  qapi-schema.json              |  88 ++++++++-
>  qmp-commands.hx               | 131 +++++++++++++
>  10 files changed, 890 insertions(+), 16 deletions(-)
> 
> diff --git a/arch_init.c b/arch_init.c
> index 88a5ba0..a27d87b 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -24,6 +24,7 @@
>  #include <stdint.h>
>  #include <stdarg.h>
>  #include <stdlib.h>
> +#include <zlib.h>
>  #ifndef _WIN32
>  #include <sys/types.h>
>  #include <sys/mman.h>
> @@ -126,6 +127,7 @@ static uint64_t bitmap_sync_count;
>  #define RAM_SAVE_FLAG_CONTINUE 0x20
>  #define RAM_SAVE_FLAG_XBZRLE   0x40
>  /* 0x80 is reserved in migration.h start with 0x100 next */
> +#define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
>  
>  static struct defconfig_file {
>      const char *filename;
> @@ -332,6 +334,177 @@ static uint64_t migration_dirty_pages;
>  static uint32_t last_version;
>  static bool ram_bulk_stage;
>  

Magic 16 constnats - why?

> +#define COMPRESS_BUF_SIZE (TARGET_PAGE_SIZE + 16)
> +#define MIG_BUF_SIZE (COMPRESS_BUF_SIZE + 256 + 16)
> +struct MigBuf {
> +    int buf_index;
> +    uint8_t buf[MIG_BUF_SIZE];
> +};
> +
> +typedef struct MigBuf MigBuf;
> +

These functions look like they're recreating stuff in Qemufile - is there
no way to share anything? 
> +static void migrate_put_byte(MigBuf *f, int v)
> +{
> +    f->buf[f->buf_index] = v;
> +    f->buf_index++;
> +}
> +
> +static void migrate_put_be16(MigBuf *f, unsigned int v)
> +{
> +    migrate_put_byte(f, v >> 8);
> +    migrate_put_byte(f, v);
> +}
> +
> +static void migrate_put_be32(MigBuf *f, unsigned int v)
> +{
> +    migrate_put_byte(f, v >> 24);
> +    migrate_put_byte(f, v >> 16);
> +    migrate_put_byte(f, v >> 8);
> +    migrate_put_byte(f, v);
> +}
> +
> +static void migrate_put_be64(MigBuf *f, uint64_t v)
> +{
> +    migrate_put_be32(f, v >> 32);
> +    migrate_put_be32(f, v);
> +}
> +

This feels like you're doing something very similar to 
the buffered file code that recently went in; could you
reuse qemu_bufopen or the QEMUSizedBuffer?
I think if you could use the qemu_buf somehow (maybe with
modifications?) then you could avoid a lot of the 'if'd
code below, because you'd always be working with a QEMUFile,
it would just be a different QEMUFile.

> +static void migrate_put_buffer(MigBuf *f, const uint8_t *buf, int size)
> +{
> +    int l;
> +
> +    while (size > 0) {
> +        l = MIG_BUF_SIZE - f->buf_index;
> +        if (l > size) {
> +            l = size;
> +        }
> +        memcpy(f->buf + f->buf_index, buf, l);
> +        f->buf_index += l;
> +        buf += l;
> +        size -= l;
> +    }
> +}
> +
> +static size_t migrate_save_block_hdr(MigBuf *f, RAMBlock *block,
> +        ram_addr_t offset, int cont, int flag)
> +{
> +    size_t size;
> +
> +    migrate_put_be64(f, offset | cont | flag);
> +    size = 8;
> +
> +    if (!cont) {
> +        migrate_put_byte(f, strlen(block->idstr));
> +        migrate_put_buffer(f, (uint8_t *)block->idstr,
> +                        strlen(block->idstr));
> +        size += 1 + strlen(block->idstr);
> +    }
> +    return size;
> +}
> +


> +static int migrate_qemu_add_compress(MigBuf *f,  const uint8_t *p,
> +        int size, int level)
> +{
> +    uLong  blen = COMPRESS_BUF_SIZE;
> +    if (compress2(f->buf + f->buf_index + sizeof(int), &blen, (Bytef *)p,
> +            size, level) != Z_OK) {
> +        error_report("Compress Failed!\n");
> +        return 0;
> +    }
> +    migrate_put_be32(f, blen);
> +    f->buf_index += blen;
> +    return blen + sizeof(int);
> +}

Please add a comment about what this is doing, and use size_t or
unsigned int for sizes.
Also error_report doesn't need the \n

> +enum {
> +    COM_DONE = 0,
> +    COM_START,
> +};
> +
> +static int  compress_thread_count;
> +static int  decompress_thread_count;
> +
> +struct compress_param {
> +    int state;
> +    MigBuf migbuf;
> +    RAMBlock *block;
> +    ram_addr_t offset;
> +    bool last_stage;
> +    int ret;
> +    int bytes_sent;
> +    uint8_t *p;
> +    int cont;
> +    bool bulk_stage;
> +};
> +
> +typedef struct compress_param compress_param;
> +compress_param *comp_param;
> +
> +struct decompress_param {
> +    int state;
> +    void *des;
> +    uint8 compbuf[COMPRESS_BUF_SIZE];
> +    int len;
> +};
> +typedef struct decompress_param decompress_param;
> +
> +static decompress_param *decomp_param;
> +bool incomming_migration_done;
> +static bool quit_thread;
> +
> +static int save_compress_ram_page(compress_param *param);
> +
> +
> +static void *do_data_compress(void *opaque)
> +{
> +    compress_param *param = opaque;
> +    while (!quit_thread) {
> +        if (param->state == COM_START) {
> +            save_compress_ram_page(param);
> +            param->state = COM_DONE;
> +         } else {
> +             g_usleep(1);

There has to be a better way than heaving your thread spin
with sleeps; qemu_event or semaphore or something?

> +         }
> +    }
> +
> +    return NULL;
> +}
> +
> +
> +void migrate_compress_threads_join(MigrationState *s)
> +{
> +    int i;
> +    if (!migrate_use_compress()) {
> +        return;
> +    }
> +    quit_thread = true;
> +    for (i = 0; i < compress_thread_count; i++) {
> +        qemu_thread_join(s->compress_thread + i);
> +    }
> +    g_free(s->compress_thread);
> +    g_free(comp_param);
> +    s->compress_thread = NULL;
> +    comp_param = NULL;
> +}
> +
> +void migrate_compress_threads_create(MigrationState *s)
> +{
> +    int i;
> +    if (!migrate_use_compress()) {
> +        return;
> +    }
> +    quit_thread = false;
> +    compress_thread_count = s->compress_thread_count;
> +    s->compress_thread = g_malloc0(sizeof(QemuThread)
> +        * s->compress_thread_count);
> +    comp_param = g_malloc0(sizeof(compress_param) * s->compress_thread_count);

You might need to be careful about how quit_thread and comp_param
are accessed by the migration thread and your individual compression threads,
especially on those architectures that don't do ordering etc.

> +    for (i = 0; i < s->compress_thread_count; i++) {
> +        qemu_thread_create(s->compress_thread + i, "compress",
> +            do_data_compress, comp_param + i, QEMU_THREAD_JOINABLE);
> +
> +    }
> +}
> +
>  /* Update the xbzrle cache to reflect a page that's been sent as all 0.
>   * The important thing is that a stale (not-yet-0'd) page be replaced
>   * by the new data.
> @@ -351,9 +524,10 @@ static void xbzrle_cache_zero_page(ram_addr_t current_addr)
>  
>  #define ENCODING_FLAG_XBZRLE 0x1
>  
> -static int save_xbzrle_page(QEMUFile *f, uint8_t **current_data,
> +static int save_xbzrle_page(void *f, uint8_t **current_data,
>                              ram_addr_t current_addr, RAMBlock *block,
> -                            ram_addr_t offset, int cont, bool last_stage)
> +                            ram_addr_t offset, int cont, bool last_stage,
> +                            bool save_to_buf)
>  {
>      int encoded_len = 0, bytes_sent = -1;
>      uint8_t *prev_cached_page;
> @@ -401,10 +575,19 @@ static int save_xbzrle_page(QEMUFile *f, uint8_t **current_data,
>      }
>  
>      /* Send XBZRLE based compressed page */
> -    bytes_sent = save_block_hdr(f, block, offset, cont, RAM_SAVE_FLAG_XBZRLE);
> -    qemu_put_byte(f, ENCODING_FLAG_XBZRLE);
> -    qemu_put_be16(f, encoded_len);
> -    qemu_put_buffer(f, XBZRLE.encoded_buf, encoded_len);
> +    if (save_to_buf) {
> +        bytes_sent = migrate_save_block_hdr((MigBuf *)f, block, offset,
> +            cont, RAM_SAVE_FLAG_XBZRLE);
> +        migrate_put_byte((MigBuf *)f, ENCODING_FLAG_XBZRLE);
> +        migrate_put_be16((MigBuf *)f, encoded_len);
> +        migrate_put_buffer((MigBuf *)f, XBZRLE.encoded_buf, encoded_len);
> +    } else {
> +        bytes_sent = save_block_hdr((QEMUFile *)f, block, offset,
> +            cont, RAM_SAVE_FLAG_XBZRLE);
> +        qemu_put_byte((QEMUFile *)f, ENCODING_FLAG_XBZRLE);
> +        qemu_put_be16((QEMUFile *)f, encoded_len);
> +        qemu_put_buffer((QEMUFile *)f, XBZRLE.encoded_buf, encoded_len);
> +    }

So this in particular is the thing where I think using a qemu_buf file/qsb
would help; all that if would disappear.

>      bytes_sent += encoded_len + 1 + 2;
>      acct_info.xbzrle_pages++;
>      acct_info.xbzrle_bytes += bytes_sent;
> @@ -609,7 +792,7 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
>          xbzrle_cache_zero_page(current_addr);
>      } else if (!ram_bulk_stage && migrate_use_xbzrle()) {
>          bytes_sent = save_xbzrle_page(f, &p, current_addr, block,
> -                                      offset, cont, last_stage);
> +                                      offset, cont, last_stage, false);
>          if (!last_stage) {
>              /* Can't send this cached data async, since the cache page
>               * might get updated before it gets to the wire
> @@ -635,6 +818,90 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
>      return bytes_sent;
>  }
>  
> +static int save_compress_ram_page(compress_param *param)
> +{
> +    int bytes_sent = param->bytes_sent;
> +    int blen = COMPRESS_BUF_SIZE;
> +    int cont = param->cont;
> +    uint8_t *p = param->p;
> +    int ret = param->ret;
> +    RAMBlock *block = param->block;
> +    ram_addr_t offset = param->offset;
> +    bool last_stage = param->last_stage;
> +    /* In doubt sent page as normal */
> +    XBZRLE_cache_lock();
> +    ram_addr_t current_addr = block->offset + offset;
> +    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
> +        if (ret != RAM_SAVE_CONTROL_DELAYED) {
> +            if (bytes_sent > 0) {
> +                atomic_inc(&acct_info.norm_pages);
> +             } else if (bytes_sent == 0) {
> +                atomic_inc(&acct_info.dup_pages);
> +             }
> +        }
> +    } else if (is_zero_range(p, TARGET_PAGE_SIZE)) {
> +        atomic_inc(&acct_info.dup_pages);
> +        bytes_sent = migrate_save_block_hdr(&param->migbuf, block, offset, cont,
> +                             RAM_SAVE_FLAG_COMPRESS);
> +        migrate_put_byte(&param->migbuf, 0);
> +        bytes_sent++;
> +        /* Must let xbzrle know, otherwise a previous (now 0'd) cached
> +         * page would be stale
> +         */
> +        xbzrle_cache_zero_page(current_addr);
> +    } else if (!param->bulk_stage && migrate_use_xbzrle()) {
> +        bytes_sent = save_xbzrle_page(&param->migbuf, &p, current_addr, block,
> +                              offset, cont, last_stage, true);
> +    }
> +    XBZRLE_cache_unlock();
> +    /* XBZRLE overflow or normal page */

I wonder if it's worth the complexity of doing the zero check
and the xbzrle if you're already doing compression?  I assume
zlib is going to handle a zero page reasonably well anyway?

> +    if (bytes_sent == -1) {
> +        bytes_sent = migrate_save_block_hdr(&param->migbuf, block,
> +            offset, cont, RAM_SAVE_FLAG_COMPRESS_PAGE);
> +        blen = migrate_qemu_add_compress(&param->migbuf, p,
> +            TARGET_PAGE_SIZE, migrate_compress_level());
> +        bytes_sent += blen;
> +        atomic_inc(&acct_info.norm_pages);
> +    }
> +    return bytes_sent;
> +}
> +
> +static uint64_t bytes_transferred;
> +
> +static void flush_compressed_data(QEMUFile *f)
> +{
> +    int idx;
> +    if (!migrate_use_compress()) {
> +        return;
> +    }
> +
> +    for (idx = 0; idx < compress_thread_count; idx++) {
> +        while (comp_param[idx].state != COM_DONE) {
> +            g_usleep(0);
> +        }

Again, some type of event/semaphore rather than busy sleeping;
and also I don't understand how the different threads keep everything
in order - can you add some comments (or maybe notes in the docs)
that explain how it all works?

> +        if (comp_param[idx].migbuf.buf_index > 0) {
> +            qemu_put_buffer(f, comp_param[idx].migbuf.buf,
> +                comp_param[idx].migbuf.buf_index);
> +            bytes_transferred += comp_param[idx].migbuf.buf_index;
> +            comp_param[idx].migbuf.buf_index = 0;
> +        }
> +    }
> +}

> +static inline void set_common_compress_params(compress_param *param,
> +    int ret, int bytes_sent, RAMBlock *block, ram_addr_t offset,
> +    bool last_stage, int cont, uint8_t *p, bool bulk_stage)
> +{
> +    param->ret = ret;
> +    param->bytes_sent = bytes_sent;
> +    param->block = block;
> +    param->offset = offset;
> +    param->last_stage = last_stage;
> +    param->cont = cont;
> +    param->p = p;
> +    param->bulk_stage = bulk_stage;
> +}
> +
>  /*
>   * ram_find_and_save_block: Finds a page to send and sends it to f
>   *
> @@ -649,6 +916,8 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
>      bool complete_round = false;
>      int bytes_sent = 0;
>      MemoryRegion *mr;
> +    int cont, idx, ret, len = -1;
> +    uint8_t *p;
>  
>      if (!block)
>          block = QTAILQ_FIRST(&ram_list.blocks);
> @@ -667,14 +936,73 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
>                  block = QTAILQ_FIRST(&ram_list.blocks);
>                  complete_round = true;
>                  ram_bulk_stage = false;
> +                if (migrate_use_xbzrle()) {
> +                    /* terminate the used thread at this point*/
> +                    flush_compressed_data(f);
> +                    quit_thread = true;
> +                }
>              }
>          } else {
> -            bytes_sent = ram_save_page(f, block, offset, last_stage);
> -
> -            /* if page is unmodified, continue to the next */
> -            if (bytes_sent > 0) {
> -                last_sent_block = block;
> -                break;
> +            if (!migrate_use_compress()) {
> +                bytes_sent = ram_save_page(f, block, offset, last_stage);
> +                /* if page is unmodified, continue to the next */
> +                if (bytes_sent > 0) {
> +                    last_sent_block = block;
> +                    break;
> +                }
> +            } else {
> +                cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
> +                p = memory_region_get_ram_ptr(block->mr) + offset;
> +                ret = ram_control_save_page(f, block->offset,
> +                           offset, TARGET_PAGE_SIZE, &len);
> +                if ((!ram_bulk_stage && migrate_use_xbzrle()) || cont == 0) {
> +                    if (cont == 0) {
> +                        flush_compressed_data(f);
> +                    }
> +                    set_common_compress_params(&comp_param[0],
> +                        ret, len, block, offset, last_stage, cont,
> +                        p, ram_bulk_stage);
> +                    bytes_sent = save_compress_ram_page(&comp_param[0]);
> +                    if (bytes_sent > 0) {
> +                        qemu_put_buffer(f, comp_param[0].migbuf.buf,
> +                            comp_param[0].migbuf.buf_index);
> +                        comp_param[0].migbuf.buf_index = 0;
> +                        last_sent_block = block;
> +                        break;
> +                    }

Is there no way to move this down into your save_compress_ram_page?
When I split the code into ram_find_and_save_block and ram_save_page
a few months ago, it meant that 'ram_find_and_save_block' only really
did the work of finding what to send, and 'ram_save_page' figured out
everything to do with sending it; it would be nice to keep all
the details of sending it separate still.

Since ram_bulk_stage is a static global in this file, why bother passing
it into the 'compress_params'?  I think you could probably avoid a lot
of things like that.

> +                } else {
> +retry:
> +                    for (idx = 0; idx < compress_thread_count; idx++) {
> +                        if (comp_param[idx].state == COM_DONE) {
> +                            bytes_sent = comp_param[idx].migbuf.buf_index;
> +                            if (bytes_sent == 0) {
> +                                set_common_compress_params(&comp_param[idx],
> +                                    ret, len, block, offset, last_stage,
> +                                    cont, p, ram_bulk_stage);
> +                                comp_param[idx].state = COM_START;
> +                                bytes_sent = 1;
> +                                bytes_transferred -= 1;
> +                                break;
> +                            } else if (bytes_sent > 0) {
> +                                qemu_put_buffer(f, comp_param[idx].migbuf.buf,
> +                                    comp_param[idx].migbuf.buf_index);
> +                                comp_param[idx].migbuf.buf_index = 0;
> +                                set_common_compress_params(&comp_param[idx],
> +                                   ret, len, block, offset, last_stage,
> +                                   cont, p, ram_bulk_stage);
> +                                comp_param[idx].state = COM_START;
> +                                break;
> +                            }
> +                        }
> +                    }
> +                    if (idx < compress_thread_count) {
> +                        last_sent_block = block;
> +                        break;
> +                    } else {
> +                        g_usleep(0);
> +                        goto retry;
> +                    }

No; again this shouldn't be using usleep to do stuff between threads; do
stuff using proper safe thread ops, and probably a queue or something
that holds things to the different threads.

> +                }
>              }
>          }
>      }
> @@ -684,7 +1012,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
>      return bytes_sent;
>  }
>  
> -static uint64_t bytes_transferred;
>  
>  void acct_update_position(QEMUFile *f, size_t size, bool zero)
>  {
> @@ -892,6 +1219,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
>          i++;
>      }
>  
> +    flush_compressed_data(f);
>      qemu_mutex_unlock_ramlist();
>  
>      /*
> @@ -938,6 +1266,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
>          bytes_transferred += bytes_sent;
>      }
>  
> +    flush_compressed_data(f);
>      ram_control_after_iterate(f, RAM_CONTROL_FINISH);
>      migration_end();
>  
> @@ -1038,10 +1367,61 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
>      }
>  }
>  
> +QemuThread *decompress_threads;
> +
> +static void *do_data_decompress(void *opaque)
> +{
> +    decompress_param *param = opaque;
> +    while (incomming_migration_done == false) {
> +        if (param->state == COM_START) {
> +            uLong pagesize = TARGET_PAGE_SIZE;
> +            if (uncompress((Bytef *)param->des, &pagesize,
> +                    (const Bytef *)param->compbuf, param->len) != Z_OK) {
> +                error_report("Uncompress Failed!\n");

Again \n on error_report.

> +                break;
> +            }
> +            param->state = COM_DONE;
> +        } else {
> +            if (quit_thread) {
> +                break;
> +            }
> +            g_usleep(1);

and the usleep.

> +        }
> +    }
> +    return NULL;
> +}
> +
> +void migrate_decompress_threads_create(int count)
> +{
> +    int i;
> +    decompress_thread_count = count;
> +    decompress_threads = g_malloc0(sizeof(QemuThread) * count);
> +    decomp_param = g_malloc0(sizeof(decompress_param) * count);
> +    quit_thread = false;
> +    for (i = 0; i < count; i++) {
> +        qemu_thread_create(decompress_threads + i, "decompress",
> +            do_data_decompress, decomp_param + i, QEMU_THREAD_JOINABLE);
> +    }
> +}
> +
> +void migrate_decompress_threads_join(void)
> +{
> +    int i;
> +    for (i = 0; i < decompress_thread_count; i++) {
> +        qemu_thread_join(decompress_threads + i);
> +    }
> +    g_free(decompress_threads);
> +    g_free(decomp_param);
> +    decompress_threads = NULL;
> +    decomp_param = NULL;
> +}
> +
>  static int ram_load(QEMUFile *f, void *opaque, int version_id)
>  {
>      int flags = 0, ret = 0;
>      static uint64_t seq_iter;
> +    int len = 0;
> +    uint8_t compbuf[COMPRESS_BUF_SIZE];
>  
>      seq_iter++;
>  
> @@ -1106,6 +1486,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>              ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
>              break;
>          case RAM_SAVE_FLAG_PAGE:
> +            quit_thread = true;
>              host = host_from_stream_offset(f, addr, flags);
>              if (!host) {
>                  error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
> @@ -1115,6 +1496,32 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>  
>              qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
>              break;
> +        case RAM_SAVE_FLAG_COMPRESS_PAGE:
> +            host = host_from_stream_offset(f, addr, flags);
> +            if (!host) {
> +                error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
> +                ret = -EINVAL;
> +                break;
> +            }
> +
> +            len = qemu_get_be32(f);
> +            qemu_get_buffer(f, compbuf, len);
> +            int idx;
> +retry:
> +            for (idx = 0; idx < decompress_thread_count; idx++) {
> +                if (decomp_param[idx].state == COM_DONE)  {
> +                    memcpy(decomp_param[idx].compbuf, compbuf, len);
> +                    decomp_param[idx].des = host;
> +                    decomp_param[idx].len = len;
> +                    decomp_param[idx].state = COM_START;
> +                    break;
> +                }
> +            }
> +            if (idx == decompress_thread_count) {
> +                g_usleep(0);
> +                goto retry;
> +            }
> +            break;

Same comments as above.

>          case RAM_SAVE_FLAG_XBZRLE:
>              host = host_from_stream_offset(f, addr, flags);
>              if (!host) {
> diff --git a/hmp-commands.hx b/hmp-commands.hx
> index e37bc8b..8b93bed 100644
> --- a/hmp-commands.hx
> +++ b/hmp-commands.hx
> @@ -941,6 +941,56 @@ Set cache size to @var{value} (in bytes) for xbzrle migrations.
>  ETEXI
>  
>      {
> +        .name       = "migrate_set_compress_level",
> +        .args_type  = "value:i",
> +        .params     = "value",
> +        .help       = "set compress level for compress migrations,"
> +                      "the level is a number between 0 and 9, 0 stands for "
> +                      "no compression.\n"
> +                      "1 stands for the fast compress speed while 9 stands for"
> +                      "the highest compress ratio.",
> +        .mhandler.cmd = hmp_migrate_set_compress_level,
> +    },
> +
> +STEXI
> +@item migrate_set_compress_level @var{value}
> +@findex migrate_set_compress_level
> +Set compress level to @var{value}  for compress migrations.
> +ETEXI
> +
> +    {
> +        .name       = "migrate_set_compress_threads",
> +        .args_type  = "value:i",
> +        .params     = "value",
> +        .help       = "set compress thread count for migrations. "
> +                      "a proper thread count will accelerate the migration speed,"
> +                      "the threads should be between 1 and the CPUS of your system",
> +        .mhandler.cmd = hmp_migrate_set_compress_threads,
> +    },
> +
> +STEXI
> +@item migrate_set_compress_threads @var{value}
> +@findex migrate_set_compress_threads
> +Set compress threads to @var{value}  for compress migrations.
> +ETEXI
> +
> +    {
> +        .name       = "migrate_set_decompress_threads",
> +        .args_type  = "value:i",
> +        .params     = "value",
> +        .help       = "set decompress thread count for migrations. "
> +                      "a proper thread count will accelerate the migration speed,"
> +                      "the threads should be between 1 and the CPUS of your system",
> +        .mhandler.cmd = hmp_migrate_set_decompress_threads,
> +    },
> +
> +STEXI
> +@item migrate_set_decompress_threads @var{value}
> +@findex migrate_set_decompress_threads
> +Set decompress threads to @var{value}  for compress migrations.
> +ETEXI
> +
> +    {
>          .name       = "migrate_set_speed",
>          .args_type  = "value:o",
>          .params     = "value",
> @@ -1766,6 +1816,12 @@ show migration status
>  show current migration capabilities
>  @item info migrate_cache_size
>  show current migration XBZRLE cache size
> +@item info migrate_compress_level
> +show current migration compress level
> +@item info migrate_compress_threads
> +show current migration compress threads
> +@item info migrate_decompress_threads
> +show current migration decompress threads
>  @item info balloon
>  show balloon information
>  @item info qtree
> diff --git a/hmp.c b/hmp.c
> index 63d7686..b1936a3 100644
> --- a/hmp.c
> +++ b/hmp.c
> @@ -252,6 +252,24 @@ void hmp_info_migrate_cache_size(Monitor *mon, const QDict *qdict)
>                     qmp_query_migrate_cache_size(NULL) >> 10);
>  }
>  
> +void hmp_info_migrate_compress_level(Monitor *mon, const QDict *qdict)
> +{
> +    monitor_printf(mon, "compress level: %" PRId64 "\n",
> +                   qmp_query_migrate_compress_level(NULL));
> +}
> +
> +void hmp_info_migrate_compress_threads(Monitor *mon, const QDict *qdict)
> +{
> +    monitor_printf(mon, "compress threads: %" PRId64 "\n",
> +                   qmp_query_migrate_compress_threads(NULL));
> +}
> +
> +void hmp_info_migrate_decompress_threads(Monitor *mon, const QDict *qdict)
> +{
> +    monitor_printf(mon, "decompress threads: %" PRId64 "\n",
> +                   qmp_query_migrate_decompress_threads(NULL));
> +}
> +
>  void hmp_info_cpus(Monitor *mon, const QDict *qdict)
>  {
>      CpuInfoList *cpu_list, *cpu;
> @@ -1041,6 +1059,45 @@ void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict)
>      }
>  }
>  
> +void hmp_migrate_set_compress_level(Monitor *mon, const QDict *qdict)
> +{
> +    int64_t value = qdict_get_int(qdict, "value");
> +    Error *err = NULL;
> +
> +    qmp_migrate_set_compress_level(value, &err);
> +    if (err) {
> +        monitor_printf(mon, "%s\n", error_get_pretty(err));
> +        error_free(err);
> +        return;
> +    }
> +}
> +
> +void hmp_migrate_set_compress_threads(Monitor *mon, const QDict *qdict)
> +{
> +    int64_t value = qdict_get_int(qdict, "value");
> +    Error *err = NULL;
> +
> +    qmp_migrate_set_compress_threads(value, &err);
> +    if (err) {
> +        monitor_printf(mon, "%s\n", error_get_pretty(err));
> +        error_free(err);
> +        return;
> +    }
> +}
> +
> +void hmp_migrate_set_decompress_threads(Monitor *mon, const QDict *qdict)
> +{
> +    int64_t value = qdict_get_int(qdict, "value");
> +    Error *err = NULL;
> +
> +    qmp_migrate_set_decompress_threads(value, &err);
> +    if (err) {
> +        monitor_printf(mon, "%s\n", error_get_pretty(err));
> +        error_free(err);
> +        return;
> +    }
> +}
> +
>  void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict)
>  {
>      int64_t value = qdict_get_int(qdict, "value");
> diff --git a/hmp.h b/hmp.h
> index 4bb5dca..b348806 100644
> --- a/hmp.h
> +++ b/hmp.h
> @@ -29,6 +29,9 @@ void hmp_info_mice(Monitor *mon, const QDict *qdict);
>  void hmp_info_migrate(Monitor *mon, const QDict *qdict);
>  void hmp_info_migrate_capabilities(Monitor *mon, const QDict *qdict);
>  void hmp_info_migrate_cache_size(Monitor *mon, const QDict *qdict);
> +void hmp_info_migrate_compress_level(Monitor *mon, const QDict *qdict);
> +void hmp_info_migrate_compress_threads(Monitor *mon, const QDict *qdict);
> +void hmp_info_migrate_decompress_threads(Monitor *mon, const QDict *qdict);
>  void hmp_info_cpus(Monitor *mon, const QDict *qdict);
>  void hmp_info_block(Monitor *mon, const QDict *qdict);
>  void hmp_info_blockstats(Monitor *mon, const QDict *qdict);
> @@ -64,6 +67,9 @@ void hmp_migrate_set_downtime(Monitor *mon, const QDict *qdict);
>  void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict);
>  void hmp_migrate_set_capability(Monitor *mon, const QDict *qdict);
>  void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict);
> +void hmp_migrate_set_compress_level(Monitor *mon, const QDict *qdict);
> +void hmp_migrate_set_compress_threads(Monitor *mon, const QDict *qdict);
> +void hmp_migrate_set_decompress_threads(Monitor *mon, const QDict *qdict);
>  void hmp_set_password(Monitor *mon, const QDict *qdict);
>  void hmp_expire_password(Monitor *mon, const QDict *qdict);
>  void hmp_eject(Monitor *mon, const QDict *qdict);
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index 3cb5ba8..03c8e0d 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -49,6 +49,9 @@ struct MigrationState
>      QemuThread thread;
>      QEMUBH *cleanup_bh;
>      QEMUFile *file;
> +    QemuThread *compress_thread;
> +    int compress_thread_count;
> +    int compress_level;
>  
>      int state;
>      MigrationParams params;
> @@ -64,6 +67,7 @@ struct MigrationState
>      int64_t dirty_sync_count;
>  };
>  
> +extern bool incomming_migration_done;
>  void process_incoming_migration(QEMUFile *f);
>  
>  void qemu_start_incoming_migration(const char *uri, Error **errp);
> @@ -107,6 +111,10 @@ bool migration_has_finished(MigrationState *);
>  bool migration_has_failed(MigrationState *);
>  MigrationState *migrate_get_current(void);
>  
> +void migrate_compress_threads_create(MigrationState *s);
> +void migrate_compress_threads_join(MigrationState *s);
> +void migrate_decompress_threads_create(int count);
> +void migrate_decompress_threads_join(void);
>  uint64_t ram_bytes_remaining(void);
>  uint64_t ram_bytes_transferred(void);
>  uint64_t ram_bytes_total(void);
> @@ -144,7 +152,7 @@ void migrate_del_blocker(Error *reason);
>  
>  bool migrate_rdma_pin_all(void);
>  bool migrate_zero_blocks(void);
> -
> +bool migrate_use_compress(void);
>  bool migrate_auto_converge(void);
>  
>  int xbzrle_encode_buffer(uint8_t *old_buf, uint8_t *new_buf, int slen,
> @@ -153,6 +161,8 @@ int xbzrle_decode_buffer(uint8_t *src, int slen, uint8_t *dst, int dlen);
>  
>  int migrate_use_xbzrle(void);
>  int64_t migrate_xbzrle_cache_size(void);
> +int migrate_compress_level(void);
> +int migrate_compress_threads(void);
>  
>  int64_t xbzrle_cache_resize(int64_t new_size);
>  
> diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h
> index 401676b..431e6cc 100644
> --- a/include/migration/qemu-file.h
> +++ b/include/migration/qemu-file.h
> @@ -112,6 +112,7 @@ QEMUFile *qemu_bufopen(const char *mode, QEMUSizedBuffer *input);
>  int qemu_get_fd(QEMUFile *f);
>  int qemu_fclose(QEMUFile *f);
>  int64_t qemu_ftell(QEMUFile *f);
> +uint64_t qemu_add_compress(QEMUFile *f,  const uint8_t *p, int size);

Huh? I don't see the code for this anywhere?

>  void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size);
>  void qemu_put_byte(QEMUFile *f, int v);
>  /*
> diff --git a/migration.c b/migration.c
> index c49a05a..716de97 100644
> --- a/migration.c
> +++ b/migration.c
> @@ -46,6 +46,12 @@ enum {
>  /* Migration XBZRLE default cache size */
>  #define DEFAULT_MIGRATE_CACHE_SIZE (64 * 1024 * 1024)
>  
> +/* Migration compress default thread count */
> +#define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8
> +#define DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT 2
> +/*0: means nocompress, 1: best speed, ... 9: best compress ratio */
> +#define DEFAULT_MIGRATE_COMPRESS_LEVEL 1
> +
>  static NotifierList migration_state_notifiers =
>      NOTIFIER_LIST_INITIALIZER(migration_state_notifiers);
>  
> @@ -60,6 +66,8 @@ MigrationState *migrate_get_current(void)
>          .bandwidth_limit = MAX_THROTTLE,
>          .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
>          .mbps = -1,
> +        .compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
> +        .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL,
>      };
>  
>      return &current_migration;
> @@ -101,6 +109,7 @@ static void process_incoming_migration_co(void *opaque)
>          error_report("load of migration failed: %s", strerror(-ret));
>          exit(EXIT_FAILURE);
>      }
> +    incomming_migration_done = true;
>      qemu_announce_self();
>  
>      /* Make sure all file formats flush their mutable metadata */
> @@ -116,10 +125,14 @@ static void process_incoming_migration_co(void *opaque)
>      } else {
>          runstate_set(RUN_STATE_PAUSED);
>      }
> +    migrate_decompress_threads_join();
>  }
>  
> +static int uncompress_thread_count = DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT;
>  void process_incoming_migration(QEMUFile *f)
>  {
> +    incomming_migration_done = false;
> +    migrate_decompress_threads_create(uncompress_thread_count);
>      Coroutine *co = qemu_coroutine_create(process_incoming_migration_co);
>      int fd = qemu_get_fd(f);
>  
> @@ -302,6 +315,7 @@ static void migrate_fd_cleanup(void *opaque)
>          qemu_thread_join(&s->thread);
>          qemu_mutex_lock_iothread();
>  
> +        migrate_compress_threads_join(s);
>          qemu_fclose(s->file);
>          s->file = NULL;
>      }
> @@ -373,6 +387,8 @@ static MigrationState *migrate_init(const MigrationParams *params)
>      int64_t bandwidth_limit = s->bandwidth_limit;
>      bool enabled_capabilities[MIGRATION_CAPABILITY_MAX];
>      int64_t xbzrle_cache_size = s->xbzrle_cache_size;
> +    int compress_level = s->compress_level;
> +    int compress_thread_count = s->compress_thread_count;
>  
>      memcpy(enabled_capabilities, s->enabled_capabilities,
>             sizeof(enabled_capabilities));
> @@ -383,6 +399,8 @@ static MigrationState *migrate_init(const MigrationParams *params)
>             sizeof(enabled_capabilities));
>      s->xbzrle_cache_size = xbzrle_cache_size;
>  
> +    s->compress_level = compress_level;
> +    s->compress_thread_count = compress_thread_count;
>      s->bandwidth_limit = bandwidth_limit;
>      s->state = MIG_STATE_SETUP;
>      trace_migrate_set_state(MIG_STATE_SETUP);
> @@ -503,6 +521,59 @@ int64_t qmp_query_migrate_cache_size(Error **errp)
>      return migrate_xbzrle_cache_size();
>  }
>  
> +void qmp_migrate_set_compress_level(int64_t value, Error **errp)
> +{
> +    MigrationState *s = migrate_get_current();
> +
> +    if (value > 9 || value < 0) {
> +        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress level",
> +                  "is invalid, please input a integer between 0 and 9. ");
> +        return;
> +    }
> +
> +    s->compress_level = value;
> +}
> +
> +int64_t qmp_query_migrate_compress_level(Error **errp)
> +{
> +    return migrate_compress_level();
> +}
> +
> +void qmp_migrate_set_compress_threads(int64_t value, Error **errp)
> +{
> +    MigrationState *s = migrate_get_current();
> +
> +    if (value > 255 || value < 1) {
> +        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress thread count",
> +                  "is invalid, please input a integer between 1 and 255. ");
> +        return;
> +    }
> +
> +    s->compress_thread_count = value;
> +}
> +
> +void qmp_migrate_set_decompress_threads(int64_t value, Error **errp)
> +{
> +
> +    if (value > 255 || value < 1) {
> +        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress thread count",
> +                  "is invalid, please input a integer between 1 and 255. ");
> +        return;
> +    }
> +
> +    uncompress_thread_count = value;
> +}
> +
> +int64_t qmp_query_migrate_compress_threads(Error **errp)
> +{
> +    return migrate_compress_threads();
> +}
> +
> +int64_t qmp_query_migrate_decompress_threads(Error **errp)
> +{
> +    return uncompress_thread_count;
> +}
> +
>  void qmp_migrate_set_speed(int64_t value, Error **errp)
>  {
>      MigrationState *s;
> @@ -555,6 +626,33 @@ bool migrate_zero_blocks(void)
>      return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_BLOCKS];
>  }
>  
> +bool migrate_use_compress(void)
> +{
> +    MigrationState *s;
> +
> +    s = migrate_get_current();
> +
> +    return s->enabled_capabilities[MIGRATION_CAPABILITY_COMPRESS];
> +}
> +
> +int migrate_compress_level(void)
> +{
> +    MigrationState *s;
> +
> +    s = migrate_get_current();
> +
> +    return s->compress_level;
> +}
> +
> +int migrate_compress_threads(void)
> +{
> +    MigrationState *s;
> +
> +    s = migrate_get_current();
> +
> +    return s->compress_thread_count;
> +}
> +
>  int migrate_use_xbzrle(void)
>  {
>      MigrationState *s;
> @@ -697,4 +795,5 @@ void migrate_fd_connect(MigrationState *s)
>  
>      qemu_thread_create(&s->thread, "migration", migration_thread, s,
>                         QEMU_THREAD_JOINABLE);
> +    migrate_compress_threads_create(s);
>  }
> diff --git a/monitor.c b/monitor.c
> index 905d8cf..365547e 100644
> --- a/monitor.c
> +++ b/monitor.c
> @@ -2865,6 +2865,27 @@ static mon_cmd_t info_cmds[] = {
>          .mhandler.cmd = hmp_info_migrate_cache_size,
>      },
>      {
> +        .name       = "migrate_compress_level",
> +        .args_type  = "",
> +        .params     = "",
> +        .help       = "show current migration compress level",
> +        .mhandler.cmd = hmp_info_migrate_compress_level,
> +    },
> +    {
> +        .name       = "migrate_compress_threads",
> +        .args_type  = "",
> +        .params     = "",
> +        .help       = "show current migration compress thread count",
> +        .mhandler.cmd = hmp_info_migrate_compress_threads,
> +    },
> +    {
> +        .name       = "migrate_decompress_threads",
> +        .args_type  = "",
> +        .params     = "",
> +        .help       = "show current migration decompress thread count",
> +        .mhandler.cmd = hmp_info_migrate_decompress_threads,
> +    },
> +    {
>          .name       = "balloon",
>          .args_type  = "",
>          .params     = "",
> diff --git a/qapi-schema.json b/qapi-schema.json
> index 24379ab..71a9e0f 100644
> --- a/qapi-schema.json
> +++ b/qapi-schema.json
> @@ -491,13 +491,17 @@
>  #          to enable the capability on the source VM. The feature is disabled by
>  #          default. (since 1.6)
>  #
> +# @compress: Using the multiple compression threads to accelerate live migration.
> +#          This feature can help to reduce the migration traffic, by sending
> +#          compressed pages. The feature is disabled by default. (since 2.3)
> +#
>  # @auto-converge: If enabled, QEMU will automatically throttle down the guest
>  #          to speed up convergence of RAM migration. (since 1.6)
>  #
>  # Since: 1.2
>  ##
>  { 'enum': 'MigrationCapability',
> -  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks'] }
> +  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks', 'compress'] }
>  
>  ##
>  # @MigrationCapabilityStatus
> @@ -1382,6 +1386,88 @@
>  { 'command': 'query-migrate-cache-size', 'returns': 'int' }
>  
>  ##
> +# @migrate-set-compress-level
> +#
> +# Set compress level
> +#
> +# @value: compress level int
> +#
> +# The compress level will be an integer between 0 and 9.
> +# The compress level can be modified before and during ongoing migration
> +#
> +# Returns: nothing on success
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'migrate-set-compress-level', 'data': {'value': 'int'} }
> +
> +##
> +# @query-migrate-compress-level
> +#
> +# query compress level
> +#
> +# Returns: compress level int
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'query-migrate-compress-level', 'returns': 'int' }
> +
> +##
> +# @migrate-set-compress-threads
> +#
> +# Set compress threads
> +#
> +# @value: compress threads int
> +#
> +# The compress thread count is an integer between 1 and 255.
> +# The compress level can be modified only before migration
> +#
> +# Returns: nothing on success
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'migrate-set-compress-threads', 'data': {'value': 'int'} }
> +
> +##
> +# @query-migrate-compress-threads
> +#
> +# query compress threads
> +#
> +# Returns: compress threads int
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'query-migrate-compress-threads', 'returns': 'int' }
> +
> +##
> +##
> +# @migrate-set-decompress-threads
> +#
> +# Set decompress threads
> +#
> +# @value: decompress threads int
> +#
> +# The decompress thread count is an integer between 1 and 255.
> +# The decompress level can be modified only before migration
> +#
> +# Returns: nothing on success
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'migrate-set-decompress-threads', 'data': {'value': 'int'} }
> +
> +##
> +# @query-migrate-decompress-threads
> +#
> +# query decompress threads
> +#
> +# Returns: decompress threads int
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'query-migrate-decompress-threads', 'returns': 'int' }
> +
> +##
>  # @ObjectPropertyInfo:
>  #
>  # @name: the name of the property
> diff --git a/qmp-commands.hx b/qmp-commands.hx
> index 1abd619..b60fdab 100644
> --- a/qmp-commands.hx
> +++ b/qmp-commands.hx
> @@ -705,7 +705,138 @@ Example:
>  <- { "return": 67108864 }
>  
>  EQMP
> +{
> +        .name       = "migrate-set-compress-level",
> +        .args_type  = "value:i",
> +        .mhandler.cmd_new = qmp_marshal_input_migrate_set_compress_level,
> +    },
> +
> +SQMP
> +migrate-set-compress-level
> +----------------------
> +
> +Set compress level to be used by compress migration, the compress level is an integer
> +between 0 and 9
> +
> +Arguments:
> +
> +- "value": compress level (json-int)
> +
> +Example:
> +
> +-> { "execute": "migrate-set-compress-level", "arguments": { "value": 536870912 } }
> +<- { "return": {} }
> +
> +EQMP
> +    {
> +        .name       = "query-migrate-compress-level",
> +        .args_type  = "",
> +        .mhandler.cmd_new = qmp_marshal_input_query_migrate_compress_level,
> +    },
> +
> +SQMP
> +query-migrate-compress-level
> +------------------------
> +
> +Show compress level to be used by compress migration
> +
> +returns a json-object with the following information:
> +- "size" : json-int
> +
> +Example:
> +
> +-> { "execute": "query-migrate-compress-level" }
> +<- { "return": 67108864 }
> +
> +EQMP
> +{
> +        .name       = "migrate-set-compress-threads",
> +        .args_type  = "value:i",
> +        .mhandler.cmd_new = qmp_marshal_input_migrate_set_compress_threads,
> +    },
> +
> +SQMP
> +migrate-set-compress-threads
> +----------------------
> +
> +Set compress thread count to be used by compress migration, the compress thread count is an integer
> +between 1 and 255
> +
> +Arguments:
> +
> +- "value": compress threads (json-int)
> +
> +Example:
> +
> +-> { "execute": "migrate-set-compress-threads", "arguments": { "value": 536870912 } }
> +<- { "return": {} }
> +
> +EQMP
> +    {
> +        .name       = "query-migrate-compress-threads",
> +        .args_type  = "",
> +        .mhandler.cmd_new = qmp_marshal_input_query_migrate_compress_threads,
> +    },
> +
> +SQMP
> +query-migrate-compress-threads
> +------------------------
> +
> +Show compress thread count to be used by compress migration
> +
> +returns a json-object with the following information:
> +- "size" : json-int
> +
> +Example:
> +
> +-> { "execute": "query-migrate-compress-threads" }
> +<- { "return": 67108864 }
> +
> +EQMP
> +{
> +        .name       = "migrate-set-decompress-threads",
> +        .args_type  = "value:i",
> +        .mhandler.cmd_new = qmp_marshal_input_migrate_set_decompress_threads,
> +    },
> +
> +SQMP
> +migrate-set-decompress-threads
> +----------------------
> +
> +Set decompress thread count to be used by compress migration, the decompress thread count is an integer
> +between 1 and 255
> +
> +Arguments:
> +
> +- "value": decompress threads (json-int)
> +
> +Example:
> +
> +-> { "execute": "migrate-set-decompress-threads", "arguments": { "value": 536870912 } }
> +<- { "return": {} }
>  
> +EQMP
> +    {
> +        .name       = "query-migrate-decompress-threads",
> +        .args_type  = "",
> +        .mhandler.cmd_new = qmp_marshal_input_query_migrate_decompress_threads,
> +    },
> +
> +SQMP
> +query-migrate-decompress-threads
> +------------------------
> +
> +Show decompress thread count to be used by compress migration
> +
> +returns a json-object with the following information:
> +- "size" : json-int
> +
> +Example:
> +
> +-> { "execute": "query-migrate-compress-threads" }
> +<- { "return": 67108864 }
> +
> +EQMP
>      {
>          .name       = "migrate_set_speed",
>          .args_type  = "value:o",
> -- 
> 1.9.1
> 
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Zhang, Yang Z Nov. 21, 2014, 6:18 a.m. UTC | #3
Eric Blake wrote on 2014-11-06:

Hi Eric

Thanks for your review and comment.

> On 11/06/2014 12:08 PM, Li Liang wrote:

>> Instead of sending the guest memory directly, this solution compress 

>> the ram page before sending, after receiving, the data will be 

>> decompressed.

>> This feature can help to reduce the data transferred about 60%, this 

>> is very useful when the network bandwidth is limited, and the 

>> migration time can also be reduced about 70%. The feature is off by 

>> default, following the document docs/multiple-compression-threads.txt

>> for information to use it.

>> 

>> Reviewed-by: Eric Blake <eblake@redhat.com>

> 

> Please DON'T add this line unless the author spelled it out (or if 

> they mentioned that it would be okay if you fix minor issues).  I 

> intentionally omitted a reviewed-by on v1:

> 

> https://lists.gnu.org/archive/html/qemu-devel/2014-11/msg00672.html

> 

> because I was not happy with the patch as it was presented and did not 

> think the work to fix it was trivial.  Furthermore, my review of v1 

> was just over the interface, and not the entire patch; there are very 

> likely still bugs lurking in the .c files.  Once again, I'm going to 

> limit my review of v2 to the interface (at least in this email):

> 

>> Signed-off-by: Li Liang <liang.z.li@intel.com>

>> ---

>> 

>> +++ b/qapi-schema.json

>> @@ -491,13 +491,17 @@

>>  #          to enable the capability on the source VM. The feature is

>>  disabled by #          default. (since 1.6) #

>> +# @compress: Using the multiple compression threads to accelerate

>> +live migration. +#          This feature can help to reduce the

>> migration traffic, by sending +#          compressed pages. The feature

>> is disabled by default. (since 2.3) +#  # @auto-converge: If enabled, 

>> QEMU will automatically throttle down the

> guest

>>  #          to speed up convergence of RAM migration. (since 1.6)

>>  #

>>  # Since: 1.2

>>  ##

>>  { 'enum': 'MigrationCapability',

>> -  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks'] 

>> }

>> +  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks', 

>> + 'compress'] }

>> 

> 

> I'll repeat what I said on v1 (but this time, with some links to back 

> it up :)


Agree. Additional commands lead to complexity. 

> 

> We really need to avoid a proliferation of new commands, two per 

> tunable does not scale well.  I think now is the time to implement my 

> earlier suggestion at making MigrationCapability become THE resource for tunables:

> 

> https://lists.gnu.org/archive/html/qemu-devel/2014-03/msg02274.html


I see that you said you are trying to clean up this part. Have you done it? It is helpful if you can provide some draft patches or help on this since you are more professional than us on this part.

Best regards,
Yang
Li, Liang Z Nov. 21, 2014, 7:01 a.m. UTC | #4
> > +static void migrate_put_be32(MigBuf *f, unsigned int v)
> > +{
> > +    migrate_put_byte(f, v >> 24);
> > +    migrate_put_byte(f, v >> 16);
> > +    migrate_put_byte(f, v >> 8);
> > +    migrate_put_byte(f, v);
> > +}
> > +
> > +static void migrate_put_be64(MigBuf *f, uint64_t v)
> > +{
> > +    migrate_put_be32(f, v >> 32);
> > +    migrate_put_be32(f, v);
> > +}
> > +

> This feels like you're doing something very similar to 
> the buffered file code that recently went in; could you
> reuse qemu_bufopen or the QEMUSizedBuffer?
> I think if you could use the qemu_buf somehow (maybe with
> modifications?) then you could avoid a lot of the 'if'd
> code below, because you'd always be working with a QEMUFile,
> it would just be a different QEMUFile.

I will do it in the next version patch.  

> > +static void *do_data_compress(void *opaque)
> > +{
> > +    compress_param *param = opaque;
> > +    while (!quit_thread) {
> > +        if (param->state == COM_START) {
> > +            save_compress_ram_page(param);
> > +            param->state = COM_DONE;
> > +         } else {
> > +             g_usleep(1);
> 
> > There has to be a better way than heaving your thread spin
> > with sleeps; qemu_event or semaphore or something?

I will use QemuCond and QemuMutex  instead.

> > +static int save_compress_ram_page(compress_param *param)
> > +{
> > +    int bytes_sent = param->bytes_sent;
> > +    int blen = COMPRESS_BUF_SIZE;
> > +    int cont = param->cont;
> > +    uint8_t *p = param->p;
> > +    int ret = param->ret;
> > +    RAMBlock *block = param->block;
> > +    ram_addr_t offset = param->offset;
> > +    bool last_stage = param->last_stage;
> > +    /* In doubt sent page as normal */
> > +    XBZRLE_cache_lock();
> > +    ram_addr_t current_addr = block->offset + offset;
> > +    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
> > +        if (ret != RAM_SAVE_CONTROL_DELAYED) {
> > +            if (bytes_sent > 0) {
> > +                atomic_inc(&acct_info.norm_pages);
> > +             } else if (bytes_sent == 0) {
> > +                atomic_inc(&acct_info.dup_pages);
> > +             }
> > +        }
> > +    } else if (is_zero_range(p, TARGET_PAGE_SIZE)) {
> > +        atomic_inc(&acct_info.dup_pages);
> > +        bytes_sent = migrate_save_block_hdr(&param->migbuf, block, offset, cont,
> > +                             RAM_SAVE_FLAG_COMPRESS);
> > +        migrate_put_byte(&param->migbuf, 0);
> > +        bytes_sent++;
> > +        /* Must let xbzrle know, otherwise a previous (now 0'd) cached
> > +         * page would be stale
> > +         */
> > +        xbzrle_cache_zero_page(current_addr);
> > +    } else if (!param->bulk_stage && migrate_use_xbzrle()) {
> > +        bytes_sent = save_xbzrle_page(&param->migbuf, &p, current_addr, block,
> > +                              offset, cont, last_stage, true);
> > +    }
> > +    XBZRLE_cache_unlock();
> > +    /* XBZRLE overflow or normal page */

> I wonder if it's worth the complexity of doing the zero check
> and the xbzrle if you're already doing compression?  I assume
> zlib is going to handle a zero page reasonably well anyway?

Yes, the test show it's worth, with zero check is time will be shorter. The reason for checking the xbzrle is that  I want the compression co-work with xbzrle, 
using xbzrle can reduce the amount of data transferred.

> > +static uint64_t bytes_transferred;
> > +
> > +static void flush_compressed_data(QEMUFile *f)
> > +{
> > +    int idx;
> > +    if (!migrate_use_compress()) {
> > +        return;
> > +    }
> > +
> > +    for (idx = 0; idx < compress_thread_count; idx++) {
> > +        while (comp_param[idx].state != COM_DONE) {
> > +            g_usleep(0);
> > +        }

> Again, some type of event/semaphore rather than busy sleeping;
> and also I don't understand how the different threads keep everything
> in order - can you add some comments (or maybe notes in the docs)
> that explain how it all works?

I will add some comments in next version.

> >              }
> >          } else {
> > -            bytes_sent = ram_save_page(f, block, offset, last_stage);
> > -
> > -            /* if page is unmodified, continue to the next */
> > -            if (bytes_sent > 0) {
> > -                last_sent_block = block;
> > -                break;
> > +            if (!migrate_use_compress()) {
> > +                bytes_sent = ram_save_page(f, block, offset, last_stage);
> > +                /* if page is unmodified, continue to the next */
> > +                if (bytes_sent > 0) {
> > +                    last_sent_block = block;
> > +                    break;
> > +                }
> > +            } else {
> > +                cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
> > +                p = memory_region_get_ram_ptr(block->mr) + offset;
> > +                ret = ram_control_save_page(f, block->offset,
> > +                           offset, TARGET_PAGE_SIZE, &len);
> > +                if ((!ram_bulk_stage && migrate_use_xbzrle()) || cont == 0) {
> > +                    if (cont == 0) {
> > +                        flush_compressed_data(f);
> > +                    }
> > +                    set_common_compress_params(&comp_param[0],
> > +                        ret, len, block, offset, last_stage, cont,
> > +                        p, ram_bulk_stage);
> > +                    bytes_sent = save_compress_ram_page(&comp_param[0]);
> > +                    if (bytes_sent > 0) {
> > +                        qemu_put_buffer(f, comp_param[0].migbuf.buf,
> > +                            comp_param[0].migbuf.buf_index);
> > +                        comp_param[0].migbuf.buf_index = 0;
> > +                        last_sent_block = block;
> > +                        break;
> > +                    }

> Is there no way to move this down into your save_compress_ram_page?
> When I split the code into ram_find_and_save_block and ram_save_page
> a few months ago, it meant that 'ram_find_and_save_block' only really
> did the work of finding what to send, and 'ram_save_page' figured out
> everything to do with sending it; it would be nice to keep all
> the details of sending it separate still.

I will rewrite the code.

> Since ram_bulk_stage is a static global in this file, why bother passing
> it into the 'compress_params'?  I think you could probably avoid a lot
> of things like that.

Passing ram_bulk_stage to compress_params is important to make things correct.

Liang
ChenLiang Nov. 21, 2014, 7:29 a.m. UTC | #5
On 2014/11/6 19:08, Li Liang wrote:

> Instead of sending the guest memory directly, this solution compress
> the ram page before sending, after receiving, the data will be
> decompressed.
> This feature can help to reduce the data transferred about
> 60%, this is very useful when the network bandwidth is limited,
> and the migration time can also be reduced about 70%. The
> feature is off by default, following the document
> docs/multiple-compression-threads.txt for information to use it.
> 
> Reviewed-by: Eric Blake <eblake@redhat.com>
> Signed-off-by: Li Liang <liang.z.li@intel.com>
> ---
>  arch_init.c                   | 435 ++++++++++++++++++++++++++++++++++++++++--
>  hmp-commands.hx               |  56 ++++++
>  hmp.c                         |  57 ++++++
>  hmp.h                         |   6 +
>  include/migration/migration.h |  12 +-
>  include/migration/qemu-file.h |   1 +
>  migration.c                   |  99 ++++++++++
>  monitor.c                     |  21 ++
>  qapi-schema.json              |  88 ++++++++-
>  qmp-commands.hx               | 131 +++++++++++++
>  10 files changed, 890 insertions(+), 16 deletions(-)
> 
> diff --git a/arch_init.c b/arch_init.c
> index 88a5ba0..a27d87b 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -24,6 +24,7 @@
>  #include <stdint.h>
>  #include <stdarg.h>
>  #include <stdlib.h>
> +#include <zlib.h>
>  #ifndef _WIN32
>  #include <sys/types.h>
>  #include <sys/mman.h>
> @@ -126,6 +127,7 @@ static uint64_t bitmap_sync_count;
>  #define RAM_SAVE_FLAG_CONTINUE 0x20
>  #define RAM_SAVE_FLAG_XBZRLE   0x40
>  /* 0x80 is reserved in migration.h start with 0x100 next */
> +#define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
>  
>  static struct defconfig_file {
>      const char *filename;
> @@ -332,6 +334,177 @@ static uint64_t migration_dirty_pages;
>  static uint32_t last_version;
>  static bool ram_bulk_stage;
>  
> +#define COMPRESS_BUF_SIZE (TARGET_PAGE_SIZE + 16)
> +#define MIG_BUF_SIZE (COMPRESS_BUF_SIZE + 256 + 16)
> +struct MigBuf {
> +    int buf_index;
> +    uint8_t buf[MIG_BUF_SIZE];
> +};
> +
> +typedef struct MigBuf MigBuf;
> +
> +static void migrate_put_byte(MigBuf *f, int v)
> +{
> +    f->buf[f->buf_index] = v;
> +    f->buf_index++;
> +}
> +
> +static void migrate_put_be16(MigBuf *f, unsigned int v)
> +{
> +    migrate_put_byte(f, v >> 8);
> +    migrate_put_byte(f, v);
> +}
> +
> +static void migrate_put_be32(MigBuf *f, unsigned int v)
> +{
> +    migrate_put_byte(f, v >> 24);
> +    migrate_put_byte(f, v >> 16);
> +    migrate_put_byte(f, v >> 8);
> +    migrate_put_byte(f, v);
> +}
> +
> +static void migrate_put_be64(MigBuf *f, uint64_t v)
> +{
> +    migrate_put_be32(f, v >> 32);
> +    migrate_put_be32(f, v);
> +}
> +
> +static void migrate_put_buffer(MigBuf *f, const uint8_t *buf, int size)
> +{
> +    int l;
> +
> +    while (size > 0) {
> +        l = MIG_BUF_SIZE - f->buf_index;
> +        if (l > size) {
> +            l = size;
> +        }
> +        memcpy(f->buf + f->buf_index, buf, l);
> +        f->buf_index += l;
> +        buf += l;
> +        size -= l;
> +    }
> +}
> +
> +static size_t migrate_save_block_hdr(MigBuf *f, RAMBlock *block,
> +        ram_addr_t offset, int cont, int flag)
> +{
> +    size_t size;
> +
> +    migrate_put_be64(f, offset | cont | flag);
> +    size = 8;
> +
> +    if (!cont) {
> +        migrate_put_byte(f, strlen(block->idstr));
> +        migrate_put_buffer(f, (uint8_t *)block->idstr,
> +                        strlen(block->idstr));
> +        size += 1 + strlen(block->idstr);
> +    }
> +    return size;
> +}
> +
> +static int migrate_qemu_add_compress(MigBuf *f,  const uint8_t *p,
> +        int size, int level)
> +{
> +    uLong  blen = COMPRESS_BUF_SIZE;
> +    if (compress2(f->buf + f->buf_index + sizeof(int), &blen, (Bytef *)p,
> +            size, level) != Z_OK) {
> +        error_report("Compress Failed!\n");
> +        return 0;
> +    }
> +    migrate_put_be32(f, blen);
> +    f->buf_index += blen;
> +    return blen + sizeof(int);
> +}
> +
> +enum {
> +    COM_DONE = 0,
> +    COM_START,
> +};
> +
> +static int  compress_thread_count;
> +static int  decompress_thread_count;
> +
> +struct compress_param {
> +    int state;
> +    MigBuf migbuf;
> +    RAMBlock *block;
> +    ram_addr_t offset;
> +    bool last_stage;
> +    int ret;
> +    int bytes_sent;
> +    uint8_t *p;
> +    int cont;
> +    bool bulk_stage;
> +};
> +
> +typedef struct compress_param compress_param;
> +compress_param *comp_param;
> +
> +struct decompress_param {
> +    int state;
> +    void *des;
> +    uint8 compbuf[COMPRESS_BUF_SIZE];
> +    int len;
> +};
> +typedef struct decompress_param decompress_param;
> +
> +static decompress_param *decomp_param;
> +bool incomming_migration_done;
> +static bool quit_thread;
> +
> +static int save_compress_ram_page(compress_param *param);
> +
> +
> +static void *do_data_compress(void *opaque)
> +{
> +    compress_param *param = opaque;
> +    while (!quit_thread) {
> +        if (param->state == COM_START) {
> +            save_compress_ram_page(param);
> +            param->state = COM_DONE;
> +         } else {
> +             g_usleep(1);
> +         }
> +    }
> +
> +    return NULL;
> +}
> +
> +
> +void migrate_compress_threads_join(MigrationState *s)
> +{
> +    int i;
> +    if (!migrate_use_compress()) {
> +        return;
> +    }
> +    quit_thread = true;
> +    for (i = 0; i < compress_thread_count; i++) {
> +        qemu_thread_join(s->compress_thread + i);
> +    }
> +    g_free(s->compress_thread);
> +    g_free(comp_param);
> +    s->compress_thread = NULL;
> +    comp_param = NULL;
> +}
> +
> +void migrate_compress_threads_create(MigrationState *s)
> +{
> +    int i;
> +    if (!migrate_use_compress()) {
> +        return;
> +    }
> +    quit_thread = false;
> +    compress_thread_count = s->compress_thread_count;
> +    s->compress_thread = g_malloc0(sizeof(QemuThread)
> +        * s->compress_thread_count);
> +    comp_param = g_malloc0(sizeof(compress_param) * s->compress_thread_count);
> +    for (i = 0; i < s->compress_thread_count; i++) {
> +        qemu_thread_create(s->compress_thread + i, "compress",
> +            do_data_compress, comp_param + i, QEMU_THREAD_JOINABLE);
> +
> +    }
> +}
> +
>  /* Update the xbzrle cache to reflect a page that's been sent as all 0.
>   * The important thing is that a stale (not-yet-0'd) page be replaced
>   * by the new data.
> @@ -351,9 +524,10 @@ static void xbzrle_cache_zero_page(ram_addr_t current_addr)
>  
>  #define ENCODING_FLAG_XBZRLE 0x1
>  
> -static int save_xbzrle_page(QEMUFile *f, uint8_t **current_data,
> +static int save_xbzrle_page(void *f, uint8_t **current_data,
>                              ram_addr_t current_addr, RAMBlock *block,
> -                            ram_addr_t offset, int cont, bool last_stage)
> +                            ram_addr_t offset, int cont, bool last_stage,
> +                            bool save_to_buf)
>  {
>      int encoded_len = 0, bytes_sent = -1;
>      uint8_t *prev_cached_page;
> @@ -401,10 +575,19 @@ static int save_xbzrle_page(QEMUFile *f, uint8_t **current_data,
>      }
>  
>      /* Send XBZRLE based compressed page */
> -    bytes_sent = save_block_hdr(f, block, offset, cont, RAM_SAVE_FLAG_XBZRLE);
> -    qemu_put_byte(f, ENCODING_FLAG_XBZRLE);
> -    qemu_put_be16(f, encoded_len);
> -    qemu_put_buffer(f, XBZRLE.encoded_buf, encoded_len);
> +    if (save_to_buf) {
> +        bytes_sent = migrate_save_block_hdr((MigBuf *)f, block, offset,
> +            cont, RAM_SAVE_FLAG_XBZRLE);
> +        migrate_put_byte((MigBuf *)f, ENCODING_FLAG_XBZRLE);
> +        migrate_put_be16((MigBuf *)f, encoded_len);
> +        migrate_put_buffer((MigBuf *)f, XBZRLE.encoded_buf, encoded_len);
> +    } else {
> +        bytes_sent = save_block_hdr((QEMUFile *)f, block, offset,
> +            cont, RAM_SAVE_FLAG_XBZRLE);
> +        qemu_put_byte((QEMUFile *)f, ENCODING_FLAG_XBZRLE);
> +        qemu_put_be16((QEMUFile *)f, encoded_len);
> +        qemu_put_buffer((QEMUFile *)f, XBZRLE.encoded_buf, encoded_len);
> +    }
>      bytes_sent += encoded_len + 1 + 2;
>      acct_info.xbzrle_pages++;
>      acct_info.xbzrle_bytes += bytes_sent;
> @@ -609,7 +792,7 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
>          xbzrle_cache_zero_page(current_addr);
>      } else if (!ram_bulk_stage && migrate_use_xbzrle()) {
>          bytes_sent = save_xbzrle_page(f, &p, current_addr, block,
> -                                      offset, cont, last_stage);
> +                                      offset, cont, last_stage, false);
>          if (!last_stage) {
>              /* Can't send this cached data async, since the cache page
>               * might get updated before it gets to the wire
> @@ -635,6 +818,90 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
>      return bytes_sent;
>  }
>  
> +static int save_compress_ram_page(compress_param *param)
> +{
> +    int bytes_sent = param->bytes_sent;
> +    int blen = COMPRESS_BUF_SIZE;
> +    int cont = param->cont;
> +    uint8_t *p = param->p;
> +    int ret = param->ret;
> +    RAMBlock *block = param->block;
> +    ram_addr_t offset = param->offset;
> +    bool last_stage = param->last_stage;
> +    /* In doubt sent page as normal */
> +    XBZRLE_cache_lock();
> +    ram_addr_t current_addr = block->offset + offset;
> +    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
> +        if (ret != RAM_SAVE_CONTROL_DELAYED) {
> +            if (bytes_sent > 0) {
> +                atomic_inc(&acct_info.norm_pages);
> +             } else if (bytes_sent == 0) {
> +                atomic_inc(&acct_info.dup_pages);
> +             }
> +        }
> +    } else if (is_zero_range(p, TARGET_PAGE_SIZE)) {
> +        atomic_inc(&acct_info.dup_pages);
> +        bytes_sent = migrate_save_block_hdr(&param->migbuf, block, offset, cont,
> +                             RAM_SAVE_FLAG_COMPRESS);
> +        migrate_put_byte(&param->migbuf, 0);
> +        bytes_sent++;
> +        /* Must let xbzrle know, otherwise a previous (now 0'd) cached
> +         * page would be stale
> +         */
> +        xbzrle_cache_zero_page(current_addr);
> +    } else if (!param->bulk_stage && migrate_use_xbzrle()) {
> +        bytes_sent = save_xbzrle_page(&param->migbuf, &p, current_addr, block,
> +                              offset, cont, last_stage, true);
> +    }
> +    XBZRLE_cache_unlock();
> +    /* XBZRLE overflow or normal page */
> +    if (bytes_sent == -1) {
> +        bytes_sent = migrate_save_block_hdr(&param->migbuf, block,
> +            offset, cont, RAM_SAVE_FLAG_COMPRESS_PAGE);
> +        blen = migrate_qemu_add_compress(&param->migbuf, p,
> +            TARGET_PAGE_SIZE, migrate_compress_level());
> +        bytes_sent += blen;
> +        atomic_inc(&acct_info.norm_pages);
> +    }
> +    return bytes_sent;
> +}
> +
> +static uint64_t bytes_transferred;
> +
> +static void flush_compressed_data(QEMUFile *f)
> +{
> +    int idx;
> +    if (!migrate_use_compress()) {
> +        return;
> +    }
> +
> +    for (idx = 0; idx < compress_thread_count; idx++) {
> +        while (comp_param[idx].state != COM_DONE) {
> +            g_usleep(0);
> +        }
> +        if (comp_param[idx].migbuf.buf_index > 0) {
> +            qemu_put_buffer(f, comp_param[idx].migbuf.buf,
> +                comp_param[idx].migbuf.buf_index);
> +            bytes_transferred += comp_param[idx].migbuf.buf_index;
> +            comp_param[idx].migbuf.buf_index = 0;
> +        }
> +    }
> +}
> +
> +static inline void set_common_compress_params(compress_param *param,
> +    int ret, int bytes_sent, RAMBlock *block, ram_addr_t offset,
> +    bool last_stage, int cont, uint8_t *p, bool bulk_stage)
> +{
> +    param->ret = ret;
> +    param->bytes_sent = bytes_sent;
> +    param->block = block;
> +    param->offset = offset;
> +    param->last_stage = last_stage;
> +    param->cont = cont;
> +    param->p = p;
> +    param->bulk_stage = bulk_stage;
> +}
> +
>  /*
>   * ram_find_and_save_block: Finds a page to send and sends it to f
>   *
> @@ -649,6 +916,8 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
>      bool complete_round = false;
>      int bytes_sent = 0;
>      MemoryRegion *mr;
> +    int cont, idx, ret, len = -1;
> +    uint8_t *p;
>  
>      if (!block)
>          block = QTAILQ_FIRST(&ram_list.blocks);
> @@ -667,14 +936,73 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
>                  block = QTAILQ_FIRST(&ram_list.blocks);
>                  complete_round = true;
>                  ram_bulk_stage = false;
> +                if (migrate_use_xbzrle()) {
> +                    /* terminate the used thread at this point*/
> +                    flush_compressed_data(f);
> +                    quit_thread = true;
> +                }
>              }
>          } else {
> -            bytes_sent = ram_save_page(f, block, offset, last_stage);
> -
> -            /* if page is unmodified, continue to the next */
> -            if (bytes_sent > 0) {
> -                last_sent_block = block;
> -                break;
> +            if (!migrate_use_compress()) {
> +                bytes_sent = ram_save_page(f, block, offset, last_stage);
> +                /* if page is unmodified, continue to the next */
> +                if (bytes_sent > 0) {
> +                    last_sent_block = block;
> +                    break;
> +                }
> +            } else {
> +                cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
> +                p = memory_region_get_ram_ptr(block->mr) + offset;
> +                ret = ram_control_save_page(f, block->offset,
> +                           offset, TARGET_PAGE_SIZE, &len);
> +                if ((!ram_bulk_stage && migrate_use_xbzrle()) || cont == 0) {
> +                    if (cont == 0) {
> +                        flush_compressed_data(f);
> +                    }
> +                    set_common_compress_params(&comp_param[0],
> +                        ret, len, block, offset, last_stage, cont,
> +                        p, ram_bulk_stage);
> +                    bytes_sent = save_compress_ram_page(&comp_param[0]);
> +                    if (bytes_sent > 0) {
> +                        qemu_put_buffer(f, comp_param[0].migbuf.buf,
> +                            comp_param[0].migbuf.buf_index);
> +                        comp_param[0].migbuf.buf_index = 0;
> +                        last_sent_block = block;
> +                        break;
> +                    }
> +                } else {
> +retry:
> +                    for (idx = 0; idx < compress_thread_count; idx++) {
> +                        if (comp_param[idx].state == COM_DONE) {
> +                            bytes_sent = comp_param[idx].migbuf.buf_index;
> +                            if (bytes_sent == 0) {
> +                                set_common_compress_params(&comp_param[idx],
> +                                    ret, len, block, offset, last_stage,
> +                                    cont, p, ram_bulk_stage);
> +                                comp_param[idx].state = COM_START;
> +                                bytes_sent = 1;
> +                                bytes_transferred -= 1;
> +                                break;
> +                            } else if (bytes_sent > 0) {
> +                                qemu_put_buffer(f, comp_param[idx].migbuf.buf,
> +                                    comp_param[idx].migbuf.buf_index);
> +                                comp_param[idx].migbuf.buf_index = 0;
> +                                set_common_compress_params(&comp_param[idx],
> +                                   ret, len, block, offset, last_stage,
> +                                   cont, p, ram_bulk_stage);
> +                                comp_param[idx].state = COM_START;
> +                                break;
> +                            }
> +                        }
> +                    }
> +                    if (idx < compress_thread_count) {
> +                        last_sent_block = block;
> +                        break;
> +                    } else {
> +                        g_usleep(0);
> +                        goto retry;
> +                    }
> +                }
>              }
>          }
>      }
> @@ -684,7 +1012,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
>      return bytes_sent;
>  }
>  
> -static uint64_t bytes_transferred;
>  
>  void acct_update_position(QEMUFile *f, size_t size, bool zero)
>  {
> @@ -892,6 +1219,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
>          i++;
>      }
>  
> +    flush_compressed_data(f);
>      qemu_mutex_unlock_ramlist();
>  
>      /*
> @@ -938,6 +1266,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
>          bytes_transferred += bytes_sent;
>      }
>  
> +    flush_compressed_data(f);
>      ram_control_after_iterate(f, RAM_CONTROL_FINISH);
>      migration_end();
>  
> @@ -1038,10 +1367,61 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
>      }
>  }
>  
> +QemuThread *decompress_threads;
> +
> +static void *do_data_decompress(void *opaque)
> +{
> +    decompress_param *param = opaque;
> +    while (incomming_migration_done == false) {
> +        if (param->state == COM_START) {
> +            uLong pagesize = TARGET_PAGE_SIZE;
> +            if (uncompress((Bytef *)param->des, &pagesize,
> +                    (const Bytef *)param->compbuf, param->len) != Z_OK) {
> +                error_report("Uncompress Failed!\n");
> +                break;
> +            }
> +            param->state = COM_DONE;
> +        } else {
> +            if (quit_thread) {
> +                break;
> +            }
> +            g_usleep(1);
> +        }
> +    }
> +    return NULL;
> +}
> +
> +void migrate_decompress_threads_create(int count)
> +{
> +    int i;
> +    decompress_thread_count = count;
> +    decompress_threads = g_malloc0(sizeof(QemuThread) * count);
> +    decomp_param = g_malloc0(sizeof(decompress_param) * count);
> +    quit_thread = false;
> +    for (i = 0; i < count; i++) {
> +        qemu_thread_create(decompress_threads + i, "decompress",
> +            do_data_decompress, decomp_param + i, QEMU_THREAD_JOINABLE);
> +    }
> +}
> +
> +void migrate_decompress_threads_join(void)
> +{
> +    int i;
> +    for (i = 0; i < decompress_thread_count; i++) {
> +        qemu_thread_join(decompress_threads + i);
> +    }
> +    g_free(decompress_threads);
> +    g_free(decomp_param);
> +    decompress_threads = NULL;
> +    decomp_param = NULL;
> +}
> +
>  static int ram_load(QEMUFile *f, void *opaque, int version_id)
>  {
>      int flags = 0, ret = 0;
>      static uint64_t seq_iter;
> +    int len = 0;
> +    uint8_t compbuf[COMPRESS_BUF_SIZE];
>  
>      seq_iter++;
>  
> @@ -1106,6 +1486,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>              ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
>              break;
>          case RAM_SAVE_FLAG_PAGE:
> +            quit_thread = true;
>              host = host_from_stream_offset(f, addr, flags);
>              if (!host) {
>                  error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
> @@ -1115,6 +1496,32 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>  
>              qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
>              break;
> +        case RAM_SAVE_FLAG_COMPRESS_PAGE:
> +            host = host_from_stream_offset(f, addr, flags);
> +            if (!host) {
> +                error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
> +                ret = -EINVAL;
> +                break;
> +            }
> +
> +            len = qemu_get_be32(f);
> +            qemu_get_buffer(f, compbuf, len);
> +            int idx;
> +retry:
> +            for (idx = 0; idx < decompress_thread_count; idx++) {
> +                if (decomp_param[idx].state == COM_DONE)  {
> +                    memcpy(decomp_param[idx].compbuf, compbuf, len);
> +                    decomp_param[idx].des = host;
> +                    decomp_param[idx].len = len;
> +                    decomp_param[idx].state = COM_START;
> +                    break;
> +                }
> +            }
> +            if (idx == decompress_thread_count) {
> +                g_usleep(0);
> +                goto retry;
> +            }
> +            break;
>          case RAM_SAVE_FLAG_XBZRLE:
>              host = host_from_stream_offset(f, addr, flags);
>              if (!host) {
> diff --git a/hmp-commands.hx b/hmp-commands.hx
> index e37bc8b..8b93bed 100644
> --- a/hmp-commands.hx
> +++ b/hmp-commands.hx
> @@ -941,6 +941,56 @@ Set cache size to @var{value} (in bytes) for xbzrle migrations.
>  ETEXI
>  
>      {
> +        .name       = "migrate_set_compress_level",
> +        .args_type  = "value:i",
> +        .params     = "value",
> +        .help       = "set compress level for compress migrations,"
> +                      "the level is a number between 0 and 9, 0 stands for "
> +                      "no compression.\n"
> +                      "1 stands for the fast compress speed while 9 stands for"
> +                      "the highest compress ratio.",
> +        .mhandler.cmd = hmp_migrate_set_compress_level,
> +    },
> +
> +STEXI
> +@item migrate_set_compress_level @var{value}
> +@findex migrate_set_compress_level
> +Set compress level to @var{value}  for compress migrations.
> +ETEXI
> +
> +    {
> +        .name       = "migrate_set_compress_threads",
> +        .args_type  = "value:i",
> +        .params     = "value",
> +        .help       = "set compress thread count for migrations. "
> +                      "a proper thread count will accelerate the migration speed,"
> +                      "the threads should be between 1 and the CPUS of your system",
> +        .mhandler.cmd = hmp_migrate_set_compress_threads,
> +    },
> +
> +STEXI
> +@item migrate_set_compress_threads @var{value}
> +@findex migrate_set_compress_threads
> +Set compress threads to @var{value}  for compress migrations.
> +ETEXI
> +
> +    {
> +        .name       = "migrate_set_decompress_threads",
> +        .args_type  = "value:i",
> +        .params     = "value",
> +        .help       = "set decompress thread count for migrations. "
> +                      "a proper thread count will accelerate the migration speed,"
> +                      "the threads should be between 1 and the CPUS of your system",
> +        .mhandler.cmd = hmp_migrate_set_decompress_threads,
> +    },
> +
> +STEXI
> +@item migrate_set_decompress_threads @var{value}
> +@findex migrate_set_decompress_threads
> +Set decompress threads to @var{value}  for compress migrations.
> +ETEXI
> +
> +    {
>          .name       = "migrate_set_speed",
>          .args_type  = "value:o",
>          .params     = "value",
> @@ -1766,6 +1816,12 @@ show migration status
>  show current migration capabilities
>  @item info migrate_cache_size
>  show current migration XBZRLE cache size
> +@item info migrate_compress_level
> +show current migration compress level
> +@item info migrate_compress_threads
> +show current migration compress threads
> +@item info migrate_decompress_threads
> +show current migration decompress threads
>  @item info balloon
>  show balloon information
>  @item info qtree
> diff --git a/hmp.c b/hmp.c
> index 63d7686..b1936a3 100644
> --- a/hmp.c
> +++ b/hmp.c
> @@ -252,6 +252,24 @@ void hmp_info_migrate_cache_size(Monitor *mon, const QDict *qdict)
>                     qmp_query_migrate_cache_size(NULL) >> 10);
>  }
>  
> +void hmp_info_migrate_compress_level(Monitor *mon, const QDict *qdict)
> +{
> +    monitor_printf(mon, "compress level: %" PRId64 "\n",
> +                   qmp_query_migrate_compress_level(NULL));
> +}
> +
> +void hmp_info_migrate_compress_threads(Monitor *mon, const QDict *qdict)
> +{
> +    monitor_printf(mon, "compress threads: %" PRId64 "\n",
> +                   qmp_query_migrate_compress_threads(NULL));
> +}
> +
> +void hmp_info_migrate_decompress_threads(Monitor *mon, const QDict *qdict)
> +{
> +    monitor_printf(mon, "decompress threads: %" PRId64 "\n",
> +                   qmp_query_migrate_decompress_threads(NULL));
> +}
> +
>  void hmp_info_cpus(Monitor *mon, const QDict *qdict)
>  {
>      CpuInfoList *cpu_list, *cpu;
> @@ -1041,6 +1059,45 @@ void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict)
>      }
>  }
>  
> +void hmp_migrate_set_compress_level(Monitor *mon, const QDict *qdict)
> +{
> +    int64_t value = qdict_get_int(qdict, "value");
> +    Error *err = NULL;
> +
> +    qmp_migrate_set_compress_level(value, &err);
> +    if (err) {
> +        monitor_printf(mon, "%s\n", error_get_pretty(err));
> +        error_free(err);
> +        return;
> +    }
> +}
> +
> +void hmp_migrate_set_compress_threads(Monitor *mon, const QDict *qdict)
> +{
> +    int64_t value = qdict_get_int(qdict, "value");
> +    Error *err = NULL;
> +
> +    qmp_migrate_set_compress_threads(value, &err);
> +    if (err) {
> +        monitor_printf(mon, "%s\n", error_get_pretty(err));
> +        error_free(err);
> +        return;
> +    }
> +}
> +
> +void hmp_migrate_set_decompress_threads(Monitor *mon, const QDict *qdict)
> +{
> +    int64_t value = qdict_get_int(qdict, "value");
> +    Error *err = NULL;
> +
> +    qmp_migrate_set_decompress_threads(value, &err);
> +    if (err) {
> +        monitor_printf(mon, "%s\n", error_get_pretty(err));
> +        error_free(err);
> +        return;
> +    }
> +}
> +
>  void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict)
>  {
>      int64_t value = qdict_get_int(qdict, "value");
> diff --git a/hmp.h b/hmp.h
> index 4bb5dca..b348806 100644
> --- a/hmp.h
> +++ b/hmp.h
> @@ -29,6 +29,9 @@ void hmp_info_mice(Monitor *mon, const QDict *qdict);
>  void hmp_info_migrate(Monitor *mon, const QDict *qdict);
>  void hmp_info_migrate_capabilities(Monitor *mon, const QDict *qdict);
>  void hmp_info_migrate_cache_size(Monitor *mon, const QDict *qdict);
> +void hmp_info_migrate_compress_level(Monitor *mon, const QDict *qdict);
> +void hmp_info_migrate_compress_threads(Monitor *mon, const QDict *qdict);
> +void hmp_info_migrate_decompress_threads(Monitor *mon, const QDict *qdict);
>  void hmp_info_cpus(Monitor *mon, const QDict *qdict);
>  void hmp_info_block(Monitor *mon, const QDict *qdict);
>  void hmp_info_blockstats(Monitor *mon, const QDict *qdict);
> @@ -64,6 +67,9 @@ void hmp_migrate_set_downtime(Monitor *mon, const QDict *qdict);
>  void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict);
>  void hmp_migrate_set_capability(Monitor *mon, const QDict *qdict);
>  void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict);
> +void hmp_migrate_set_compress_level(Monitor *mon, const QDict *qdict);
> +void hmp_migrate_set_compress_threads(Monitor *mon, const QDict *qdict);
> +void hmp_migrate_set_decompress_threads(Monitor *mon, const QDict *qdict);
>  void hmp_set_password(Monitor *mon, const QDict *qdict);
>  void hmp_expire_password(Monitor *mon, const QDict *qdict);
>  void hmp_eject(Monitor *mon, const QDict *qdict);
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index 3cb5ba8..03c8e0d 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -49,6 +49,9 @@ struct MigrationState
>      QemuThread thread;
>      QEMUBH *cleanup_bh;
>      QEMUFile *file;
> +    QemuThread *compress_thread;
> +    int compress_thread_count;
> +    int compress_level;
>  
>      int state;
>      MigrationParams params;
> @@ -64,6 +67,7 @@ struct MigrationState
>      int64_t dirty_sync_count;
>  };
>  
> +extern bool incomming_migration_done;
>  void process_incoming_migration(QEMUFile *f);
>  
>  void qemu_start_incoming_migration(const char *uri, Error **errp);
> @@ -107,6 +111,10 @@ bool migration_has_finished(MigrationState *);
>  bool migration_has_failed(MigrationState *);
>  MigrationState *migrate_get_current(void);
>  
> +void migrate_compress_threads_create(MigrationState *s);
> +void migrate_compress_threads_join(MigrationState *s);
> +void migrate_decompress_threads_create(int count);
> +void migrate_decompress_threads_join(void);
>  uint64_t ram_bytes_remaining(void);
>  uint64_t ram_bytes_transferred(void);
>  uint64_t ram_bytes_total(void);
> @@ -144,7 +152,7 @@ void migrate_del_blocker(Error *reason);
>  
>  bool migrate_rdma_pin_all(void);
>  bool migrate_zero_blocks(void);
> -
> +bool migrate_use_compress(void);
>  bool migrate_auto_converge(void);
>  
>  int xbzrle_encode_buffer(uint8_t *old_buf, uint8_t *new_buf, int slen,
> @@ -153,6 +161,8 @@ int xbzrle_decode_buffer(uint8_t *src, int slen, uint8_t *dst, int dlen);
>  
>  int migrate_use_xbzrle(void);
>  int64_t migrate_xbzrle_cache_size(void);
> +int migrate_compress_level(void);
> +int migrate_compress_threads(void);
>  
>  int64_t xbzrle_cache_resize(int64_t new_size);
>  
> diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h
> index 401676b..431e6cc 100644
> --- a/include/migration/qemu-file.h
> +++ b/include/migration/qemu-file.h
> @@ -112,6 +112,7 @@ QEMUFile *qemu_bufopen(const char *mode, QEMUSizedBuffer *input);
>  int qemu_get_fd(QEMUFile *f);
>  int qemu_fclose(QEMUFile *f);
>  int64_t qemu_ftell(QEMUFile *f);
> +uint64_t qemu_add_compress(QEMUFile *f,  const uint8_t *p, int size);
>  void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size);
>  void qemu_put_byte(QEMUFile *f, int v);
>  /*
> diff --git a/migration.c b/migration.c
> index c49a05a..716de97 100644
> --- a/migration.c
> +++ b/migration.c
> @@ -46,6 +46,12 @@ enum {
>  /* Migration XBZRLE default cache size */
>  #define DEFAULT_MIGRATE_CACHE_SIZE (64 * 1024 * 1024)
>  
> +/* Migration compress default thread count */
> +#define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8
> +#define DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT 2
> +/*0: means nocompress, 1: best speed, ... 9: best compress ratio */
> +#define DEFAULT_MIGRATE_COMPRESS_LEVEL 1
> +
>  static NotifierList migration_state_notifiers =
>      NOTIFIER_LIST_INITIALIZER(migration_state_notifiers);
>  
> @@ -60,6 +66,8 @@ MigrationState *migrate_get_current(void)
>          .bandwidth_limit = MAX_THROTTLE,
>          .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
>          .mbps = -1,
> +        .compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
> +        .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL,
>      };
>  
>      return &current_migration;
> @@ -101,6 +109,7 @@ static void process_incoming_migration_co(void *opaque)
>          error_report("load of migration failed: %s", strerror(-ret));
>          exit(EXIT_FAILURE);
>      }
> +    incomming_migration_done = true;
>      qemu_announce_self();
>  
>      /* Make sure all file formats flush their mutable metadata */
> @@ -116,10 +125,14 @@ static void process_incoming_migration_co(void *opaque)
>      } else {
>          runstate_set(RUN_STATE_PAUSED);
>      }
> +    migrate_decompress_threads_join();
>  }
>  
> +static int uncompress_thread_count = DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT;
>  void process_incoming_migration(QEMUFile *f)
>  {
> +    incomming_migration_done = false;
> +    migrate_decompress_threads_create(uncompress_thread_count);
>      Coroutine *co = qemu_coroutine_create(process_incoming_migration_co);
>      int fd = qemu_get_fd(f);
>  
> @@ -302,6 +315,7 @@ static void migrate_fd_cleanup(void *opaque)
>          qemu_thread_join(&s->thread);
>          qemu_mutex_lock_iothread();
>  
> +        migrate_compress_threads_join(s);
>          qemu_fclose(s->file);
>          s->file = NULL;
>      }
> @@ -373,6 +387,8 @@ static MigrationState *migrate_init(const MigrationParams *params)
>      int64_t bandwidth_limit = s->bandwidth_limit;
>      bool enabled_capabilities[MIGRATION_CAPABILITY_MAX];
>      int64_t xbzrle_cache_size = s->xbzrle_cache_size;
> +    int compress_level = s->compress_level;
> +    int compress_thread_count = s->compress_thread_count;
>  
>      memcpy(enabled_capabilities, s->enabled_capabilities,
>             sizeof(enabled_capabilities));
> @@ -383,6 +399,8 @@ static MigrationState *migrate_init(const MigrationParams *params)
>             sizeof(enabled_capabilities));
>      s->xbzrle_cache_size = xbzrle_cache_size;
>  
> +    s->compress_level = compress_level;
> +    s->compress_thread_count = compress_thread_count;
>      s->bandwidth_limit = bandwidth_limit;
>      s->state = MIG_STATE_SETUP;
>      trace_migrate_set_state(MIG_STATE_SETUP);
> @@ -503,6 +521,59 @@ int64_t qmp_query_migrate_cache_size(Error **errp)
>      return migrate_xbzrle_cache_size();
>  }
>  
> +void qmp_migrate_set_compress_level(int64_t value, Error **errp)
> +{
> +    MigrationState *s = migrate_get_current();
> +
> +    if (value > 9 || value < 0) {
> +        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress level",
> +                  "is invalid, please input a integer between 0 and 9. ");
> +        return;
> +    }
> +
> +    s->compress_level = value;
> +}
> +
> +int64_t qmp_query_migrate_compress_level(Error **errp)
> +{
> +    return migrate_compress_level();
> +}
> +
> +void qmp_migrate_set_compress_threads(int64_t value, Error **errp)
> +{
> +    MigrationState *s = migrate_get_current();
> +
> +    if (value > 255 || value < 1) {
> +        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress thread count",
> +                  "is invalid, please input a integer between 1 and 255. ");
> +        return;
> +    }
> +
> +    s->compress_thread_count = value;
> +}
> +
> +void qmp_migrate_set_decompress_threads(int64_t value, Error **errp)
> +{
> +
> +    if (value > 255 || value < 1) {
> +        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress thread count",
> +                  "is invalid, please input a integer between 1 and 255. ");
> +        return;
> +    }
> +
> +    uncompress_thread_count = value;
> +}
> +
> +int64_t qmp_query_migrate_compress_threads(Error **errp)
> +{
> +    return migrate_compress_threads();
> +}
> +
> +int64_t qmp_query_migrate_decompress_threads(Error **errp)
> +{
> +    return uncompress_thread_count;
> +}
> +
>  void qmp_migrate_set_speed(int64_t value, Error **errp)
>  {
>      MigrationState *s;
> @@ -555,6 +626,33 @@ bool migrate_zero_blocks(void)
>      return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_BLOCKS];
>  }
>  
> +bool migrate_use_compress(void)
> +{
> +    MigrationState *s;
> +
> +    s = migrate_get_current();
> +
> +    return s->enabled_capabilities[MIGRATION_CAPABILITY_COMPRESS];
> +}
> +
> +int migrate_compress_level(void)
> +{
> +    MigrationState *s;
> +
> +    s = migrate_get_current();
> +
> +    return s->compress_level;
> +}
> +
> +int migrate_compress_threads(void)
> +{
> +    MigrationState *s;
> +
> +    s = migrate_get_current();
> +
> +    return s->compress_thread_count;
> +}
> +
>  int migrate_use_xbzrle(void)
>  {
>      MigrationState *s;
> @@ -697,4 +795,5 @@ void migrate_fd_connect(MigrationState *s)
>  
>      qemu_thread_create(&s->thread, "migration", migration_thread, s,
>                         QEMU_THREAD_JOINABLE);
> +    migrate_compress_threads_create(s);


don't create compress_threads always.
It may be better:

if (!migrate_use_xbzrle()) {
    migrate_compress_threads_create(s);
}

BTW, this patch is too big to review. Spliting it into some patch will be welcome.

>  }
> diff --git a/monitor.c b/monitor.c
> index 905d8cf..365547e 100644
> --- a/monitor.c
> +++ b/monitor.c
> @@ -2865,6 +2865,27 @@ static mon_cmd_t info_cmds[] = {
>          .mhandler.cmd = hmp_info_migrate_cache_size,
>      },
>      {
> +        .name       = "migrate_compress_level",
> +        .args_type  = "",
> +        .params     = "",
> +        .help       = "show current migration compress level",
> +        .mhandler.cmd = hmp_info_migrate_compress_level,
> +    },
> +    {
> +        .name       = "migrate_compress_threads",
> +        .args_type  = "",
> +        .params     = "",
> +        .help       = "show current migration compress thread count",
> +        .mhandler.cmd = hmp_info_migrate_compress_threads,
> +    },
> +    {
> +        .name       = "migrate_decompress_threads",
> +        .args_type  = "",
> +        .params     = "",
> +        .help       = "show current migration decompress thread count",
> +        .mhandler.cmd = hmp_info_migrate_decompress_threads,
> +    },
> +    {
>          .name       = "balloon",
>          .args_type  = "",
>          .params     = "",
> diff --git a/qapi-schema.json b/qapi-schema.json
> index 24379ab..71a9e0f 100644
> --- a/qapi-schema.json
> +++ b/qapi-schema.json
> @@ -491,13 +491,17 @@
>  #          to enable the capability on the source VM. The feature is disabled by
>  #          default. (since 1.6)
>  #
> +# @compress: Using the multiple compression threads to accelerate live migration.
> +#          This feature can help to reduce the migration traffic, by sending
> +#          compressed pages. The feature is disabled by default. (since 2.3)
> +#
>  # @auto-converge: If enabled, QEMU will automatically throttle down the guest
>  #          to speed up convergence of RAM migration. (since 1.6)
>  #
>  # Since: 1.2
>  ##
>  { 'enum': 'MigrationCapability',
> -  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks'] }
> +  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks', 'compress'] }
>  
>  ##
>  # @MigrationCapabilityStatus
> @@ -1382,6 +1386,88 @@
>  { 'command': 'query-migrate-cache-size', 'returns': 'int' }
>  
>  ##
> +# @migrate-set-compress-level
> +#
> +# Set compress level
> +#
> +# @value: compress level int
> +#
> +# The compress level will be an integer between 0 and 9.
> +# The compress level can be modified before and during ongoing migration
> +#
> +# Returns: nothing on success
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'migrate-set-compress-level', 'data': {'value': 'int'} }
> +
> +##
> +# @query-migrate-compress-level
> +#
> +# query compress level
> +#
> +# Returns: compress level int
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'query-migrate-compress-level', 'returns': 'int' }
> +
> +##
> +# @migrate-set-compress-threads
> +#
> +# Set compress threads
> +#
> +# @value: compress threads int
> +#
> +# The compress thread count is an integer between 1 and 255.
> +# The compress level can be modified only before migration
> +#
> +# Returns: nothing on success
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'migrate-set-compress-threads', 'data': {'value': 'int'} }
> +
> +##
> +# @query-migrate-compress-threads
> +#
> +# query compress threads
> +#
> +# Returns: compress threads int
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'query-migrate-compress-threads', 'returns': 'int' }
> +
> +##
> +##
> +# @migrate-set-decompress-threads
> +#
> +# Set decompress threads
> +#
> +# @value: decompress threads int
> +#
> +# The decompress thread count is an integer between 1 and 255.
> +# The decompress level can be modified only before migration
> +#
> +# Returns: nothing on success
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'migrate-set-decompress-threads', 'data': {'value': 'int'} }
> +
> +##
> +# @query-migrate-decompress-threads
> +#
> +# query decompress threads
> +#
> +# Returns: decompress threads int
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'query-migrate-decompress-threads', 'returns': 'int' }
> +
> +##
>  # @ObjectPropertyInfo:
>  #
>  # @name: the name of the property
> diff --git a/qmp-commands.hx b/qmp-commands.hx
> index 1abd619..b60fdab 100644
> --- a/qmp-commands.hx
> +++ b/qmp-commands.hx
> @@ -705,7 +705,138 @@ Example:
>  <- { "return": 67108864 }
>  
>  EQMP
> +{
> +        .name       = "migrate-set-compress-level",
> +        .args_type  = "value:i",
> +        .mhandler.cmd_new = qmp_marshal_input_migrate_set_compress_level,
> +    },
> +
> +SQMP
> +migrate-set-compress-level
> +----------------------
> +
> +Set compress level to be used by compress migration, the compress level is an integer
> +between 0 and 9
> +
> +Arguments:
> +
> +- "value": compress level (json-int)
> +
> +Example:
> +
> +-> { "execute": "migrate-set-compress-level", "arguments": { "value": 536870912 } }
> +<- { "return": {} }
> +
> +EQMP
> +    {
> +        .name       = "query-migrate-compress-level",
> +        .args_type  = "",
> +        .mhandler.cmd_new = qmp_marshal_input_query_migrate_compress_level,
> +    },
> +
> +SQMP
> +query-migrate-compress-level
> +------------------------
> +
> +Show compress level to be used by compress migration
> +
> +returns a json-object with the following information:
> +- "size" : json-int
> +
> +Example:
> +
> +-> { "execute": "query-migrate-compress-level" }
> +<- { "return": 67108864 }
> +
> +EQMP
> +{
> +        .name       = "migrate-set-compress-threads",
> +        .args_type  = "value:i",
> +        .mhandler.cmd_new = qmp_marshal_input_migrate_set_compress_threads,
> +    },
> +
> +SQMP
> +migrate-set-compress-threads
> +----------------------
> +
> +Set compress thread count to be used by compress migration, the compress thread count is an integer
> +between 1 and 255
> +
> +Arguments:
> +
> +- "value": compress threads (json-int)
> +
> +Example:
> +
> +-> { "execute": "migrate-set-compress-threads", "arguments": { "value": 536870912 } }
> +<- { "return": {} }
> +
> +EQMP
> +    {
> +        .name       = "query-migrate-compress-threads",
> +        .args_type  = "",
> +        .mhandler.cmd_new = qmp_marshal_input_query_migrate_compress_threads,
> +    },
> +
> +SQMP
> +query-migrate-compress-threads
> +------------------------
> +
> +Show compress thread count to be used by compress migration
> +
> +returns a json-object with the following information:
> +- "size" : json-int
> +
> +Example:
> +
> +-> { "execute": "query-migrate-compress-threads" }
> +<- { "return": 67108864 }
> +
> +EQMP
> +{
> +        .name       = "migrate-set-decompress-threads",
> +        .args_type  = "value:i",
> +        .mhandler.cmd_new = qmp_marshal_input_migrate_set_decompress_threads,
> +    },
> +
> +SQMP
> +migrate-set-decompress-threads
> +----------------------
> +
> +Set decompress thread count to be used by compress migration, the decompress thread count is an integer
> +between 1 and 255
> +
> +Arguments:
> +
> +- "value": decompress threads (json-int)
> +
> +Example:
> +
> +-> { "execute": "migrate-set-decompress-threads", "arguments": { "value": 536870912 } }
> +<- { "return": {} }
>  
> +EQMP
> +    {
> +        .name       = "query-migrate-decompress-threads",
> +        .args_type  = "",
> +        .mhandler.cmd_new = qmp_marshal_input_query_migrate_decompress_threads,
> +    },
> +
> +SQMP
> +query-migrate-decompress-threads
> +------------------------
> +
> +Show decompress thread count to be used by compress migration
> +
> +returns a json-object with the following information:
> +- "size" : json-int
> +
> +Example:
> +
> +-> { "execute": "query-migrate-compress-threads" }
> +<- { "return": 67108864 }
> +
> +EQMP
>      {
>          .name       = "migrate_set_speed",
>          .args_type  = "value:o",
Li, Liang Z Nov. 21, 2014, 7:38 a.m. UTC | #6
> > +int migrate_compress_threads(void)
> > +{
> > +    MigrationState *s;
> > +
> > +    s = migrate_get_current();
> > +
> > +    return s->compress_thread_count;
> > +}
> > +
> >  int migrate_use_xbzrle(void)
> >  {
> >      MigrationState *s;
> > @@ -697,4 +795,5 @@ void migrate_fd_connect(MigrationState *s)
> >  
> >      qemu_thread_create(&s->thread, "migration", migration_thread, s,
> >                         QEMU_THREAD_JOINABLE);
> > +    migrate_compress_threads_create(s);


> don't create compress_threads always.
> It may be better:

> if (!migrate_use_xbzrle()) {
>     migrate_compress_threads_create(s);
> }

Thanks for your comments, in fact,  the multiple thread compression can co-work with xbrzle, which can help to accelerate live migration.

> BTW, this patch is too big to review. Spliting it into some patch will be welcome.

I am doing it.
ChenLiang Nov. 21, 2014, 8:17 a.m. UTC | #7
On 2014/11/21 15:38, Li, Liang Z wrote:

>>> +int migrate_compress_threads(void)
>>> +{
>>> +    MigrationState *s;
>>> +
>>> +    s = migrate_get_current();
>>> +
>>> +    return s->compress_thread_count;
>>> +}
>>> +
>>>  int migrate_use_xbzrle(void)
>>>  {
>>>      MigrationState *s;
>>> @@ -697,4 +795,5 @@ void migrate_fd_connect(MigrationState *s)
>>>  
>>>      qemu_thread_create(&s->thread, "migration", migration_thread, s,
>>>                         QEMU_THREAD_JOINABLE);
>>> +    migrate_compress_threads_create(s);
> 
> 
>> don't create compress_threads always.
>> It may be better:
> 
>> if (!migrate_use_xbzrle()) {
>>     migrate_compress_threads_create(s);
>> }
> 
> Thanks for your comments, in fact,  the multiple thread compression can co-work with xbrzle, which can help to accelerate live migration.


hmm, multiple thread compression can't co-work with xbzrle. xbzrle need guarantee
the cache at src is same to dest. But I dont see that below:

+    /* XBZRLE overflow or normal page */
+    if (bytes_sent == -1) {
+        bytes_sent = migrate_save_block_hdr(&param->migbuf, block,
+            offset, cont, RAM_SAVE_FLAG_COMPRESS_PAGE);
+        blen = migrate_qemu_add_compress(&param->migbuf, p,
+            TARGET_PAGE_SIZE, migrate_compress_level());
+        bytes_sent += blen;
+        atomic_inc(&acct_info.norm_pages);

the code don't update the cache of xbzrle at src.

> 
>> BTW, this patch is too big to review. Spliting it into some patch will be welcome.
> 
> I am doing it.
> 
> 
> 
> 
>
Li, Liang Z Nov. 21, 2014, 8:35 a.m. UTC | #8
> hmm, multiple thread compression can't co-work with xbzrle. xbzrle need guarantee the cache at src is same to dest. But I dont see that below:
>
> +    /* XBZRLE overflow or normal page */
> +    if (bytes_sent == -1) {
> +        bytes_sent = migrate_save_block_hdr(&param->migbuf, block,
> +            offset, cont, RAM_SAVE_FLAG_COMPRESS_PAGE);
> +        blen = migrate_qemu_add_compress(&param->migbuf, p,
> +            TARGET_PAGE_SIZE, migrate_compress_level());
> +        bytes_sent += blen;
> +        atomic_inc(&acct_info.norm_pages);
>
> the code don't update the cache of xbzrle at src.

It's updated here..

 +    } else if (is_zero_range(p, TARGET_PAGE_SIZE)) {
 +        atomic_inc(&acct_info.dup_pages);
 +        bytes_sent = migrate_save_block_hdr(&param->migbuf, block, offset, cont,
 +                             RAM_SAVE_FLAG_COMPRESS);
 +        migrate_put_byte(&param->migbuf, 0);
 +        bytes_sent++;
 +        /* Must let xbzrle know, otherwise a previous (now 0'd) cached
 +         * page would be stale
 +         */
 +        xbzrle_cache_zero_page(current_addr);
 +    } else if (!param->bulk_stage && migrate_use_xbzrle()) {
 +        bytes_sent = save_xbzrle_page(&param->migbuf, &p, current_addr, block,
 +                              offset, cont, last_stage, true);
 +    }
ChenLiang Nov. 21, 2014, 8:38 a.m. UTC | #9
On 2014/11/21 16:17, ChenLiang wrote:

> On 2014/11/21 15:38, Li, Liang Z wrote:
> 
>>>> +int migrate_compress_threads(void)
>>>> +{
>>>> +    MigrationState *s;
>>>> +
>>>> +    s = migrate_get_current();
>>>> +
>>>> +    return s->compress_thread_count;
>>>> +}
>>>> +
>>>>  int migrate_use_xbzrle(void)
>>>>  {
>>>>      MigrationState *s;
>>>> @@ -697,4 +795,5 @@ void migrate_fd_connect(MigrationState *s)
>>>>  
>>>>      qemu_thread_create(&s->thread, "migration", migration_thread, s,
>>>>                         QEMU_THREAD_JOINABLE);
>>>> +    migrate_compress_threads_create(s);
>>
>>
>>> don't create compress_threads always.
>>> It may be better:
>>
>>> if (!migrate_use_xbzrle()) {
>>>     migrate_compress_threads_create(s);
>>> }
>>
>> Thanks for your comments, in fact,  the multiple thread compression can co-work with xbrzle, which can help to accelerate live migration.
> 
> 
> hmm, multiple thread compression can't co-work with xbzrle. xbzrle need guarantee
> the cache at src is same to dest. But I dont see that below:
> 
> +    /* XBZRLE overflow or normal page */
> +    if (bytes_sent == -1) {
> +        bytes_sent = migrate_save_block_hdr(&param->migbuf, block,
> +            offset, cont, RAM_SAVE_FLAG_COMPRESS_PAGE);
> +        blen = migrate_qemu_add_compress(&param->migbuf, p,
> +            TARGET_PAGE_SIZE, migrate_compress_level());
> +        bytes_sent += blen;
> +        atomic_inc(&acct_info.norm_pages);
> 
> the code don't update the cache of xbzrle at src.
> 
>>
>>> BTW, this patch is too big to review. Spliting it into some patch will be welcome.
>>
>> I am doing it.
>>
>>
>>
>>
>>
> 
> 
> 
> 
> 
>
ChenLiang Nov. 21, 2014, 8:39 a.m. UTC | #10
On 2014/11/21 16:17, ChenLiang wrote:

> On 2014/11/21 15:38, Li, Liang Z wrote:
> 
>>>> +int migrate_compress_threads(void)
>>>> +{
>>>> +    MigrationState *s;
>>>> +
>>>> +    s = migrate_get_current();
>>>> +
>>>> +    return s->compress_thread_count;
>>>> +}
>>>> +
>>>>  int migrate_use_xbzrle(void)
>>>>  {
>>>>      MigrationState *s;
>>>> @@ -697,4 +795,5 @@ void migrate_fd_connect(MigrationState *s)
>>>>  
>>>>      qemu_thread_create(&s->thread, "migration", migration_thread, s,
>>>>                         QEMU_THREAD_JOINABLE);
>>>> +    migrate_compress_threads_create(s);
>>
>>
>>> don't create compress_threads always.
>>> It may be better:
>>
>>> if (!migrate_use_xbzrle()) {
>>>     migrate_compress_threads_create(s);
>>> }
>>
>> Thanks for your comments, in fact,  the multiple thread compression can co-work with xbrzle, which can help to accelerate live migration.
> 
> 
> hmm, multiple thread compression can't co-work with xbzrle. xbzrle need guarantee
> the cache at src is same to dest. But I dont see that below:
> 
> +    /* XBZRLE overflow or normal page */
> +    if (bytes_sent == -1) {
> +        bytes_sent = migrate_save_block_hdr(&param->migbuf, block,
> +            offset, cont, RAM_SAVE_FLAG_COMPRESS_PAGE);
> +        blen = migrate_qemu_add_compress(&param->migbuf, p,
> +            TARGET_PAGE_SIZE, migrate_compress_level());
> +        bytes_sent += blen;
> +        atomic_inc(&acct_info.norm_pages);
> 
> the code don't update the cache of xbzrle at src.
> 


sorry, I make a mistake. :)

>>
>>> BTW, this patch is too big to review. Spliting it into some patch will be welcome.
>>
>> I am doing it.
>>
>>
>>
>>
>>
> 
> 
> 
> 
> 
>
Li, Liang Z Nov. 24, 2014, 2:25 a.m. UTC | #11
> >  # @auto-converge: If enabled, QEMU will automatically throttle down the guest

> >  #          to speed up convergence of RAM migration. (since 1.6)

> >  #

> >  # Since: 1.2

> >  ##

> >  { 'enum': 'MigrationCapability',

> > -  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks'] 

> > }

> > +  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks', 

> > + 'compress'] }

> >  


> I'll repeat what I said on v1 (but this time, with some links to back it up :)


> We really need to avoid a proliferation of new commands, two per tunable does not scale well.  I think now is the time to implement my earlier suggestion at making MigrationCapability become THE resource for > tunables:


> https://lists.gnu.org/archive/html/qemu-devel/2014-03/msg02274.html


Hi, Eric

       I have read your proposal, and I just want to verify if I got it  exactly.  Take the 'compresss-level' parameter for example, according to you suggestion, should I implement a command 'set-migrate-capability compress-level 1', or  'set-migrate-parameter  compress-level 1' ?  if it's the former, how to keep the HMP back compatibility, as you know, the current HMP framework will check the parameter type, the 'int' will be processed differently from 'bool', ;  if it's the latter,  it seems like a ' query-migrate-paramer ' command should be provided to keep consistency, not query-migrate-capability.

Liang
Eric Blake Nov. 24, 2014, 5:16 p.m. UTC | #12
On 11/23/2014 07:25 PM, Li, Liang Z wrote:
>>>  # @auto-converge: If enabled, QEMU will automatically throttle down the guest
>>>  #          to speed up convergence of RAM migration. (since 1.6)
>>>  #
>>>  # Since: 1.2
>>>  ##
>>>  { 'enum': 'MigrationCapability',
>>> -  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks'] 
>>> }
>>> +  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks', 
>>> + 'compress'] }
>>>  
> 
>> I'll repeat what I said on v1 (but this time, with some links to back it up :)
> 
>> We really need to avoid a proliferation of new commands, two per tunable does not scale well.  I think now is the time to implement my earlier suggestion at making MigrationCapability become THE resource for > tunables:

[please configure your mailer to wrap long lines]

> 
>> https://lists.gnu.org/archive/html/qemu-devel/2014-03/msg02274.html
> 
> Hi, Eric
> 
>        I have read your proposal, and I just want to verify if I got it  exactly.  Take the 'compresss-level' parameter for example, according to you suggestion, should I implement a command 'set-migrate-capability compress-level 1', or  'set-migrate-parameter  compress-level 1' ?  if it's the former, how to keep the HMP back compatibility, as you know, the current HMP framework will check the parameter type, the 'int' will be processed differently from 'bool', ;  if it's the latter,  it seems like a ' query-migrate-paramer ' command should be provided to keep consistency, not query-migrate-capability.

HMP back-compat is NOT a problem we need to worry about; it's okay to
break the semantics if something else is easier to represent; it is only
QMP where we have to remain backwards compatible.  We already have
set-migrate-capability, so that seems like the command to extend,
instead of adding a new one.  On the other hand, if it is easier for you
to add a new HMP command that maps correctly to the underlying QMP
command, then that is fine, too.  The point of my proposal is that the
QMP command can use a union to provide the correct typing as needed,
without worrying about what HMP has to do to match that.
Li, Liang Z Dec. 8, 2014, 6:34 a.m. UTC | #13
>>  #

>>  # Since: 1.2

>>  ##

>>  { 'enum': 'MigrationCapability',

>> -  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks'] 

>> }

>> +  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks', 

>>+ 'compress'] }

>> 

>

>I'll repeat what I said on v1 (but this time, with some links to back it up :)

>

>We really need to avoid a proliferation of new commands, two per tunable does not scale well.  I think now is the time to implement my earlier suggestion at making MigrationCapability become THE resource for tunables:

>

>https://lists.gnu.org/archive/html/qemu-devel/2014-03/msg02274.html


Hi Eric,

     I tried to follow your suggestion to implement a back-compat method to reuse the 'migrate-set-capabilities' and 'query-migrate-capabilities' , I found that I should change a lot of current code to make it work, and I don’t know how to deal with the HMP interface.  So I add 'migrate-set-parameter' and 'query-migrate-parameter' interface to reduce the migrate-tunable commands, they can deal with all the 'int'  type parameter, now the ' compress-threads' ' , 'decompress-threads', and 'compress-level' and be set/queried with the two interfaces. 

{ 'enum': 'MigrationParameter',
  'data': ['compress-level', 'compress-threads', 'decompress-threads'] }

{ 'type': 'MigrationParameterStatus',
  'data': { 'parameter' : 'MigrationParameter', 'value' : 'int' } }

   I am not sure if it's a good solution, but it's much more simple, and it can minimize the change of current code. Is that OK?

Liang
Li, Liang Z Dec. 10, 2014, 8:23 a.m. UTC | #14
>>>  ##

>>>  { 'enum': 'MigrationCapability',

>>> -  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks'] 

>>> }

>>> +  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks',

>>>+ 'compress'] }

>>> 

>>

>>I'll repeat what I said on v1 (but this time, with some links to back 

>>it up :)

>>

>>We really need to avoid a proliferation of new commands, two per tunable does not scale well.  I think now is the time to implement my earlier suggestion at making MigrationCapability become THE resource for tunables:

>>

>>https://lists.gnu.org/archive/html/qemu-devel/2014-03/msg02274.html

>

>Hi Eric,

>

>     I tried to follow your suggestion to implement a back-compat method to reuse the 'migrate-set-capabilities' and 'query-migrate-capabilities' , I found that I should change a lot of current code to make it work, and I don’t know how to deal with the HMP interface.  So I add 'migrate-set-parameter' and 'query-migrate-parameter' interface to reduce the migrate-tunable commands, they can deal with all the 'int'  type parameter, now the ' compress-threads' ' , 'decompress-threads', and 'compress-level' and be set/queried with the two interfaces. 

>

>{ 'enum': 'MigrationParameter',

>  'data': ['compress-level', 'compress-threads', 'decompress-threads'] }

>

>{ 'type': 'MigrationParameterStatus',

>  'data': { 'parameter' : 'MigrationParameter', 'value' : 'int' } }

>

>   I am not sure if it's a good solution, but it's much more simple, and it can minimize the change of current code. Is that OK?


Hi Eric,

    What do you think about this solution?

Liang
diff mbox

Patch

diff --git a/arch_init.c b/arch_init.c
index 88a5ba0..a27d87b 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -24,6 +24,7 @@ 
 #include <stdint.h>
 #include <stdarg.h>
 #include <stdlib.h>
+#include <zlib.h>
 #ifndef _WIN32
 #include <sys/types.h>
 #include <sys/mman.h>
@@ -126,6 +127,7 @@  static uint64_t bitmap_sync_count;
 #define RAM_SAVE_FLAG_CONTINUE 0x20
 #define RAM_SAVE_FLAG_XBZRLE   0x40
 /* 0x80 is reserved in migration.h start with 0x100 next */
+#define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
 
 static struct defconfig_file {
     const char *filename;
@@ -332,6 +334,177 @@  static uint64_t migration_dirty_pages;
 static uint32_t last_version;
 static bool ram_bulk_stage;
 
+#define COMPRESS_BUF_SIZE (TARGET_PAGE_SIZE + 16)
+#define MIG_BUF_SIZE (COMPRESS_BUF_SIZE + 256 + 16)
+struct MigBuf {
+    int buf_index;
+    uint8_t buf[MIG_BUF_SIZE];
+};
+
+typedef struct MigBuf MigBuf;
+
+static void migrate_put_byte(MigBuf *f, int v)
+{
+    f->buf[f->buf_index] = v;
+    f->buf_index++;
+}
+
+static void migrate_put_be16(MigBuf *f, unsigned int v)
+{
+    migrate_put_byte(f, v >> 8);
+    migrate_put_byte(f, v);
+}
+
+static void migrate_put_be32(MigBuf *f, unsigned int v)
+{
+    migrate_put_byte(f, v >> 24);
+    migrate_put_byte(f, v >> 16);
+    migrate_put_byte(f, v >> 8);
+    migrate_put_byte(f, v);
+}
+
+static void migrate_put_be64(MigBuf *f, uint64_t v)
+{
+    migrate_put_be32(f, v >> 32);
+    migrate_put_be32(f, v);
+}
+
+static void migrate_put_buffer(MigBuf *f, const uint8_t *buf, int size)
+{
+    int l;
+
+    while (size > 0) {
+        l = MIG_BUF_SIZE - f->buf_index;
+        if (l > size) {
+            l = size;
+        }
+        memcpy(f->buf + f->buf_index, buf, l);
+        f->buf_index += l;
+        buf += l;
+        size -= l;
+    }
+}
+
+static size_t migrate_save_block_hdr(MigBuf *f, RAMBlock *block,
+        ram_addr_t offset, int cont, int flag)
+{
+    size_t size;
+
+    migrate_put_be64(f, offset | cont | flag);
+    size = 8;
+
+    if (!cont) {
+        migrate_put_byte(f, strlen(block->idstr));
+        migrate_put_buffer(f, (uint8_t *)block->idstr,
+                        strlen(block->idstr));
+        size += 1 + strlen(block->idstr);
+    }
+    return size;
+}
+
+static int migrate_qemu_add_compress(MigBuf *f,  const uint8_t *p,
+        int size, int level)
+{
+    uLong  blen = COMPRESS_BUF_SIZE;
+    if (compress2(f->buf + f->buf_index + sizeof(int), &blen, (Bytef *)p,
+            size, level) != Z_OK) {
+        error_report("Compress Failed!\n");
+        return 0;
+    }
+    migrate_put_be32(f, blen);
+    f->buf_index += blen;
+    return blen + sizeof(int);
+}
+
+enum {
+    COM_DONE = 0,
+    COM_START,
+};
+
+static int  compress_thread_count;
+static int  decompress_thread_count;
+
+struct compress_param {
+    int state;
+    MigBuf migbuf;
+    RAMBlock *block;
+    ram_addr_t offset;
+    bool last_stage;
+    int ret;
+    int bytes_sent;
+    uint8_t *p;
+    int cont;
+    bool bulk_stage;
+};
+
+typedef struct compress_param compress_param;
+compress_param *comp_param;
+
+struct decompress_param {
+    int state;
+    void *des;
+    uint8 compbuf[COMPRESS_BUF_SIZE];
+    int len;
+};
+typedef struct decompress_param decompress_param;
+
+static decompress_param *decomp_param;
+bool incomming_migration_done;
+static bool quit_thread;
+
+static int save_compress_ram_page(compress_param *param);
+
+
+static void *do_data_compress(void *opaque)
+{
+    compress_param *param = opaque;
+    while (!quit_thread) {
+        if (param->state == COM_START) {
+            save_compress_ram_page(param);
+            param->state = COM_DONE;
+         } else {
+             g_usleep(1);
+         }
+    }
+
+    return NULL;
+}
+
+
+void migrate_compress_threads_join(MigrationState *s)
+{
+    int i;
+    if (!migrate_use_compress()) {
+        return;
+    }
+    quit_thread = true;
+    for (i = 0; i < compress_thread_count; i++) {
+        qemu_thread_join(s->compress_thread + i);
+    }
+    g_free(s->compress_thread);
+    g_free(comp_param);
+    s->compress_thread = NULL;
+    comp_param = NULL;
+}
+
+void migrate_compress_threads_create(MigrationState *s)
+{
+    int i;
+    if (!migrate_use_compress()) {
+        return;
+    }
+    quit_thread = false;
+    compress_thread_count = s->compress_thread_count;
+    s->compress_thread = g_malloc0(sizeof(QemuThread)
+        * s->compress_thread_count);
+    comp_param = g_malloc0(sizeof(compress_param) * s->compress_thread_count);
+    for (i = 0; i < s->compress_thread_count; i++) {
+        qemu_thread_create(s->compress_thread + i, "compress",
+            do_data_compress, comp_param + i, QEMU_THREAD_JOINABLE);
+
+    }
+}
+
 /* Update the xbzrle cache to reflect a page that's been sent as all 0.
  * The important thing is that a stale (not-yet-0'd) page be replaced
  * by the new data.
@@ -351,9 +524,10 @@  static void xbzrle_cache_zero_page(ram_addr_t current_addr)
 
 #define ENCODING_FLAG_XBZRLE 0x1
 
-static int save_xbzrle_page(QEMUFile *f, uint8_t **current_data,
+static int save_xbzrle_page(void *f, uint8_t **current_data,
                             ram_addr_t current_addr, RAMBlock *block,
-                            ram_addr_t offset, int cont, bool last_stage)
+                            ram_addr_t offset, int cont, bool last_stage,
+                            bool save_to_buf)
 {
     int encoded_len = 0, bytes_sent = -1;
     uint8_t *prev_cached_page;
@@ -401,10 +575,19 @@  static int save_xbzrle_page(QEMUFile *f, uint8_t **current_data,
     }
 
     /* Send XBZRLE based compressed page */
-    bytes_sent = save_block_hdr(f, block, offset, cont, RAM_SAVE_FLAG_XBZRLE);
-    qemu_put_byte(f, ENCODING_FLAG_XBZRLE);
-    qemu_put_be16(f, encoded_len);
-    qemu_put_buffer(f, XBZRLE.encoded_buf, encoded_len);
+    if (save_to_buf) {
+        bytes_sent = migrate_save_block_hdr((MigBuf *)f, block, offset,
+            cont, RAM_SAVE_FLAG_XBZRLE);
+        migrate_put_byte((MigBuf *)f, ENCODING_FLAG_XBZRLE);
+        migrate_put_be16((MigBuf *)f, encoded_len);
+        migrate_put_buffer((MigBuf *)f, XBZRLE.encoded_buf, encoded_len);
+    } else {
+        bytes_sent = save_block_hdr((QEMUFile *)f, block, offset,
+            cont, RAM_SAVE_FLAG_XBZRLE);
+        qemu_put_byte((QEMUFile *)f, ENCODING_FLAG_XBZRLE);
+        qemu_put_be16((QEMUFile *)f, encoded_len);
+        qemu_put_buffer((QEMUFile *)f, XBZRLE.encoded_buf, encoded_len);
+    }
     bytes_sent += encoded_len + 1 + 2;
     acct_info.xbzrle_pages++;
     acct_info.xbzrle_bytes += bytes_sent;
@@ -609,7 +792,7 @@  static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
         xbzrle_cache_zero_page(current_addr);
     } else if (!ram_bulk_stage && migrate_use_xbzrle()) {
         bytes_sent = save_xbzrle_page(f, &p, current_addr, block,
-                                      offset, cont, last_stage);
+                                      offset, cont, last_stage, false);
         if (!last_stage) {
             /* Can't send this cached data async, since the cache page
              * might get updated before it gets to the wire
@@ -635,6 +818,90 @@  static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
     return bytes_sent;
 }
 
+static int save_compress_ram_page(compress_param *param)
+{
+    int bytes_sent = param->bytes_sent;
+    int blen = COMPRESS_BUF_SIZE;
+    int cont = param->cont;
+    uint8_t *p = param->p;
+    int ret = param->ret;
+    RAMBlock *block = param->block;
+    ram_addr_t offset = param->offset;
+    bool last_stage = param->last_stage;
+    /* In doubt sent page as normal */
+    XBZRLE_cache_lock();
+    ram_addr_t current_addr = block->offset + offset;
+    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
+        if (ret != RAM_SAVE_CONTROL_DELAYED) {
+            if (bytes_sent > 0) {
+                atomic_inc(&acct_info.norm_pages);
+             } else if (bytes_sent == 0) {
+                atomic_inc(&acct_info.dup_pages);
+             }
+        }
+    } else if (is_zero_range(p, TARGET_PAGE_SIZE)) {
+        atomic_inc(&acct_info.dup_pages);
+        bytes_sent = migrate_save_block_hdr(&param->migbuf, block, offset, cont,
+                             RAM_SAVE_FLAG_COMPRESS);
+        migrate_put_byte(&param->migbuf, 0);
+        bytes_sent++;
+        /* Must let xbzrle know, otherwise a previous (now 0'd) cached
+         * page would be stale
+         */
+        xbzrle_cache_zero_page(current_addr);
+    } else if (!param->bulk_stage && migrate_use_xbzrle()) {
+        bytes_sent = save_xbzrle_page(&param->migbuf, &p, current_addr, block,
+                              offset, cont, last_stage, true);
+    }
+    XBZRLE_cache_unlock();
+    /* XBZRLE overflow or normal page */
+    if (bytes_sent == -1) {
+        bytes_sent = migrate_save_block_hdr(&param->migbuf, block,
+            offset, cont, RAM_SAVE_FLAG_COMPRESS_PAGE);
+        blen = migrate_qemu_add_compress(&param->migbuf, p,
+            TARGET_PAGE_SIZE, migrate_compress_level());
+        bytes_sent += blen;
+        atomic_inc(&acct_info.norm_pages);
+    }
+    return bytes_sent;
+}
+
+static uint64_t bytes_transferred;
+
+static void flush_compressed_data(QEMUFile *f)
+{
+    int idx;
+    if (!migrate_use_compress()) {
+        return;
+    }
+
+    for (idx = 0; idx < compress_thread_count; idx++) {
+        while (comp_param[idx].state != COM_DONE) {
+            g_usleep(0);
+        }
+        if (comp_param[idx].migbuf.buf_index > 0) {
+            qemu_put_buffer(f, comp_param[idx].migbuf.buf,
+                comp_param[idx].migbuf.buf_index);
+            bytes_transferred += comp_param[idx].migbuf.buf_index;
+            comp_param[idx].migbuf.buf_index = 0;
+        }
+    }
+}
+
+static inline void set_common_compress_params(compress_param *param,
+    int ret, int bytes_sent, RAMBlock *block, ram_addr_t offset,
+    bool last_stage, int cont, uint8_t *p, bool bulk_stage)
+{
+    param->ret = ret;
+    param->bytes_sent = bytes_sent;
+    param->block = block;
+    param->offset = offset;
+    param->last_stage = last_stage;
+    param->cont = cont;
+    param->p = p;
+    param->bulk_stage = bulk_stage;
+}
+
 /*
  * ram_find_and_save_block: Finds a page to send and sends it to f
  *
@@ -649,6 +916,8 @@  static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
     bool complete_round = false;
     int bytes_sent = 0;
     MemoryRegion *mr;
+    int cont, idx, ret, len = -1;
+    uint8_t *p;
 
     if (!block)
         block = QTAILQ_FIRST(&ram_list.blocks);
@@ -667,14 +936,73 @@  static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
                 block = QTAILQ_FIRST(&ram_list.blocks);
                 complete_round = true;
                 ram_bulk_stage = false;
+                if (migrate_use_xbzrle()) {
+                    /* terminate the used thread at this point*/
+                    flush_compressed_data(f);
+                    quit_thread = true;
+                }
             }
         } else {
-            bytes_sent = ram_save_page(f, block, offset, last_stage);
-
-            /* if page is unmodified, continue to the next */
-            if (bytes_sent > 0) {
-                last_sent_block = block;
-                break;
+            if (!migrate_use_compress()) {
+                bytes_sent = ram_save_page(f, block, offset, last_stage);
+                /* if page is unmodified, continue to the next */
+                if (bytes_sent > 0) {
+                    last_sent_block = block;
+                    break;
+                }
+            } else {
+                cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
+                p = memory_region_get_ram_ptr(block->mr) + offset;
+                ret = ram_control_save_page(f, block->offset,
+                           offset, TARGET_PAGE_SIZE, &len);
+                if ((!ram_bulk_stage && migrate_use_xbzrle()) || cont == 0) {
+                    if (cont == 0) {
+                        flush_compressed_data(f);
+                    }
+                    set_common_compress_params(&comp_param[0],
+                        ret, len, block, offset, last_stage, cont,
+                        p, ram_bulk_stage);
+                    bytes_sent = save_compress_ram_page(&comp_param[0]);
+                    if (bytes_sent > 0) {
+                        qemu_put_buffer(f, comp_param[0].migbuf.buf,
+                            comp_param[0].migbuf.buf_index);
+                        comp_param[0].migbuf.buf_index = 0;
+                        last_sent_block = block;
+                        break;
+                    }
+                } else {
+retry:
+                    for (idx = 0; idx < compress_thread_count; idx++) {
+                        if (comp_param[idx].state == COM_DONE) {
+                            bytes_sent = comp_param[idx].migbuf.buf_index;
+                            if (bytes_sent == 0) {
+                                set_common_compress_params(&comp_param[idx],
+                                    ret, len, block, offset, last_stage,
+                                    cont, p, ram_bulk_stage);
+                                comp_param[idx].state = COM_START;
+                                bytes_sent = 1;
+                                bytes_transferred -= 1;
+                                break;
+                            } else if (bytes_sent > 0) {
+                                qemu_put_buffer(f, comp_param[idx].migbuf.buf,
+                                    comp_param[idx].migbuf.buf_index);
+                                comp_param[idx].migbuf.buf_index = 0;
+                                set_common_compress_params(&comp_param[idx],
+                                   ret, len, block, offset, last_stage,
+                                   cont, p, ram_bulk_stage);
+                                comp_param[idx].state = COM_START;
+                                break;
+                            }
+                        }
+                    }
+                    if (idx < compress_thread_count) {
+                        last_sent_block = block;
+                        break;
+                    } else {
+                        g_usleep(0);
+                        goto retry;
+                    }
+                }
             }
         }
     }
@@ -684,7 +1012,6 @@  static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
     return bytes_sent;
 }
 
-static uint64_t bytes_transferred;
 
 void acct_update_position(QEMUFile *f, size_t size, bool zero)
 {
@@ -892,6 +1219,7 @@  static int ram_save_iterate(QEMUFile *f, void *opaque)
         i++;
     }
 
+    flush_compressed_data(f);
     qemu_mutex_unlock_ramlist();
 
     /*
@@ -938,6 +1266,7 @@  static int ram_save_complete(QEMUFile *f, void *opaque)
         bytes_transferred += bytes_sent;
     }
 
+    flush_compressed_data(f);
     ram_control_after_iterate(f, RAM_CONTROL_FINISH);
     migration_end();
 
@@ -1038,10 +1367,61 @@  void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
     }
 }
 
+QemuThread *decompress_threads;
+
+static void *do_data_decompress(void *opaque)
+{
+    decompress_param *param = opaque;
+    while (incomming_migration_done == false) {
+        if (param->state == COM_START) {
+            uLong pagesize = TARGET_PAGE_SIZE;
+            if (uncompress((Bytef *)param->des, &pagesize,
+                    (const Bytef *)param->compbuf, param->len) != Z_OK) {
+                error_report("Uncompress Failed!\n");
+                break;
+            }
+            param->state = COM_DONE;
+        } else {
+            if (quit_thread) {
+                break;
+            }
+            g_usleep(1);
+        }
+    }
+    return NULL;
+}
+
+void migrate_decompress_threads_create(int count)
+{
+    int i;
+    decompress_thread_count = count;
+    decompress_threads = g_malloc0(sizeof(QemuThread) * count);
+    decomp_param = g_malloc0(sizeof(decompress_param) * count);
+    quit_thread = false;
+    for (i = 0; i < count; i++) {
+        qemu_thread_create(decompress_threads + i, "decompress",
+            do_data_decompress, decomp_param + i, QEMU_THREAD_JOINABLE);
+    }
+}
+
+void migrate_decompress_threads_join(void)
+{
+    int i;
+    for (i = 0; i < decompress_thread_count; i++) {
+        qemu_thread_join(decompress_threads + i);
+    }
+    g_free(decompress_threads);
+    g_free(decomp_param);
+    decompress_threads = NULL;
+    decomp_param = NULL;
+}
+
 static int ram_load(QEMUFile *f, void *opaque, int version_id)
 {
     int flags = 0, ret = 0;
     static uint64_t seq_iter;
+    int len = 0;
+    uint8_t compbuf[COMPRESS_BUF_SIZE];
 
     seq_iter++;
 
@@ -1106,6 +1486,7 @@  static int ram_load(QEMUFile *f, void *opaque, int version_id)
             ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
             break;
         case RAM_SAVE_FLAG_PAGE:
+            quit_thread = true;
             host = host_from_stream_offset(f, addr, flags);
             if (!host) {
                 error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
@@ -1115,6 +1496,32 @@  static int ram_load(QEMUFile *f, void *opaque, int version_id)
 
             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
             break;
+        case RAM_SAVE_FLAG_COMPRESS_PAGE:
+            host = host_from_stream_offset(f, addr, flags);
+            if (!host) {
+                error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
+                ret = -EINVAL;
+                break;
+            }
+
+            len = qemu_get_be32(f);
+            qemu_get_buffer(f, compbuf, len);
+            int idx;
+retry:
+            for (idx = 0; idx < decompress_thread_count; idx++) {
+                if (decomp_param[idx].state == COM_DONE)  {
+                    memcpy(decomp_param[idx].compbuf, compbuf, len);
+                    decomp_param[idx].des = host;
+                    decomp_param[idx].len = len;
+                    decomp_param[idx].state = COM_START;
+                    break;
+                }
+            }
+            if (idx == decompress_thread_count) {
+                g_usleep(0);
+                goto retry;
+            }
+            break;
         case RAM_SAVE_FLAG_XBZRLE:
             host = host_from_stream_offset(f, addr, flags);
             if (!host) {
diff --git a/hmp-commands.hx b/hmp-commands.hx
index e37bc8b..8b93bed 100644
--- a/hmp-commands.hx
+++ b/hmp-commands.hx
@@ -941,6 +941,56 @@  Set cache size to @var{value} (in bytes) for xbzrle migrations.
 ETEXI
 
     {
+        .name       = "migrate_set_compress_level",
+        .args_type  = "value:i",
+        .params     = "value",
+        .help       = "set compress level for compress migrations,"
+                      "the level is a number between 0 and 9, 0 stands for "
+                      "no compression.\n"
+                      "1 stands for the fast compress speed while 9 stands for"
+                      "the highest compress ratio.",
+        .mhandler.cmd = hmp_migrate_set_compress_level,
+    },
+
+STEXI
+@item migrate_set_compress_level @var{value}
+@findex migrate_set_compress_level
+Set compress level to @var{value}  for compress migrations.
+ETEXI
+
+    {
+        .name       = "migrate_set_compress_threads",
+        .args_type  = "value:i",
+        .params     = "value",
+        .help       = "set compress thread count for migrations. "
+                      "a proper thread count will accelerate the migration speed,"
+                      "the threads should be between 1 and the CPUS of your system",
+        .mhandler.cmd = hmp_migrate_set_compress_threads,
+    },
+
+STEXI
+@item migrate_set_compress_threads @var{value}
+@findex migrate_set_compress_threads
+Set compress threads to @var{value}  for compress migrations.
+ETEXI
+
+    {
+        .name       = "migrate_set_decompress_threads",
+        .args_type  = "value:i",
+        .params     = "value",
+        .help       = "set decompress thread count for migrations. "
+                      "a proper thread count will accelerate the migration speed,"
+                      "the threads should be between 1 and the CPUS of your system",
+        .mhandler.cmd = hmp_migrate_set_decompress_threads,
+    },
+
+STEXI
+@item migrate_set_decompress_threads @var{value}
+@findex migrate_set_decompress_threads
+Set decompress threads to @var{value}  for compress migrations.
+ETEXI
+
+    {
         .name       = "migrate_set_speed",
         .args_type  = "value:o",
         .params     = "value",
@@ -1766,6 +1816,12 @@  show migration status
 show current migration capabilities
 @item info migrate_cache_size
 show current migration XBZRLE cache size
+@item info migrate_compress_level
+show current migration compress level
+@item info migrate_compress_threads
+show current migration compress threads
+@item info migrate_decompress_threads
+show current migration decompress threads
 @item info balloon
 show balloon information
 @item info qtree
diff --git a/hmp.c b/hmp.c
index 63d7686..b1936a3 100644
--- a/hmp.c
+++ b/hmp.c
@@ -252,6 +252,24 @@  void hmp_info_migrate_cache_size(Monitor *mon, const QDict *qdict)
                    qmp_query_migrate_cache_size(NULL) >> 10);
 }
 
+void hmp_info_migrate_compress_level(Monitor *mon, const QDict *qdict)
+{
+    monitor_printf(mon, "compress level: %" PRId64 "\n",
+                   qmp_query_migrate_compress_level(NULL));
+}
+
+void hmp_info_migrate_compress_threads(Monitor *mon, const QDict *qdict)
+{
+    monitor_printf(mon, "compress threads: %" PRId64 "\n",
+                   qmp_query_migrate_compress_threads(NULL));
+}
+
+void hmp_info_migrate_decompress_threads(Monitor *mon, const QDict *qdict)
+{
+    monitor_printf(mon, "decompress threads: %" PRId64 "\n",
+                   qmp_query_migrate_decompress_threads(NULL));
+}
+
 void hmp_info_cpus(Monitor *mon, const QDict *qdict)
 {
     CpuInfoList *cpu_list, *cpu;
@@ -1041,6 +1059,45 @@  void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict)
     }
 }
 
+void hmp_migrate_set_compress_level(Monitor *mon, const QDict *qdict)
+{
+    int64_t value = qdict_get_int(qdict, "value");
+    Error *err = NULL;
+
+    qmp_migrate_set_compress_level(value, &err);
+    if (err) {
+        monitor_printf(mon, "%s\n", error_get_pretty(err));
+        error_free(err);
+        return;
+    }
+}
+
+void hmp_migrate_set_compress_threads(Monitor *mon, const QDict *qdict)
+{
+    int64_t value = qdict_get_int(qdict, "value");
+    Error *err = NULL;
+
+    qmp_migrate_set_compress_threads(value, &err);
+    if (err) {
+        monitor_printf(mon, "%s\n", error_get_pretty(err));
+        error_free(err);
+        return;
+    }
+}
+
+void hmp_migrate_set_decompress_threads(Monitor *mon, const QDict *qdict)
+{
+    int64_t value = qdict_get_int(qdict, "value");
+    Error *err = NULL;
+
+    qmp_migrate_set_decompress_threads(value, &err);
+    if (err) {
+        monitor_printf(mon, "%s\n", error_get_pretty(err));
+        error_free(err);
+        return;
+    }
+}
+
 void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict)
 {
     int64_t value = qdict_get_int(qdict, "value");
diff --git a/hmp.h b/hmp.h
index 4bb5dca..b348806 100644
--- a/hmp.h
+++ b/hmp.h
@@ -29,6 +29,9 @@  void hmp_info_mice(Monitor *mon, const QDict *qdict);
 void hmp_info_migrate(Monitor *mon, const QDict *qdict);
 void hmp_info_migrate_capabilities(Monitor *mon, const QDict *qdict);
 void hmp_info_migrate_cache_size(Monitor *mon, const QDict *qdict);
+void hmp_info_migrate_compress_level(Monitor *mon, const QDict *qdict);
+void hmp_info_migrate_compress_threads(Monitor *mon, const QDict *qdict);
+void hmp_info_migrate_decompress_threads(Monitor *mon, const QDict *qdict);
 void hmp_info_cpus(Monitor *mon, const QDict *qdict);
 void hmp_info_block(Monitor *mon, const QDict *qdict);
 void hmp_info_blockstats(Monitor *mon, const QDict *qdict);
@@ -64,6 +67,9 @@  void hmp_migrate_set_downtime(Monitor *mon, const QDict *qdict);
 void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict);
 void hmp_migrate_set_capability(Monitor *mon, const QDict *qdict);
 void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict);
+void hmp_migrate_set_compress_level(Monitor *mon, const QDict *qdict);
+void hmp_migrate_set_compress_threads(Monitor *mon, const QDict *qdict);
+void hmp_migrate_set_decompress_threads(Monitor *mon, const QDict *qdict);
 void hmp_set_password(Monitor *mon, const QDict *qdict);
 void hmp_expire_password(Monitor *mon, const QDict *qdict);
 void hmp_eject(Monitor *mon, const QDict *qdict);
diff --git a/include/migration/migration.h b/include/migration/migration.h
index 3cb5ba8..03c8e0d 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -49,6 +49,9 @@  struct MigrationState
     QemuThread thread;
     QEMUBH *cleanup_bh;
     QEMUFile *file;
+    QemuThread *compress_thread;
+    int compress_thread_count;
+    int compress_level;
 
     int state;
     MigrationParams params;
@@ -64,6 +67,7 @@  struct MigrationState
     int64_t dirty_sync_count;
 };
 
+extern bool incomming_migration_done;
 void process_incoming_migration(QEMUFile *f);
 
 void qemu_start_incoming_migration(const char *uri, Error **errp);
@@ -107,6 +111,10 @@  bool migration_has_finished(MigrationState *);
 bool migration_has_failed(MigrationState *);
 MigrationState *migrate_get_current(void);
 
+void migrate_compress_threads_create(MigrationState *s);
+void migrate_compress_threads_join(MigrationState *s);
+void migrate_decompress_threads_create(int count);
+void migrate_decompress_threads_join(void);
 uint64_t ram_bytes_remaining(void);
 uint64_t ram_bytes_transferred(void);
 uint64_t ram_bytes_total(void);
@@ -144,7 +152,7 @@  void migrate_del_blocker(Error *reason);
 
 bool migrate_rdma_pin_all(void);
 bool migrate_zero_blocks(void);
-
+bool migrate_use_compress(void);
 bool migrate_auto_converge(void);
 
 int xbzrle_encode_buffer(uint8_t *old_buf, uint8_t *new_buf, int slen,
@@ -153,6 +161,8 @@  int xbzrle_decode_buffer(uint8_t *src, int slen, uint8_t *dst, int dlen);
 
 int migrate_use_xbzrle(void);
 int64_t migrate_xbzrle_cache_size(void);
+int migrate_compress_level(void);
+int migrate_compress_threads(void);
 
 int64_t xbzrle_cache_resize(int64_t new_size);
 
diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h
index 401676b..431e6cc 100644
--- a/include/migration/qemu-file.h
+++ b/include/migration/qemu-file.h
@@ -112,6 +112,7 @@  QEMUFile *qemu_bufopen(const char *mode, QEMUSizedBuffer *input);
 int qemu_get_fd(QEMUFile *f);
 int qemu_fclose(QEMUFile *f);
 int64_t qemu_ftell(QEMUFile *f);
+uint64_t qemu_add_compress(QEMUFile *f,  const uint8_t *p, int size);
 void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size);
 void qemu_put_byte(QEMUFile *f, int v);
 /*
diff --git a/migration.c b/migration.c
index c49a05a..716de97 100644
--- a/migration.c
+++ b/migration.c
@@ -46,6 +46,12 @@  enum {
 /* Migration XBZRLE default cache size */
 #define DEFAULT_MIGRATE_CACHE_SIZE (64 * 1024 * 1024)
 
+/* Migration compress default thread count */
+#define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8
+#define DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT 2
+/*0: means nocompress, 1: best speed, ... 9: best compress ratio */
+#define DEFAULT_MIGRATE_COMPRESS_LEVEL 1
+
 static NotifierList migration_state_notifiers =
     NOTIFIER_LIST_INITIALIZER(migration_state_notifiers);
 
@@ -60,6 +66,8 @@  MigrationState *migrate_get_current(void)
         .bandwidth_limit = MAX_THROTTLE,
         .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
         .mbps = -1,
+        .compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
+        .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL,
     };
 
     return &current_migration;
@@ -101,6 +109,7 @@  static void process_incoming_migration_co(void *opaque)
         error_report("load of migration failed: %s", strerror(-ret));
         exit(EXIT_FAILURE);
     }
+    incomming_migration_done = true;
     qemu_announce_self();
 
     /* Make sure all file formats flush their mutable metadata */
@@ -116,10 +125,14 @@  static void process_incoming_migration_co(void *opaque)
     } else {
         runstate_set(RUN_STATE_PAUSED);
     }
+    migrate_decompress_threads_join();
 }
 
+static int uncompress_thread_count = DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT;
 void process_incoming_migration(QEMUFile *f)
 {
+    incomming_migration_done = false;
+    migrate_decompress_threads_create(uncompress_thread_count);
     Coroutine *co = qemu_coroutine_create(process_incoming_migration_co);
     int fd = qemu_get_fd(f);
 
@@ -302,6 +315,7 @@  static void migrate_fd_cleanup(void *opaque)
         qemu_thread_join(&s->thread);
         qemu_mutex_lock_iothread();
 
+        migrate_compress_threads_join(s);
         qemu_fclose(s->file);
         s->file = NULL;
     }
@@ -373,6 +387,8 @@  static MigrationState *migrate_init(const MigrationParams *params)
     int64_t bandwidth_limit = s->bandwidth_limit;
     bool enabled_capabilities[MIGRATION_CAPABILITY_MAX];
     int64_t xbzrle_cache_size = s->xbzrle_cache_size;
+    int compress_level = s->compress_level;
+    int compress_thread_count = s->compress_thread_count;
 
     memcpy(enabled_capabilities, s->enabled_capabilities,
            sizeof(enabled_capabilities));
@@ -383,6 +399,8 @@  static MigrationState *migrate_init(const MigrationParams *params)
            sizeof(enabled_capabilities));
     s->xbzrle_cache_size = xbzrle_cache_size;
 
+    s->compress_level = compress_level;
+    s->compress_thread_count = compress_thread_count;
     s->bandwidth_limit = bandwidth_limit;
     s->state = MIG_STATE_SETUP;
     trace_migrate_set_state(MIG_STATE_SETUP);
@@ -503,6 +521,59 @@  int64_t qmp_query_migrate_cache_size(Error **errp)
     return migrate_xbzrle_cache_size();
 }
 
+void qmp_migrate_set_compress_level(int64_t value, Error **errp)
+{
+    MigrationState *s = migrate_get_current();
+
+    if (value > 9 || value < 0) {
+        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress level",
+                  "is invalid, please input a integer between 0 and 9. ");
+        return;
+    }
+
+    s->compress_level = value;
+}
+
+int64_t qmp_query_migrate_compress_level(Error **errp)
+{
+    return migrate_compress_level();
+}
+
+void qmp_migrate_set_compress_threads(int64_t value, Error **errp)
+{
+    MigrationState *s = migrate_get_current();
+
+    if (value > 255 || value < 1) {
+        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress thread count",
+                  "is invalid, please input a integer between 1 and 255. ");
+        return;
+    }
+
+    s->compress_thread_count = value;
+}
+
+void qmp_migrate_set_decompress_threads(int64_t value, Error **errp)
+{
+
+    if (value > 255 || value < 1) {
+        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress thread count",
+                  "is invalid, please input a integer between 1 and 255. ");
+        return;
+    }
+
+    uncompress_thread_count = value;
+}
+
+int64_t qmp_query_migrate_compress_threads(Error **errp)
+{
+    return migrate_compress_threads();
+}
+
+int64_t qmp_query_migrate_decompress_threads(Error **errp)
+{
+    return uncompress_thread_count;
+}
+
 void qmp_migrate_set_speed(int64_t value, Error **errp)
 {
     MigrationState *s;
@@ -555,6 +626,33 @@  bool migrate_zero_blocks(void)
     return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_BLOCKS];
 }
 
+bool migrate_use_compress(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->enabled_capabilities[MIGRATION_CAPABILITY_COMPRESS];
+}
+
+int migrate_compress_level(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->compress_level;
+}
+
+int migrate_compress_threads(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->compress_thread_count;
+}
+
 int migrate_use_xbzrle(void)
 {
     MigrationState *s;
@@ -697,4 +795,5 @@  void migrate_fd_connect(MigrationState *s)
 
     qemu_thread_create(&s->thread, "migration", migration_thread, s,
                        QEMU_THREAD_JOINABLE);
+    migrate_compress_threads_create(s);
 }
diff --git a/monitor.c b/monitor.c
index 905d8cf..365547e 100644
--- a/monitor.c
+++ b/monitor.c
@@ -2865,6 +2865,27 @@  static mon_cmd_t info_cmds[] = {
         .mhandler.cmd = hmp_info_migrate_cache_size,
     },
     {
+        .name       = "migrate_compress_level",
+        .args_type  = "",
+        .params     = "",
+        .help       = "show current migration compress level",
+        .mhandler.cmd = hmp_info_migrate_compress_level,
+    },
+    {
+        .name       = "migrate_compress_threads",
+        .args_type  = "",
+        .params     = "",
+        .help       = "show current migration compress thread count",
+        .mhandler.cmd = hmp_info_migrate_compress_threads,
+    },
+    {
+        .name       = "migrate_decompress_threads",
+        .args_type  = "",
+        .params     = "",
+        .help       = "show current migration decompress thread count",
+        .mhandler.cmd = hmp_info_migrate_decompress_threads,
+    },
+    {
         .name       = "balloon",
         .args_type  = "",
         .params     = "",
diff --git a/qapi-schema.json b/qapi-schema.json
index 24379ab..71a9e0f 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -491,13 +491,17 @@ 
 #          to enable the capability on the source VM. The feature is disabled by
 #          default. (since 1.6)
 #
+# @compress: Using the multiple compression threads to accelerate live migration.
+#          This feature can help to reduce the migration traffic, by sending
+#          compressed pages. The feature is disabled by default. (since 2.3)
+#
 # @auto-converge: If enabled, QEMU will automatically throttle down the guest
 #          to speed up convergence of RAM migration. (since 1.6)
 #
 # Since: 1.2
 ##
 { 'enum': 'MigrationCapability',
-  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks'] }
+  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks', 'compress'] }
 
 ##
 # @MigrationCapabilityStatus
@@ -1382,6 +1386,88 @@ 
 { 'command': 'query-migrate-cache-size', 'returns': 'int' }
 
 ##
+# @migrate-set-compress-level
+#
+# Set compress level
+#
+# @value: compress level int
+#
+# The compress level will be an integer between 0 and 9.
+# The compress level can be modified before and during ongoing migration
+#
+# Returns: nothing on success
+#
+# Since: 2.3
+##
+{ 'command': 'migrate-set-compress-level', 'data': {'value': 'int'} }
+
+##
+# @query-migrate-compress-level
+#
+# query compress level
+#
+# Returns: compress level int
+#
+# Since: 2.3
+##
+{ 'command': 'query-migrate-compress-level', 'returns': 'int' }
+
+##
+# @migrate-set-compress-threads
+#
+# Set compress threads
+#
+# @value: compress threads int
+#
+# The compress thread count is an integer between 1 and 255.
+# The compress level can be modified only before migration
+#
+# Returns: nothing on success
+#
+# Since: 2.3
+##
+{ 'command': 'migrate-set-compress-threads', 'data': {'value': 'int'} }
+
+##
+# @query-migrate-compress-threads
+#
+# query compress threads
+#
+# Returns: compress threads int
+#
+# Since: 2.3
+##
+{ 'command': 'query-migrate-compress-threads', 'returns': 'int' }
+
+##
+##
+# @migrate-set-decompress-threads
+#
+# Set decompress threads
+#
+# @value: decompress threads int
+#
+# The decompress thread count is an integer between 1 and 255.
+# The decompress level can be modified only before migration
+#
+# Returns: nothing on success
+#
+# Since: 2.3
+##
+{ 'command': 'migrate-set-decompress-threads', 'data': {'value': 'int'} }
+
+##
+# @query-migrate-decompress-threads
+#
+# query decompress threads
+#
+# Returns: decompress threads int
+#
+# Since: 2.3
+##
+{ 'command': 'query-migrate-decompress-threads', 'returns': 'int' }
+
+##
 # @ObjectPropertyInfo:
 #
 # @name: the name of the property
diff --git a/qmp-commands.hx b/qmp-commands.hx
index 1abd619..b60fdab 100644
--- a/qmp-commands.hx
+++ b/qmp-commands.hx
@@ -705,7 +705,138 @@  Example:
 <- { "return": 67108864 }
 
 EQMP
+{
+        .name       = "migrate-set-compress-level",
+        .args_type  = "value:i",
+        .mhandler.cmd_new = qmp_marshal_input_migrate_set_compress_level,
+    },
+
+SQMP
+migrate-set-compress-level
+----------------------
+
+Set compress level to be used by compress migration, the compress level is an integer
+between 0 and 9
+
+Arguments:
+
+- "value": compress level (json-int)
+
+Example:
+
+-> { "execute": "migrate-set-compress-level", "arguments": { "value": 536870912 } }
+<- { "return": {} }
+
+EQMP
+    {
+        .name       = "query-migrate-compress-level",
+        .args_type  = "",
+        .mhandler.cmd_new = qmp_marshal_input_query_migrate_compress_level,
+    },
+
+SQMP
+query-migrate-compress-level
+------------------------
+
+Show compress level to be used by compress migration
+
+returns a json-object with the following information:
+- "size" : json-int
+
+Example:
+
+-> { "execute": "query-migrate-compress-level" }
+<- { "return": 67108864 }
+
+EQMP
+{
+        .name       = "migrate-set-compress-threads",
+        .args_type  = "value:i",
+        .mhandler.cmd_new = qmp_marshal_input_migrate_set_compress_threads,
+    },
+
+SQMP
+migrate-set-compress-threads
+----------------------
+
+Set compress thread count to be used by compress migration, the compress thread count is an integer
+between 1 and 255
+
+Arguments:
+
+- "value": compress threads (json-int)
+
+Example:
+
+-> { "execute": "migrate-set-compress-threads", "arguments": { "value": 536870912 } }
+<- { "return": {} }
+
+EQMP
+    {
+        .name       = "query-migrate-compress-threads",
+        .args_type  = "",
+        .mhandler.cmd_new = qmp_marshal_input_query_migrate_compress_threads,
+    },
+
+SQMP
+query-migrate-compress-threads
+------------------------
+
+Show compress thread count to be used by compress migration
+
+returns a json-object with the following information:
+- "size" : json-int
+
+Example:
+
+-> { "execute": "query-migrate-compress-threads" }
+<- { "return": 67108864 }
+
+EQMP
+{
+        .name       = "migrate-set-decompress-threads",
+        .args_type  = "value:i",
+        .mhandler.cmd_new = qmp_marshal_input_migrate_set_decompress_threads,
+    },
+
+SQMP
+migrate-set-decompress-threads
+----------------------
+
+Set decompress thread count to be used by compress migration, the decompress thread count is an integer
+between 1 and 255
+
+Arguments:
+
+- "value": decompress threads (json-int)
+
+Example:
+
+-> { "execute": "migrate-set-decompress-threads", "arguments": { "value": 536870912 } }
+<- { "return": {} }
 
+EQMP
+    {
+        .name       = "query-migrate-decompress-threads",
+        .args_type  = "",
+        .mhandler.cmd_new = qmp_marshal_input_query_migrate_decompress_threads,
+    },
+
+SQMP
+query-migrate-decompress-threads
+------------------------
+
+Show decompress thread count to be used by compress migration
+
+returns a json-object with the following information:
+- "size" : json-int
+
+Example:
+
+-> { "execute": "query-migrate-compress-threads" }
+<- { "return": 67108864 }
+
+EQMP
     {
         .name       = "migrate_set_speed",
         .args_type  = "value:o",