diff mbox

[v6,10/14] migration: Add the core code for decompression

Message ID 1427099549-10633-11-git-send-email-liang.z.li@intel.com
State New
Headers show

Commit Message

Li, Liang Z March 23, 2015, 8:32 a.m. UTC
Implement the core logic of multiple thread decompression,
the decompression can work now.

Signed-off-by: Liang Li <liang.z.li@intel.com>
Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
---
 arch_init.c | 50 ++++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 48 insertions(+), 2 deletions(-)

Comments

Juan Quintela March 25, 2015, 11:56 a.m. UTC | #1
Liang Li <liang.z.li@intel.com> wrote:
> Implement the core logic of multiple thread decompression,
> the decompression can work now.
>
> Signed-off-by: Liang Li <liang.z.li@intel.com>
> Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
> ---
>  arch_init.c | 50 ++++++++++++++++++++++++++++++++++++++++++++++++--
>  1 file changed, 48 insertions(+), 2 deletions(-)
>
> diff --git a/arch_init.c b/arch_init.c
> index b81acc9..6a0d709 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -888,6 +888,13 @@ static inline void start_compression(CompressParam *param)
>      qemu_mutex_unlock(&param->mutex);
>  }
>  
> +static inline void start_decompression(DecompressParam *param)
> +{
> +    qemu_mutex_lock(&param->mutex);
> +    param->start = true;

start protucetd by param->mutex

> +    qemu_cond_signal(&param->cond);
> +    qemu_mutex_unlock(&param->mutex);
> +}
>  
>  static uint64_t bytes_transferred;
>  
> @@ -1459,8 +1466,26 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
>  
>  static void *do_data_decompress(void *opaque)
>  {
> +    DecompressParam *param = opaque;
> +    size_t pagesize;
> +
>      while (!quit_decomp_thread) {

quit_decomp_thread locking (or lack of it is equivalent to the one of quit_comp_thread)



> -        /* To be done */
> +        qemu_mutex_lock(&param->mutex);
> +        while (!param->start && !quit_decomp_thread) {

start protected by param->mutex.


> +            qemu_cond_wait(&param->cond, &param->mutex);
> +            pagesize = TARGET_PAGE_SIZE;
> +            if (!quit_decomp_thread) {
> +                /* uncompress() will return failed in some case, especially
> +                 * when the page is dirted when doing the compression, it's
> +                 * not a problem because the dirty page will be retransferred
> +                 * and uncompress() won't break the data in other pages.
> +                 */
> +                uncompress((Bytef *)param->des, &pagesize,
> +                           (const Bytef *)param->compbuf, param->len);
> +            }
> +            param->start = false;
> +        }
> +        qemu_mutex_unlock(&param->mutex);
>      }
>  
>      return NULL;
> @@ -1492,6 +1517,11 @@ void migrate_decompress_threads_join(void)
>      quit_decomp_thread = true;
>      thread_count = migrate_decompress_threads();
>      for (i = 0; i < thread_count; i++) {
> +        qemu_mutex_lock(&decomp_param[i].mutex);
> +        qemu_cond_signal(&decomp_param[i].cond);
> +        qemu_mutex_unlock(&decomp_param[i].mutex);
> +    }
> +    for (i = 0; i < thread_count; i++) {
>          qemu_thread_join(decompress_threads + i);
>          qemu_mutex_destroy(&decomp_param[i].mutex);
>          qemu_cond_destroy(&decomp_param[i].cond);
> @@ -1508,7 +1538,23 @@ void migrate_decompress_threads_join(void)
>  static void decompress_data_with_multi_threads(uint8_t *compbuf,
>                                                 void *host, int len)
>  {
> -    /* To be done */
> +    int idx, thread_count;
> +
> +    thread_count = migrate_decompress_threads();
> +    while (true) {
> +        for (idx = 0; idx < thread_count; idx++) {
> +            if (!decomp_param[idx].start) {

start not protected

> +                memcpy(decomp_param[idx].compbuf, compbuf, len);
> +                decomp_param[idx].des = host;
> +                decomp_param[idx].len = len;
> +                start_decompression(&decomp_param[idx]);
> +                break;
> +            }
> +        }
> +        if (idx < thread_count) {
> +            break;
> +        }
> +    }
>  }
>  
>  static int ram_load(QEMUFile *f, void *opaque, int version_id)
Li, Liang Z March 26, 2015, 2:46 a.m. UTC | #2
> > +        qemu_mutex_lock(&param->mutex);
> > +        while (!param->start && !quit_decomp_thread) {
> 
> start protected by param->mutex.
> 
> 
> > +            qemu_cond_wait(&param->cond, &param->mutex);
> > +            pagesize = TARGET_PAGE_SIZE;
> > +            if (!quit_decomp_thread) {
> > +                /* uncompress() will return failed in some case, especially
> > +                 * when the page is dirted when doing the compression, it's
> > +                 * not a problem because the dirty page will be retransferred
> > +                 * and uncompress() won't break the data in other pages.
> > +                 */
> > +                uncompress((Bytef *)param->des, &pagesize,
> > +                           (const Bytef *)param->compbuf, param->len);
> > +            }
> > +            param->start = false;
> > +        }
> > +        qemu_mutex_unlock(&param->mutex);
> >      }
> >
> >      return NULL;
> > @@ -1492,6 +1517,11 @@ void migrate_decompress_threads_join(void)
> >      quit_decomp_thread = true;
> >      thread_count = migrate_decompress_threads();
> >      for (i = 0; i < thread_count; i++) {
> > +        qemu_mutex_lock(&decomp_param[i].mutex);
> > +        qemu_cond_signal(&decomp_param[i].cond);
> > +        qemu_mutex_unlock(&decomp_param[i].mutex);
> > +    }
> > +    for (i = 0; i < thread_count; i++) {
> >          qemu_thread_join(decompress_threads + i);
> >          qemu_mutex_destroy(&decomp_param[i].mutex);
> >          qemu_cond_destroy(&decomp_param[i].cond);
> > @@ -1508,7 +1538,23 @@ void migrate_decompress_threads_join(void)
> >  static void decompress_data_with_multi_threads(uint8_t *compbuf,
> >                                                 void *host, int len)
> > {
> > -    /* To be done */
> > +    int idx, thread_count;
> > +
> > +    thread_count = migrate_decompress_threads();
> > +    while (true) {
> > +        for (idx = 0; idx < thread_count; idx++) {
> > +            if (!decomp_param[idx].start) {
> 
> start not protected

Yes, it's incorrect,  I will change this.

> > +                memcpy(decomp_param[idx].compbuf, compbuf, len);
> > +                decomp_param[idx].des = host;
> > +                decomp_param[idx].len = len;
> > +                start_decompression(&decomp_param[idx]);
> > +                break;
> > +            }
> > +        }
> > +        if (idx < thread_count) {
> > +            break;
> > +        }
> > +    }
> >  }
> >
> >  static int ram_load(QEMUFile *f, void *opaque, int version_id)
diff mbox

Patch

diff --git a/arch_init.c b/arch_init.c
index b81acc9..6a0d709 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -888,6 +888,13 @@  static inline void start_compression(CompressParam *param)
     qemu_mutex_unlock(&param->mutex);
 }
 
+static inline void start_decompression(DecompressParam *param)
+{
+    qemu_mutex_lock(&param->mutex);
+    param->start = true;
+    qemu_cond_signal(&param->cond);
+    qemu_mutex_unlock(&param->mutex);
+}
 
 static uint64_t bytes_transferred;
 
@@ -1459,8 +1466,26 @@  void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
 
 static void *do_data_decompress(void *opaque)
 {
+    DecompressParam *param = opaque;
+    size_t pagesize;
+
     while (!quit_decomp_thread) {
-        /* To be done */
+        qemu_mutex_lock(&param->mutex);
+        while (!param->start && !quit_decomp_thread) {
+            qemu_cond_wait(&param->cond, &param->mutex);
+            pagesize = TARGET_PAGE_SIZE;
+            if (!quit_decomp_thread) {
+                /* uncompress() will return failed in some case, especially
+                 * when the page is dirted when doing the compression, it's
+                 * not a problem because the dirty page will be retransferred
+                 * and uncompress() won't break the data in other pages.
+                 */
+                uncompress((Bytef *)param->des, &pagesize,
+                           (const Bytef *)param->compbuf, param->len);
+            }
+            param->start = false;
+        }
+        qemu_mutex_unlock(&param->mutex);
     }
 
     return NULL;
@@ -1492,6 +1517,11 @@  void migrate_decompress_threads_join(void)
     quit_decomp_thread = true;
     thread_count = migrate_decompress_threads();
     for (i = 0; i < thread_count; i++) {
+        qemu_mutex_lock(&decomp_param[i].mutex);
+        qemu_cond_signal(&decomp_param[i].cond);
+        qemu_mutex_unlock(&decomp_param[i].mutex);
+    }
+    for (i = 0; i < thread_count; i++) {
         qemu_thread_join(decompress_threads + i);
         qemu_mutex_destroy(&decomp_param[i].mutex);
         qemu_cond_destroy(&decomp_param[i].cond);
@@ -1508,7 +1538,23 @@  void migrate_decompress_threads_join(void)
 static void decompress_data_with_multi_threads(uint8_t *compbuf,
                                                void *host, int len)
 {
-    /* To be done */
+    int idx, thread_count;
+
+    thread_count = migrate_decompress_threads();
+    while (true) {
+        for (idx = 0; idx < thread_count; idx++) {
+            if (!decomp_param[idx].start) {
+                memcpy(decomp_param[idx].compbuf, compbuf, len);
+                decomp_param[idx].des = host;
+                decomp_param[idx].len = len;
+                start_decompression(&decomp_param[idx]);
+                break;
+            }
+        }
+        if (idx < thread_count) {
+            break;
+        }
+    }
 }
 
 static int ram_load(QEMUFile *f, void *opaque, int version_id)