From patchwork Tue Sep 21 15:48:26 2021 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Anton Ivanov X-Patchwork-Id: 1530786 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@ozlabs.org Authentication-Results: ozlabs.org; spf=pass (sender SPF authorized) smtp.mailfrom=openvswitch.org (client-ip=140.211.166.137; helo=smtp4.osuosl.org; envelope-from=ovs-dev-bounces@openvswitch.org; receiver=) Received: from smtp4.osuosl.org (smtp4.osuosl.org [140.211.166.137]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (4096 bits) server-digest SHA256) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 4HDQnw5cpCz9sW5 for ; Wed, 22 Sep 2021 01:48:52 +1000 (AEST) Received: from localhost (localhost [127.0.0.1]) by smtp4.osuosl.org (Postfix) with ESMTP id 1A8A940666; Tue, 21 Sep 2021 15:48:50 +0000 (UTC) X-Virus-Scanned: amavisd-new at osuosl.org Received: from smtp4.osuosl.org ([127.0.0.1]) by localhost (smtp4.osuosl.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id 6ADy1rXHaL3L; Tue, 21 Sep 2021 15:48:47 +0000 (UTC) Received: from lists.linuxfoundation.org (lf-lists.osuosl.org [140.211.9.56]) by smtp4.osuosl.org (Postfix) with ESMTPS id 72FFB4041F; Tue, 21 Sep 2021 15:48:46 +0000 (UTC) Received: from lf-lists.osuosl.org (localhost [127.0.0.1]) by lists.linuxfoundation.org (Postfix) with ESMTP id 8C6ADC0028; Tue, 21 Sep 2021 15:48:45 +0000 (UTC) X-Original-To: ovs-dev@openvswitch.org Delivered-To: ovs-dev@lists.linuxfoundation.org Received: from smtp4.osuosl.org (smtp4.osuosl.org [IPv6:2605:bc80:3010::137]) by lists.linuxfoundation.org (Postfix) with ESMTP id 184DDC000D for ; Tue, 21 Sep 2021 15:48:43 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by smtp4.osuosl.org (Postfix) with ESMTP id DFB2E40130 for ; Tue, 21 Sep 2021 15:48:41 +0000 (UTC) X-Virus-Scanned: amavisd-new at osuosl.org Received: from smtp4.osuosl.org ([127.0.0.1]) by localhost (smtp4.osuosl.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id Um6UWG4_VeNs for ; Tue, 21 Sep 2021 15:48:39 +0000 (UTC) X-Greylist: from auto-whitelisted by SQLgrey-1.8.0 Received: from www.kot-begemot.co.uk (ivanoab7.miniserver.com [37.128.132.42]) by smtp4.osuosl.org (Postfix) with ESMTPS id AA91E400ED for ; Tue, 21 Sep 2021 15:48:39 +0000 (UTC) Received: from tun252.jain.kot-begemot.co.uk ([192.168.18.6] helo=jain.kot-begemot.co.uk) by www.kot-begemot.co.uk with esmtps (TLS1.3:ECDHE_RSA_AES_256_GCM_SHA384:256) (Exim 4.92) (envelope-from ) id 1mSi0Z-0002ZK-NY; Tue, 21 Sep 2021 15:48:36 +0000 Received: from jain.kot-begemot.co.uk ([192.168.3.3]) by jain.kot-begemot.co.uk with esmtp (Exim 4.92) (envelope-from ) id 1mSi0U-0006lj-UU; Tue, 21 Sep 2021 16:48:33 +0100 From: anton.ivanov@cambridgegreys.com To: ovs-dev@openvswitch.org Date: Tue, 21 Sep 2021 16:48:26 +0100 Message-Id: <20210921154827.25940-1-anton.ivanov@cambridgegreys.com> X-Mailer: git-send-email 2.20.1 MIME-Version: 1.0 X-Clacks-Overhead: GNU Terry Pratchett Cc: i.maximets@ovn.org, Anton Ivanov Subject: [ovs-dev] [OVN Patch v4 1/2] Make changes to the parallel processing API to allow pool sizing X-BeenThere: ovs-dev@openvswitch.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: ovs-dev-bounces@openvswitch.org Sender: "dev" From: Anton Ivanov 1. Make pool size user defineable. 2. Expose pool destruction. 3. Make pools resizeable at runtime. 4. Split pool start and completion to allow background execution. 5. Add a simplified API for SAFE walking single hash. Signed-off-by: Anton Ivanov --- lib/ovn-parallel-hmap.c | 290 +++++++++++++++++++++++++++++++--------- lib/ovn-parallel-hmap.h | 77 ++++++++++- northd/northd.c | 72 +++------- 3 files changed, 321 insertions(+), 118 deletions(-) diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c index b8c7ac786..1b3883441 100644 --- a/lib/ovn-parallel-hmap.c +++ b/lib/ovn-parallel-hmap.c @@ -51,7 +51,6 @@ static bool can_parallelize = false; * accompanied by a fence. It does not need to be atomic or be * accessed under a lock. */ -static bool workers_must_exit = false; static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools); @@ -70,10 +69,27 @@ static void merge_hash_results(struct worker_pool *pool OVS_UNUSED, void *fin_result, void *result_frags, int index); + +static bool init_control(struct worker_control *control, int id, + struct worker_pool *pool); + +static void cleanup_control(struct worker_pool *pool, int id); + +static void free_controls(struct worker_pool *pool); + +static struct worker_control *alloc_controls(int size); + +static void *standard_helper_thread(void *arg); + +struct worker_pool *ovn_add_standard_pool(int size) +{ + return add_worker_pool(standard_helper_thread, size); +} + bool -ovn_stop_parallel_processing(void) +ovn_stop_parallel_processing(struct worker_pool *pool) { - return workers_must_exit; + return pool->workers_must_exit; } bool @@ -92,11 +108,67 @@ ovn_can_parallelize_hashes(bool force_parallel) return can_parallelize; } + +void +destroy_pool(struct worker_pool *pool) { + char sem_name[256]; + + free_controls(pool); + sem_close(pool->done); + sprintf(sem_name, MAIN_SEM_NAME, sembase, pool); + sem_unlink(sem_name); + free(pool); +} + +bool +ovn_resize_pool(struct worker_pool *pool, int size) +{ + int i; + + ovs_assert(pool != NULL); + + if (!size) { + size = pool_size; + } + + ovs_mutex_lock(&init_mutex); + + if (can_parallelize) { + free_controls(pool); + pool->size = size; + + /* Allocate new control structures. */ + + pool->controls = alloc_controls(size); + pool->workers_must_exit = false; + + for (i = 0; i < pool->size; i++) { + if (! init_control(&pool->controls[i], i, pool)) { + goto cleanup; + } + } + } + ovs_mutex_unlock(&init_mutex); + return true; +cleanup: + + /* Something went wrong when opening semaphores. In this case + * it is better to shut off parallel procesing altogether + */ + + VLOG_INFO("Failed to initialize parallel processing, error %d", errno); + can_parallelize = false; + free_controls(pool); + + ovs_mutex_unlock(&init_mutex); + return false; +} + + struct worker_pool * -ovn_add_worker_pool(void *(*start)(void *)) +ovn_add_worker_pool(void *(*start)(void *), int size) { struct worker_pool *new_pool = NULL; - struct worker_control *new_control; bool test = false; int i; char sem_name[256]; @@ -113,38 +185,29 @@ ovn_add_worker_pool(void *(*start)(void *)) ovs_mutex_unlock(&init_mutex); } + if (!size) { + size = pool_size; + } + ovs_mutex_lock(&init_mutex); if (can_parallelize) { new_pool = xmalloc(sizeof(struct worker_pool)); - new_pool->size = pool_size; - new_pool->controls = NULL; + new_pool->size = size; + new_pool->start = start; sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool); new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0); if (new_pool->done == SEM_FAILED) { goto cleanup; } - new_pool->controls = - xmalloc(sizeof(struct worker_control) * new_pool->size); + new_pool->controls = alloc_controls(size); + new_pool->workers_must_exit = false; for (i = 0; i < new_pool->size; i++) { - new_control = &new_pool->controls[i]; - new_control->id = i; - new_control->done = new_pool->done; - new_control->data = NULL; - ovs_mutex_init(&new_control->mutex); - new_control->finished = ATOMIC_VAR_INIT(false); - sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i); - new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0); - if (new_control->fire == SEM_FAILED) { + if (!init_control(&new_pool->controls[i], i, new_pool)) { goto cleanup; } } - - for (i = 0; i < pool_size; i++) { - new_pool->controls[i].worker = - ovs_thread_create("worker pool helper", start, &new_pool->controls[i]); - } ovs_list_push_back(&worker_pools, &new_pool->list_node); } ovs_mutex_unlock(&init_mutex); @@ -157,16 +220,7 @@ cleanup: VLOG_INFO("Failed to initialize parallel processing, error %d", errno); can_parallelize = false; - if (new_pool->controls) { - for (i = 0; i < new_pool->size; i++) { - if (new_pool->controls[i].fire != SEM_FAILED) { - sem_close(new_pool->controls[i].fire); - sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i); - sem_unlink(sem_name); - break; /* semaphores past this one are uninitialized */ - } - } - } + free_controls(new_pool); if (new_pool->done != SEM_FAILED) { sem_close(new_pool->done); sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool); @@ -176,7 +230,6 @@ cleanup: return NULL; } - /* Initializes 'hmap' as an empty hash table with mask N. */ void ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask) @@ -221,13 +274,9 @@ ovn_fast_hmap_size_for(struct hmap *hmap, int size) /* Run a thread pool which uses a callback function to process results */ void -ovn_run_pool_callback(struct worker_pool *pool, - void *fin_result, void *result_frags, - void (*helper_func)(struct worker_pool *pool, - void *fin_result, - void *result_frags, int index)) +ovn_start_pool(struct worker_pool *pool) { - int index, completed; + int index; /* Ensure that all worker threads see the same data as the * main thread. @@ -238,8 +287,20 @@ ovn_run_pool_callback(struct worker_pool *pool, for (index = 0; index < pool->size; index++) { sem_post(pool->controls[index].fire); } +} - completed = 0; +/* Complete a thread pool which uses a callback function to process results + */ +void +ovn_complete_pool_callback(struct worker_pool *pool, + void *fin_result, void *result_frags, + void (*helper_func)(struct worker_pool *pool, + void *fin_result, + void *result_frags, + int index)) +{ + int index; + int completed = 0; do { bool test; @@ -282,6 +343,19 @@ ovn_run_pool_callback(struct worker_pool *pool, } while (completed < pool->size); } +/* Complete a thread pool which uses a callback function to process results + */ +void +ovn_run_pool_callback(struct worker_pool *pool, + void *fin_result, void *result_frags, + void (*helper_func)(struct worker_pool *pool, + void *fin_result, + void *result_frags, int index)) +{ + ovn_start_pool(pool); + ovn_complete_pool_callback(pool, fin_result, result_frags, helper_func); +} + /* Run a thread pool - basic, does not do results processing. */ void @@ -350,29 +424,99 @@ ovn_run_pool_list(struct worker_pool *pool, } void -ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl) +ovn_update_hashrow_locks(struct hmap *target, struct hashrow_locks *hrl) { int i; - if (hrl->mask != lflows->mask) { + if (hrl->mask != target->mask) { if (hrl->row_locks) { free(hrl->row_locks); } - hrl->row_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask + 1); - hrl->mask = lflows->mask; - for (i = 0; i <= lflows->mask; i++) { + hrl->row_locks = xcalloc(sizeof(struct ovs_mutex), target->mask + 1); + hrl->mask = target->mask; + for (i = 0; i <= target->mask; i++) { ovs_mutex_init(&hrl->row_locks[i]); } } } +static bool +init_control(struct worker_control *control, int id, + struct worker_pool *pool) +{ + char sem_name[256]; + control->id = id; + control->done = pool->done; + control->data = NULL; + ovs_mutex_init(&control->mutex); + control->finished = ATOMIC_VAR_INIT(false); + sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, id); + control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0); + control->pool = pool; + control->worker = 0; + if (control->fire == SEM_FAILED) { + return false; + } + control->worker = + ovs_thread_create("worker pool helper", pool->start, control); + return true; +} + static void -worker_pool_hook(void *aux OVS_UNUSED) { +cleanup_control(struct worker_pool *pool, int id) +{ + char sem_name[256]; + struct worker_control *control = &pool->controls[id]; + + if (control->fire != SEM_FAILED) { + sem_close(control->fire); + sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, id); + sem_unlink(sem_name); + } +} + +static void +free_controls(struct worker_pool *pool) +{ + int i; + if (pool->controls) { + pool->workers_must_exit = true; + for (i = 0; i < pool->size ; i++) { + if (pool->controls[i].fire != SEM_FAILED) { + sem_post(pool->controls[i].fire); + } + } + for (i = 0; i < pool->size ; i++) { + if (pool->controls[i].worker) { + pthread_join(pool->controls[i].worker, NULL); + pool->controls[i].worker = 0; + } + } + for (i = 0; i < pool->size; i++) { + cleanup_control(pool, i); + } + free(pool->controls); + pool->controls = NULL; + pool->workers_must_exit = false; + } +} + +static struct worker_control *alloc_controls(int size) +{ int i; + struct worker_control *controls = + xcalloc(sizeof(struct worker_control), size); + + for (i = 0; i < size ; i++) { + controls[i].fire = SEM_FAILED; + } + return controls; +} + +static void +worker_pool_hook(void *aux OVS_UNUSED) { static struct worker_pool *pool; char sem_name[256]; - workers_must_exit = true; - /* All workers must honour the must_exit flag and check for it regularly. * We can make it atomic and check it via atomics in workers, but that * is not really necessary as it is set just once - when the program @@ -383,17 +527,7 @@ worker_pool_hook(void *aux OVS_UNUSED) { /* Wake up the workers after the must_exit flag has been set */ LIST_FOR_EACH (pool, list_node, &worker_pools) { - for (i = 0; i < pool->size ; i++) { - sem_post(pool->controls[i].fire); - } - for (i = 0; i < pool->size ; i++) { - pthread_join(pool->controls[i].worker, NULL); - } - for (i = 0; i < pool->size ; i++) { - sem_close(pool->controls[i].fire); - sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i); - sem_unlink(sem_name); - } + free_controls(pool); sem_close(pool->done); sprintf(sem_name, MAIN_SEM_NAME, sembase, pool); sem_unlink(sem_name); @@ -458,4 +592,40 @@ merge_hash_results(struct worker_pool *pool OVS_UNUSED, hmap_destroy(&res_frags[index]); } +static void * +standard_helper_thread(void *arg) +{ + struct worker_control *control = (struct worker_control *) arg; + struct helper_data *hd; + int bnum; + struct hmap_node *element, *next; + + while (!stop_parallel_processing(control->pool)) { + wait_for_work(control); + hd = (struct helper_data *) control->data; + if (stop_parallel_processing(control->pool)) { + return NULL; + } + if (hd) { + for (bnum = control->id; bnum <= hd->target->mask; + bnum += control->pool->size) + { + element = hmap_first_in_bucket_num(hd->target, bnum); + while (element) { + if (stop_parallel_processing(control->pool)) { + return NULL; + } + next = element->next; + hd->element_func(element, hd->target, hd->data); + element = next; + } + } + + } + post_completed_work(control); + } + return NULL; +} + + #endif diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h index 2df132ea8..4cdd5c4e5 100644 --- a/lib/ovn-parallel-hmap.h +++ b/lib/ovn-parallel-hmap.h @@ -83,6 +83,7 @@ struct worker_control { void *data; /* Pointer to data to be processed. */ void *workload; /* back-pointer to the worker pool structure. */ pthread_t worker; + struct worker_pool *pool; }; struct worker_pool { @@ -90,16 +91,31 @@ struct worker_pool { struct ovs_list list_node; /* List of pools - used in cleanup/exit. */ struct worker_control *controls; /* "Handles" in this pool. */ sem_t *done; /* Work completion semaphorew. */ + void *(*start)(void *); /* Work function. */ + bool workers_must_exit; /* Pool to be destroyed flag. */ +}; + + +struct helper_data { + struct hmap *target; + void (*element_func)(struct hmap_node *element, struct hmap *target, + void *data); + void *data; }; /* Add a worker pool for thread function start() which expects a pointer to - * a worker_control structure as an argument. */ + * a worker_control structure as an argument. + * If size is non-zero, it is used for pool sizing. If size is zero, pool + * size uses system defaults. + */ -struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)); +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *), int size); + +struct worker_pool *ovn_add_standard_pool(int size); /* Setting this to true will make all processing threads exit */ -bool ovn_stop_parallel_processing(void); +bool ovn_stop_parallel_processing(struct worker_pool *pool); /* Build a hmap pre-sized for size elements */ @@ -143,6 +159,35 @@ void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result, void *fin_result, void *result_frags, int index)); +/* Start a pool in background + */ + +void ovn_start_pool(struct worker_pool *pool); + +/* Complete a background pool run, merge results from hash frags into a final + * hash result. + * The hash frags must be pre-sized to the same size. + */ + +void ovn_complete_pool_hash(struct worker_pool *pool, + struct hmap *result, struct hmap *result_frags); +/* Complete a background pool run, merge results from list frags into a final + * list result. + */ + +void ovn_complete_pool_list(struct worker_pool *pool, + struct ovs_list *result, struct ovs_list *result_frags); + +/* Complete a background pool run, call a callback function to perform + * processing of results. + */ + +void ovn_complete_pool_callback(struct worker_pool *pool, void *fin_result, + void *result_frags, + void (*helper_func)(struct worker_pool *pool, + void *fin_result, void *result_frags, int index)); + + /* Returns the first node in 'hmap' in the bucket in which the given 'hash' * would land, or a null pointer if that bucket is empty. */ @@ -230,7 +275,7 @@ struct hashrow_locks { /* Update an hash row locks structure to match the current hash size */ -void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl); +void ovn_update_hashrow_locks(struct hmap *, struct hashrow_locks *); /* Lock a hash row */ static inline void lock_hash_row(struct hashrow_locks *hrl, uint32_t hash) @@ -253,6 +298,10 @@ static inline void init_hash_row_locks(struct hashrow_locks *hrl) bool ovn_can_parallelize_hashes(bool force_parallel); +void ovn_destroy_pool(struct worker_pool *pool); + +bool ovn_resize_pool(struct worker_pool *pool, int size); + /* Use the OVN library functions for stuff which OVS has not defined * If OVS has defined these, they will still compile using the OVN * local names, but will be dropped by the linker in favour of the OVS @@ -263,9 +312,11 @@ bool ovn_can_parallelize_hashes(bool force_parallel); #define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force) -#define stop_parallel_processing() ovn_stop_parallel_processing() +#define stop_parallel_processing(pool) ovn_stop_parallel_processing(pool) + +#define add_worker_pool(start, size) ovn_add_worker_pool(start, size) -#define add_worker_pool(start) ovn_add_worker_pool(start) +#define add_standard_pool(start, size) ovn_add_standard_pool(start, size) #define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size) @@ -286,6 +337,20 @@ bool ovn_can_parallelize_hashes(bool force_parallel); #define run_pool_callback(pool, fin_result, result_frags, helper_func) \ ovn_run_pool_callback(pool, fin_result, result_frags, helper_func) +#define start_pool(pool) ovn_start_pool(pool) + +#define complete_pool_hash(pool, result, result_frags) \ + ovn_complete_pool_hash(pool, result, result_frags) + +#define complete_pool_list(pool, result, result_frags) \ + ovn_complete_pool_list(pool, result, result_frags) + +#define complete_pool_callback(pool, fin_result, result_frags, helper_func) \ + ovn_complete_pool_callback(pool, fin_result, result_frags, helper_func) + +#define destroy_pool(pool) ovn_destroy_pool(pool) + +#define resize_pool(pool, size) ovn_resize_pool(pool, size) #ifdef __clang__ diff --git a/northd/northd.c b/northd/northd.c index d1b87891c..7724d27e9 100644 --- a/northd/northd.c +++ b/northd/northd.c @@ -12871,16 +12871,10 @@ build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op, &lsi->actions); } -struct lflows_thread_pool { - struct worker_pool *pool; -}; - - static void * build_lflows_thread(void *arg) { struct worker_control *control = (struct worker_control *) arg; - struct lflows_thread_pool *workload; struct lswitch_flow_build_info *lsi; struct ovn_datapath *od; @@ -12889,21 +12883,20 @@ build_lflows_thread(void *arg) struct ovn_igmp_group *igmp_group; int bnum; - while (!stop_parallel_processing()) { + while (!stop_parallel_processing(control->pool)) { wait_for_work(control); - workload = (struct lflows_thread_pool *) control->workload; lsi = (struct lswitch_flow_build_info *) control->data; - if (stop_parallel_processing()) { + if (stop_parallel_processing(control->pool)) { return NULL; } - if (lsi && workload) { + if (lsi) { /* Iterate over bucket ThreadID, ThreadID+size, ... */ for (bnum = control->id; bnum <= lsi->datapaths->mask; - bnum += workload->pool->size) + bnum += control->pool->size) { HMAP_FOR_EACH_IN_PARALLEL (od, key_node, bnum, lsi->datapaths) { - if (stop_parallel_processing()) { + if (stop_parallel_processing(control->pool)) { return NULL; } build_lswitch_and_lrouter_iterate_by_od(od, lsi); @@ -12911,10 +12904,10 @@ build_lflows_thread(void *arg) } for (bnum = control->id; bnum <= lsi->ports->mask; - bnum += workload->pool->size) + bnum += control->pool->size) { HMAP_FOR_EACH_IN_PARALLEL (op, key_node, bnum, lsi->ports) { - if (stop_parallel_processing()) { + if (stop_parallel_processing(control->pool)) { return NULL; } build_lswitch_and_lrouter_iterate_by_op(op, lsi); @@ -12922,10 +12915,10 @@ build_lflows_thread(void *arg) } for (bnum = control->id; bnum <= lsi->lbs->mask; - bnum += workload->pool->size) + bnum += control->pool->size) { HMAP_FOR_EACH_IN_PARALLEL (lb, hmap_node, bnum, lsi->lbs) { - if (stop_parallel_processing()) { + if (stop_parallel_processing(control->pool)) { return NULL; } build_lswitch_arp_nd_service_monitor(lb, lsi->lflows, @@ -12943,11 +12936,11 @@ build_lflows_thread(void *arg) } for (bnum = control->id; bnum <= lsi->igmp_groups->mask; - bnum += workload->pool->size) + bnum += control->pool->size) { HMAP_FOR_EACH_IN_PARALLEL ( igmp_group, hmap_node, bnum, lsi->igmp_groups) { - if (stop_parallel_processing()) { + if (stop_parallel_processing(control->pool)) { return NULL; } build_lswitch_ip_mcast_igmp_mld(igmp_group, lsi->lflows, @@ -12962,43 +12955,18 @@ build_lflows_thread(void *arg) } static bool pool_init_done = false; -static struct lflows_thread_pool *build_lflows_pool = NULL; +static struct worker_pool *build_lflows_pool = NULL; static void init_lflows_thread_pool(void) { - int index; if (!pool_init_done) { - struct worker_pool *pool = add_worker_pool(build_lflows_thread); + build_lflows_pool = add_worker_pool(build_lflows_thread, 0); pool_init_done = true; - if (pool) { - build_lflows_pool = xmalloc(sizeof(*build_lflows_pool)); - build_lflows_pool->pool = pool; - for (index = 0; index < build_lflows_pool->pool->size; index++) { - build_lflows_pool->pool->controls[index].workload = - build_lflows_pool; - } - } } } -/* TODO: replace hard cutoffs by configurable via commands. These are - * temporary defines to determine single-thread to multi-thread processing - * cutoff. - * Setting to 1 forces "all parallel" lflow build. - */ - -static void -noop_callback(struct worker_pool *pool OVS_UNUSED, - void *fin_result OVS_UNUSED, - void *result_frags OVS_UNUSED, - int index OVS_UNUSED) -{ - /* Do nothing */ -} - - static void build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports, struct hmap *port_groups, struct hmap *lflows, @@ -13022,16 +12990,16 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports, struct lswitch_flow_build_info *lsiv; int index; - lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->pool->size); + lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->size); if (use_logical_dp_groups) { lflow_segs = NULL; } else { - lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->pool->size); + lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->size); } /* Set up "work chunks" for each thread to work on. */ - for (index = 0; index < build_lflows_pool->pool->size; index++) { + for (index = 0; index < build_lflows_pool->size; index++) { if (use_logical_dp_groups) { /* if dp_groups are in use we lock a shared lflows hash * on a per-bucket level instead of merging hash frags */ @@ -13053,17 +13021,17 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports, ds_init(&lsiv[index].match); ds_init(&lsiv[index].actions); - build_lflows_pool->pool->controls[index].data = &lsiv[index]; + build_lflows_pool->controls[index].data = &lsiv[index]; } /* Run thread pool. */ if (use_logical_dp_groups) { - run_pool_callback(build_lflows_pool->pool, NULL, NULL, noop_callback); + run_pool_callback(build_lflows_pool, NULL, NULL, NULL); } else { - run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs); + run_pool_hash(build_lflows_pool, lflows, lflow_segs); } - for (index = 0; index < build_lflows_pool->pool->size; index++) { + for (index = 0; index < build_lflows_pool->size; index++) { ds_destroy(&lsiv[index].match); ds_destroy(&lsiv[index].actions); } From patchwork Tue Sep 21 15:48:27 2021 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Anton Ivanov X-Patchwork-Id: 1530785 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@ozlabs.org Authentication-Results: ozlabs.org; spf=pass (sender SPF authorized) smtp.mailfrom=openvswitch.org (client-ip=2605:bc80:3010::136; helo=smtp3.osuosl.org; envelope-from=ovs-dev-bounces@openvswitch.org; receiver=) Received: from smtp3.osuosl.org (smtp3.osuosl.org [IPv6:2605:bc80:3010::136]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (4096 bits) server-digest SHA256) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 4HDQns5xTPz9sW5 for ; Wed, 22 Sep 2021 01:48:49 +1000 (AEST) Received: from localhost (localhost [127.0.0.1]) by smtp3.osuosl.org (Postfix) with ESMTP id 5202E60BB6; Tue, 21 Sep 2021 15:48:46 +0000 (UTC) X-Virus-Scanned: amavisd-new at osuosl.org Received: from smtp3.osuosl.org ([127.0.0.1]) by localhost (smtp3.osuosl.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id 0SQCleDu7pHW; Tue, 21 Sep 2021 15:48:44 +0000 (UTC) Received: from lists.linuxfoundation.org (lf-lists.osuosl.org [140.211.9.56]) by smtp3.osuosl.org (Postfix) with ESMTPS id EC44760595; Tue, 21 Sep 2021 15:48:43 +0000 (UTC) Received: from lf-lists.osuosl.org (localhost [127.0.0.1]) by lists.linuxfoundation.org (Postfix) with ESMTP id B92C5C000F; Tue, 21 Sep 2021 15:48:43 +0000 (UTC) X-Original-To: ovs-dev@openvswitch.org Delivered-To: ovs-dev@lists.linuxfoundation.org Received: from smtp4.osuosl.org (smtp4.osuosl.org [140.211.166.137]) by lists.linuxfoundation.org (Postfix) with ESMTP id 0793CC000D for ; Tue, 21 Sep 2021 15:48:42 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by smtp4.osuosl.org (Postfix) with ESMTP id DDD9A4012C for ; Tue, 21 Sep 2021 15:48:41 +0000 (UTC) X-Virus-Scanned: amavisd-new at osuosl.org Received: from smtp4.osuosl.org ([127.0.0.1]) by localhost (smtp4.osuosl.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id WNzOMFsxvBKP for ; Tue, 21 Sep 2021 15:48:40 +0000 (UTC) X-Greylist: from auto-whitelisted by SQLgrey-1.8.0 Received: from www.kot-begemot.co.uk (ivanoab7.miniserver.com [37.128.132.42]) by smtp4.osuosl.org (Postfix) with ESMTPS id 09983400F4 for ; Tue, 21 Sep 2021 15:48:39 +0000 (UTC) Received: from tun252.jain.kot-begemot.co.uk ([192.168.18.6] helo=jain.kot-begemot.co.uk) by www.kot-begemot.co.uk with esmtps (TLS1.3:ECDHE_RSA_AES_256_GCM_SHA384:256) (Exim 4.92) (envelope-from ) id 1mSi0b-0002ZO-Di; Tue, 21 Sep 2021 15:48:38 +0000 Received: from jain.kot-begemot.co.uk ([192.168.3.3]) by jain.kot-begemot.co.uk with esmtp (Exim 4.92) (envelope-from ) id 1mSi0X-0006lj-56; Tue, 21 Sep 2021 16:48:35 +0100 From: anton.ivanov@cambridgegreys.com To: ovs-dev@openvswitch.org Date: Tue, 21 Sep 2021 16:48:27 +0100 Message-Id: <20210921154827.25940-2-anton.ivanov@cambridgegreys.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20210921154827.25940-1-anton.ivanov@cambridgegreys.com> References: <20210921154827.25940-1-anton.ivanov@cambridgegreys.com> MIME-Version: 1.0 X-Clacks-Overhead: GNU Terry Pratchett Cc: i.maximets@ovn.org, Anton Ivanov Subject: [ovs-dev] [OVN Patch v4 2/2] Add support for configuring parallelization via unixctl X-BeenThere: ovs-dev@openvswitch.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: ovs-dev-bounces@openvswitch.org Sender: "dev" From: Anton Ivanov libs: add configuration support to parallel-hmap.[c,h] northd: add support for configuring parallelization to northd Signed-off-by: Anton Ivanov --- lib/ovn-parallel-hmap.c | 185 ++++++++++++++++++++++++++++++++++++++-- lib/ovn-parallel-hmap.h | 63 +++++++++++++- northd/northd.c | 30 +++---- northd/northd.h | 2 - northd/ovn-northd.c | 5 +- tests/ovn-macros.at | 16 +++- 6 files changed, 263 insertions(+), 38 deletions(-) diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c index 1b3883441..6a6488a17 100644 --- a/lib/ovn-parallel-hmap.c +++ b/lib/ovn-parallel-hmap.c @@ -33,6 +33,7 @@ #include "ovs-thread.h" #include "ovs-numa.h" #include "random.h" +#include "unixctl.h" VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap); @@ -46,6 +47,7 @@ VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap); */ static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false); static bool can_parallelize = false; +static bool should_parallelize = false; /* This is set only in the process of exit and the set is * accompanied by a fence. It does not need to be atomic or be @@ -83,7 +85,7 @@ static void *standard_helper_thread(void *arg); struct worker_pool *ovn_add_standard_pool(int size) { - return add_worker_pool(standard_helper_thread, size); + return add_worker_pool(standard_helper_thread, size, "default", true); } bool @@ -92,6 +94,19 @@ ovn_stop_parallel_processing(struct worker_pool *pool) return pool->workers_must_exit; } +bool +ovn_set_parallel_processing(bool enable) +{ + should_parallelize = enable; + return can_parallelize; +} + +bool +ovn_get_parallel_processing(void) +{ + return can_parallelize && should_parallelize; +} + bool ovn_can_parallelize_hashes(bool force_parallel) { @@ -117,6 +132,7 @@ destroy_pool(struct worker_pool *pool) { sem_close(pool->done); sprintf(sem_name, MAIN_SEM_NAME, sembase, pool); sem_unlink(sem_name); + free(pool->name); free(pool); } @@ -127,6 +143,10 @@ ovn_resize_pool(struct worker_pool *pool, int size) ovs_assert(pool != NULL); + if (!pool->is_mutable) { + return false; + } + if (!size) { size = pool_size; } @@ -166,7 +186,8 @@ cleanup: struct worker_pool * -ovn_add_worker_pool(void *(*start)(void *), int size) +ovn_add_worker_pool(void *(*start)(void *), int size, char *name, + bool is_mutable) { struct worker_pool *new_pool = NULL; bool test = false; @@ -194,6 +215,8 @@ ovn_add_worker_pool(void *(*start)(void *), int size) new_pool = xmalloc(sizeof(struct worker_pool)); new_pool->size = size; new_pool->start = start; + new_pool->is_mutable = is_mutable; + new_pool->name = xstrdup(name); sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool); new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0); if (new_pool->done == SEM_FAILED) { @@ -226,6 +249,7 @@ cleanup: sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool); sem_unlink(sem_name); } + free(new_pool->name); ovs_mutex_unlock(&init_mutex); return NULL; } @@ -342,8 +366,7 @@ ovn_complete_pool_callback(struct worker_pool *pool, } } while (completed < pool->size); } - -/* Complete a thread pool which uses a callback function to process results +/* Run a thread pool which uses a callback function to process results */ void ovn_run_pool_callback(struct worker_pool *pool, @@ -352,8 +375,8 @@ ovn_run_pool_callback(struct worker_pool *pool, void *fin_result, void *result_frags, int index)) { - ovn_start_pool(pool); - ovn_complete_pool_callback(pool, fin_result, result_frags, helper_func); + start_pool(pool); + complete_pool_callback(pool, fin_result, result_frags, helper_func); } /* Run a thread pool - basic, does not do results processing. @@ -401,6 +424,28 @@ ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc) inc->n = 0; } +/* Run a thread pool which gathers results in an array + * of hashes. Merge results. + */ +void +ovn_complete_pool_hash(struct worker_pool *pool, + struct hmap *result, + struct hmap *result_frags) +{ + complete_pool_callback(pool, result, result_frags, merge_hash_results); +} + +/* Run a thread pool which gathers results in an array of lists. + * Merge results. + */ +void +ovn_complete_pool_list(struct worker_pool *pool, + struct ovs_list *result, + struct ovs_list *result_frags) +{ + complete_pool_callback(pool, result, result_frags, merge_list_results); +} + /* Run a thread pool which gathers results in an array * of hashes. Merge results. */ @@ -514,7 +559,7 @@ static struct worker_control *alloc_controls(int size) static void worker_pool_hook(void *aux OVS_UNUSED) { - static struct worker_pool *pool; + struct worker_pool *pool; char sem_name[256]; /* All workers must honour the must_exit flag and check for it regularly. @@ -628,4 +673,130 @@ standard_helper_thread(void *arg) } +static void +ovn_thread_pool_resize_pool(struct unixctl_conn *conn, int argc OVS_UNUSED, + const char *argv[], void *unused OVS_UNUSED) +{ + + struct worker_pool *pool; + int value; + + if (!str_to_int(argv[2], 10, &value)) { + unixctl_command_reply_error(conn, "invalid argument"); + return; + } + + if (value > 0) { + pool_size = value; + } + LIST_FOR_EACH (pool, list_node, &worker_pools) { + if (strcmp(pool->name, argv[1]) == 0) { + resize_pool(pool, value); + unixctl_command_reply_error(conn, NULL); + } + } + unixctl_command_reply_error(conn, "pool not found"); +} + +static void +ovn_thread_pool_list_pools(struct unixctl_conn *conn, int argc OVS_UNUSED, + const char *argv[] OVS_UNUSED, + void *unused OVS_UNUSED) +{ + + char *reply = NULL; + char *new_reply; + char buf[256]; + struct worker_pool *pool; + + LIST_FOR_EACH (pool, list_node, &worker_pools) { + snprintf(buf, 255, "%s : %d\n", pool->name, pool->size); + if (reply) { + new_reply = xmalloc(strlen(reply) + strlen(buf) + 1); + ovs_strlcpy(new_reply, reply, strlen(reply)); + strcat(new_reply, buf); + free(reply); + } + reply = new_reply; + } + unixctl_command_reply(conn, reply); +} + +static void +ovn_thread_pool_set_parallel_on(struct unixctl_conn *conn, int argc OVS_UNUSED, + const char *argv[], void *unused OVS_UNUSED) +{ + int value; + bool result; + if (!str_to_int(argv[1], 10, &value)) { + unixctl_command_reply_error(conn, "invalid argument"); + return; + } + + if (!ovn_can_parallelize_hashes(true)) { + unixctl_command_reply_error(conn, "cannot enable parallel processing"); + return; + } + + if (value > 0) { + /* Change default pool size */ + ovs_mutex_lock(&init_mutex); + pool_size = value; + ovs_mutex_unlock(&init_mutex); + } + + result = ovn_set_parallel_processing(true); + unixctl_command_reply(conn, result ? "enabled" : "disabled"); +} + +static void +ovn_thread_pool_set_parallel_off(struct unixctl_conn *conn, + int argc OVS_UNUSED, + const char *argv[] OVS_UNUSED, + void *unused OVS_UNUSED) +{ + ovn_set_parallel_processing(false); + unixctl_command_reply(conn, NULL); +} + +static void +ovn_thread_pool_parallel_status(struct unixctl_conn *conn, int argc OVS_UNUSED, + const char *argv[] OVS_UNUSED, + void *unused OVS_UNUSED) +{ + char status[256]; + + sprintf(status, "%s, default pool size %d", + get_parallel_processing() ? "active" : "inactive", + pool_size); + + unixctl_command_reply(conn, status); +} + +void +ovn_parallel_thread_pools_init(void) +{ + bool test = false; + + if (atomic_compare_exchange_strong( + &initial_pool_setup, + &test, + true)) { + ovs_mutex_lock(&init_mutex); + setup_worker_pools(false); + ovs_mutex_unlock(&init_mutex); + } + + unixctl_command_register("thread-pool/set-parallel-on", "N", 1, 1, + ovn_thread_pool_set_parallel_on, NULL); + unixctl_command_register("thread-pool/set-parallel-off", "", 0, 0, + ovn_thread_pool_set_parallel_off, NULL); + unixctl_command_register("thread-pool/status", "", 0, 0, + ovn_thread_pool_parallel_status, NULL); + unixctl_command_register("thread-pool/list", "", 0, 0, + ovn_thread_pool_list_pools, NULL); + unixctl_command_register("thread-pool/reload-pool", "Pool Threads", 2, 2, + ovn_thread_pool_resize_pool, NULL); +} + #endif diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h index 4cdd5c4e5..9c0a69cb1 100644 --- a/lib/ovn-parallel-hmap.h +++ b/lib/ovn-parallel-hmap.h @@ -33,6 +33,7 @@ extern "C" { #include "openvswitch/hmap.h" #include "openvswitch/thread.h" #include "ovs-atomic.h" +#include "unixctl.h" /* Process this include only if OVS does not supply parallel definitions */ @@ -93,6 +94,8 @@ struct worker_pool { sem_t *done; /* Work completion semaphorew. */ void *(*start)(void *); /* Work function. */ bool workers_must_exit; /* Pool to be destroyed flag. */ + char *name; /* Name to be used in cli commands */ + bool is_mutable; /* Can the pool be reloaded with different params */ }; @@ -109,7 +112,9 @@ struct helper_data { * size uses system defaults. */ -struct worker_pool *ovn_add_worker_pool(void *(*start)(void *), int size); +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *), + int size, char *name, + bool is_mutable); struct worker_pool *ovn_add_standard_pool(int size); @@ -188,6 +193,35 @@ void ovn_complete_pool_callback(struct worker_pool *pool, void *fin_result, void *fin_result, void *result_frags, int index)); +/* Start a pool. Do not wait for any results. They will be collected + * using the _complete_ functions. + */ +void ovn_start_pool(struct worker_pool *pool); + +/* Complete a pool run started using start_pool(); + * Merge results from hash frags into a final hash result. + * The hash frags must be pre-sized to the same size. + */ + +void ovn_complete_pool_hash(struct worker_pool *pool, + struct hmap *result, struct hmap *result_frags); + +/* Complete a pool run started using start_pool(); + * Merge results from list frags into a final list result. + */ + +void ovn_complete_pool_list(struct worker_pool *pool, + struct ovs_list *result, struct ovs_list *result_frags); + +/* Complete a pool run started using start_pool(); + * Call a callback function to perform processing of results. + */ + +void ovn_complete_pool_callback(struct worker_pool *pool, void *fin_result, + void *result_frags, + void (*helper_func)(struct worker_pool *pool, + void *fin_result, void *result_frags, int index)); + /* Returns the first node in 'hmap' in the bucket in which the given 'hash' * would land, or a null pointer if that bucket is empty. */ @@ -298,10 +332,16 @@ static inline void init_hash_row_locks(struct hashrow_locks *hrl) bool ovn_can_parallelize_hashes(bool force_parallel); +bool ovn_set_parallel_processing(bool enable); + +bool ovn_get_parallel_processing(void); + void ovn_destroy_pool(struct worker_pool *pool); bool ovn_resize_pool(struct worker_pool *pool, int size); +void ovn_parallel_thread_pools_init(void); + /* Use the OVN library functions for stuff which OVS has not defined * If OVS has defined these, they will still compile using the OVN * local names, but will be dropped by the linker in favour of the OVS @@ -312,9 +352,16 @@ bool ovn_resize_pool(struct worker_pool *pool, int size); #define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force) +#define set_parallel_processing(enable) ovn_set_parallel_processing(enable) + +#define get_parallel_processing() ovn_get_parallel_processing() + +#define enable_parallel_processing() ovn_enable_parallel_processing() + #define stop_parallel_processing(pool) ovn_stop_parallel_processing(pool) -#define add_worker_pool(start, size) ovn_add_worker_pool(start, size) +#define add_worker_pool(start, size, name, is_mutable) \ + ovn_add_worker_pool(start, size, name, is_mutable) #define add_standard_pool(start, size) ovn_add_standard_pool(start, size) @@ -339,6 +386,17 @@ bool ovn_resize_pool(struct worker_pool *pool, int size); #define start_pool(pool) ovn_start_pool(pool) +#define complete_pool_hash(pool, result, result_frags) \ + ovn_complete_pool_hash(pool, result, result_frags) + +#define complete_pool_list(pool, result, result_frags) \ + ovn_complete_pool_list(pool, result, result_frags) + +#define complete_pool_callback(pool, fin_result, result_frags, helper_func) \ + ovn_complete_pool_callback(pool, fin_result, result_frags, helper_func) + +#define start_pool(pool) ovn_start_pool(pool) + #define complete_pool_hash(pool, result, result_frags) \ ovn_complete_pool_hash(pool, result, result_frags) @@ -352,6 +410,7 @@ bool ovn_resize_pool(struct worker_pool *pool, int size); #define resize_pool(pool, size) ovn_resize_pool(pool, size) +#define parallel_thread_pools_init() ovn_parallel_thread_pools_init() #ifdef __clang__ #pragma clang diagnostic pop diff --git a/northd/northd.c b/northd/northd.c index 7724d27e9..d6401fe62 100644 --- a/northd/northd.c +++ b/northd/northd.c @@ -4279,7 +4279,6 @@ ovn_lflow_equal(const struct ovn_lflow *a, const struct ovn_datapath *od, /* If this option is 'true' northd will combine logical flows that differ by * logical datapath only by creating a datapath group. */ static bool use_logical_dp_groups = false; -static bool use_parallel_build = true; static void ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od, @@ -4298,7 +4297,7 @@ ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od, lflow->ctrl_meter = ctrl_meter; lflow->dpg = NULL; lflow->where = where; - if (use_parallel_build && use_logical_dp_groups) { + if (get_parallel_processing() && use_logical_dp_groups) { ovs_mutex_init(&lflow->odg_lock); } } @@ -4370,7 +4369,7 @@ do_ovn_lflow_add(struct hmap *lflow_map, struct ovn_datapath *od, nullable_xstrdup(ctrl_meter), ovn_lflow_hint(stage_hint), where); hmapx_add(&lflow->od_group, od); - if (!use_parallel_build) { + if (!get_parallel_processing()) { hmap_insert(lflow_map, &lflow->hmap_node, hash); } else { hmap_insert_fast(lflow_map, &lflow->hmap_node, hash); @@ -4441,7 +4440,7 @@ ovn_lflow_add_at_with_hash(struct hmap *lflow_map, struct ovn_datapath *od, struct ovn_lflow *lflow; ovs_assert(ovn_stage_to_datapath_type(stage) == ovn_datapath_get_type(od)); - if (use_logical_dp_groups && use_parallel_build) { + if (use_logical_dp_groups && get_parallel_processing()) { lflow = do_ovn_lflow_add_pd(lflow_map, od, hash, stage, priority, match, actions, io_port, stage_hint, where, ctrl_meter); @@ -4479,7 +4478,7 @@ ovn_dp_group_add_with_reference(struct ovn_lflow *lflow_ref, return false; } - if (use_parallel_build && use_logical_dp_groups) { + if (get_parallel_processing() && use_logical_dp_groups) { ovs_mutex_lock(&lflow_ref->odg_lock); hmapx_add(&lflow_ref->od_group, od); ovs_mutex_unlock(&lflow_ref->odg_lock); @@ -12962,7 +12961,8 @@ init_lflows_thread_pool(void) { if (!pool_init_done) { - build_lflows_pool = add_worker_pool(build_lflows_thread, 0); + build_lflows_pool = add_worker_pool(build_lflows_thread, 0, + "lflows", true); pool_init_done = true; } } @@ -12978,14 +12978,11 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports, char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac); - if (use_parallel_build) { + if (get_parallel_processing()) { init_lflows_thread_pool(); - if (!can_parallelize_hashes(false)) { - use_parallel_build = false; - } } - if (use_parallel_build) { + if (get_parallel_processing()) { struct hmap *lflow_segs; struct lswitch_flow_build_info *lsiv; int index; @@ -13185,19 +13182,19 @@ build_lflows(struct northd_context *ctx, struct hmap *datapaths, if (reset_parallel) { /* Parallel build was disabled before, we need to * re-enable it. */ - use_parallel_build = true; + set_parallel_processing(true); reset_parallel = false; } fast_hmap_size_for(&lflows, max_seen_lflow_size); - if (use_parallel_build && use_logical_dp_groups && + if (get_parallel_processing() && use_logical_dp_groups && needs_parallel_init) { ovs_rwlock_init(&flowtable_lock); needs_parallel_init = false; /* Disable parallel build on first run with dp_groups * to determine the correct sizing of hashes. */ - use_parallel_build = false; + set_parallel_processing(false); reset_parallel = true; } build_lswitch_and_lrouter_flows(datapaths, ports, @@ -14279,10 +14276,6 @@ ovnnb_db_run(struct northd_context *ctx, ovsdb_idl_set_probe_interval(ctx->ovnnb_idl, northd_probe_interval_nb); ovsdb_idl_set_probe_interval(ctx->ovnsb_idl, northd_probe_interval_sb); - use_parallel_build = - (smap_get_bool(&nb->options, "use_parallel_build", false) && - can_parallelize_hashes(false)); - use_logical_dp_groups = smap_get_bool(&nb->options, "use_logical_dp_groups", true); use_ct_inv_match = smap_get_bool(&nb->options, @@ -14652,7 +14645,6 @@ ovn_db_run(struct northd_context *ctx, ovs_list_init(&lr_list); hmap_init(&datapaths); hmap_init(&ports); - use_parallel_build = ctx->use_parallel_build; int64_t start_time = time_wall_msec(); stopwatch_start(OVNNB_DB_RUN_STOPWATCH_NAME, time_msec()); diff --git a/northd/northd.h b/northd/northd.h index ffa2bbb4e..5cbd183ef 100644 --- a/northd/northd.h +++ b/northd/northd.h @@ -27,8 +27,6 @@ struct northd_context { struct ovsdb_idl_index *sbrec_ha_chassis_grp_by_name; struct ovsdb_idl_index *sbrec_mcast_group_by_name_dp; struct ovsdb_idl_index *sbrec_ip_mcast_by_dp; - - bool use_parallel_build; }; void diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c index 42c0ad644..fb1268661 100644 --- a/northd/ovn-northd.c +++ b/northd/ovn-northd.c @@ -65,8 +65,6 @@ static const char *ssl_private_key_file; static const char *ssl_certificate_file; static const char *ssl_ca_cert_file; -static bool use_parallel_build = true; - static const char *rbac_chassis_auth[] = {"name"}; static const char *rbac_chassis_update[] = @@ -622,7 +620,7 @@ main(int argc, char *argv[]) daemonize_complete(); - use_parallel_build = can_parallelize_hashes(false); + ovn_parallel_thread_pools_init(); /* We want to detect (almost) all changes to the ovn-nb db. */ struct ovsdb_idl_loop ovnnb_idl_loop = OVSDB_IDL_LOOP_INITIALIZER( @@ -941,7 +939,6 @@ main(int argc, char *argv[]) .sbrec_ha_chassis_grp_by_name = sbrec_ha_chassis_grp_by_name, .sbrec_mcast_group_by_name_dp = sbrec_mcast_group_by_name_dp, .sbrec_ip_mcast_by_dp = sbrec_ip_mcast_by_dp, - .use_parallel_build = use_parallel_build, }; if (!state.had_lock && ovsdb_idl_has_lock(ovnsb_idl_loop.idl)) { diff --git a/tests/ovn-macros.at b/tests/ovn-macros.at index f06f2e68e..958ce18b0 100644 --- a/tests/ovn-macros.at +++ b/tests/ovn-macros.at @@ -179,6 +179,18 @@ ovn_start_northd() { test -d "$ovs_base/$name" || mkdir "$ovs_base/$name" as $name start_daemon $NORTHD_TYPE $northd_args -vjsonrpc \ --ovnnb-db=$OVN_NB_DB --ovnsb-db=$OVN_SB_DB + if test -z "$USE_PARALLEL_THREADS" ; then + USE_PARALLEL_THREADS=0 + fi + + if test X$NORTHD_USE_PARALLELIZATION = Xyes; then + case ${NORTHD_TYPE:=ovn-northd} in + ovn-northd) ovs-appctl --timeout=10 --target northd$suffix/ovn-northd \ + thread-pool/set-parallel-on $USE_PARALLEL_THREADS + ;; + esac + fi + } # ovn_start [--backup-northd=none|paused] [AZ] @@ -252,10 +264,6 @@ ovn_start () { else ovn-nbctl set NB_Global . options:use_logical_dp_groups=false fi - - if test X$NORTHD_USE_PARALLELIZATION = Xyes; then - ovn-nbctl set NB_Global . options:use_parallel_build=true - fi } # Interconnection networks.