diff mbox

[v6,08/14] migration: Add the core code of multi-thread compression

Message ID 1427099549-10633-9-git-send-email-liang.z.li@intel.com
State New
Headers show

Commit Message

Li, Liang Z March 23, 2015, 8:32 a.m. UTC
Implement the core logic of the multiple thread compression. At this
point, multiple thread compression can't co-work with xbzrle yet.

Signed-off-by: Liang Li <liang.z.li@intel.com>
Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
---
 arch_init.c | 184 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 177 insertions(+), 7 deletions(-)

Comments

Juan Quintela March 25, 2015, 11:47 a.m. UTC | #1
Liang Li <liang.z.li@intel.com> wrote:
> Implement the core logic of the multiple thread compression. At this
> point, multiple thread compression can't co-work with xbzrle yet.
>
> Signed-off-by: Liang Li <liang.z.li@intel.com>
> Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>

> ---
>  arch_init.c | 184 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
>  1 file changed, 177 insertions(+), 7 deletions(-)
>
> diff --git a/arch_init.c b/arch_init.c
> index 48cae22..9f63c0f 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -355,12 +355,33 @@ static DecompressParam *decomp_param;
>  static QemuThread *decompress_threads;
>  static uint8_t *compressed_data_buf;
>  
> +static int do_compress_ram_page(CompressParam *param);
> +
>  static void *do_data_compress(void *opaque)
>  {
> -    while (!quit_comp_thread) {
> +    CompressParam *param = opaque;
>  
> -    /* To be done */
What is the different with changing this loop to:


> +    while (!quit_comp_thread) {

Here we don't have quit_comp_thread protected by anything.

> +        qemu_mutex_lock(&param->mutex);
> +        /* Re-check the quit_comp_thread in case of
> +         * terminate_compression_threads is called just before
> +         * qemu_mutex_lock(&param->mutex) and after
> +         * while(!quit_comp_thread), re-check it here can make
> +         * sure the compression thread terminate as expected.
> +         */
> +        while (!param->start && !quit_comp_thread) {

Here and next use is protected by param->mutex, but param is per
compression thread, so, it is not really protected.

> +            qemu_cond_wait(&param->cond, &param->mutex);
> +        }
> +        if (!quit_comp_thread) {
> +            do_compress_ram_page(param);
> +        }
> +        param->start = false;

param->start is pretected by param->mutex everywhere

> +        qemu_mutex_unlock(&param->mutex);
>  
> +        qemu_mutex_lock(comp_done_lock);
> +        param->done = true;

param->done protected by comp_done_lock

> +        qemu_cond_signal(comp_done_cond);
> +        qemu_mutex_unlock(comp_done_lock);
>      }
>  
>      return NULL;
> @@ -368,9 +389,15 @@ static void *do_data_compress(void *opaque)
>  
>  static inline void terminate_compression_threads(void)
>  {
> -    quit_comp_thread = true;
> +    int idx, thread_count;
>  
> -    /* To be done */
> +    thread_count = migrate_compress_threads();
> +    quit_comp_thread = true;

quite_comp_thread not protected again.

> +    for (idx = 0; idx < thread_count; idx++) {
> +        qemu_mutex_lock(&comp_param[idx].mutex);
> +        qemu_cond_signal(&comp_param[idx].cond);
> +        qemu_mutex_unlock(&comp_param[idx].mutex);
> +    }
>  }
>  
>  void migrate_compress_threads_join(void)
> @@ -420,6 +447,7 @@ void migrate_compress_threads_create(void)
>           * it's ops to empty.
>           */
>          comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
> +        comp_param[i].done = true;
>          qemu_mutex_init(&comp_param[i].mutex);
>          qemu_cond_init(&comp_param[i].cond);
>          qemu_thread_create(compress_threads + i, "compress",
> @@ -829,6 +857,97 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
>      return pages;
>  }
>  
> +static int do_compress_ram_page(CompressParam *param)
> +{
> +    int bytes_sent, blen;
> +    uint8_t *p;
> +    RAMBlock *block = param->block;
> +    ram_addr_t offset = param->offset;
> +
> +    p = memory_region_get_ram_ptr(block->mr) + (offset & TARGET_PAGE_MASK);
> +
> +    bytes_sent = save_page_header(param->file, block, offset |
> +                                  RAM_SAVE_FLAG_COMPRESS_PAGE);
> +    blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
> +                                     migrate_compress_level());
> +    bytes_sent += blen;
> +    atomic_inc(&acct_info.norm_pages);
> +
> +    return bytes_sent;
> +}
> +
> +static inline void start_compression(CompressParam *param)
> +{
> +    param->done = false;

Not protected (well, its caller have protected it by comp_done_lock.

> +    qemu_mutex_lock(&param->mutex);
> +    param->start = true;
> +    qemu_cond_signal(&param->cond);
> +    qemu_mutex_unlock(&param->mutex);
> +}
> +
> +
> +static uint64_t bytes_transferred;
> +
> +static void flush_compressed_data(QEMUFile *f)
> +{
> +    int idx, len, thread_count;
> +
> +    if (!migrate_use_compression()) {
> +        return;
> +    }
> +    thread_count = migrate_compress_threads();
> +    for (idx = 0; idx < thread_count; idx++) {
> +        if (!comp_param[idx].done) {

done is not protected here.

> +            qemu_mutex_lock(comp_done_lock);
> +            while (!comp_param[idx].done && !quit_comp_thread) {


Now, it is under comp_done_lock.  Bun none of its other uses is
protected by it.

And here done is proteced by comp_done_cond


> +                qemu_cond_wait(comp_done_cond, comp_done_lock);
> +            }
> +            qemu_mutex_unlock(comp_done_lock);
> +        }
> +        if (!quit_comp_thread) {

Here, it is unprotected again.

> +            len = qemu_put_qemu_file(f, comp_param[idx].file);
> +            bytes_transferred += len;
> +        }
> +    }
> +}
> +
> +static inline void set_compress_params(CompressParam *param, RAMBlock *block,
> +                                       ram_addr_t offset)
> +{
> +    param->block = block;
> +    param->offset = offset;
> +}
> +
> +static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
> +                                           ram_addr_t offset,
> +                                           uint64_t *bytes_transferred)
> +{
> +    int idx, thread_count, bytes_xmit = -1, pages = -1;
> +
> +    thread_count = migrate_compress_threads();
> +    qemu_mutex_lock(comp_done_lock);
> +    while (true) {
> +        for (idx = 0; idx < thread_count; idx++) {
> +            if (comp_param[idx].done) {
> +                bytes_xmit = qemu_put_qemu_file(f, comp_param[idx].file);
> +                set_compress_params(&comp_param[idx], block, offset);
> +                start_compression(&comp_param[idx]);
> +                pages = 1;
> +                *bytes_transferred += bytes_xmit;
> +                break;
> +            }
> +        }
> +        if (pages > 0) {
> +            break;
> +        } else {
> +            qemu_cond_wait(comp_done_cond, comp_done_lock);
> +        }
> +    }
> +    qemu_mutex_unlock(comp_done_lock);
> +
> +    return pages;
> +}
> +
>  /**
>   * ram_save_compressed_page: compress the given page and send it to the stream
>   *
> @@ -845,8 +964,59 @@ static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
>                                      uint64_t *bytes_transferred)
>  {
>      int pages = -1;
> +    uint64_t bytes_xmit;
> +    MemoryRegion *mr = block->mr;
> +    uint8_t *p;
> +    int ret;
> +
> +    p = memory_region_get_ram_ptr(mr) + offset;
>  
> -    /* To be done*/
> +    bytes_xmit = 0;
> +    ret = ram_control_save_page(f, block->offset,
> +                                offset, TARGET_PAGE_SIZE, &bytes_xmit);
> +    if (bytes_xmit) {
> +        *bytes_transferred += bytes_xmit;
> +        pages = 1;
> +    }
> +    if (block == last_sent_block) {
> +        offset |= RAM_SAVE_FLAG_CONTINUE;
> +    }
> +    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
> +        if (ret != RAM_SAVE_CONTROL_DELAYED) {
> +            if (bytes_xmit > 0) {
> +                acct_info.norm_pages++;
> +            } else if (bytes_xmit == 0) {
> +                acct_info.dup_pages++;
> +            }
> +        }
> +    } else {
> +        /* When starting the process of a new block, the first page of
> +         * the block should be sent out before other pages in the same
> +         * block, and all the pages in last block should have been sent
> +         * out, keeping this order is important, because the 'cont' flag
> +         * is used to avoid resending the block name.
> +         */
> +        if (block != last_sent_block) {
> +            flush_compressed_data(f);
> +            pages = save_zero_page(f, block, offset, p, bytes_transferred);
> +            if (pages == -1) {
> +                set_compress_params(&comp_param[0], block, offset);
> +                /* Use the qemu thread to compress the data to make sure the
> +                 * first page is sent out before other pages
> +                 */
> +                bytes_xmit = do_compress_ram_page(&comp_param[0]);
> +                qemu_put_qemu_file(f, comp_param[0].file);
> +                *bytes_transferred += bytes_xmit;
> +                pages = 1;
> +            }
> +        } else {
> +            pages = save_zero_page(f, block, offset, p, bytes_transferred);
> +            if (pages == -1) {
> +                pages = compress_page_with_multi_thread(f, block, offset,
> +                                                        bytes_transferred);
> +            }
> +        }
> +    }
>  
>      return pages;
>  }
> @@ -914,8 +1084,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage,
>      return pages;
>  }
>  
> -static uint64_t bytes_transferred;
> -
>  void acct_update_position(QEMUFile *f, size_t size, bool zero)
>  {
>      uint64_t pages = size / TARGET_PAGE_SIZE;
> @@ -1129,6 +1297,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
>          }
>          i++;
>      }
> +    flush_compressed_data(f);
>      rcu_read_unlock();
>  
>      /*
> @@ -1170,6 +1339,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
>          }
>      }
>  
> +    flush_compressed_data(f);
>      ram_control_after_iterate(f, RAM_CONTROL_FINISH);
>      migration_end();
Li, Liang Z March 26, 2015, 2:37 a.m. UTC | #2
> > --- a/arch_init.c
> > +++ b/arch_init.c
> > @@ -355,12 +355,33 @@ static DecompressParam *decomp_param;  static
> > QemuThread *decompress_threads;  static uint8_t
> *compressed_data_buf;
> >
> > +static int do_compress_ram_page(CompressParam *param);
> > +
> >  static void *do_data_compress(void *opaque)  {
> > -    while (!quit_comp_thread) {
> > +    CompressParam *param = opaque;
> >
> > -    /* To be done */
> What is the different with changing this loop to:
> 
> 
> > +    while (!quit_comp_thread) {
> 
> Here we don't have quit_comp_thread protected by anything.

Yes, add a lock to protect quit_comp_thread is not hard and can make code
more clean, but the lock will drop the performance. So I don't select to protect
it with a lock or something else. 

Is there any problem to operate quit_comp_thread without protect?

> 
> > +        qemu_mutex_lock(&param->mutex);
> > +        /* Re-check the quit_comp_thread in case of
> > +         * terminate_compression_threads is called just before
> > +         * qemu_mutex_lock(&param->mutex) and after
> > +         * while(!quit_comp_thread), re-check it here can make
> > +         * sure the compression thread terminate as expected.
> > +         */
> > +        while (!param->start && !quit_comp_thread) {
> 
> Here and next use is protected by param->mutex, but param is per
> compression thread, so, it is not really protected.
> 
> > +            qemu_cond_wait(&param->cond, &param->mutex);
> > +        }
> > +        if (!quit_comp_thread) {
> > +            do_compress_ram_page(param);
> > +        }
> > +        param->start = false;
> 
> param->start is pretected by param->mutex everywhere
> 
> > +        qemu_mutex_unlock(&param->mutex);
> >
> > +        qemu_mutex_lock(comp_done_lock);
> > +        param->done = true;
> 
> param->done protected by comp_done_lock
> 
> > +        qemu_cond_signal(comp_done_cond);
> > +        qemu_mutex_unlock(comp_done_lock);
> >      }
> >
> >      return NULL;
> > @@ -368,9 +389,15 @@ static void *do_data_compress(void *opaque)
> >
> >  static inline void terminate_compression_threads(void)
> >  {
> > -    quit_comp_thread = true;
> > +    int idx, thread_count;
> >
> > -    /* To be done */
> > +    thread_count = migrate_compress_threads();
> > +    quit_comp_thread = true;
> 
> quite_comp_thread not protected again.
> 
> > +    for (idx = 0; idx < thread_count; idx++) {
> > +        qemu_mutex_lock(&comp_param[idx].mutex);
> > +        qemu_cond_signal(&comp_param[idx].cond);
> > +        qemu_mutex_unlock(&comp_param[idx].mutex);
> > +    }
> >  }
> >
> >  void migrate_compress_threads_join(void)
> > @@ -420,6 +447,7 @@ void migrate_compress_threads_create(void)
> >           * it's ops to empty.
> >           */
> >          comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
> > +        comp_param[i].done = true;
> >          qemu_mutex_init(&comp_param[i].mutex);
> >          qemu_cond_init(&comp_param[i].cond);
> >          qemu_thread_create(compress_threads + i, "compress", @@
> > -829,6 +857,97 @@ static int ram_save_page(QEMUFile *f, RAMBlock*
> block, ram_addr_t offset,
> >      return pages;
> >  }
> >
> > +static int do_compress_ram_page(CompressParam *param) {
> > +    int bytes_sent, blen;
> > +    uint8_t *p;
> > +    RAMBlock *block = param->block;
> > +    ram_addr_t offset = param->offset;
> > +
> > +    p = memory_region_get_ram_ptr(block->mr) + (offset &
> > + TARGET_PAGE_MASK);
> > +
> > +    bytes_sent = save_page_header(param->file, block, offset |
> > +                                  RAM_SAVE_FLAG_COMPRESS_PAGE);
> > +    blen = qemu_put_compression_data(param->file, p,
> TARGET_PAGE_SIZE,
> > +                                     migrate_compress_level());
> > +    bytes_sent += blen;
> > +    atomic_inc(&acct_info.norm_pages);
> > +
> > +    return bytes_sent;
> > +}
> > +
> > +static inline void start_compression(CompressParam *param) {
> > +    param->done = false;
> 
> Not protected (well, its caller have protected it by comp_done_lock.
> 
> > +    qemu_mutex_lock(&param->mutex);
> > +    param->start = true;
> > +    qemu_cond_signal(&param->cond);
> > +    qemu_mutex_unlock(&param->mutex); }
> > +
> > +
> > +static uint64_t bytes_transferred;
> > +
> > +static void flush_compressed_data(QEMUFile *f) {
> > +    int idx, len, thread_count;
> > +
> > +    if (!migrate_use_compression()) {
> > +        return;
> > +    }
> > +    thread_count = migrate_compress_threads();
> > +    for (idx = 0; idx < thread_count; idx++) {
> > +        if (!comp_param[idx].done) {
> 
> done is not protected here.

My intention is to do some optimization.

> 
> > +            qemu_mutex_lock(comp_done_lock);
> > +            while (!comp_param[idx].done && !quit_comp_thread) {
> 
> 
> Now, it is under comp_done_lock.  Bun none of its other uses is protected by
> it.
> 
> And here done is proteced by comp_done_cond
> 
> 
> > +                qemu_cond_wait(comp_done_cond, comp_done_lock);
> > +            }
> > +            qemu_mutex_unlock(comp_done_lock);
> > +        }
> > +        if (!quit_comp_thread) {
> 
> Here, it is unprotected again.

I have tried the way that you commented in the version 5 patch, but I found the performance 
drop a lot. In my way, the migration can finish in 20s, but after change, it takes 30+s.
Maybe there were something that I did incorrectly. 

So, I my implementation, I tried to avoid  using the lock as possible as I can.
Juan Quintela March 26, 2015, 10:27 a.m. UTC | #3
"Li, Liang Z" <liang.z.li@intel.com> wrote:
>> > --- a/arch_init.c
>> > +++ b/arch_init.c
>> > @@ -355,12 +355,33 @@ static DecompressParam *decomp_param;  static
>> > QemuThread *decompress_threads;  static uint8_t
>> *compressed_data_buf;
>> >
>> > +static int do_compress_ram_page(CompressParam *param);
>> > +
>> >  static void *do_data_compress(void *opaque)  {
>> > -    while (!quit_comp_thread) {
>> > +    CompressParam *param = opaque;
>> >
>> > -    /* To be done */
>> What is the different with changing this loop to:
>> 
>> 
>> > +    while (!quit_comp_thread) {
>> 
>> Here we don't have quit_comp_thread protected by anything.
>
> Yes, add a lock to protect quit_comp_thread is not hard and can make code
> more clean, but the lock will drop the performance. So I don't select to protect
> it with a lock or something else. 
>
> Is there any problem to operate quit_comp_thread without protect?

You are not using atomic operations and you are using it from several
threads, so yes.  I still think that just ading another bool to the
parmas struct should be enough, and shouldn't affect a lot performance
(you are updating/looking at ->start near in all places that you touch it.)


>
>> 
>> > +        qemu_mutex_lock(&param->mutex);
>> > +        /* Re-check the quit_comp_thread in case of
>> > +         * terminate_compression_threads is called just before
>> > +         * qemu_mutex_lock(&param->mutex) and after
>> > +         * while(!quit_comp_thread), re-check it here can make
>> > +         * sure the compression thread terminate as expected.
>> > +         */
>> > +        while (!param->start && !quit_comp_thread) {
>> 
>> Here and next use is protected by param->mutex, but param is per
>> compression thread, so, it is not really protected.
>> 
>> > +            qemu_cond_wait(&param->cond, &param->mutex);
>> > +        }
>> > +        if (!quit_comp_thread) {
>> > +            do_compress_ram_page(param);
>> > +        }
>> > +        param->start = false;
>> 
>> param->start is pretected by param->mutex everywhere
>> 
>> > +        qemu_mutex_unlock(&param->mutex);
>> >
>> > +        qemu_mutex_lock(comp_done_lock);
>> > +        param->done = true;
>> 
>> param->done protected by comp_done_lock
>> 
>> > +        qemu_cond_signal(comp_done_cond);
>> > +        qemu_mutex_unlock(comp_done_lock);
>> >      }
>> >
>> >      return NULL;
>> > @@ -368,9 +389,15 @@ static void *do_data_compress(void *opaque)
>> >
>> >  static inline void terminate_compression_threads(void)
>> >  {
>> > -    quit_comp_thread = true;
>> > +    int idx, thread_count;
>> >
>> > -    /* To be done */
>> > +    thread_count = migrate_compress_threads();
>> > +    quit_comp_thread = true;
>> 
>> quite_comp_thread not protected again.
>> 
>> > +    for (idx = 0; idx < thread_count; idx++) {
>> > +        qemu_mutex_lock(&comp_param[idx].mutex);
>> > +        qemu_cond_signal(&comp_param[idx].cond);
>> > +        qemu_mutex_unlock(&comp_param[idx].mutex);
>> > +    }
>> >  }
>> >
>> >  void migrate_compress_threads_join(void)
>> > @@ -420,6 +447,7 @@ void migrate_compress_threads_create(void)
>> >           * it's ops to empty.
>> >           */
>> >          comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
>> > +        comp_param[i].done = true;
>> >          qemu_mutex_init(&comp_param[i].mutex);
>> >          qemu_cond_init(&comp_param[i].cond);
>> >          qemu_thread_create(compress_threads + i, "compress", @@
>> > -829,6 +857,97 @@ static int ram_save_page(QEMUFile *f, RAMBlock*
>> block, ram_addr_t offset,
>> >      return pages;
>> >  }
>> >
>> > +static int do_compress_ram_page(CompressParam *param) {
>> > +    int bytes_sent, blen;
>> > +    uint8_t *p;
>> > +    RAMBlock *block = param->block;
>> > +    ram_addr_t offset = param->offset;
>> > +
>> > +    p = memory_region_get_ram_ptr(block->mr) + (offset &
>> > + TARGET_PAGE_MASK);
>> > +
>> > +    bytes_sent = save_page_header(param->file, block, offset |
>> > +                                  RAM_SAVE_FLAG_COMPRESS_PAGE);
>> > +    blen = qemu_put_compression_data(param->file, p,
>> TARGET_PAGE_SIZE,
>> > +                                     migrate_compress_level());
>> > +    bytes_sent += blen;
>> > +    atomic_inc(&acct_info.norm_pages);
>> > +
>> > +    return bytes_sent;
>> > +}
>> > +
>> > +static inline void start_compression(CompressParam *param) {
>> > +    param->done = false;
>> 
>> Not protected (well, its caller have protected it by comp_done_lock.
>> 
>> > +    qemu_mutex_lock(&param->mutex);
>> > +    param->start = true;
>> > +    qemu_cond_signal(&param->cond);
>> > +    qemu_mutex_unlock(&param->mutex); }
>> > +
>> > +
>> > +static uint64_t bytes_transferred;
>> > +
>> > +static void flush_compressed_data(QEMUFile *f) {
>> > +    int idx, len, thread_count;
>> > +
>> > +    if (!migrate_use_compression()) {
>> > +        return;
>> > +    }
>> > +    thread_count = migrate_compress_threads();
>> > +    for (idx = 0; idx < thread_count; idx++) {
>> > +        if (!comp_param[idx].done) {
>> 
>> done is not protected here.
>
> My intention is to do some optimization.

If you decided to use a shared variable unprotected, I think that you
are the one that have to prove that access is ok even unprotected.
Also, think that not everything is x86.  I am pretty sure that other
architectures have a more relaxed memory model where this kind of
"optimizations" are not valid.


>> 
>> > +            qemu_mutex_lock(comp_done_lock);
>> > +            while (!comp_param[idx].done && !quit_comp_thread) {
>> 
>> 
>> Now, it is under comp_done_lock.  Bun none of its other uses is protected by
>> it.
>> 
>> And here done is proteced by comp_done_cond
>> 
>> 
>> > +                qemu_cond_wait(comp_done_cond, comp_done_lock);
>> > +            }
>> > +            qemu_mutex_unlock(comp_done_lock);
>> > +        }
>> > +        if (!quit_comp_thread) {
>> 
>> Here, it is unprotected again.
>
> I have tried the way that you commented in the version 5 patch, but I
> found the performance
> drop a lot. In my way, the migration can finish in 20s, but after
> change, it takes 30+s.
> Maybe there were something that I did incorrectly. 
>
> So, I my implementation, I tried to avoid  using the lock as possible as I can.

Do you have it handy to take a look?

Thanks, Juan.
Li, Liang Z March 27, 2015, 2:59 a.m. UTC | #4
> >> > --- a/arch_init.c
> >> > +++ b/arch_init.c
> >> > @@ -355,12 +355,33 @@ static DecompressParam *decomp_param;
> static
> >> > QemuThread *decompress_threads;  static uint8_t
> >> *compressed_data_buf;
> >> >
> >> > +static int do_compress_ram_page(CompressParam *param);
> >> > +
> >> >  static void *do_data_compress(void *opaque)  {
> >> > -    while (!quit_comp_thread) {
> >> > +    CompressParam *param = opaque;
> >> >
> >> > -    /* To be done */
> >> What is the different with changing this loop to:
> >>
> >>
> >> > +    while (!quit_comp_thread) {
> >>
> >> Here we don't have quit_comp_thread protected by anything.
> >
> > Yes, add a lock to protect quit_comp_thread is not hard and can make
> > code more clean, but the lock will drop the performance. So I don't
> > select to protect it with a lock or something else.
> >
> > Is there any problem to operate quit_comp_thread without protect?
> 
> You are not using atomic operations and you are using it from several threads,
> so yes.  

Even the quit_comp_thread is used from several threads without protect, I think the current code
can work correctly. The only thing that we care about is when doing migrate cancel, all the compression
threads and the migration thread can terminate as expected.

For the compression threads, no matter which point of the code is executing, the thread can
terminate after the terminate_compression_threads() being called.

For the migration thread, the only place that will block the thread from termination is the 
flush_compressed_data() function, I think it can terminate too.

Please point out if I am wrong.

  
>I still think that just ading another bool to the parmas struct should be
> enough, and shouldn't affect a lot performance (you are updating/looking at
> ->start near in all places that you touch it.)
> 

Not all near at the ->start, quit_comp_thread is checked in the  flush_compressed_data()
function. If  adding another bool to paramas struct, then we can protect it with the param->mutex,
but in the function flush_compressed_data(), we still need to get the mutex before accessing 
quit_comp_thread, it's inefficient.

May be wen can change the quit_comp_thread to  int and operate it with atomic_read and atomic_set,
and make the all the operation atomic. Is that OK? The precondition is that it's really have to.

> >
> >>
> >> > +        qemu_mutex_lock(&param->mutex);
> >> > +        /* Re-check the quit_comp_thread in case of
> >> > +         * terminate_compression_threads is called just before
> >> > +         * qemu_mutex_lock(&param->mutex) and after
> >> > +         * while(!quit_comp_thread), re-check it here can make
> >> > +         * sure the compression thread terminate as expected.
> >> > +         */
> >> > +        while (!param->start && !quit_comp_thread) {
> >>
> >> Here and next use is protected by param->mutex, but param is per
> >> compression thread, so, it is not really protected.
> >>
> >> > +            qemu_cond_wait(&param->cond, &param->mutex);
> >> > +        }
> >> > +        if (!quit_comp_thread) {
> >> > +            do_compress_ram_page(param);
> >> > +        }
> >> > +        param->start = false;
> >>
> >> param->start is pretected by param->mutex everywhere
> >>
> >> > +        qemu_mutex_unlock(&param->mutex);
> >> >
> >> > +        qemu_mutex_lock(comp_done_lock);
> >> > +        param->done = true;
> >>
> >> param->done protected by comp_done_lock
> >>
> >> > +        qemu_cond_signal(comp_done_cond);
> >> > +        qemu_mutex_unlock(comp_done_lock);
> >> >      }
> >> >
> >> >      return NULL;
> >> > @@ -368,9 +389,15 @@ static void *do_data_compress(void *opaque)
> >> >
> >> >  static inline void terminate_compression_threads(void)
> >> >  {
> >> > -    quit_comp_thread = true;
> >> > +    int idx, thread_count;
> >> >
> >> > -    /* To be done */
> >> > +    thread_count = migrate_compress_threads();
> >> > +    quit_comp_thread = true;
> >>
> >> quite_comp_thread not protected again.
> >>
> >> > +    for (idx = 0; idx < thread_count; idx++) {
> >> > +        qemu_mutex_lock(&comp_param[idx].mutex);
> >> > +        qemu_cond_signal(&comp_param[idx].cond);
> >> > +        qemu_mutex_unlock(&comp_param[idx].mutex);
> >> > +    }
> >> >  }
> >> >
> >> >  void migrate_compress_threads_join(void)
> >> > @@ -420,6 +447,7 @@ void migrate_compress_threads_create(void)
> >> >           * it's ops to empty.
> >> >           */
> >> >          comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
> >> > +        comp_param[i].done = true;
> >> >          qemu_mutex_init(&comp_param[i].mutex);
> >> >          qemu_cond_init(&comp_param[i].cond);
> >> >          qemu_thread_create(compress_threads + i, "compress", @@
> >> > -829,6 +857,97 @@ static int ram_save_page(QEMUFile *f, RAMBlock*
> >> block, ram_addr_t offset,
> >> >      return pages;
> >> >  }
> >> >
> >> > +static int do_compress_ram_page(CompressParam *param) {
> >> > +    int bytes_sent, blen;
> >> > +    uint8_t *p;
> >> > +    RAMBlock *block = param->block;
> >> > +    ram_addr_t offset = param->offset;
> >> > +
> >> > +    p = memory_region_get_ram_ptr(block->mr) + (offset &
> >> > + TARGET_PAGE_MASK);
> >> > +
> >> > +    bytes_sent = save_page_header(param->file, block, offset |
> >> > +                                  RAM_SAVE_FLAG_COMPRESS_PAGE);
> >> > +    blen = qemu_put_compression_data(param->file, p,
> >> TARGET_PAGE_SIZE,
> >> > +                                     migrate_compress_level());
> >> > +    bytes_sent += blen;
> >> > +    atomic_inc(&acct_info.norm_pages);
> >> > +
> >> > +    return bytes_sent;
> >> > +}
> >> > +
> >> > +static inline void start_compression(CompressParam *param) {
> >> > +    param->done = false;
> >>
> >> Not protected (well, its caller have protected it by comp_done_lock.
> >>
> >> > +    qemu_mutex_lock(&param->mutex);
> >> > +    param->start = true;
> >> > +    qemu_cond_signal(&param->cond);
> >> > +    qemu_mutex_unlock(&param->mutex); }
> >> > +
> >> > +
> >> > +static uint64_t bytes_transferred;
> >> > +
> >> > +static void flush_compressed_data(QEMUFile *f) {
> >> > +    int idx, len, thread_count;
> >> > +
> >> > +    if (!migrate_use_compression()) {
> >> > +        return;
> >> > +    }
> >> > +    thread_count = migrate_compress_threads();
> >> > +    for (idx = 0; idx < thread_count; idx++) {
> >> > +        if (!comp_param[idx].done) {
> >>
> >> done is not protected here.
> >
> > My intention is to do some optimization.
> 
> If you decided to use a shared variable unprotected, I think that you are the
> one that have to prove that access is ok even unprotected.
> Also, think that not everything is x86.  I am pretty sure that other
> architectures have a more relaxed memory model where this kind of
> "optimizations" are not valid.

I don't know this, I will remove the "optimizations" code.

> >>
> >> > +            qemu_mutex_lock(comp_done_lock);
> >> > +            while (!comp_param[idx].done && !quit_comp_thread) {
> >>
> >>
> >> Now, it is under comp_done_lock.  Bun none of its other uses is
> >> protected by it.
> >>
> >> And here done is proteced by comp_done_cond
> >>
> >>
> >> > +                qemu_cond_wait(comp_done_cond, comp_done_lock);
> >> > +            }
> >> > +            qemu_mutex_unlock(comp_done_lock);
> >> > +        }
> >> > +        if (!quit_comp_thread) {
> >>
> >> Here, it is unprotected again.
> >
> > I have tried the way that you commented in the version 5 patch, but I
> > found the performance drop a lot. In my way, the migration can finish
> > in 20s, but after change, it takes 30+s.
> > Maybe there were something that I did incorrectly.
> >
> > So, I my implementation, I tried to avoid  using the lock as possible as I can.
> 
> Do you have it handy to take a look?
> 
> Thanks, Juan.
Juan Quintela March 27, 2015, 10:47 a.m. UTC | #5
Liang Li <liang.z.li@intel.com> wrote:
> Implement the core logic of the multiple thread compression. At this
> point, multiple thread compression can't co-work with xbzrle yet.
>
> Signed-off-by: Liang Li <liang.z.li@intel.com>
> Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>


Coming back to here, as we have the full code.

> ---
>  arch_init.c | 184 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
>  1 file changed, 177 insertions(+), 7 deletions(-)
>
> diff --git a/arch_init.c b/arch_init.c
> index 48cae22..9f63c0f 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -355,12 +355,33 @@ static DecompressParam *decomp_param;
>  static QemuThread *decompress_threads;
>  static uint8_t *compressed_data_buf;
>  
> +static int do_compress_ram_page(CompressParam *param);
> +
>  static void *do_data_compress(void *opaque)
>  {
> -    while (!quit_comp_thread) {
> +    CompressParam *param = opaque;
>  
> -    /* To be done */
Change this line to

> +    while (!quit_comp_thread) {

  while(true) {

> +        qemu_mutex_lock(&param->mutex);
> +        /* Re-check the quit_comp_thread in case of
> +         * terminate_compression_threads is called just before
> +         * qemu_mutex_lock(&param->mutex) and after
> +         * while(!quit_comp_thread), re-check it here can make
> +         * sure the compression thread terminate as expected.
> +         */
Change this

> +        while (!param->start && !quit_comp_thread) {

to

while (!param->start && !parm->quit) {

> +            qemu_cond_wait(&param->cond, &param->mutex);
> +        }

And this

> +        if (!quit_comp_thread) {

to

      if (!param->quit) {
> +            do_compress_ram_page(param);
> +        }

Take care here of exiting correctly of the loop.
Notice that the only case where we are not going to take the look is the
last iteration, so I think the optimization don't gives us nothing (in
this place), no?

> +        param->start = false;
> +        qemu_mutex_unlock(&param->mutex);
>  
> +        qemu_mutex_lock(comp_done_lock);
> +        param->done = true;
> +        qemu_cond_signal(comp_done_cond);
> +        qemu_mutex_unlock(comp_done_lock);
>      }


>  
>      return NULL;
> @@ -368,9 +389,15 @@ static void *do_data_compress(void *opaque)
>  
>  static inline void terminate_compression_threads(void)
>  {
> -    quit_comp_thread = true;
> +    int idx, thread_count;
>  
> -    /* To be done */
> +    thread_count = migrate_compress_threads();
> +    quit_comp_thread = true;


> +    for (idx = 0; idx < thread_count; idx++) {
> +        qemu_mutex_lock(&comp_param[idx].mutex);
Add this
           comp_param[idx].quit = true;


And for now on, quit_comp_thread is only used on migration_thread, so it
should be safe to use, no?

flush_compresed_data() is only ever called from the migration_thread, so
no lock there needed either.

> +        qemu_cond_signal(&comp_param[idx].cond);
> +        qemu_mutex_unlock(&comp_param[idx].mutex);
> +    }
>  }
>  
>  void migrate_compress_threads_join(void)
> @@ -420,6 +447,7 @@ void migrate_compress_threads_create(void)
>           * it's ops to empty.
>           */
>          comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
> +        comp_param[i].done = true;
>          qemu_mutex_init(&comp_param[i].mutex);
>          qemu_cond_init(&comp_param[i].cond);
>          qemu_thread_create(compress_threads + i, "compress",
> @@ -829,6 +857,97 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
>      return pages;
>  }
>  
> +static int do_compress_ram_page(CompressParam *param)
> +{
> +    int bytes_sent, blen;
> +    uint8_t *p;
> +    RAMBlock *block = param->block;
> +    ram_addr_t offset = param->offset;
> +
> +    p = memory_region_get_ram_ptr(block->mr) + (offset & TARGET_PAGE_MASK);
> +
> +    bytes_sent = save_page_header(param->file, block, offset |
> +                                  RAM_SAVE_FLAG_COMPRESS_PAGE);
> +    blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
> +                                     migrate_compress_level());
> +    bytes_sent += blen;
> +    atomic_inc(&acct_info.norm_pages);
> +
> +    return bytes_sent;
> +}
> +
> +static inline void start_compression(CompressParam *param)
> +{
> +    param->done = false;
> +    qemu_mutex_lock(&param->mutex);
> +    param->start = true;
> +    qemu_cond_signal(&param->cond);
> +    qemu_mutex_unlock(&param->mutex);
> +}
> +
> +
> +static uint64_t bytes_transferred;
> +
> +static void flush_compressed_data(QEMUFile *f)
> +{
> +    int idx, len, thread_count;
> +
> +    if (!migrate_use_compression()) {
> +        return;
> +    }
> +    thread_count = migrate_compress_threads();
> +    for (idx = 0; idx < thread_count; idx++) {
> +        if (!comp_param[idx].done) {
> +            qemu_mutex_lock(comp_done_lock);
> +            while (!comp_param[idx].done && !quit_comp_thread) {
> +                qemu_cond_wait(comp_done_cond, comp_done_lock);
> +            }
> +            qemu_mutex_unlock(comp_done_lock);
> +        }
> +        if (!quit_comp_thread) {
> +            len = qemu_put_qemu_file(f, comp_param[idx].file);
> +            bytes_transferred += len;
> +        }
> +    }
> +}
> +
> +static inline void set_compress_params(CompressParam *param, RAMBlock *block,
> +                                       ram_addr_t offset)
> +{
> +    param->block = block;
> +    param->offset = offset;
> +}
> +
> +static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
> +                                           ram_addr_t offset,
> +                                           uint64_t *bytes_transferred)
> +{
> +    int idx, thread_count, bytes_xmit = -1, pages = -1;
> +
> +    thread_count = migrate_compress_threads();
> +    qemu_mutex_lock(comp_done_lock);
> +    while (true) {
> +        for (idx = 0; idx < thread_count; idx++) {
> +            if (comp_param[idx].done) {
> +                bytes_xmit = qemu_put_qemu_file(f, comp_param[idx].file);
> +                set_compress_params(&comp_param[idx], block, offset);
> +                start_compression(&comp_param[idx]);
> +                pages = 1;
> +                *bytes_transferred += bytes_xmit;
> +                break;
> +            }
> +        }
> +        if (pages > 0) {
> +            break;
> +        } else {
> +            qemu_cond_wait(comp_done_cond, comp_done_lock);
> +        }
> +    }
> +    qemu_mutex_unlock(comp_done_lock);
> +
> +    return pages;
> +}
> +
>  /**
>   * ram_save_compressed_page: compress the given page and send it to the stream
>   *
> @@ -845,8 +964,59 @@ static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
>                                      uint64_t *bytes_transferred)
>  {
>      int pages = -1;
> +    uint64_t bytes_xmit;
> +    MemoryRegion *mr = block->mr;
> +    uint8_t *p;
> +    int ret;
> +
> +    p = memory_region_get_ram_ptr(mr) + offset;
>  
> -    /* To be done*/
> +    bytes_xmit = 0;
> +    ret = ram_control_save_page(f, block->offset,
> +                                offset, TARGET_PAGE_SIZE, &bytes_xmit);
> +    if (bytes_xmit) {
> +        *bytes_transferred += bytes_xmit;
> +        pages = 1;
> +    }
> +    if (block == last_sent_block) {
> +        offset |= RAM_SAVE_FLAG_CONTINUE;
> +    }
> +    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
> +        if (ret != RAM_SAVE_CONTROL_DELAYED) {
> +            if (bytes_xmit > 0) {
> +                acct_info.norm_pages++;
> +            } else if (bytes_xmit == 0) {
> +                acct_info.dup_pages++;
> +            }
> +        }
> +    } else {
> +        /* When starting the process of a new block, the first page of
> +         * the block should be sent out before other pages in the same
> +         * block, and all the pages in last block should have been sent
> +         * out, keeping this order is important, because the 'cont' flag
> +         * is used to avoid resending the block name.
> +         */
> +        if (block != last_sent_block) {
> +            flush_compressed_data(f);
> +            pages = save_zero_page(f, block, offset, p, bytes_transferred);
> +            if (pages == -1) {
> +                set_compress_params(&comp_param[0], block, offset);
> +                /* Use the qemu thread to compress the data to make sure the
> +                 * first page is sent out before other pages
> +                 */
> +                bytes_xmit = do_compress_ram_page(&comp_param[0]);
> +                qemu_put_qemu_file(f, comp_param[0].file);
> +                *bytes_transferred += bytes_xmit;
> +                pages = 1;
> +            }
> +        } else {
> +            pages = save_zero_page(f, block, offset, p, bytes_transferred);
> +            if (pages == -1) {
> +                pages = compress_page_with_multi_thread(f, block, offset,
> +                                                        bytes_transferred);
> +            }
> +        }
> +    }
>  
>      return pages;
>  }
> @@ -914,8 +1084,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage,
>      return pages;
>  }
>  
> -static uint64_t bytes_transferred;
> -
>  void acct_update_position(QEMUFile *f, size_t size, bool zero)
>  {
>      uint64_t pages = size / TARGET_PAGE_SIZE;
> @@ -1129,6 +1297,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
>          }
>          i++;
>      }
> +    flush_compressed_data(f);
>      rcu_read_unlock();
>  
>      /*
> @@ -1170,6 +1339,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
>          }
>      }
>  
> +    flush_compressed_data(f);
>      ram_control_after_iterate(f, RAM_CONTROL_FINISH);
>      migration_end();
Li, Liang Z March 28, 2015, 6:11 a.m. UTC | #6
> -
> >  1 file changed, 177 insertions(+), 7 deletions(-)
> >
> > diff --git a/arch_init.c b/arch_init.c index 48cae22..9f63c0f 100644
> > --- a/arch_init.c
> > +++ b/arch_init.c
> > @@ -355,12 +355,33 @@ static DecompressParam *decomp_param;  static
> > QemuThread *decompress_threads;  static uint8_t
> *compressed_data_buf;
> >
> > +static int do_compress_ram_page(CompressParam *param);
> > +
> >  static void *do_data_compress(void *opaque)  {
> > -    while (!quit_comp_thread) {
> > +    CompressParam *param = opaque;
> >
> > -    /* To be done */
> Change this line to
> 
> > +    while (!quit_comp_thread) {
> 
>   while(true) {
> 
> > +        qemu_mutex_lock(&param->mutex);
> > +        /* Re-check the quit_comp_thread in case of
> > +         * terminate_compression_threads is called just before
> > +         * qemu_mutex_lock(&param->mutex) and after
> > +         * while(!quit_comp_thread), re-check it here can make
> > +         * sure the compression thread terminate as expected.
> > +         */
> Change this
> 
> > +        while (!param->start && !quit_comp_thread) {
> 
> to
> 
> while (!param->start && !parm->quit) {
> 
> > +            qemu_cond_wait(&param->cond, &param->mutex);
> > +        }
> 
> And this
> 
> > +        if (!quit_comp_thread) {
> 
> to
> 
>       if (!param->quit) {
> > +            do_compress_ram_page(param);
> > +        }
> 
> Take care here of exiting correctly of the loop.
> Notice that the only case where we are not going to take the look is the last
> iteration, so I think the optimization don't gives us nothing (in this place), no?
> 
> > +        param->start = false;
> > +        qemu_mutex_unlock(&param->mutex);
> >
> > +        qemu_mutex_lock(comp_done_lock);
> > +        param->done = true;
> > +        qemu_cond_signal(comp_done_cond);
> > +        qemu_mutex_unlock(comp_done_lock);
> >      }
> 
> 
> >
> >      return NULL;
> > @@ -368,9 +389,15 @@ static void *do_data_compress(void *opaque)
> >
> >  static inline void terminate_compression_threads(void)
> >  {
> > -    quit_comp_thread = true;
> > +    int idx, thread_count;
> >
> > -    /* To be done */
> > +    thread_count = migrate_compress_threads();
> > +    quit_comp_thread = true;
> 
> 
> > +    for (idx = 0; idx < thread_count; idx++) {
> > +        qemu_mutex_lock(&comp_param[idx].mutex);
> Add this
>            comp_param[idx].quit = true;
> 
> 
> And for now on, quit_comp_thread is only used on migration_thread, so it
> should be safe to use, no?
> 
> flush_compresed_data() is only ever called from the migration_thread, so no
> lock there needed either.

Now that the lock is no needed in flush_comrepssed_data(), it looks good to me.
I will change the code according to your suggestion. 

And could you ask the related maintainers to review the other parts of my patches,
especially the 3 patches related to the qmp and hmp interfaces. 

Thanks Juan!

Liang
diff mbox

Patch

diff --git a/arch_init.c b/arch_init.c
index 48cae22..9f63c0f 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -355,12 +355,33 @@  static DecompressParam *decomp_param;
 static QemuThread *decompress_threads;
 static uint8_t *compressed_data_buf;
 
+static int do_compress_ram_page(CompressParam *param);
+
 static void *do_data_compress(void *opaque)
 {
-    while (!quit_comp_thread) {
+    CompressParam *param = opaque;
 
-    /* To be done */
+    while (!quit_comp_thread) {
+        qemu_mutex_lock(&param->mutex);
+        /* Re-check the quit_comp_thread in case of
+         * terminate_compression_threads is called just before
+         * qemu_mutex_lock(&param->mutex) and after
+         * while(!quit_comp_thread), re-check it here can make
+         * sure the compression thread terminate as expected.
+         */
+        while (!param->start && !quit_comp_thread) {
+            qemu_cond_wait(&param->cond, &param->mutex);
+        }
+        if (!quit_comp_thread) {
+            do_compress_ram_page(param);
+        }
+        param->start = false;
+        qemu_mutex_unlock(&param->mutex);
 
+        qemu_mutex_lock(comp_done_lock);
+        param->done = true;
+        qemu_cond_signal(comp_done_cond);
+        qemu_mutex_unlock(comp_done_lock);
     }
 
     return NULL;
@@ -368,9 +389,15 @@  static void *do_data_compress(void *opaque)
 
 static inline void terminate_compression_threads(void)
 {
-    quit_comp_thread = true;
+    int idx, thread_count;
 
-    /* To be done */
+    thread_count = migrate_compress_threads();
+    quit_comp_thread = true;
+    for (idx = 0; idx < thread_count; idx++) {
+        qemu_mutex_lock(&comp_param[idx].mutex);
+        qemu_cond_signal(&comp_param[idx].cond);
+        qemu_mutex_unlock(&comp_param[idx].mutex);
+    }
 }
 
 void migrate_compress_threads_join(void)
@@ -420,6 +447,7 @@  void migrate_compress_threads_create(void)
          * it's ops to empty.
          */
         comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
+        comp_param[i].done = true;
         qemu_mutex_init(&comp_param[i].mutex);
         qemu_cond_init(&comp_param[i].cond);
         qemu_thread_create(compress_threads + i, "compress",
@@ -829,6 +857,97 @@  static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
     return pages;
 }
 
+static int do_compress_ram_page(CompressParam *param)
+{
+    int bytes_sent, blen;
+    uint8_t *p;
+    RAMBlock *block = param->block;
+    ram_addr_t offset = param->offset;
+
+    p = memory_region_get_ram_ptr(block->mr) + (offset & TARGET_PAGE_MASK);
+
+    bytes_sent = save_page_header(param->file, block, offset |
+                                  RAM_SAVE_FLAG_COMPRESS_PAGE);
+    blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
+                                     migrate_compress_level());
+    bytes_sent += blen;
+    atomic_inc(&acct_info.norm_pages);
+
+    return bytes_sent;
+}
+
+static inline void start_compression(CompressParam *param)
+{
+    param->done = false;
+    qemu_mutex_lock(&param->mutex);
+    param->start = true;
+    qemu_cond_signal(&param->cond);
+    qemu_mutex_unlock(&param->mutex);
+}
+
+
+static uint64_t bytes_transferred;
+
+static void flush_compressed_data(QEMUFile *f)
+{
+    int idx, len, thread_count;
+
+    if (!migrate_use_compression()) {
+        return;
+    }
+    thread_count = migrate_compress_threads();
+    for (idx = 0; idx < thread_count; idx++) {
+        if (!comp_param[idx].done) {
+            qemu_mutex_lock(comp_done_lock);
+            while (!comp_param[idx].done && !quit_comp_thread) {
+                qemu_cond_wait(comp_done_cond, comp_done_lock);
+            }
+            qemu_mutex_unlock(comp_done_lock);
+        }
+        if (!quit_comp_thread) {
+            len = qemu_put_qemu_file(f, comp_param[idx].file);
+            bytes_transferred += len;
+        }
+    }
+}
+
+static inline void set_compress_params(CompressParam *param, RAMBlock *block,
+                                       ram_addr_t offset)
+{
+    param->block = block;
+    param->offset = offset;
+}
+
+static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
+                                           ram_addr_t offset,
+                                           uint64_t *bytes_transferred)
+{
+    int idx, thread_count, bytes_xmit = -1, pages = -1;
+
+    thread_count = migrate_compress_threads();
+    qemu_mutex_lock(comp_done_lock);
+    while (true) {
+        for (idx = 0; idx < thread_count; idx++) {
+            if (comp_param[idx].done) {
+                bytes_xmit = qemu_put_qemu_file(f, comp_param[idx].file);
+                set_compress_params(&comp_param[idx], block, offset);
+                start_compression(&comp_param[idx]);
+                pages = 1;
+                *bytes_transferred += bytes_xmit;
+                break;
+            }
+        }
+        if (pages > 0) {
+            break;
+        } else {
+            qemu_cond_wait(comp_done_cond, comp_done_lock);
+        }
+    }
+    qemu_mutex_unlock(comp_done_lock);
+
+    return pages;
+}
+
 /**
  * ram_save_compressed_page: compress the given page and send it to the stream
  *
@@ -845,8 +964,59 @@  static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
                                     uint64_t *bytes_transferred)
 {
     int pages = -1;
+    uint64_t bytes_xmit;
+    MemoryRegion *mr = block->mr;
+    uint8_t *p;
+    int ret;
+
+    p = memory_region_get_ram_ptr(mr) + offset;
 
-    /* To be done*/
+    bytes_xmit = 0;
+    ret = ram_control_save_page(f, block->offset,
+                                offset, TARGET_PAGE_SIZE, &bytes_xmit);
+    if (bytes_xmit) {
+        *bytes_transferred += bytes_xmit;
+        pages = 1;
+    }
+    if (block == last_sent_block) {
+        offset |= RAM_SAVE_FLAG_CONTINUE;
+    }
+    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
+        if (ret != RAM_SAVE_CONTROL_DELAYED) {
+            if (bytes_xmit > 0) {
+                acct_info.norm_pages++;
+            } else if (bytes_xmit == 0) {
+                acct_info.dup_pages++;
+            }
+        }
+    } else {
+        /* When starting the process of a new block, the first page of
+         * the block should be sent out before other pages in the same
+         * block, and all the pages in last block should have been sent
+         * out, keeping this order is important, because the 'cont' flag
+         * is used to avoid resending the block name.
+         */
+        if (block != last_sent_block) {
+            flush_compressed_data(f);
+            pages = save_zero_page(f, block, offset, p, bytes_transferred);
+            if (pages == -1) {
+                set_compress_params(&comp_param[0], block, offset);
+                /* Use the qemu thread to compress the data to make sure the
+                 * first page is sent out before other pages
+                 */
+                bytes_xmit = do_compress_ram_page(&comp_param[0]);
+                qemu_put_qemu_file(f, comp_param[0].file);
+                *bytes_transferred += bytes_xmit;
+                pages = 1;
+            }
+        } else {
+            pages = save_zero_page(f, block, offset, p, bytes_transferred);
+            if (pages == -1) {
+                pages = compress_page_with_multi_thread(f, block, offset,
+                                                        bytes_transferred);
+            }
+        }
+    }
 
     return pages;
 }
@@ -914,8 +1084,6 @@  static int ram_find_and_save_block(QEMUFile *f, bool last_stage,
     return pages;
 }
 
-static uint64_t bytes_transferred;
-
 void acct_update_position(QEMUFile *f, size_t size, bool zero)
 {
     uint64_t pages = size / TARGET_PAGE_SIZE;
@@ -1129,6 +1297,7 @@  static int ram_save_iterate(QEMUFile *f, void *opaque)
         }
         i++;
     }
+    flush_compressed_data(f);
     rcu_read_unlock();
 
     /*
@@ -1170,6 +1339,7 @@  static int ram_save_complete(QEMUFile *f, void *opaque)
         }
     }
 
+    flush_compressed_data(f);
     ram_control_after_iterate(f, RAM_CONTROL_FINISH);
     migration_end();