@@ -13,6 +13,8 @@ lib_libovn_la_SOURCES = \
lib/expr.c \
lib/extend-table.h \
lib/extend-table.c \
+ lib/ovn-parallel-hmap.h \
+ lib/ovn-parallel-hmap.c \
lib/ip-mcast-index.c \
lib/ip-mcast-index.h \
lib/mcast-group-index.c \
new file mode 100644
@@ -0,0 +1,412 @@
+/*
+ * Copyright (c) 2020 Red Hat, Inc.
+ * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include <stdint.h>
+#include <string.h>
+#include <stdlib.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <semaphore.h>
+#include "fatal-signal.h"
+#include "util.h"
+#include "openvswitch/vlog.h"
+#include "openvswitch/hmap.h"
+#include "openvswitch/thread.h"
+#include "ovn-parallel-hmap.h"
+#include "ovs-atomic.h"
+#include "ovs-thread.h"
+#include "ovs-numa.h"
+#include "random.h"
+
+VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
+
+#ifndef OVS_HAS_PARALLEL_HMAP
+
+#define WORKER_SEM_NAME "%x-%p-%x"
+#define MAIN_SEM_NAME "%x-%p-main"
+
+/* These are accessed under mutex inside add_worker_pool().
+ * They do not need to be atomic.
+ */
+
+static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
+static bool can_parallelize = false;
+
+/* This is set only in the process of exit and the set is
+ * accompanied by a fence. It does not need to be atomic or be
+ * accessed under a lock.
+ */
+
+static bool workers_must_exit = false;
+
+static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
+
+static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
+
+static int pool_size;
+
+static int sembase;
+
+static void worker_pool_hook(void *aux OVS_UNUSED) {
+ int i;
+ static struct worker_pool *pool;
+ char sem_name[256];
+
+ workers_must_exit = true;
+
+ /* All workers must honour the must_exit flag and check for it regularly.
+ * We can make it atomic and check it via atomics in workers, but that
+ * is not really necessary as it is set just once - when the program
+ * terminates. So we use a fence which is invoked before exiting instead.
+ */
+ atomic_thread_fence(memory_order_acq_rel);
+
+ /* Wake up the workers after the must_exit flag has been set */
+
+ LIST_FOR_EACH (pool, list_node, &worker_pools) {
+ for (i = 0; i < pool->size ; i++) {
+ sem_post(pool->controls[i].fire);
+ }
+ for (i = 0; i < pool->size ; i++) {
+ sem_close(pool->controls[i].fire);
+ sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
+ sem_unlink(sem_name);
+ }
+ sem_close(pool->done);
+ sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
+ sem_unlink(sem_name);
+ }
+}
+
+static void setup_worker_pools(void) {
+ int cores, nodes;
+
+ nodes = ovs_numa_get_n_numas();
+ if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
+ nodes = 1;
+ }
+ cores = ovs_numa_get_n_cores();
+
+ /* If there is no NUMA config, use 4 cores.
+ * If there is NUMA config use half the cores on
+ * one node so that the OS does not start pushing
+ * threads to other nodes.
+ */
+ if (cores == OVS_CORE_UNSPEC || cores <= 0) {
+ /* If there is no NUMA we can try the ovs-threads routine.
+ * It falls back to sysconf and/or affinity mask.
+ */
+ cores = count_cpu_cores();
+ pool_size = cores;
+ } else {
+ pool_size = cores / nodes;
+ }
+ if (pool_size > 16) {
+ pool_size = 16;
+ }
+ can_parallelize = (pool_size >= 3);
+ fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
+ sembase = random_uint32();
+}
+
+bool ovn_stop_parallel_processing(void)
+{
+ return workers_must_exit;
+}
+
+bool ovn_can_parallelize_hashes(void)
+{
+ bool test = false;
+
+ if (atomic_compare_exchange_strong(
+ &initial_pool_setup,
+ &test,
+ true)) {
+ ovs_mutex_lock(&init_mutex);
+ setup_worker_pools();
+ ovs_mutex_unlock(&init_mutex);
+ }
+ return can_parallelize;
+}
+
+struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)){
+
+ struct worker_pool *new_pool = NULL;
+ struct worker_control *new_control;
+ bool test = false;
+ int i;
+ char sem_name[256];
+
+
+ if (atomic_compare_exchange_strong(
+ &initial_pool_setup,
+ &test,
+ true)) {
+ ovs_mutex_lock(&init_mutex);
+ setup_worker_pools();
+ ovs_mutex_unlock(&init_mutex);
+ }
+
+ ovs_mutex_lock(&init_mutex);
+ if (can_parallelize) {
+ new_pool = xmalloc(sizeof(struct worker_pool));
+ new_pool->size = pool_size;
+ sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
+ new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
+
+ ovs_list_push_back(&worker_pools, &new_pool->list_node);
+
+ new_pool->controls =
+ xmalloc(sizeof(struct worker_control) * new_pool->size);
+
+ for (i = 0; i < new_pool->size; i++) {
+ new_control = &new_pool->controls[i];
+ sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
+ new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
+ new_control->id = i;
+ new_control->done = new_pool->done;
+ new_control->data = NULL;
+ ovs_mutex_init(&new_control->mutex);
+ new_control->finished = ATOMIC_VAR_INIT(false);
+ }
+
+ for (i = 0; i < pool_size; i++) {
+ ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
+ }
+ }
+ ovs_mutex_unlock(&init_mutex);
+ return new_pool;
+}
+
+
+/* Initializes 'hmap' as an empty hash table with mask N. */
+void
+ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
+{
+ size_t i;
+
+ hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1));
+ hmap->one = NULL;
+ hmap->mask = mask;
+ hmap->n = 0;
+ for (i = 0; i <= hmap->mask; i++) {
+ hmap->buckets[i] = NULL;
+ }
+}
+
+/* Initializes 'hmap' as an empty hash table of size X.
+ * Intended for use in parallel processing so that all
+ * fragments used to store results in a parallel job
+ * are the same size.
+ */
+void
+ovn_fast_hmap_size_for(struct hmap *hmap, int size)
+{
+ size_t mask;
+ mask = size / 2;
+ mask |= mask >> 1;
+ mask |= mask >> 2;
+ mask |= mask >> 4;
+ mask |= mask >> 8;
+ mask |= mask >> 16;
+#if SIZE_MAX > UINT32_MAX
+ mask |= mask >> 32;
+#endif
+
+ /* If we need to dynamically allocate buckets we might as well allocate at
+ * least 4 of them. */
+ mask |= (mask & 1) << 1;
+
+ fast_hmap_init(hmap, mask);
+}
+
+/* Run a thread pool which uses a callback function to process results
+ */
+
+void ovn_run_pool_callback(struct worker_pool *pool,
+ void *fin_result, void *result_frags,
+ void (*helper_func)(struct worker_pool *pool,
+ void *fin_result,
+ void *result_frags, int index))
+{
+ int index, completed;
+
+ /* Ensure that all worker threads see the same data as the
+ * main thread.
+ */
+
+ atomic_thread_fence(memory_order_acq_rel);
+
+ /* Start workers */
+
+ for (index = 0; index < pool->size; index++) {
+ sem_post(pool->controls[index].fire);
+ }
+
+ completed = 0;
+
+ do {
+ bool test;
+ /* Note - we do not loop on semaphore until it reaches
+ * zero, but on pool size/remaining workers.
+ * This is by design. If the inner loop can handle
+ * completion for more than one worker within an iteration
+ * it will do so to ensure no additional iterations and
+ * waits once all of them are done.
+ *
+ * This may result in us having an initial positive value
+ * of the semaphore when the pool is invoked the next time.
+ * This is harmless - the loop will spin up a couple of times
+ * doing nothing while the workers are processing their data
+ * slices.
+ */
+ sem_wait(pool->done);
+ for (index = 0; index < pool->size; index++) {
+ test = true;
+ /* If the worker has marked its data chunk as complete,
+ * invoke the helper function to combine the results of
+ * this worker into the main result.
+ *
+ * The worker must invoke an appropriate memory fence
+ * (most likely acq_rel) to ensure that the main thread
+ * sees all of the results produced by the worker.
+ */
+ if (atomic_compare_exchange_weak(
+ &pool->controls[index].finished,
+ &test,
+ false)) {
+ if (helper_func) {
+ (helper_func)(pool, fin_result, result_frags, index);
+ }
+ completed++;
+ pool->controls[index].data = NULL;
+ }
+ }
+ } while (completed < pool->size);
+}
+
+/* Run a thread pool - basic, does not do results processing.
+ */
+
+void ovn_run_pool(struct worker_pool *pool)
+{
+ run_pool_callback(pool, NULL, NULL, NULL);
+}
+
+/* Brute force merge of a hashmap into another hashmap.
+ * Intended for use in parallel processing. The destination
+ * hashmap MUST be the same size as the one being merged.
+ *
+ * This can be achieved by pre-allocating them to correct size
+ * and using hmap_insert_fast() instead of hmap_insert()
+ */
+
+void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc)
+{
+ size_t i;
+
+ ovs_assert(inc->mask == dest->mask);
+
+ if (!inc->n) {
+ /* Request to merge an empty frag, nothing to do */
+ return;
+ }
+
+ for (i = 0; i <= dest->mask; i++) {
+ struct hmap_node **dest_bucket = &dest->buckets[i];
+ struct hmap_node **inc_bucket = &inc->buckets[i];
+ if (*inc_bucket != NULL) {
+ struct hmap_node *last_node = *inc_bucket;
+ while (last_node->next != NULL) {
+ last_node = last_node->next;
+ }
+ last_node->next = *dest_bucket;
+ *dest_bucket = *inc_bucket;
+ *inc_bucket = NULL;
+ }
+ }
+ dest->n += inc->n;
+ inc->n = 0;
+}
+
+/* Run a thread pool which gathers results in an array
+ * of hashes. Merge results.
+ */
+
+static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
+ void *fin_result, void *result_frags,
+ int index)
+{
+ struct hmap *result = (struct hmap *)fin_result;
+ struct hmap *res_frags = (struct hmap *)result_frags;
+
+ fast_hmap_merge(result, &res_frags[index]);
+ hmap_destroy(&res_frags[index]);
+}
+
+
+void ovn_run_pool_hash(
+ struct worker_pool *pool,
+ struct hmap *result,
+ struct hmap *result_frags)
+{
+ run_pool_callback(pool, result, result_frags, merge_hash_results);
+}
+
+/* Run a thread pool which gathers results in an array of lists.
+ * Merge results.
+ */
+
+static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
+ void *fin_result, void *result_frags,
+ int index)
+{
+ struct ovs_list *result = (struct ovs_list *)fin_result;
+ struct ovs_list *res_frags = (struct ovs_list *)result_frags;
+
+ if (!ovs_list_is_empty(&res_frags[index])) {
+ ovs_list_splice(result->next,
+ ovs_list_front(&res_frags[index]), &res_frags[index]);
+ }
+}
+
+
+void ovn_run_pool_list(
+ struct worker_pool *pool,
+ struct ovs_list *result,
+ struct ovs_list *result_frags)
+{
+ run_pool_callback(pool, result, result_frags, merge_list_results);
+}
+
+void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl)
+{
+ int i;
+ if (hrl->mask != lflows->mask) {
+ if (hrl->row_locks) {
+ free(hrl->row_locks);
+ }
+ hrl->row_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask + 1);
+ hrl->mask = lflows->mask;
+ for (i = 0; i <= lflows->mask; i++) {
+ ovs_mutex_init(&hrl->row_locks[i]);
+ }
+ }
+}
+
+#endif
new file mode 100644
@@ -0,0 +1,285 @@
+/*
+ * Copyright (c) 2020 Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef OVN_PARALLEL_HMAP
+#define OVN_PARALLEL_HMAP 1
+
+/* if the parallel macros are defined by hmap.h or any other ovs define
+ * we skip over the ovn specific definitions.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdbool.h>
+#include <stdlib.h>
+#include <semaphore.h>
+#include "openvswitch/util.h"
+#include "openvswitch/hmap.h"
+#include "openvswitch/thread.h"
+#include "ovs-atomic.h"
+
+/* Process this include only if OVS does not supply parallel definitions
+ */
+
+#ifdef OVS_HAS_PARALLEL_HMAP
+
+#include "parallel-hmap.h"
+
+#else
+
+
+#ifdef __clang__
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wthread-safety"
+#endif
+
+
+/* A version of the HMAP_FOR_EACH macro intended for iterating as part
+ * of parallel processing.
+ * Each worker thread has a different ThreadID in the range of 0..POOL_SIZE
+ * and will iterate hash buckets ThreadID, ThreadID + step,
+ * ThreadID + step * 2, etc. The actual macro accepts
+ * ThreadID + step * i as the JOBID parameter.
+ */
+
+#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \
+ for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER); \
+ (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \
+ || ((NODE = NULL), false); \
+ ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), MEMBER))
+
+/* We do not have a SAFE version of the macro, because the hash size is not
+ * atomic and hash removal operations would need to be wrapped with
+ * locks. This will defeat most of the benefits from doing anything in
+ * parallel.
+ * If the code block inside FOR_EACH_IN_PARALLEL needs to remove elements,
+ * each thread should store them in a temporary list result instead, merging
+ * the lists into a combined result at the end */
+
+/* Work "Handle" */
+
+struct worker_control {
+ int id; /* Used as a modulo when iterating over a hash. */
+ atomic_bool finished; /* Set to true after achunk of work is complete. */
+ sem_t *fire; /* Work start semaphore - sem_post starts the worker. */
+ sem_t *done; /* Work completion semaphore - sem_post on completion. */
+ struct ovs_mutex mutex; /* Guards the data. */
+ void *data; /* Pointer to data to be processed. */
+ void *workload; /* back-pointer to the worker pool structure. */
+};
+
+struct worker_pool {
+ int size; /* Number of threads in the pool. */
+ struct ovs_list list_node; /* List of pools - used in cleanup/exit. */
+ struct worker_control *controls; /* "Handles" in this pool. */
+ sem_t *done; /* Work completion semaphorew. */
+};
+
+/* Add a worker pool for thread function start() which expects a pointer to
+ * a worker_control structure as an argument. */
+
+struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
+
+/* Setting this to true will make all processing threads exit */
+
+bool ovn_stop_parallel_processing(void);
+
+/* Build a hmap pre-sized for size elements */
+
+void ovn_fast_hmap_size_for(struct hmap *hmap, int size);
+
+/* Build a hmap with a mask equals to size */
+
+void ovn_fast_hmap_init(struct hmap *hmap, ssize_t size);
+
+/* Brute-force merge a hmap into hmap.
+ * Dest and inc have to have the same mask. The merge is performed
+ * by extending the element list for bucket N in the dest hmap with the list
+ * from bucket N in inc.
+ */
+
+void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc);
+
+/* Run a pool, without any default processing of results.
+ */
+
+void ovn_run_pool(struct worker_pool *pool);
+
+/* Run a pool, merge results from hash frags into a final hash result.
+ * The hash frags must be pre-sized to the same size.
+ */
+
+void ovn_run_pool_hash(struct worker_pool *pool,
+ struct hmap *result, struct hmap *result_frags);
+/* Run a pool, merge results from list frags into a final list result.
+ */
+
+void ovn_run_pool_list(struct worker_pool *pool,
+ struct ovs_list *result, struct ovs_list *result_frags);
+
+/* Run a pool, call a callback function to perform processing of results.
+ */
+
+void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result,
+ void *result_frags,
+ void (*helper_func)(struct worker_pool *pool,
+ void *fin_result, void *result_frags, int index));
+
+
+/* Returns the first node in 'hmap' in the bucket in which the given 'hash'
+ * would land, or a null pointer if that bucket is empty. */
+
+static inline struct hmap_node *
+hmap_first_in_bucket_num(const struct hmap *hmap, size_t num)
+{
+ return hmap->buckets[num];
+}
+
+static inline struct hmap_node *
+parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t pool_size)
+{
+ size_t i;
+ for (i = start; i <= hmap->mask; i+= pool_size) {
+ struct hmap_node *node = hmap->buckets[i];
+ if (node) {
+ return node;
+ }
+ }
+ return NULL;
+}
+
+/* Returns the first node in 'hmap', as expected by thread with job_id
+ * for parallel processing in arbitrary order, or a null pointer if
+ * the slice of 'hmap' for that job_id is empty. */
+static inline struct hmap_node *
+parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t pool_size)
+{
+ return parallel_hmap_next__(hmap, job_id, pool_size);
+}
+
+/* Returns the next node in the slice of 'hmap' following 'node',
+ * in arbitrary order, or a * null pointer if 'node' is the last node in
+ * the 'hmap' slice.
+ *
+ */
+static inline struct hmap_node *
+parallel_hmap_next(const struct hmap *hmap,
+ const struct hmap_node *node, ssize_t pool_size)
+{
+ return (node->next
+ ? node->next
+ : parallel_hmap_next__(hmap,
+ (node->hash & hmap->mask) + pool_size, pool_size));
+}
+
+static inline void post_completed_work(struct worker_control *control)
+{
+ atomic_thread_fence(memory_order_acq_rel);
+ atomic_store_relaxed(&control->finished, true);
+ sem_post(control->done);
+}
+
+static inline void wait_for_work(struct worker_control *control)
+{
+ sem_wait(control->fire);
+}
+
+/* Hash per-row locking support - to be used only in conjunction
+ * with fast hash inserts. Normal hash inserts may resize the hash
+ * rendering the locking invalid.
+ */
+
+struct hashrow_locks {
+ ssize_t mask;
+ struct ovs_mutex *row_locks;
+};
+
+/* Update an hash row locks structure to match the current hash size */
+
+void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl);
+
+/* Lock a hash row */
+
+static inline void lock_hash_row(struct hashrow_locks *hrl, uint32_t hash)
+{
+ ovs_mutex_lock(&hrl->row_locks[hash % hrl->mask]);
+}
+
+/* Unlock a hash row */
+
+static inline void unlock_hash_row(struct hashrow_locks *hrl, uint32_t hash)
+{
+ ovs_mutex_unlock(&hrl->row_locks[hash % hrl->mask]);
+}
+/* Init the row locks structure */
+
+static inline void init_hash_row_locks(struct hashrow_locks *hrl)
+{
+ hrl->mask = 0;
+ hrl->row_locks = NULL;
+}
+
+bool ovn_can_parallelize_hashes(void);
+
+/* Use the OVN library functions for stuff which OVS has not defined
+ * If OVS has defined these, they will still compile using the OVN
+ * local names, but will be dropped by the linker in favour of the OVS
+ * supplied functions.
+ */
+
+#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, hrl)
+
+#define can_parallelize_hashes() ovn_can_parallelize_hashes()
+
+#define stop_parallel_processing() ovn_stop_parallel_processing()
+
+#define add_worker_pool(start) ovn_add_worker_pool(start)
+
+#define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size)
+
+#define fast_hmap_init(hmap, size) ovn_fast_hmap_init(hmap, size)
+
+#define fast_hmap_merge(dest, inc) ovn_fast_hmap_merge(dest, inc)
+
+#define hmap_merge(dest, inc) ovn_hmap_merge(dest, inc)
+
+#define ovn_run_pool(pool) ovn_run_pool(pool)
+
+#define run_pool_hash(pool, result, result_frags) \
+ ovn_run_pool_hash(pool, result, result_frags)
+
+#define run_pool_list(pool, result, result_frags) \
+ ovn_run_pool_list(pool, result, result_frags)
+
+#define run_pool_callback(pool, fin_result, result_frags, helper_func) \
+ ovn_run_pool_callback(pool, fin_result, result_frags, helper_func)
+
+
+
+#ifdef __clang__
+#pragma clang diagnostic pop
+#endif
+
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+
+#endif /* lib/fasthmap.h */