Message ID | 1422875149-13198-6-git-send-email-liang.z.li@intel.com |
---|---|
State | New |
Headers | show |
* Liang Li (liang.z.li@intel.com) wrote: > Define the data structure and variables used to do multiple thread > compression, and add the code to initialize and free them. > > Signed-off-by: Liang Li <liang.z.li@intel.com> > Signed-off-by: Yang Zhang <yang.z.zhang@intel.com> Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com> > --- > arch_init.c | 34 +++++++++++++++++++++++++++++++++- > 1 file changed, 33 insertions(+), 1 deletion(-) > > diff --git a/arch_init.c b/arch_init.c > index ed34eb3..87c4947 100644 > --- a/arch_init.c > +++ b/arch_init.c > @@ -335,7 +335,12 @@ static uint32_t last_version; > static bool ram_bulk_stage; > > struct CompressParam { > - /* To be done */ > + bool busy; > + QEMUFile *file; > + QemuMutex mutex; > + QemuCond cond; > + RAMBlock *block; > + ram_addr_t offset; > }; > typedef struct CompressParam CompressParam; > > @@ -345,6 +350,14 @@ struct DecompressParam { > typedef struct DecompressParam DecompressParam; > > static CompressParam *comp_param; > +/* comp_done_cond is used to wake up the migration thread when > + * one of the compression threads has finished the compression. > + * comp_done_lock is used to co-work with comp_done_cond. > + */ > +static QemuMutex *comp_done_lock; > +static QemuCond *comp_done_cond; > +/* The empty QEMUFileOps will be used by file in CompressParam */ > +static const QEMUFileOps empty_ops = { }; > static bool quit_thread; > static DecompressParam *decomp_param; > static QemuThread *decompress_threads; > @@ -379,11 +392,20 @@ void migrate_compress_threads_join(MigrationState *s) > thread_count = migrate_compress_threads(); > for (i = 0; i < thread_count; i++) { > qemu_thread_join(s->compress_thread + i); > + qemu_fclose(comp_param[i].file); > + qemu_mutex_destroy(&comp_param[i].mutex); > + qemu_cond_destroy(&comp_param[i].cond); > } > + qemu_mutex_destroy(comp_done_lock); > + qemu_cond_destroy(comp_done_cond); > g_free(s->compress_thread); > g_free(comp_param); > + g_free(comp_done_cond); > + g_free(comp_done_lock); > s->compress_thread = NULL; > comp_param = NULL; > + comp_done_cond = NULL; > + comp_done_lock = NULL; > } > > void migrate_compress_threads_create(MigrationState *s) > @@ -397,7 +419,17 @@ void migrate_compress_threads_create(MigrationState *s) > thread_count = migrate_compress_threads(); > s->compress_thread = g_new0(QemuThread, thread_count); > comp_param = g_new0(CompressParam, thread_count); > + comp_done_cond = g_new0(QemuCond, 1); > + comp_done_lock = g_new0(QemuMutex, 1); > + qemu_cond_init(comp_done_cond); > + qemu_mutex_init(comp_done_lock); > for (i = 0; i < thread_count; i++) { > + /* com_param[i].file is just used as a dummy buffer to save data, set > + * it's ops to empty. > + */ > + comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops); > + qemu_mutex_init(&comp_param[i].mutex); > + qemu_cond_init(&comp_param[i].cond); > qemu_thread_create(s->compress_thread + i, "compress", > do_data_compress, comp_param + i, > QEMU_THREAD_JOINABLE); > -- > 1.9.1 > -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
diff --git a/arch_init.c b/arch_init.c index ed34eb3..87c4947 100644 --- a/arch_init.c +++ b/arch_init.c @@ -335,7 +335,12 @@ static uint32_t last_version; static bool ram_bulk_stage; struct CompressParam { - /* To be done */ + bool busy; + QEMUFile *file; + QemuMutex mutex; + QemuCond cond; + RAMBlock *block; + ram_addr_t offset; }; typedef struct CompressParam CompressParam; @@ -345,6 +350,14 @@ struct DecompressParam { typedef struct DecompressParam DecompressParam; static CompressParam *comp_param; +/* comp_done_cond is used to wake up the migration thread when + * one of the compression threads has finished the compression. + * comp_done_lock is used to co-work with comp_done_cond. + */ +static QemuMutex *comp_done_lock; +static QemuCond *comp_done_cond; +/* The empty QEMUFileOps will be used by file in CompressParam */ +static const QEMUFileOps empty_ops = { }; static bool quit_thread; static DecompressParam *decomp_param; static QemuThread *decompress_threads; @@ -379,11 +392,20 @@ void migrate_compress_threads_join(MigrationState *s) thread_count = migrate_compress_threads(); for (i = 0; i < thread_count; i++) { qemu_thread_join(s->compress_thread + i); + qemu_fclose(comp_param[i].file); + qemu_mutex_destroy(&comp_param[i].mutex); + qemu_cond_destroy(&comp_param[i].cond); } + qemu_mutex_destroy(comp_done_lock); + qemu_cond_destroy(comp_done_cond); g_free(s->compress_thread); g_free(comp_param); + g_free(comp_done_cond); + g_free(comp_done_lock); s->compress_thread = NULL; comp_param = NULL; + comp_done_cond = NULL; + comp_done_lock = NULL; } void migrate_compress_threads_create(MigrationState *s) @@ -397,7 +419,17 @@ void migrate_compress_threads_create(MigrationState *s) thread_count = migrate_compress_threads(); s->compress_thread = g_new0(QemuThread, thread_count); comp_param = g_new0(CompressParam, thread_count); + comp_done_cond = g_new0(QemuCond, 1); + comp_done_lock = g_new0(QemuMutex, 1); + qemu_cond_init(comp_done_cond); + qemu_mutex_init(comp_done_lock); for (i = 0; i < thread_count; i++) { + /* com_param[i].file is just used as a dummy buffer to save data, set + * it's ops to empty. + */ + comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops); + qemu_mutex_init(&comp_param[i].mutex); + qemu_cond_init(&comp_param[i].cond); qemu_thread_create(s->compress_thread + i, "compress", do_data_compress, comp_param + i, QEMU_THREAD_JOINABLE);