Message ID | 1418347746-15829-3-git-send-email-liang.z.li@intel.com |
---|---|
State | New |
Headers | show |
* Liang Li (liang.z.li@intel.com) wrote: > Signed-off-by: Liang Li <liang.z.li@intel.com> > Signed-off-by: Yang Zhang <yang.z.zhang@intel.com> > --- > arch_init.c | 78 ++++++++++++++++++++++++++++++++++++++++++- > include/migration/migration.h | 9 +++++ > migration.c | 38 +++++++++++++++++++++ > 3 files changed, 124 insertions(+), 1 deletion(-) Just a few minor comments fix below. > diff --git a/arch_init.c b/arch_init.c > index 7680d28..a988ec2 100644 > --- a/arch_init.c > +++ b/arch_init.c > @@ -332,6 +332,67 @@ static uint64_t migration_dirty_pages; > static uint32_t last_version; > static bool ram_bulk_stage; > > +struct compress_param { > + /* To be done */ > +}; > +typedef struct compress_param compress_param; The struct types should be CompressParam rather than compress_param (like QemuThread - see the CODING_STYLE). > +static compress_param *comp_param; > +static bool quit_thread; > + > +static void *do_data_compress(void *opaque) > +{ > + while (!quit_thread) { > + > + /* To be done */ > + > + } > + return NULL; > +} Initially I was worried if these needed atomics or barriers for the 'quit_thread' - but later you add mutexs/cond that I think should be enough. > + > +static inline void terminate_compression_threads(void) > +{ > + quit_thread = true; > + /* To be done */ > +} > + > +void migrate_compress_threads_join(MigrationState *s) > +{ > + int i, thread_count; > + > + if (!migrate_use_compression()) { > + return; > + } > + terminate_compression_threads(); > + thread_count = migrate_compress_threads(); > + for (i = 0; i < thread_count; i++) { > + qemu_thread_join(s->compress_thread + i); > + } > + g_free(s->compress_thread); > + g_free(comp_param); > + s->compress_thread = NULL; > + comp_param = NULL; > +} > + > +void migrate_compress_threads_create(MigrationState *s) > +{ > + int i, thread_count; > + > + if (!migrate_use_compression()) { > + return; > + } > + quit_thread = false; > + thread_count = migrate_compress_threads(); > + s->compress_thread = g_malloc0(sizeof(QemuThread) > + * thread_count); > + comp_param = g_malloc0(sizeof(compress_param) * thread_count); I think g_new0 is better for that. > + for (i = 0; i < thread_count; i++) { > + qemu_thread_create(s->compress_thread + i, "compress", > + do_data_compress, comp_param + i, QEMU_THREAD_JOINABLE); > + > + } > +} > + > /* Update the xbzrle cache to reflect a page that's been sent as all 0. > * The important thing is that a stale (not-yet-0'd) page be replaced > * by the new data. > @@ -643,6 +704,16 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset, > return bytes_sent; > } > > +static int ram_save_compressed_page(QEMUFile *f, RAMBlock* block, > + ram_addr_t offset, bool last_stage) > +{ > + int bytes_sent = 0; > + > + /* To be done*/ > + > + return bytes_sent; > +} > + > /* > * ram_find_and_save_block: Finds a page to send and sends it to f > * > @@ -677,7 +748,12 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage) > ram_bulk_stage = false; > } > } else { > - bytes_sent = ram_save_page(f, block, offset, last_stage); > + if (migrate_use_compression()) { > + bytes_sent = ram_save_compressed_page(f, > + block, offset, last_stage); > + } else { > + bytes_sent = ram_save_page(f, block, offset, last_stage); > + } > > /* if page is unmodified, continue to the next */ > if (bytes_sent > 0) { > diff --git a/include/migration/migration.h b/include/migration/migration.h > index 3cb5ba8..daf6c81 100644 > --- a/include/migration/migration.h > +++ b/include/migration/migration.h > @@ -49,6 +49,9 @@ struct MigrationState > QemuThread thread; > QEMUBH *cleanup_bh; > QEMUFile *file; > + QemuThread *compress_thread; > + int compress_thread_count; > + int compress_level; OK, these get replaced later anyway by the parameter stuff. > > int state; > MigrationParams params; > @@ -107,6 +110,8 @@ bool migration_has_finished(MigrationState *); > bool migration_has_failed(MigrationState *); > MigrationState *migrate_get_current(void); > > +void migrate_compress_threads_create(MigrationState *s); > +void migrate_compress_threads_join(MigrationState *s); > uint64_t ram_bytes_remaining(void); > uint64_t ram_bytes_transferred(void); > uint64_t ram_bytes_total(void); > @@ -156,6 +161,10 @@ int64_t migrate_xbzrle_cache_size(void); > > int64_t xbzrle_cache_resize(int64_t new_size); > > +bool migrate_use_compression(void); > +int migrate_compress_level(void); > +int migrate_compress_threads(void); > + > void ram_control_before_iterate(QEMUFile *f, uint64_t flags); > void ram_control_after_iterate(QEMUFile *f, uint64_t flags); > void ram_control_load_hook(QEMUFile *f, uint64_t flags); > diff --git a/migration.c b/migration.c > index c49a05a..402daae 100644 > --- a/migration.c > +++ b/migration.c > @@ -43,6 +43,11 @@ enum { > #define BUFFER_DELAY 100 > #define XFER_LIMIT_RATIO (1000 / BUFFER_DELAY) > > +/* Default compression thread count */ > +#define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8 > +/*0: means nocompress, 1: best speed, ... 9: best compress ratio */ > +#define DEFAULT_MIGRATE_COMPRESS_LEVEL 1 > + > /* Migration XBZRLE default cache size */ > #define DEFAULT_MIGRATE_CACHE_SIZE (64 * 1024 * 1024) > > @@ -60,6 +65,8 @@ MigrationState *migrate_get_current(void) > .bandwidth_limit = MAX_THROTTLE, > .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE, > .mbps = -1, > + .compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT, > + .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL, > }; > > return ¤t_migration; > @@ -302,6 +309,7 @@ static void migrate_fd_cleanup(void *opaque) > qemu_thread_join(&s->thread); > qemu_mutex_lock_iothread(); > > + migrate_compress_threads_join(s); > qemu_fclose(s->file); > s->file = NULL; > } > @@ -373,6 +381,8 @@ static MigrationState *migrate_init(const MigrationParams *params) > int64_t bandwidth_limit = s->bandwidth_limit; > bool enabled_capabilities[MIGRATION_CAPABILITY_MAX]; > int64_t xbzrle_cache_size = s->xbzrle_cache_size; > + int compress_level = s->compress_level; > + int compress_thread_count = s->compress_thread_count; > > memcpy(enabled_capabilities, s->enabled_capabilities, > sizeof(enabled_capabilities)); > @@ -383,6 +393,8 @@ static MigrationState *migrate_init(const MigrationParams *params) > sizeof(enabled_capabilities)); > s->xbzrle_cache_size = xbzrle_cache_size; > > + s->compress_level = compress_level; > + s->compress_thread_count = compress_thread_count; > s->bandwidth_limit = bandwidth_limit; > s->state = MIG_STATE_SETUP; > trace_migrate_set_state(MIG_STATE_SETUP); > @@ -555,6 +567,31 @@ bool migrate_zero_blocks(void) > return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_BLOCKS]; > } > > +bool migrate_use_compression(void) > +{ > + /* Disable compression before the series of patches are applied */ > + return false; > +} > + > +int migrate_compress_level(void) > +{ > + MigrationState *s; > + > + s = migrate_get_current(); > + > + return s->compress_level; > +} > + > +int migrate_compress_threads(void) > +{ > + MigrationState *s; > + > + s = migrate_get_current(); > + > + return s->compress_thread_count; > +} > + > + > int migrate_use_xbzrle(void) > { > MigrationState *s; > @@ -695,6 +732,7 @@ void migrate_fd_connect(MigrationState *s) > /* Notify before starting migration thread */ > notifier_list_notify(&migration_state_notifiers, s); > > + migrate_compress_threads_create(s); > qemu_thread_create(&s->thread, "migration", migration_thread, s, > QEMU_THREAD_JOINABLE); > } > -- > 1.8.3.1 > -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
On 12/11/2014 06:28 PM, Liang Li wrote: > Signed-off-by: Liang Li <liang.z.li@intel.com> > Signed-off-by: Yang Zhang <yang.z.zhang@intel.com> > --- > +void migrate_compress_threads_create(MigrationState *s) > +{ > + int i, thread_count; > + > + if (!migrate_use_compression()) { > + return; > + } > + quit_thread = false; > + thread_count = migrate_compress_threads(); > + s->compress_thread = g_malloc0(sizeof(QemuThread) > + * thread_count); Theoretically unsafe (well, unsafe if thread_count were unbounded, although it looks like you artificially cap it at 255 later in the series); better would be: s->compress_thread = g_new0(QemuThread, thread_count) because that catches potential multiplication overflow. > + comp_param = g_malloc0(sizeof(compress_param) * thread_count); Likewise. > > +static int ram_save_compressed_page(QEMUFile *f, RAMBlock* block, Spacing is off on the second '*'. > + ram_addr_t offset, bool last_stage) Indentation is off.
diff --git a/arch_init.c b/arch_init.c index 7680d28..a988ec2 100644 --- a/arch_init.c +++ b/arch_init.c @@ -332,6 +332,67 @@ static uint64_t migration_dirty_pages; static uint32_t last_version; static bool ram_bulk_stage; +struct compress_param { + /* To be done */ +}; +typedef struct compress_param compress_param; + +static compress_param *comp_param; +static bool quit_thread; + +static void *do_data_compress(void *opaque) +{ + while (!quit_thread) { + + /* To be done */ + + } + return NULL; +} + +static inline void terminate_compression_threads(void) +{ + quit_thread = true; + /* To be done */ +} + +void migrate_compress_threads_join(MigrationState *s) +{ + int i, thread_count; + + if (!migrate_use_compression()) { + return; + } + terminate_compression_threads(); + thread_count = migrate_compress_threads(); + for (i = 0; i < thread_count; i++) { + qemu_thread_join(s->compress_thread + i); + } + g_free(s->compress_thread); + g_free(comp_param); + s->compress_thread = NULL; + comp_param = NULL; +} + +void migrate_compress_threads_create(MigrationState *s) +{ + int i, thread_count; + + if (!migrate_use_compression()) { + return; + } + quit_thread = false; + thread_count = migrate_compress_threads(); + s->compress_thread = g_malloc0(sizeof(QemuThread) + * thread_count); + comp_param = g_malloc0(sizeof(compress_param) * thread_count); + for (i = 0; i < thread_count; i++) { + qemu_thread_create(s->compress_thread + i, "compress", + do_data_compress, comp_param + i, QEMU_THREAD_JOINABLE); + + } +} + /* Update the xbzrle cache to reflect a page that's been sent as all 0. * The important thing is that a stale (not-yet-0'd) page be replaced * by the new data. @@ -643,6 +704,16 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset, return bytes_sent; } +static int ram_save_compressed_page(QEMUFile *f, RAMBlock* block, + ram_addr_t offset, bool last_stage) +{ + int bytes_sent = 0; + + /* To be done*/ + + return bytes_sent; +} + /* * ram_find_and_save_block: Finds a page to send and sends it to f * @@ -677,7 +748,12 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage) ram_bulk_stage = false; } } else { - bytes_sent = ram_save_page(f, block, offset, last_stage); + if (migrate_use_compression()) { + bytes_sent = ram_save_compressed_page(f, + block, offset, last_stage); + } else { + bytes_sent = ram_save_page(f, block, offset, last_stage); + } /* if page is unmodified, continue to the next */ if (bytes_sent > 0) { diff --git a/include/migration/migration.h b/include/migration/migration.h index 3cb5ba8..daf6c81 100644 --- a/include/migration/migration.h +++ b/include/migration/migration.h @@ -49,6 +49,9 @@ struct MigrationState QemuThread thread; QEMUBH *cleanup_bh; QEMUFile *file; + QemuThread *compress_thread; + int compress_thread_count; + int compress_level; int state; MigrationParams params; @@ -107,6 +110,8 @@ bool migration_has_finished(MigrationState *); bool migration_has_failed(MigrationState *); MigrationState *migrate_get_current(void); +void migrate_compress_threads_create(MigrationState *s); +void migrate_compress_threads_join(MigrationState *s); uint64_t ram_bytes_remaining(void); uint64_t ram_bytes_transferred(void); uint64_t ram_bytes_total(void); @@ -156,6 +161,10 @@ int64_t migrate_xbzrle_cache_size(void); int64_t xbzrle_cache_resize(int64_t new_size); +bool migrate_use_compression(void); +int migrate_compress_level(void); +int migrate_compress_threads(void); + void ram_control_before_iterate(QEMUFile *f, uint64_t flags); void ram_control_after_iterate(QEMUFile *f, uint64_t flags); void ram_control_load_hook(QEMUFile *f, uint64_t flags); diff --git a/migration.c b/migration.c index c49a05a..402daae 100644 --- a/migration.c +++ b/migration.c @@ -43,6 +43,11 @@ enum { #define BUFFER_DELAY 100 #define XFER_LIMIT_RATIO (1000 / BUFFER_DELAY) +/* Default compression thread count */ +#define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8 +/*0: means nocompress, 1: best speed, ... 9: best compress ratio */ +#define DEFAULT_MIGRATE_COMPRESS_LEVEL 1 + /* Migration XBZRLE default cache size */ #define DEFAULT_MIGRATE_CACHE_SIZE (64 * 1024 * 1024) @@ -60,6 +65,8 @@ MigrationState *migrate_get_current(void) .bandwidth_limit = MAX_THROTTLE, .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE, .mbps = -1, + .compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT, + .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL, }; return ¤t_migration; @@ -302,6 +309,7 @@ static void migrate_fd_cleanup(void *opaque) qemu_thread_join(&s->thread); qemu_mutex_lock_iothread(); + migrate_compress_threads_join(s); qemu_fclose(s->file); s->file = NULL; } @@ -373,6 +381,8 @@ static MigrationState *migrate_init(const MigrationParams *params) int64_t bandwidth_limit = s->bandwidth_limit; bool enabled_capabilities[MIGRATION_CAPABILITY_MAX]; int64_t xbzrle_cache_size = s->xbzrle_cache_size; + int compress_level = s->compress_level; + int compress_thread_count = s->compress_thread_count; memcpy(enabled_capabilities, s->enabled_capabilities, sizeof(enabled_capabilities)); @@ -383,6 +393,8 @@ static MigrationState *migrate_init(const MigrationParams *params) sizeof(enabled_capabilities)); s->xbzrle_cache_size = xbzrle_cache_size; + s->compress_level = compress_level; + s->compress_thread_count = compress_thread_count; s->bandwidth_limit = bandwidth_limit; s->state = MIG_STATE_SETUP; trace_migrate_set_state(MIG_STATE_SETUP); @@ -555,6 +567,31 @@ bool migrate_zero_blocks(void) return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_BLOCKS]; } +bool migrate_use_compression(void) +{ + /* Disable compression before the series of patches are applied */ + return false; +} + +int migrate_compress_level(void) +{ + MigrationState *s; + + s = migrate_get_current(); + + return s->compress_level; +} + +int migrate_compress_threads(void) +{ + MigrationState *s; + + s = migrate_get_current(); + + return s->compress_thread_count; +} + + int migrate_use_xbzrle(void) { MigrationState *s; @@ -695,6 +732,7 @@ void migrate_fd_connect(MigrationState *s) /* Notify before starting migration thread */ notifier_list_notify(&migration_state_notifiers, s); + migrate_compress_threads_create(s); qemu_thread_create(&s->thread, "migration", migration_thread, s, QEMU_THREAD_JOINABLE); }