Message ID | 20210107141921.1577-1-anton.ivanov@cambridgegreys.com |
---|---|
State | Superseded |
Headers | show |
Series | [ovs-dev,v9,1/3] ovn-libs: Add support for parallel processing | expand |
Bleep bloop. Greetings Anton Ivanov, I am a robot and I have tried out your patch. Thanks for your contribution. I encountered some error that I wasn't expecting. See the details below. checkpatch: WARNING: Line is 83 characters long (recommended limit is 79) #169 FILE: lib/fasthmap.c:128: ovs_thread_create("worker pool helper", start, &new_pool->controls[i]); ERROR: Improper whitespace around control block #377 FILE: lib/fasthmap.h:49: #define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \ Lines checked: 537, Warnings: 1, Errors: 1 Please check this out. If you feel there has been an error, please email aconole@redhat.com Thanks, 0-day Robot
On Thu, Jan 7, 2021 at 7:50 PM <anton.ivanov@cambridgegreys.com> wrote: > > From: Anton Ivanov <anton.ivanov@cambridgegreys.com> > > This adds a set of functions and macros intended to process > hashes in parallel. > > The principles of operation are documented in the fasthmap.h > > If these one day go into the OVS tree, the OVS tree versions > would be used in preference. > > Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com> Hi Anton, Thanks for spinning the v9. There seems to be some check patch warnings/errors. Can you please take a look. ovsrobot has run your patches in CI and looks like there are clang compilation errors - https://github.com/ovsrobot/ovn/tree/series_223190 and https://github.com/ovsrobot/ovn/runs/1663174911 Please take a look. *** clang -DHAVE_CONFIG_H -I. -I../.. -I ../../include -I ../../include -I ../../ovn -I ./include -I ../../lib -I ./lib -I /home/runner/work/ovn/ovn/ovs_src/include -I /home/runner/work/ovn/ovn/ovs_src/include -I /home/runner/work/ovn/ovn/ovs_src/lib -I /home/runner/work/ovn/ovn/ovs_src/lib -I /home/runner/work/ovn/ovn/ovs_src -I /home/runner/work/ovn/ovn/ovs_src -Wstrict-prototypes -Wall -Wextra -Wno-sign-compare -Wpointer-arith -Wformat -Wformat-security -Wswitch-enum -Wunused-parameter -Wbad-function-cast -Wcast-align -Wstrict-prototypes -Wold-style-definition -Wmissing-prototypes -Wmissing-field-initializers -Wthread-safety -fno-strict-aliasing -Wswitch-bool -Wlogical-not-parentheses -Wsizeof-array-argument -Wshift-negative-value -Qunused-arguments -Wshadow -Wno-null-pointer-arithmetic -Warray-bounds-pointer-arithmetic -fno-omit-frame-pointer -fno-common -Wno-error=unused-command-line-argument -fsanitize=address -Werror -Werror -g -O2 -MT ic/ovn-ic.o -MD -MP -MF $depbase.Tpo -c -o ic/ovn-ic.o ../../ic/ovn-ic.c &&\ 3723mv -f $depbase.Tpo $depbase.Po 3724../../northd/ovn-northd.c:4204:21: error: mutex 'slice_locks[hash % lflow_map->mask]' is not held on every path through here [-Werror,-Wthread-safety-analysis] 3725 old_lflow = ovn_lflow_find_by_lflow(lflow_map, lflow, hash); 3726 ^ 3727../../northd/ovn-northd.c:4202:13: note: mutex acquired here 3728 ovs_mutex_lock(&slice_locks[hash % lflow_map->mask]); 3729 ^ 3730/home/runner/work/ovn/ovn/ovs_src/include/openvswitch/thread.h:71:9: note: expanded from macro 'ovs_mutex_lock' 3731 ovs_mutex_lock_at(mutex, OVS_SOURCE_LOCATOR) 3732 ^ 3733../../northd/ovn-northd.c:4209:17: error: releasing mutex 'slice_locks[hash % lflow_map->mask]' that was not held [-Werror,-Wthread-safety-analysis] 3734 ovs_mutex_unlock(&slice_locks[hash % lflow_map->mask]); 3735 ^ 3736../../northd/ovn-northd.c:4218:9: error: releasing mutex 'slice_locks[hash % lflow_map->mask]' that was not held [-Werror,-Wthread-safety-analysis] 3737 ovs_mutex_unlock(&slice_locks[hash % lflow_map->mask]); 3738 ^ 37393 errors generated. 3740make[2]: *** [northd/ovn-northd.o] Error 1 3741Makefile:1979: recipe for target 'northd/ovn-northd.o' failed 3742make[2]: *** Waiting for unfinished jobs.... 3743make[2]: Leaving directory '/home/runner/work/ovn/ovn/ovn-20.12.90/_build/sub' 3744make[1]: *** [all] Error 2 3745Makefile:1291: recipe for target 'all' failed 3746make[1]: Leaving directory '/home/runner/work/ovn/ovn/ovn-20.12.90/_build/sub' 3747Makefile:2460: recipe for target 'distcheck' failed 3748make: *** [distcheck] Error 1 3749+ cat '*/_build/sub/tests/testsuite.log' 3750cat: '*/_build/sub/tests/testsuite.log': No such file or directory 3751Error: Process completed with exit code 1. *** Thanks Numan > --- > lib/automake.mk | 2 + > lib/fasthmap.c | 281 ++++++++++++++++++++++++++++++++++++++++++++++++ > lib/fasthmap.h | 206 +++++++++++++++++++++++++++++++++++ > 3 files changed, 489 insertions(+) > create mode 100644 lib/fasthmap.c > create mode 100644 lib/fasthmap.h > > diff --git a/lib/automake.mk b/lib/automake.mk > index 250c7aefa..d7e4b20cf 100644 > --- a/lib/automake.mk > +++ b/lib/automake.mk > @@ -13,6 +13,8 @@ lib_libovn_la_SOURCES = \ > lib/expr.c \ > lib/extend-table.h \ > lib/extend-table.c \ > + lib/fasthmap.h \ > + lib/fasthmap.c \ > lib/ip-mcast-index.c \ > lib/ip-mcast-index.h \ > lib/mcast-group-index.c \ > diff --git a/lib/fasthmap.c b/lib/fasthmap.c > new file mode 100644 > index 000000000..3096c90d3 > --- /dev/null > +++ b/lib/fasthmap.c > @@ -0,0 +1,281 @@ > +/* > + * Copyright (c) 2020 Red Hat, Inc. > + * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc. > + * > + * Licensed under the Apache License, Version 2.0 (the "License"); > + * you may not use this file except in compliance with the License. > + * You may obtain a copy of the License at: > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +#include <config.h> > +#include <stdint.h> > +#include <string.h> > +#include <stdlib.h> > +#include <unistd.h> > +#include <semaphore.h> > +#include "fatal-signal.h" > +#include "util.h" > +#include "openvswitch/vlog.h" > +#include "openvswitch/hmap.h" > +#include "openvswitch/thread.h" > +#include "fasthmap.h" > +#include "ovs-atomic.h" > +#include "ovs-thread.h" > +#include "ovs-numa.h" > + > +VLOG_DEFINE_THIS_MODULE(fasthmap); > + > + > +static bool worker_pool_setup = false; > +static bool workers_must_exit = false; > +static bool can_parallelize = false; > + > +static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools); > + > +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER; > + > +static int pool_size; > + > +static void worker_pool_hook(void *aux OVS_UNUSED) { > + int i; > + static struct worker_pool *pool; > + workers_must_exit = true; /* all workers must honour this flag */ > + atomic_thread_fence(memory_order_release); > + LIST_FOR_EACH (pool, list_node, &worker_pools) { > + for (i = 0; i < pool->size ; i++) { > + sem_post(&pool->controls[i].fire); > + } > + } > +} > + > +static void setup_worker_pools(void) { > + int cores, nodes; > + > + nodes = ovs_numa_get_n_numas(); > + if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) { > + nodes = 1; > + } > + cores = ovs_numa_get_n_cores(); > + > + /* If there is no NUMA config, use 4 cores. > + * If there is NUMA config use half the cores on > + * one node so that the OS does not start pushing > + * threads to other nodes. > + */ > + if (cores == OVS_CORE_UNSPEC || cores <= 0) { > + /* If there is no NUMA we can try the ovs-threads routine. > + * It falls back to sysconf and/or affinity mask. > + */ > + cores = count_cpu_cores(); > + pool_size = cores; > + } else { > + pool_size = cores / nodes; > + } > + if (pool_size > 16) { > + pool_size = 16; > + } > + can_parallelize = (pool_size >= 3); > + fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true); > + worker_pool_setup = true; > +} > + > +bool ovn_cease_fire(void) > +{ > + return workers_must_exit; > +} > + > +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)){ > + > + struct worker_pool *new_pool = NULL; > + struct worker_control *new_control; > + int i; > + > + ovs_mutex_lock(&init_mutex); > + > + if (!worker_pool_setup) { > + setup_worker_pools(); > + } > + > + if (can_parallelize) { > + new_pool = xmalloc(sizeof(struct worker_pool)); > + new_pool->size = pool_size; > + sem_init(&new_pool->done, 0, 0); > + > + ovs_list_push_back(&worker_pools, &new_pool->list_node); > + > + new_pool->controls = > + xmalloc(sizeof(struct worker_control) * new_pool->size); > + > + for (i = 0; i < new_pool->size; i++) { > + new_control = &new_pool->controls[i]; > + sem_init(&new_control->fire, 0, 0); > + new_control->id = i; > + new_control->done = &new_pool->done; > + new_control->data = NULL; > + ovs_mutex_init(&new_control->mutex); > + new_control->finished = ATOMIC_VAR_INIT(false); > + } > + > + for (i = 0; i < pool_size; i++) { > + ovs_thread_create("worker pool helper", start, &new_pool->controls[i]); > + } > + } > + ovs_mutex_unlock(&init_mutex); > + return new_pool; > +} > + > + > +/* Initializes 'hmap' as an empty hash table with mask N. */ > +void > +ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask) > +{ > + size_t i; > + > + hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1)); > + hmap->one = NULL; > + hmap->mask = mask; > + hmap->n = 0; > + for (i = 0; i <= hmap->mask; i++) { > + hmap->buckets[i] = NULL; > + } > +} > + > +/* Initializes 'hmap' as an empty hash table of size X. > + * Intended for use in parallel processing so that all > + * fragments used to store results in a parallel job > + * are the same size. > + */ > +void > +ovn_fast_hmap_size_for(struct hmap *hmap, int size) > +{ > + size_t mask; > + mask = size / 2; > + mask |= mask >> 1; > + mask |= mask >> 2; > + mask |= mask >> 4; > + mask |= mask >> 8; > + mask |= mask >> 16; > +#if SIZE_MAX > UINT32_MAX > + mask |= mask >> 32; > +#endif > + > + /* If we need to dynamically allocate buckets we might as well allocate at > + * least 4 of them. */ > + mask |= (mask & 1) << 1; > + > + fast_hmap_init(hmap, mask); > +} > + > +/* Run a thread pool which uses a callback function to process results > + */ > + > +void ovn_run_pool_callback( > + struct worker_pool *pool, > + void *fin_result, > + void *result_frags, > + void (*helper_func)(struct worker_pool *pool, > + void *fin_result, void *result_frags, int index)) > +{ > + int index, completed; > + > + atomic_thread_fence(memory_order_release); > + > + for (index = 0; index < pool->size; index++) { > + sem_post(&pool->controls[index].fire); > + } > + > + completed = 0; > + > + do { > + bool test; > + sem_wait(&pool->done); > + for (index = 0; index < pool->size; index++) { > + test = true; > + if (atomic_compare_exchange_weak( > + &pool->controls[index].finished, > + &test, > + false)) { > + if (helper_func) { > + (helper_func)(pool, fin_result, result_frags, index); > + } > + completed++; > + pool->controls[index].data = NULL; > + } > + } > + } while (completed < pool->size); > +} > + > +/* Run a thread pool - basic, does not do results processing. > + */ > + > +void ovn_run_pool(struct worker_pool *pool) > +{ > + ovn_run_pool_callback(pool, NULL, NULL, NULL); > +} > + > +/* Brute force merge of a hashmap into another hashmap. > + * Intended for use in parallel processing. The destination > + * hashmap MUST be the same size as the one being merged. > + * > + * This can be achieved by pre-allocating them to correct size > + * and using hmap_insert_fast() instead of hmap_insert() > + */ > + > +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc) > +{ > + size_t i; > + > + ovs_assert(inc->mask == dest->mask); > + > + if (!inc->n) { > + /* Request to merge an empty frag, nothing to do */ > + return; > + } > + > + for (i = 0; i <= dest->mask; i++) { > + struct hmap_node **dest_bucket = &dest->buckets[i]; > + struct hmap_node **inc_bucket = &inc->buckets[i]; > + if (*inc_bucket != NULL) { > + struct hmap_node *last_node = *inc_bucket; > + while (last_node->next != NULL) { > + last_node = last_node->next; > + } > + last_node->next = *dest_bucket; > + *dest_bucket = *inc_bucket; > + *inc_bucket = NULL; > + } > + } > + dest->n += inc->n; > + inc->n = 0; > +} > + > +/* Run a thread pool which gathers results in an array > + * of hashes. Merge results. > + */ > + > +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED, > + void *fin_result, void *result_frags, int index) > +{ > + struct hmap *result = (struct hmap *)fin_result; > + struct hmap *res_frags = (struct hmap *)result_frags; > + > + fast_hmap_merge(result, &res_frags[index]); > + hmap_destroy(&res_frags[index]); > +} > + > + > +void ovn_run_pool_hash( > + struct worker_pool *pool, > + struct hmap *result, > + struct hmap *result_frags) > +{ > + ovn_run_pool_callback(pool, result, result_frags, merge_hash_results); > +} > diff --git a/lib/fasthmap.h b/lib/fasthmap.h > new file mode 100644 > index 000000000..2a28553d5 > --- /dev/null > +++ b/lib/fasthmap.h > @@ -0,0 +1,206 @@ > +/* > + * Copyright (c) 2020 Red Hat, Inc. > + * > + * Licensed under the Apache License, Version 2.0 (the "License"); > + * you may not use this file except in compliance with the License. > + * You may obtain a copy of the License at: > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +#ifndef OVN_PARALLEL_HMAP > +#define OVN_PARALLEL_HMAP 1 > + > +/* Process this include only if OVS does not supply parallel definitions > + */ > + > +#ifndef OVS_HAS_PARALLEL_HMAP > + > +/* if the parallel macros are defined by hmap.h or any other ovs define > + * we skip over the ovn specific definitions. > + */ > + > +#ifdef __cplusplus > +extern "C" { > +#endif > + > +#include <stdbool.h> > +#include <stdlib.h> > +#include <semaphore.h> > +#include "openvswitch/util.h" > +#include "openvswitch/hmap.h" > +#include "openvswitch/thread.h" > +#include "ovs-atomic.h" > + > +/* A version of the HMAP_FOR_EACH macro intended for iterating as part > + * of parallel processing. > + * Each worker thread has a different ThreadID in the range of 0..POOL_SIZE > + * and will iterate hash buckets ThreadID, ThreadID + step, > + * ThreadID + step * 2, etc. The actual macro accepts > + * ThreadID + step * i as the JOBID parameter. > + */ > + > +#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \ > + for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER); \ > + (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \ > + || ((NODE = NULL), false); \ > + ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), MEMBER)) > + > +/* We do not have a SAFE version of the macro, because the hash size is not > + * atomic and hash removal operations would need to be wrapped with > + * locks. This will defeat most of the benefits from doing anything in > + * parallel. > + * If the code block inside FOR_EACH_IN_PARALLEL needs to remove elements, > + * each thread should store them in a temporary list result instead, merging > + * the lists into a combined result at the end */ > + > +/* Work "Handle" */ > + > +struct worker_control { > + int id; /* Used as a modulo when iterating over a hash. */ > + atomic_bool finished; /* Set to true after achunk of work is complete. */ > + sem_t fire; /* Work start semaphore - sem_post starts the worker. */ > + sem_t *done; /* Work completion semaphore - sem_post on completion. */ > + struct ovs_mutex mutex; /* Guards the data. */ > + void *data; /* Pointer to data to be processed. */ > + void *workload; /* back-pointer to the worker pool structure. */ > +}; > + > +struct worker_pool { > + int size; /* Number of threads in the pool. */ > + struct ovs_list list_node; /* List of pools - used in cleanup/exit. */ > + struct worker_control *controls; /* "Handles" in this pool. */ > + sem_t done; /* Work completion semaphorew. */ > +}; > + > +/* Add a worker pool for thread function start() which expects a pointer to > + * a worker_control structure as an argument. */ > + > +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)); > + > +/* Setting this to true will make all processing threads exit */ > + > +bool ovn_cease_fire(void); > + > +/* Build a hmap pre-sized for size elements */ > + > +void ovn_fast_hmap_size_for(struct hmap *hmap, int size); > + > +/* Build a hmap with a mask equals to size */ > + > +void ovn_fast_hmap_init(struct hmap *hmap, ssize_t size); > + > +/* Brute-force merge a hmap into hmap. > + * Dest and inc have to have the same mask. The merge is performed > + * by extending the element list for bucket N in the dest hmap with the list > + * from bucket N in inc. > + */ > + > +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc); > + > +/* Run a pool, without any default processing of results. > + */ > + > +void ovn_run_pool(struct worker_pool *pool); > + > +/* Run a pool, merge results from hash frags into a final hash result. > + * The hash frags must be pre-sized to the same size. > + */ > + > +void ovn_run_pool_hash(struct worker_pool *pool, > + struct hmap *result, struct hmap *result_frags); > + > +/* Run a pool, call a callback function to perform processing of results. > + */ > + > +void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result, > + void *result_frags, > + void (*helper_func)(struct worker_pool *pool, > + void *fin_result, void *result_frags, int index)); > + > + > +/* Returns the first node in 'hmap' in the bucket in which the given 'hash' > + * would land, or a null pointer if that bucket is empty. */ > + > +static inline struct hmap_node * > +hmap_first_in_bucket_num(const struct hmap *hmap, size_t num) > +{ > + return hmap->buckets[num]; > +} > + > +static inline struct hmap_node * > +parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t pool_size) > +{ > + size_t i; > + for (i = start; i <= hmap->mask; i+= pool_size) { > + struct hmap_node *node = hmap->buckets[i]; > + if (node) { > + return node; > + } > + } > + return NULL; > +} > + > +/* Returns the first node in 'hmap', as expected by thread with job_id > + * for parallel processing in arbitrary order, or a null pointer if > + * the slice of 'hmap' for that job_id is empty. */ > +static inline struct hmap_node * > +parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t pool_size) > +{ > + return parallel_hmap_next__(hmap, job_id, pool_size); > +} > + > +/* Returns the next node in the slice of 'hmap' following 'node', > + * in arbitrary order, or a * null pointer if 'node' is the last node in > + * the 'hmap' slice. > + * > + */ > +static inline struct hmap_node * > +parallel_hmap_next(const struct hmap *hmap, > + const struct hmap_node *node, ssize_t pool_size) > +{ > + return (node->next > + ? node->next > + : parallel_hmap_next__(hmap, > + (node->hash & hmap->mask) + pool_size, pool_size)); > +} > + > +/* Use the OVN library functions for stuff which OVS has not defined > + * If OVS has defined these, they will still compile using the OVN > + * local names, but will be dropped by the linker in favour of the OVS > + * supplied functions. > + */ > + > +#define cease_fire() ovn_cease_fire() > + > +#define add_worker_pool(start) ovn_add_worker_pool(start) > + > +#define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size) > + > +#define fast_hmap_init(hmap, size) ovn_fast_hmap_init(hmap, size) > + > +#define fast_hmap_merge(dest, inc) ovn_fast_hmap_merge(dest, inc) > + > +#define hmap_merge(dest, inc) ovn_hmap_merge(dest, inc) > + > +#define ovn_run_pool(pool) ovn_run_pool(pool) > + > +#define run_pool_hash(pool, result, result_frags) \ > + ovn_run_pool_hash(pool, result, result_frags) > + > +#define run_pool_callback(pool, fin_result, result_frags, helper_func) \ > + ovn_run_pool_callback(pool, fin_result, result_frags, helper_func) > + > +#ifdef __cplusplus > +} > +#endif > + > +#endif > + > +#endif /* lib/fasthmap.h */ > -- > 2.20.1 > > _______________________________________________ > dev mailing list > dev@openvswitch.org > https://mail.openvswitch.org/mailman/listinfo/ovs-dev >
On 08/01/2021 18:33, Numan Siddique wrote: > On Thu, Jan 7, 2021 at 7:50 PM <anton.ivanov@cambridgegreys.com> wrote: >> From: Anton Ivanov <anton.ivanov@cambridgegreys.com> >> >> This adds a set of functions and macros intended to process >> hashes in parallel. >> >> The principles of operation are documented in the fasthmap.h >> >> If these one day go into the OVS tree, the OVS tree versions >> would be used in preference. >> >> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com> > Hi Anton, > > Thanks for spinning the v9. > > There seems to be some check patch warnings/errors. Can you please take a look. I will fix the calloc() one, me a culpa. It is not possible to create a FOR_EACH define in a way which does not produce a patch check error. I tried at some point to feed it one of the defines from the hmap.h in the OVS source as "new material" and it barfed on it. If anyone has a suggestion how to alter it so it passes the check - I will be happy to accept them. Otherwise, syntactically it is valid. It also uses the same style as the OVS ones in hmap.h, list.h, etc. The CLANG looks like a mis-identification. The lock is held only if shared=true, dp_groups=true, parallel=true and is released with the same conditions in place. The release is in two places because there is a return inside the if. However, looking at the code this does not handle correctly locking for shared = false, dp_groups=true, parallel=true. At present, that will allow a hmap insert without a lock, which may end up as memory corruption. I will fix that along with making CLANG happy and resubmit. A. > > ovsrobot has run your patches in CI and looks like there are clang > compilation errors - > https://github.com/ovsrobot/ovn/tree/series_223190 > and https://github.com/ovsrobot/ovn/runs/1663174911 > > Please take a look. > > *** > clang -DHAVE_CONFIG_H -I. -I../.. -I ../../include -I ../../include -I > ../../ovn -I ./include -I ../../lib -I ./lib -I > /home/runner/work/ovn/ovn/ovs_src/include -I > /home/runner/work/ovn/ovn/ovs_src/include -I > /home/runner/work/ovn/ovn/ovs_src/lib -I > /home/runner/work/ovn/ovn/ovs_src/lib -I > /home/runner/work/ovn/ovn/ovs_src -I /home/runner/work/ovn/ovn/ovs_src > -Wstrict-prototypes -Wall -Wextra -Wno-sign-compare -Wpointer-arith > -Wformat -Wformat-security -Wswitch-enum -Wunused-parameter > -Wbad-function-cast -Wcast-align -Wstrict-prototypes > -Wold-style-definition -Wmissing-prototypes > -Wmissing-field-initializers -Wthread-safety -fno-strict-aliasing > -Wswitch-bool -Wlogical-not-parentheses -Wsizeof-array-argument > -Wshift-negative-value -Qunused-arguments -Wshadow > -Wno-null-pointer-arithmetic -Warray-bounds-pointer-arithmetic > -fno-omit-frame-pointer -fno-common > -Wno-error=unused-command-line-argument -fsanitize=address -Werror > -Werror -g -O2 -MT ic/ovn-ic.o -MD -MP -MF $depbase.Tpo -c -o > ic/ovn-ic.o ../../ic/ovn-ic.c &&\ > 3723mv -f $depbase.Tpo $depbase.Po > 3724../../northd/ovn-northd.c:4204:21: error: mutex 'slice_locks[hash > % lflow_map->mask]' is not held on every path through here > [-Werror,-Wthread-safety-analysis] > 3725 old_lflow = ovn_lflow_find_by_lflow(lflow_map, lflow, hash); > 3726 ^ > 3727../../northd/ovn-northd.c:4202:13: note: mutex acquired here > 3728 ovs_mutex_lock(&slice_locks[hash % lflow_map->mask]); > 3729 ^ > 3730/home/runner/work/ovn/ovn/ovs_src/include/openvswitch/thread.h:71:9: > note: expanded from macro 'ovs_mutex_lock' > 3731 ovs_mutex_lock_at(mutex, OVS_SOURCE_LOCATOR) > 3732 ^ > 3733../../northd/ovn-northd.c:4209:17: error: releasing mutex > 'slice_locks[hash % lflow_map->mask]' that was not held > [-Werror,-Wthread-safety-analysis] > 3734 ovs_mutex_unlock(&slice_locks[hash % lflow_map->mask]); > 3735 ^ > 3736../../northd/ovn-northd.c:4218:9: error: releasing mutex > 'slice_locks[hash % lflow_map->mask]' that was not held > [-Werror,-Wthread-safety-analysis] > 3737 ovs_mutex_unlock(&slice_locks[hash % lflow_map->mask]); > 3738 ^ > 37393 errors generated. > 3740make[2]: *** [northd/ovn-northd.o] Error 1 > 3741Makefile:1979: recipe for target 'northd/ovn-northd.o' failed > 3742make[2]: *** Waiting for unfinished jobs.... > 3743make[2]: Leaving directory > '/home/runner/work/ovn/ovn/ovn-20.12.90/_build/sub' > 3744make[1]: *** [all] Error 2 > 3745Makefile:1291: recipe for target 'all' failed > 3746make[1]: Leaving directory > '/home/runner/work/ovn/ovn/ovn-20.12.90/_build/sub' > 3747Makefile:2460: recipe for target 'distcheck' failed > 3748make: *** [distcheck] Error 1 > 3749+ cat '*/_build/sub/tests/testsuite.log' > 3750cat: '*/_build/sub/tests/testsuite.log': No such file or directory > 3751Error: Process completed with exit code 1. > *** > > Thanks > Numan > >> --- >> lib/automake.mk | 2 + >> lib/fasthmap.c | 281 ++++++++++++++++++++++++++++++++++++++++++++++++ >> lib/fasthmap.h | 206 +++++++++++++++++++++++++++++++++++ >> 3 files changed, 489 insertions(+) >> create mode 100644 lib/fasthmap.c >> create mode 100644 lib/fasthmap.h >> >> diff --git a/lib/automake.mk b/lib/automake.mk >> index 250c7aefa..d7e4b20cf 100644 >> --- a/lib/automake.mk >> +++ b/lib/automake.mk >> @@ -13,6 +13,8 @@ lib_libovn_la_SOURCES = \ >> lib/expr.c \ >> lib/extend-table.h \ >> lib/extend-table.c \ >> + lib/fasthmap.h \ >> + lib/fasthmap.c \ >> lib/ip-mcast-index.c \ >> lib/ip-mcast-index.h \ >> lib/mcast-group-index.c \ >> diff --git a/lib/fasthmap.c b/lib/fasthmap.c >> new file mode 100644 >> index 000000000..3096c90d3 >> --- /dev/null >> +++ b/lib/fasthmap.c >> @@ -0,0 +1,281 @@ >> +/* >> + * Copyright (c) 2020 Red Hat, Inc. >> + * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc. >> + * >> + * Licensed under the Apache License, Version 2.0 (the "License"); >> + * you may not use this file except in compliance with the License. >> + * You may obtain a copy of the License at: >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> + >> +#include <config.h> >> +#include <stdint.h> >> +#include <string.h> >> +#include <stdlib.h> >> +#include <unistd.h> >> +#include <semaphore.h> >> +#include "fatal-signal.h" >> +#include "util.h" >> +#include "openvswitch/vlog.h" >> +#include "openvswitch/hmap.h" >> +#include "openvswitch/thread.h" >> +#include "fasthmap.h" >> +#include "ovs-atomic.h" >> +#include "ovs-thread.h" >> +#include "ovs-numa.h" >> + >> +VLOG_DEFINE_THIS_MODULE(fasthmap); >> + >> + >> +static bool worker_pool_setup = false; >> +static bool workers_must_exit = false; >> +static bool can_parallelize = false; >> + >> +static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools); >> + >> +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER; >> + >> +static int pool_size; >> + >> +static void worker_pool_hook(void *aux OVS_UNUSED) { >> + int i; >> + static struct worker_pool *pool; >> + workers_must_exit = true; /* all workers must honour this flag */ >> + atomic_thread_fence(memory_order_release); >> + LIST_FOR_EACH (pool, list_node, &worker_pools) { >> + for (i = 0; i < pool->size ; i++) { >> + sem_post(&pool->controls[i].fire); >> + } >> + } >> +} >> + >> +static void setup_worker_pools(void) { >> + int cores, nodes; >> + >> + nodes = ovs_numa_get_n_numas(); >> + if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) { >> + nodes = 1; >> + } >> + cores = ovs_numa_get_n_cores(); >> + >> + /* If there is no NUMA config, use 4 cores. >> + * If there is NUMA config use half the cores on >> + * one node so that the OS does not start pushing >> + * threads to other nodes. >> + */ >> + if (cores == OVS_CORE_UNSPEC || cores <= 0) { >> + /* If there is no NUMA we can try the ovs-threads routine. >> + * It falls back to sysconf and/or affinity mask. >> + */ >> + cores = count_cpu_cores(); >> + pool_size = cores; >> + } else { >> + pool_size = cores / nodes; >> + } >> + if (pool_size > 16) { >> + pool_size = 16; >> + } >> + can_parallelize = (pool_size >= 3); >> + fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true); >> + worker_pool_setup = true; >> +} >> + >> +bool ovn_cease_fire(void) >> +{ >> + return workers_must_exit; >> +} >> + >> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)){ >> + >> + struct worker_pool *new_pool = NULL; >> + struct worker_control *new_control; >> + int i; >> + >> + ovs_mutex_lock(&init_mutex); >> + >> + if (!worker_pool_setup) { >> + setup_worker_pools(); >> + } >> + >> + if (can_parallelize) { >> + new_pool = xmalloc(sizeof(struct worker_pool)); >> + new_pool->size = pool_size; >> + sem_init(&new_pool->done, 0, 0); >> + >> + ovs_list_push_back(&worker_pools, &new_pool->list_node); >> + >> + new_pool->controls = >> + xmalloc(sizeof(struct worker_control) * new_pool->size); >> + >> + for (i = 0; i < new_pool->size; i++) { >> + new_control = &new_pool->controls[i]; >> + sem_init(&new_control->fire, 0, 0); >> + new_control->id = i; >> + new_control->done = &new_pool->done; >> + new_control->data = NULL; >> + ovs_mutex_init(&new_control->mutex); >> + new_control->finished = ATOMIC_VAR_INIT(false); >> + } >> + >> + for (i = 0; i < pool_size; i++) { >> + ovs_thread_create("worker pool helper", start, &new_pool->controls[i]); >> + } >> + } >> + ovs_mutex_unlock(&init_mutex); >> + return new_pool; >> +} >> + >> + >> +/* Initializes 'hmap' as an empty hash table with mask N. */ >> +void >> +ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask) >> +{ >> + size_t i; >> + >> + hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1)); >> + hmap->one = NULL; >> + hmap->mask = mask; >> + hmap->n = 0; >> + for (i = 0; i <= hmap->mask; i++) { >> + hmap->buckets[i] = NULL; >> + } >> +} >> + >> +/* Initializes 'hmap' as an empty hash table of size X. >> + * Intended for use in parallel processing so that all >> + * fragments used to store results in a parallel job >> + * are the same size. >> + */ >> +void >> +ovn_fast_hmap_size_for(struct hmap *hmap, int size) >> +{ >> + size_t mask; >> + mask = size / 2; >> + mask |= mask >> 1; >> + mask |= mask >> 2; >> + mask |= mask >> 4; >> + mask |= mask >> 8; >> + mask |= mask >> 16; >> +#if SIZE_MAX > UINT32_MAX >> + mask |= mask >> 32; >> +#endif >> + >> + /* If we need to dynamically allocate buckets we might as well allocate at >> + * least 4 of them. */ >> + mask |= (mask & 1) << 1; >> + >> + fast_hmap_init(hmap, mask); >> +} >> + >> +/* Run a thread pool which uses a callback function to process results >> + */ >> + >> +void ovn_run_pool_callback( >> + struct worker_pool *pool, >> + void *fin_result, >> + void *result_frags, >> + void (*helper_func)(struct worker_pool *pool, >> + void *fin_result, void *result_frags, int index)) >> +{ >> + int index, completed; >> + >> + atomic_thread_fence(memory_order_release); >> + >> + for (index = 0; index < pool->size; index++) { >> + sem_post(&pool->controls[index].fire); >> + } >> + >> + completed = 0; >> + >> + do { >> + bool test; >> + sem_wait(&pool->done); >> + for (index = 0; index < pool->size; index++) { >> + test = true; >> + if (atomic_compare_exchange_weak( >> + &pool->controls[index].finished, >> + &test, >> + false)) { >> + if (helper_func) { >> + (helper_func)(pool, fin_result, result_frags, index); >> + } >> + completed++; >> + pool->controls[index].data = NULL; >> + } >> + } >> + } while (completed < pool->size); >> +} >> + >> +/* Run a thread pool - basic, does not do results processing. >> + */ >> + >> +void ovn_run_pool(struct worker_pool *pool) >> +{ >> + ovn_run_pool_callback(pool, NULL, NULL, NULL); >> +} >> + >> +/* Brute force merge of a hashmap into another hashmap. >> + * Intended for use in parallel processing. The destination >> + * hashmap MUST be the same size as the one being merged. >> + * >> + * This can be achieved by pre-allocating them to correct size >> + * and using hmap_insert_fast() instead of hmap_insert() >> + */ >> + >> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc) >> +{ >> + size_t i; >> + >> + ovs_assert(inc->mask == dest->mask); >> + >> + if (!inc->n) { >> + /* Request to merge an empty frag, nothing to do */ >> + return; >> + } >> + >> + for (i = 0; i <= dest->mask; i++) { >> + struct hmap_node **dest_bucket = &dest->buckets[i]; >> + struct hmap_node **inc_bucket = &inc->buckets[i]; >> + if (*inc_bucket != NULL) { >> + struct hmap_node *last_node = *inc_bucket; >> + while (last_node->next != NULL) { >> + last_node = last_node->next; >> + } >> + last_node->next = *dest_bucket; >> + *dest_bucket = *inc_bucket; >> + *inc_bucket = NULL; >> + } >> + } >> + dest->n += inc->n; >> + inc->n = 0; >> +} >> + >> +/* Run a thread pool which gathers results in an array >> + * of hashes. Merge results. >> + */ >> + >> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED, >> + void *fin_result, void *result_frags, int index) >> +{ >> + struct hmap *result = (struct hmap *)fin_result; >> + struct hmap *res_frags = (struct hmap *)result_frags; >> + >> + fast_hmap_merge(result, &res_frags[index]); >> + hmap_destroy(&res_frags[index]); >> +} >> + >> + >> +void ovn_run_pool_hash( >> + struct worker_pool *pool, >> + struct hmap *result, >> + struct hmap *result_frags) >> +{ >> + ovn_run_pool_callback(pool, result, result_frags, merge_hash_results); >> +} >> diff --git a/lib/fasthmap.h b/lib/fasthmap.h >> new file mode 100644 >> index 000000000..2a28553d5 >> --- /dev/null >> +++ b/lib/fasthmap.h >> @@ -0,0 +1,206 @@ >> +/* >> + * Copyright (c) 2020 Red Hat, Inc. >> + * >> + * Licensed under the Apache License, Version 2.0 (the "License"); >> + * you may not use this file except in compliance with the License. >> + * You may obtain a copy of the License at: >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> + >> +#ifndef OVN_PARALLEL_HMAP >> +#define OVN_PARALLEL_HMAP 1 >> + >> +/* Process this include only if OVS does not supply parallel definitions >> + */ >> + >> +#ifndef OVS_HAS_PARALLEL_HMAP >> + >> +/* if the parallel macros are defined by hmap.h or any other ovs define >> + * we skip over the ovn specific definitions. >> + */ >> + >> +#ifdef __cplusplus >> +extern "C" { >> +#endif >> + >> +#include <stdbool.h> >> +#include <stdlib.h> >> +#include <semaphore.h> >> +#include "openvswitch/util.h" >> +#include "openvswitch/hmap.h" >> +#include "openvswitch/thread.h" >> +#include "ovs-atomic.h" >> + >> +/* A version of the HMAP_FOR_EACH macro intended for iterating as part >> + * of parallel processing. >> + * Each worker thread has a different ThreadID in the range of 0..POOL_SIZE >> + * and will iterate hash buckets ThreadID, ThreadID + step, >> + * ThreadID + step * 2, etc. The actual macro accepts >> + * ThreadID + step * i as the JOBID parameter. >> + */ >> + >> +#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \ >> + for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER); \ >> + (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \ >> + || ((NODE = NULL), false); \ >> + ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), MEMBER)) >> + >> +/* We do not have a SAFE version of the macro, because the hash size is not >> + * atomic and hash removal operations would need to be wrapped with >> + * locks. This will defeat most of the benefits from doing anything in >> + * parallel. >> + * If the code block inside FOR_EACH_IN_PARALLEL needs to remove elements, >> + * each thread should store them in a temporary list result instead, merging >> + * the lists into a combined result at the end */ >> + >> +/* Work "Handle" */ >> + >> +struct worker_control { >> + int id; /* Used as a modulo when iterating over a hash. */ >> + atomic_bool finished; /* Set to true after achunk of work is complete. */ >> + sem_t fire; /* Work start semaphore - sem_post starts the worker. */ >> + sem_t *done; /* Work completion semaphore - sem_post on completion. */ >> + struct ovs_mutex mutex; /* Guards the data. */ >> + void *data; /* Pointer to data to be processed. */ >> + void *workload; /* back-pointer to the worker pool structure. */ >> +}; >> + >> +struct worker_pool { >> + int size; /* Number of threads in the pool. */ >> + struct ovs_list list_node; /* List of pools - used in cleanup/exit. */ >> + struct worker_control *controls; /* "Handles" in this pool. */ >> + sem_t done; /* Work completion semaphorew. */ >> +}; >> + >> +/* Add a worker pool for thread function start() which expects a pointer to >> + * a worker_control structure as an argument. */ >> + >> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)); >> + >> +/* Setting this to true will make all processing threads exit */ >> + >> +bool ovn_cease_fire(void); >> + >> +/* Build a hmap pre-sized for size elements */ >> + >> +void ovn_fast_hmap_size_for(struct hmap *hmap, int size); >> + >> +/* Build a hmap with a mask equals to size */ >> + >> +void ovn_fast_hmap_init(struct hmap *hmap, ssize_t size); >> + >> +/* Brute-force merge a hmap into hmap. >> + * Dest and inc have to have the same mask. The merge is performed >> + * by extending the element list for bucket N in the dest hmap with the list >> + * from bucket N in inc. >> + */ >> + >> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc); >> + >> +/* Run a pool, without any default processing of results. >> + */ >> + >> +void ovn_run_pool(struct worker_pool *pool); >> + >> +/* Run a pool, merge results from hash frags into a final hash result. >> + * The hash frags must be pre-sized to the same size. >> + */ >> + >> +void ovn_run_pool_hash(struct worker_pool *pool, >> + struct hmap *result, struct hmap *result_frags); >> + >> +/* Run a pool, call a callback function to perform processing of results. >> + */ >> + >> +void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result, >> + void *result_frags, >> + void (*helper_func)(struct worker_pool *pool, >> + void *fin_result, void *result_frags, int index)); >> + >> + >> +/* Returns the first node in 'hmap' in the bucket in which the given 'hash' >> + * would land, or a null pointer if that bucket is empty. */ >> + >> +static inline struct hmap_node * >> +hmap_first_in_bucket_num(const struct hmap *hmap, size_t num) >> +{ >> + return hmap->buckets[num]; >> +} >> + >> +static inline struct hmap_node * >> +parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t pool_size) >> +{ >> + size_t i; >> + for (i = start; i <= hmap->mask; i+= pool_size) { >> + struct hmap_node *node = hmap->buckets[i]; >> + if (node) { >> + return node; >> + } >> + } >> + return NULL; >> +} >> + >> +/* Returns the first node in 'hmap', as expected by thread with job_id >> + * for parallel processing in arbitrary order, or a null pointer if >> + * the slice of 'hmap' for that job_id is empty. */ >> +static inline struct hmap_node * >> +parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t pool_size) >> +{ >> + return parallel_hmap_next__(hmap, job_id, pool_size); >> +} >> + >> +/* Returns the next node in the slice of 'hmap' following 'node', >> + * in arbitrary order, or a * null pointer if 'node' is the last node in >> + * the 'hmap' slice. >> + * >> + */ >> +static inline struct hmap_node * >> +parallel_hmap_next(const struct hmap *hmap, >> + const struct hmap_node *node, ssize_t pool_size) >> +{ >> + return (node->next >> + ? node->next >> + : parallel_hmap_next__(hmap, >> + (node->hash & hmap->mask) + pool_size, pool_size)); >> +} >> + >> +/* Use the OVN library functions for stuff which OVS has not defined >> + * If OVS has defined these, they will still compile using the OVN >> + * local names, but will be dropped by the linker in favour of the OVS >> + * supplied functions. >> + */ >> + >> +#define cease_fire() ovn_cease_fire() >> + >> +#define add_worker_pool(start) ovn_add_worker_pool(start) >> + >> +#define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size) >> + >> +#define fast_hmap_init(hmap, size) ovn_fast_hmap_init(hmap, size) >> + >> +#define fast_hmap_merge(dest, inc) ovn_fast_hmap_merge(dest, inc) >> + >> +#define hmap_merge(dest, inc) ovn_hmap_merge(dest, inc) >> + >> +#define ovn_run_pool(pool) ovn_run_pool(pool) >> + >> +#define run_pool_hash(pool, result, result_frags) \ >> + ovn_run_pool_hash(pool, result, result_frags) >> + >> +#define run_pool_callback(pool, fin_result, result_frags, helper_func) \ >> + ovn_run_pool_callback(pool, fin_result, result_frags, helper_func) >> + >> +#ifdef __cplusplus >> +} >> +#endif >> + >> +#endif >> + >> +#endif /* lib/fasthmap.h */ >> -- >> 2.20.1 >> >> _______________________________________________ >> dev mailing list >> dev@openvswitch.org >> https://mail.openvswitch.org/mailman/listinfo/ovs-dev >>
diff --git a/lib/automake.mk b/lib/automake.mk index 250c7aefa..d7e4b20cf 100644 --- a/lib/automake.mk +++ b/lib/automake.mk @@ -13,6 +13,8 @@ lib_libovn_la_SOURCES = \ lib/expr.c \ lib/extend-table.h \ lib/extend-table.c \ + lib/fasthmap.h \ + lib/fasthmap.c \ lib/ip-mcast-index.c \ lib/ip-mcast-index.h \ lib/mcast-group-index.c \ diff --git a/lib/fasthmap.c b/lib/fasthmap.c new file mode 100644 index 000000000..3096c90d3 --- /dev/null +++ b/lib/fasthmap.c @@ -0,0 +1,281 @@ +/* + * Copyright (c) 2020 Red Hat, Inc. + * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> +#include <stdint.h> +#include <string.h> +#include <stdlib.h> +#include <unistd.h> +#include <semaphore.h> +#include "fatal-signal.h" +#include "util.h" +#include "openvswitch/vlog.h" +#include "openvswitch/hmap.h" +#include "openvswitch/thread.h" +#include "fasthmap.h" +#include "ovs-atomic.h" +#include "ovs-thread.h" +#include "ovs-numa.h" + +VLOG_DEFINE_THIS_MODULE(fasthmap); + + +static bool worker_pool_setup = false; +static bool workers_must_exit = false; +static bool can_parallelize = false; + +static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools); + +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER; + +static int pool_size; + +static void worker_pool_hook(void *aux OVS_UNUSED) { + int i; + static struct worker_pool *pool; + workers_must_exit = true; /* all workers must honour this flag */ + atomic_thread_fence(memory_order_release); + LIST_FOR_EACH (pool, list_node, &worker_pools) { + for (i = 0; i < pool->size ; i++) { + sem_post(&pool->controls[i].fire); + } + } +} + +static void setup_worker_pools(void) { + int cores, nodes; + + nodes = ovs_numa_get_n_numas(); + if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) { + nodes = 1; + } + cores = ovs_numa_get_n_cores(); + + /* If there is no NUMA config, use 4 cores. + * If there is NUMA config use half the cores on + * one node so that the OS does not start pushing + * threads to other nodes. + */ + if (cores == OVS_CORE_UNSPEC || cores <= 0) { + /* If there is no NUMA we can try the ovs-threads routine. + * It falls back to sysconf and/or affinity mask. + */ + cores = count_cpu_cores(); + pool_size = cores; + } else { + pool_size = cores / nodes; + } + if (pool_size > 16) { + pool_size = 16; + } + can_parallelize = (pool_size >= 3); + fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true); + worker_pool_setup = true; +} + +bool ovn_cease_fire(void) +{ + return workers_must_exit; +} + +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)){ + + struct worker_pool *new_pool = NULL; + struct worker_control *new_control; + int i; + + ovs_mutex_lock(&init_mutex); + + if (!worker_pool_setup) { + setup_worker_pools(); + } + + if (can_parallelize) { + new_pool = xmalloc(sizeof(struct worker_pool)); + new_pool->size = pool_size; + sem_init(&new_pool->done, 0, 0); + + ovs_list_push_back(&worker_pools, &new_pool->list_node); + + new_pool->controls = + xmalloc(sizeof(struct worker_control) * new_pool->size); + + for (i = 0; i < new_pool->size; i++) { + new_control = &new_pool->controls[i]; + sem_init(&new_control->fire, 0, 0); + new_control->id = i; + new_control->done = &new_pool->done; + new_control->data = NULL; + ovs_mutex_init(&new_control->mutex); + new_control->finished = ATOMIC_VAR_INIT(false); + } + + for (i = 0; i < pool_size; i++) { + ovs_thread_create("worker pool helper", start, &new_pool->controls[i]); + } + } + ovs_mutex_unlock(&init_mutex); + return new_pool; +} + + +/* Initializes 'hmap' as an empty hash table with mask N. */ +void +ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask) +{ + size_t i; + + hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1)); + hmap->one = NULL; + hmap->mask = mask; + hmap->n = 0; + for (i = 0; i <= hmap->mask; i++) { + hmap->buckets[i] = NULL; + } +} + +/* Initializes 'hmap' as an empty hash table of size X. + * Intended for use in parallel processing so that all + * fragments used to store results in a parallel job + * are the same size. + */ +void +ovn_fast_hmap_size_for(struct hmap *hmap, int size) +{ + size_t mask; + mask = size / 2; + mask |= mask >> 1; + mask |= mask >> 2; + mask |= mask >> 4; + mask |= mask >> 8; + mask |= mask >> 16; +#if SIZE_MAX > UINT32_MAX + mask |= mask >> 32; +#endif + + /* If we need to dynamically allocate buckets we might as well allocate at + * least 4 of them. */ + mask |= (mask & 1) << 1; + + fast_hmap_init(hmap, mask); +} + +/* Run a thread pool which uses a callback function to process results + */ + +void ovn_run_pool_callback( + struct worker_pool *pool, + void *fin_result, + void *result_frags, + void (*helper_func)(struct worker_pool *pool, + void *fin_result, void *result_frags, int index)) +{ + int index, completed; + + atomic_thread_fence(memory_order_release); + + for (index = 0; index < pool->size; index++) { + sem_post(&pool->controls[index].fire); + } + + completed = 0; + + do { + bool test; + sem_wait(&pool->done); + for (index = 0; index < pool->size; index++) { + test = true; + if (atomic_compare_exchange_weak( + &pool->controls[index].finished, + &test, + false)) { + if (helper_func) { + (helper_func)(pool, fin_result, result_frags, index); + } + completed++; + pool->controls[index].data = NULL; + } + } + } while (completed < pool->size); +} + +/* Run a thread pool - basic, does not do results processing. + */ + +void ovn_run_pool(struct worker_pool *pool) +{ + ovn_run_pool_callback(pool, NULL, NULL, NULL); +} + +/* Brute force merge of a hashmap into another hashmap. + * Intended for use in parallel processing. The destination + * hashmap MUST be the same size as the one being merged. + * + * This can be achieved by pre-allocating them to correct size + * and using hmap_insert_fast() instead of hmap_insert() + */ + +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc) +{ + size_t i; + + ovs_assert(inc->mask == dest->mask); + + if (!inc->n) { + /* Request to merge an empty frag, nothing to do */ + return; + } + + for (i = 0; i <= dest->mask; i++) { + struct hmap_node **dest_bucket = &dest->buckets[i]; + struct hmap_node **inc_bucket = &inc->buckets[i]; + if (*inc_bucket != NULL) { + struct hmap_node *last_node = *inc_bucket; + while (last_node->next != NULL) { + last_node = last_node->next; + } + last_node->next = *dest_bucket; + *dest_bucket = *inc_bucket; + *inc_bucket = NULL; + } + } + dest->n += inc->n; + inc->n = 0; +} + +/* Run a thread pool which gathers results in an array + * of hashes. Merge results. + */ + +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED, + void *fin_result, void *result_frags, int index) +{ + struct hmap *result = (struct hmap *)fin_result; + struct hmap *res_frags = (struct hmap *)result_frags; + + fast_hmap_merge(result, &res_frags[index]); + hmap_destroy(&res_frags[index]); +} + + +void ovn_run_pool_hash( + struct worker_pool *pool, + struct hmap *result, + struct hmap *result_frags) +{ + ovn_run_pool_callback(pool, result, result_frags, merge_hash_results); +} diff --git a/lib/fasthmap.h b/lib/fasthmap.h new file mode 100644 index 000000000..2a28553d5 --- /dev/null +++ b/lib/fasthmap.h @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2020 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OVN_PARALLEL_HMAP +#define OVN_PARALLEL_HMAP 1 + +/* Process this include only if OVS does not supply parallel definitions + */ + +#ifndef OVS_HAS_PARALLEL_HMAP + +/* if the parallel macros are defined by hmap.h or any other ovs define + * we skip over the ovn specific definitions. + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <stdbool.h> +#include <stdlib.h> +#include <semaphore.h> +#include "openvswitch/util.h" +#include "openvswitch/hmap.h" +#include "openvswitch/thread.h" +#include "ovs-atomic.h" + +/* A version of the HMAP_FOR_EACH macro intended for iterating as part + * of parallel processing. + * Each worker thread has a different ThreadID in the range of 0..POOL_SIZE + * and will iterate hash buckets ThreadID, ThreadID + step, + * ThreadID + step * 2, etc. The actual macro accepts + * ThreadID + step * i as the JOBID parameter. + */ + +#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \ + for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER); \ + (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \ + || ((NODE = NULL), false); \ + ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), MEMBER)) + +/* We do not have a SAFE version of the macro, because the hash size is not + * atomic and hash removal operations would need to be wrapped with + * locks. This will defeat most of the benefits from doing anything in + * parallel. + * If the code block inside FOR_EACH_IN_PARALLEL needs to remove elements, + * each thread should store them in a temporary list result instead, merging + * the lists into a combined result at the end */ + +/* Work "Handle" */ + +struct worker_control { + int id; /* Used as a modulo when iterating over a hash. */ + atomic_bool finished; /* Set to true after achunk of work is complete. */ + sem_t fire; /* Work start semaphore - sem_post starts the worker. */ + sem_t *done; /* Work completion semaphore - sem_post on completion. */ + struct ovs_mutex mutex; /* Guards the data. */ + void *data; /* Pointer to data to be processed. */ + void *workload; /* back-pointer to the worker pool structure. */ +}; + +struct worker_pool { + int size; /* Number of threads in the pool. */ + struct ovs_list list_node; /* List of pools - used in cleanup/exit. */ + struct worker_control *controls; /* "Handles" in this pool. */ + sem_t done; /* Work completion semaphorew. */ +}; + +/* Add a worker pool for thread function start() which expects a pointer to + * a worker_control structure as an argument. */ + +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)); + +/* Setting this to true will make all processing threads exit */ + +bool ovn_cease_fire(void); + +/* Build a hmap pre-sized for size elements */ + +void ovn_fast_hmap_size_for(struct hmap *hmap, int size); + +/* Build a hmap with a mask equals to size */ + +void ovn_fast_hmap_init(struct hmap *hmap, ssize_t size); + +/* Brute-force merge a hmap into hmap. + * Dest and inc have to have the same mask. The merge is performed + * by extending the element list for bucket N in the dest hmap with the list + * from bucket N in inc. + */ + +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc); + +/* Run a pool, without any default processing of results. + */ + +void ovn_run_pool(struct worker_pool *pool); + +/* Run a pool, merge results from hash frags into a final hash result. + * The hash frags must be pre-sized to the same size. + */ + +void ovn_run_pool_hash(struct worker_pool *pool, + struct hmap *result, struct hmap *result_frags); + +/* Run a pool, call a callback function to perform processing of results. + */ + +void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result, + void *result_frags, + void (*helper_func)(struct worker_pool *pool, + void *fin_result, void *result_frags, int index)); + + +/* Returns the first node in 'hmap' in the bucket in which the given 'hash' + * would land, or a null pointer if that bucket is empty. */ + +static inline struct hmap_node * +hmap_first_in_bucket_num(const struct hmap *hmap, size_t num) +{ + return hmap->buckets[num]; +} + +static inline struct hmap_node * +parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t pool_size) +{ + size_t i; + for (i = start; i <= hmap->mask; i+= pool_size) { + struct hmap_node *node = hmap->buckets[i]; + if (node) { + return node; + } + } + return NULL; +} + +/* Returns the first node in 'hmap', as expected by thread with job_id + * for parallel processing in arbitrary order, or a null pointer if + * the slice of 'hmap' for that job_id is empty. */ +static inline struct hmap_node * +parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t pool_size) +{ + return parallel_hmap_next__(hmap, job_id, pool_size); +} + +/* Returns the next node in the slice of 'hmap' following 'node', + * in arbitrary order, or a * null pointer if 'node' is the last node in + * the 'hmap' slice. + * + */ +static inline struct hmap_node * +parallel_hmap_next(const struct hmap *hmap, + const struct hmap_node *node, ssize_t pool_size) +{ + return (node->next + ? node->next + : parallel_hmap_next__(hmap, + (node->hash & hmap->mask) + pool_size, pool_size)); +} + +/* Use the OVN library functions for stuff which OVS has not defined + * If OVS has defined these, they will still compile using the OVN + * local names, but will be dropped by the linker in favour of the OVS + * supplied functions. + */ + +#define cease_fire() ovn_cease_fire() + +#define add_worker_pool(start) ovn_add_worker_pool(start) + +#define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size) + +#define fast_hmap_init(hmap, size) ovn_fast_hmap_init(hmap, size) + +#define fast_hmap_merge(dest, inc) ovn_fast_hmap_merge(dest, inc) + +#define hmap_merge(dest, inc) ovn_hmap_merge(dest, inc) + +#define ovn_run_pool(pool) ovn_run_pool(pool) + +#define run_pool_hash(pool, result, result_frags) \ + ovn_run_pool_hash(pool, result, result_frags) + +#define run_pool_callback(pool, fin_result, result_frags, helper_func) \ + ovn_run_pool_callback(pool, fin_result, result_frags, helper_func) + +#ifdef __cplusplus +} +#endif + +#endif + +#endif /* lib/fasthmap.h */