From patchwork Mon Mar 1 13:04:25 2021 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Anton Ivanov X-Patchwork-Id: 1445564 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Authentication-Results: ozlabs.org; spf=pass (sender SPF authorized) smtp.mailfrom=openvswitch.org (client-ip=140.211.166.137; helo=smtp4.osuosl.org; envelope-from=ovs-dev-bounces@openvswitch.org; receiver=) Received: from smtp4.osuosl.org (smtp4.osuosl.org [140.211.166.137]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (4096 bits) server-digest SHA256) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 4Dq0pq2NrDz9sVv for ; Tue, 2 Mar 2021 00:04:51 +1100 (AEDT) Received: from localhost (localhost [127.0.0.1]) by smtp4.osuosl.org (Postfix) with ESMTP id 2FC6B4F249; Mon, 1 Mar 2021 13:04:49 +0000 (UTC) X-Virus-Scanned: amavisd-new at osuosl.org Received: from smtp4.osuosl.org ([127.0.0.1]) by localhost (smtp4.osuosl.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id CdmEr2enIR9v; Mon, 1 Mar 2021 13:04:46 +0000 (UTC) Received: from lists.linuxfoundation.org (lf-lists.osuosl.org [140.211.9.56]) by smtp4.osuosl.org (Postfix) with ESMTP id 8374C4F21D; Mon, 1 Mar 2021 13:04:44 +0000 (UTC) Received: from lf-lists.osuosl.org (localhost [127.0.0.1]) by lists.linuxfoundation.org (Postfix) with ESMTP id 56CA0C0017; Mon, 1 Mar 2021 13:04:43 +0000 (UTC) X-Original-To: ovs-dev@openvswitch.org Delivered-To: ovs-dev@lists.linuxfoundation.org Received: from smtp1.osuosl.org (smtp1.osuosl.org [140.211.166.138]) by lists.linuxfoundation.org (Postfix) with ESMTP id B7989C0001 for ; Mon, 1 Mar 2021 13:04:40 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by smtp1.osuosl.org (Postfix) with ESMTP id 9F0D884174 for ; Mon, 1 Mar 2021 13:04:40 +0000 (UTC) X-Virus-Scanned: amavisd-new at osuosl.org Received: from smtp1.osuosl.org ([127.0.0.1]) by localhost (smtp1.osuosl.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id 40ZnVZ1gjQGb for ; Mon, 1 Mar 2021 13:04:38 +0000 (UTC) X-Greylist: from auto-whitelisted by SQLgrey-1.8.0 Received: from www.kot-begemot.co.uk (ivanoab7.miniserver.com [37.128.132.42]) by smtp1.osuosl.org (Postfix) with ESMTPS id 084188429C for ; Mon, 1 Mar 2021 13:04:37 +0000 (UTC) Received: from tun252.jain.kot-begemot.co.uk ([192.168.18.6] helo=jain.kot-begemot.co.uk) by www.kot-begemot.co.uk with esmtps (TLS1.3:ECDHE_RSA_AES_256_GCM_SHA384:256) (Exim 4.92) (envelope-from ) id 1lGiDy-00072I-AQ; Mon, 01 Mar 2021 13:04:35 +0000 Received: from jain.kot-begemot.co.uk ([192.168.3.3]) by jain.kot-begemot.co.uk with esmtp (Exim 4.92) (envelope-from ) id 1lGiDu-0005VP-0X; Mon, 01 Mar 2021 13:04:32 +0000 From: anton.ivanov@cambridgegreys.com To: ovs-dev@openvswitch.org Date: Mon, 1 Mar 2021 13:04:25 +0000 Message-Id: <20210301130427.21069-1-anton.ivanov@cambridgegreys.com> X-Mailer: git-send-email 2.20.1 MIME-Version: 1.0 X-Clacks-Overhead: GNU Terry Pratchett Cc: i.maximets@ovn.org, Anton Ivanov Subject: [ovs-dev] [OVN Patch v15 1/3] ovn-libs: Add support for parallel processing X-BeenThere: ovs-dev@openvswitch.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: ovs-dev-bounces@openvswitch.org Sender: "dev" From: Anton Ivanov This adds a set of functions and macros intended to process hashes in parallel. The principles of operation are documented in the ovn-parallel-hmap.h If these one day go into the OVS tree, the OVS tree versions would be used in preference. Signed-off-by: Anton Ivanov --- lib/automake.mk | 2 + lib/ovn-parallel-hmap.c | 455 ++++++++++++++++++++++++++++++++++++++++ lib/ovn-parallel-hmap.h | 301 ++++++++++++++++++++++++++ 3 files changed, 758 insertions(+) create mode 100644 lib/ovn-parallel-hmap.c create mode 100644 lib/ovn-parallel-hmap.h diff --git a/lib/automake.mk b/lib/automake.mk index 250c7aefa..781be2109 100644 --- a/lib/automake.mk +++ b/lib/automake.mk @@ -13,6 +13,8 @@ lib_libovn_la_SOURCES = \ lib/expr.c \ lib/extend-table.h \ lib/extend-table.c \ + lib/ovn-parallel-hmap.h \ + lib/ovn-parallel-hmap.c \ lib/ip-mcast-index.c \ lib/ip-mcast-index.h \ lib/mcast-group-index.c \ diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c new file mode 100644 index 000000000..e83ae23cb --- /dev/null +++ b/lib/ovn-parallel-hmap.c @@ -0,0 +1,455 @@ +/* + * Copyright (c) 2020 Red Hat, Inc. + * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include "fatal-signal.h" +#include "util.h" +#include "openvswitch/vlog.h" +#include "openvswitch/hmap.h" +#include "openvswitch/thread.h" +#include "ovn-parallel-hmap.h" +#include "ovs-atomic.h" +#include "ovs-thread.h" +#include "ovs-numa.h" +#include "random.h" + +VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap); + +#ifndef OVS_HAS_PARALLEL_HMAP + +#define WORKER_SEM_NAME "%x-%p-%x" +#define MAIN_SEM_NAME "%x-%p-main" + +/* These are accessed under mutex inside add_worker_pool(). + * They do not need to be atomic. + */ + +static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false); +static bool can_parallelize = false; + +/* This is set only in the process of exit and the set is + * accompanied by a fence. It does not need to be atomic or be + * accessed under a lock. + */ + +static bool workers_must_exit = false; + +static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools); + +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER; + +static int pool_size; + +static int sembase; + +static void worker_pool_hook(void *aux OVS_UNUSED); +static void setup_worker_pools(bool force); +static void merge_list_results(struct worker_pool *pool OVS_UNUSED, + void *fin_result, void *result_frags, + int index); +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED, + void *fin_result, void *result_frags, + int index); + +bool ovn_stop_parallel_processing(void) +{ + return workers_must_exit; +} + +bool ovn_can_parallelize_hashes(bool force_parallel) +{ + bool test = false; + + if (atomic_compare_exchange_strong( + &initial_pool_setup, + &test, + true)) { + ovs_mutex_lock(&init_mutex); + setup_worker_pools(force_parallel); + ovs_mutex_unlock(&init_mutex); + } + return can_parallelize; +} + +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)){ + + struct worker_pool *new_pool = NULL; + struct worker_control *new_control; + bool test = false; + int i; + char sem_name[256]; + + + /* Belt and braces - initialize the pool system just in case if + * if it is not yet initialized. + */ + + if (atomic_compare_exchange_strong( + &initial_pool_setup, + &test, + true)) { + ovs_mutex_lock(&init_mutex); + setup_worker_pools(false); + ovs_mutex_unlock(&init_mutex); + } + + ovs_mutex_lock(&init_mutex); + if (can_parallelize) { + new_pool = xmalloc(sizeof(struct worker_pool)); + new_pool->size = pool_size; + new_pool->controls = NULL; + sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool); + new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0); + if (new_pool->done == SEM_FAILED) { + goto cleanup; + } + + new_pool->controls = + xmalloc(sizeof(struct worker_control) * new_pool->size); + + for (i = 0; i < new_pool->size; i++) { + new_control = &new_pool->controls[i]; + new_control->id = i; + new_control->done = new_pool->done; + new_control->data = NULL; + ovs_mutex_init(&new_control->mutex); + new_control->finished = ATOMIC_VAR_INIT(false); + sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i); + new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0); + if (new_control->fire == SEM_FAILED) { + goto cleanup; + } + } + + for (i = 0; i < pool_size; i++) { + ovs_thread_create("worker pool helper", start, &new_pool->controls[i]); + } + ovs_list_push_back(&worker_pools, &new_pool->list_node); + } + ovs_mutex_unlock(&init_mutex); + return new_pool; +cleanup: + + /* Something went wrong when opening semaphores. In this case + * it is better to shut off parallel procesing altogether + */ + + VLOG_INFO("Failed to initialize parallel processing, error %d", errno); + can_parallelize = false; + if (new_pool->controls) { + for (i = 0; i < new_pool->size; i++) { + if (new_pool->controls[i].fire != SEM_FAILED) { + sem_close(new_pool->controls[i].fire); + sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i); + sem_unlink(sem_name); + break; /* semaphores past this one are uninitialized */ + } + } + } + if (new_pool->done != SEM_FAILED) { + sem_close(new_pool->done); + sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool); + sem_unlink(sem_name); + } + ovs_mutex_unlock(&init_mutex); + return NULL; +} + + +/* Initializes 'hmap' as an empty hash table with mask N. */ +void +ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask) +{ + size_t i; + + hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1)); + hmap->one = NULL; + hmap->mask = mask; + hmap->n = 0; + for (i = 0; i <= hmap->mask; i++) { + hmap->buckets[i] = NULL; + } +} + +/* Initializes 'hmap' as an empty hash table of size X. + * Intended for use in parallel processing so that all + * fragments used to store results in a parallel job + * are the same size. + */ +void +ovn_fast_hmap_size_for(struct hmap *hmap, int size) +{ + size_t mask; + mask = size / 2; + mask |= mask >> 1; + mask |= mask >> 2; + mask |= mask >> 4; + mask |= mask >> 8; + mask |= mask >> 16; +#if SIZE_MAX > UINT32_MAX + mask |= mask >> 32; +#endif + + /* If we need to dynamically allocate buckets we might as well allocate at + * least 4 of them. */ + mask |= (mask & 1) << 1; + + fast_hmap_init(hmap, mask); +} + +/* Run a thread pool which uses a callback function to process results + */ + +void ovn_run_pool_callback(struct worker_pool *pool, + void *fin_result, void *result_frags, + void (*helper_func)(struct worker_pool *pool, + void *fin_result, + void *result_frags, int index)) +{ + int index, completed; + + /* Ensure that all worker threads see the same data as the + * main thread. + */ + + atomic_thread_fence(memory_order_acq_rel); + + /* Start workers */ + + for (index = 0; index < pool->size; index++) { + sem_post(pool->controls[index].fire); + } + + completed = 0; + + do { + bool test; + /* Note - we do not loop on semaphore until it reaches + * zero, but on pool size/remaining workers. + * This is by design. If the inner loop can handle + * completion for more than one worker within an iteration + * it will do so to ensure no additional iterations and + * waits once all of them are done. + * + * This may result in us having an initial positive value + * of the semaphore when the pool is invoked the next time. + * This is harmless - the loop will spin up a couple of times + * doing nothing while the workers are processing their data + * slices. + */ + wait_for_work_completion(pool); + for (index = 0; index < pool->size; index++) { + test = true; + /* If the worker has marked its data chunk as complete, + * invoke the helper function to combine the results of + * this worker into the main result. + * + * The worker must invoke an appropriate memory fence + * (most likely acq_rel) to ensure that the main thread + * sees all of the results produced by the worker. + */ + if (atomic_compare_exchange_weak( + &pool->controls[index].finished, + &test, + false)) { + if (helper_func) { + (helper_func)(pool, fin_result, result_frags, index); + } + completed++; + pool->controls[index].data = NULL; + } + } + } while (completed < pool->size); +} + +/* Run a thread pool - basic, does not do results processing. + */ + +void ovn_run_pool(struct worker_pool *pool) +{ + run_pool_callback(pool, NULL, NULL, NULL); +} + +/* Brute force merge of a hashmap into another hashmap. + * Intended for use in parallel processing. The destination + * hashmap MUST be the same size as the one being merged. + * + * This can be achieved by pre-allocating them to correct size + * and using hmap_insert_fast() instead of hmap_insert() + */ + +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc) +{ + size_t i; + + ovs_assert(inc->mask == dest->mask); + + if (!inc->n) { + /* Request to merge an empty frag, nothing to do */ + return; + } + + for (i = 0; i <= dest->mask; i++) { + struct hmap_node **dest_bucket = &dest->buckets[i]; + struct hmap_node **inc_bucket = &inc->buckets[i]; + if (*inc_bucket != NULL) { + struct hmap_node *last_node = *inc_bucket; + while (last_node->next != NULL) { + last_node = last_node->next; + } + last_node->next = *dest_bucket; + *dest_bucket = *inc_bucket; + *inc_bucket = NULL; + } + } + dest->n += inc->n; + inc->n = 0; +} + +/* Run a thread pool which gathers results in an array + * of hashes. Merge results. + */ + + +void ovn_run_pool_hash( + struct worker_pool *pool, + struct hmap *result, + struct hmap *result_frags) +{ + run_pool_callback(pool, result, result_frags, merge_hash_results); +} + +/* Run a thread pool which gathers results in an array of lists. + * Merge results. + */ +void ovn_run_pool_list( + struct worker_pool *pool, + struct ovs_list *result, + struct ovs_list *result_frags) +{ + run_pool_callback(pool, result, result_frags, merge_list_results); +} + +void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl) +{ + int i; + if (hrl->mask != lflows->mask) { + if (hrl->row_locks) { + free(hrl->row_locks); + } + hrl->row_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask + 1); + hrl->mask = lflows->mask; + for (i = 0; i <= lflows->mask; i++) { + ovs_mutex_init(&hrl->row_locks[i]); + } + } +} + +static void worker_pool_hook(void *aux OVS_UNUSED) { + int i; + static struct worker_pool *pool; + char sem_name[256]; + + workers_must_exit = true; + + /* All workers must honour the must_exit flag and check for it regularly. + * We can make it atomic and check it via atomics in workers, but that + * is not really necessary as it is set just once - when the program + * terminates. So we use a fence which is invoked before exiting instead. + */ + atomic_thread_fence(memory_order_acq_rel); + + /* Wake up the workers after the must_exit flag has been set */ + + LIST_FOR_EACH (pool, list_node, &worker_pools) { + for (i = 0; i < pool->size ; i++) { + sem_post(pool->controls[i].fire); + } + for (i = 0; i < pool->size ; i++) { + sem_close(pool->controls[i].fire); + sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i); + sem_unlink(sem_name); + } + sem_close(pool->done); + sprintf(sem_name, MAIN_SEM_NAME, sembase, pool); + sem_unlink(sem_name); + } +} + +static void setup_worker_pools(bool force) { + int cores, nodes; + + nodes = ovs_numa_get_n_numas(); + if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) { + nodes = 1; + } + cores = ovs_numa_get_n_cores(); + + /* If there is no NUMA config, use 4 cores. + * If there is NUMA config use half the cores on + * one node so that the OS does not start pushing + * threads to other nodes. + */ + if (cores == OVS_CORE_UNSPEC || cores <= 0) { + /* If there is no NUMA we can try the ovs-threads routine. + * It falls back to sysconf and/or affinity mask. + */ + cores = count_cpu_cores(); + pool_size = cores; + } else { + pool_size = cores / nodes; + } + if ((pool_size < 4) && force) { + pool_size = 4; + } + can_parallelize = (pool_size >= 3); + fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true); + sembase = random_uint32(); +} + +static void merge_list_results(struct worker_pool *pool OVS_UNUSED, + void *fin_result, void *result_frags, + int index) +{ + struct ovs_list *result = (struct ovs_list *)fin_result; + struct ovs_list *res_frags = (struct ovs_list *)result_frags; + + if (!ovs_list_is_empty(&res_frags[index])) { + ovs_list_splice(result->next, + ovs_list_front(&res_frags[index]), &res_frags[index]); + } +} + +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED, + void *fin_result, void *result_frags, + int index) +{ + struct hmap *result = (struct hmap *)fin_result; + struct hmap *res_frags = (struct hmap *)result_frags; + + fast_hmap_merge(result, &res_frags[index]); + hmap_destroy(&res_frags[index]); +} + +#endif diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h new file mode 100644 index 000000000..8db61eaba --- /dev/null +++ b/lib/ovn-parallel-hmap.h @@ -0,0 +1,301 @@ +/* + * Copyright (c) 2020 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OVN_PARALLEL_HMAP +#define OVN_PARALLEL_HMAP 1 + +/* if the parallel macros are defined by hmap.h or any other ovs define + * we skip over the ovn specific definitions. + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include +#include "openvswitch/util.h" +#include "openvswitch/hmap.h" +#include "openvswitch/thread.h" +#include "ovs-atomic.h" + +/* Process this include only if OVS does not supply parallel definitions + */ + +#ifdef OVS_HAS_PARALLEL_HMAP + +#include "parallel-hmap.h" + +#else + + +#ifdef __clang__ +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wthread-safety" +#endif + + +/* A version of the HMAP_FOR_EACH macro intended for iterating as part + * of parallel processing. + * Each worker thread has a different ThreadID in the range of 0..POOL_SIZE + * and will iterate hash buckets ThreadID, ThreadID + step, + * ThreadID + step * 2, etc. The actual macro accepts + * ThreadID + step * i as the JOBID parameter. + */ + +#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \ + for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER); \ + (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \ + || ((NODE = NULL), false); \ + ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), MEMBER)) + +/* We do not have a SAFE version of the macro, because the hash size is not + * atomic and hash removal operations would need to be wrapped with + * locks. This will defeat most of the benefits from doing anything in + * parallel. + * If the code block inside FOR_EACH_IN_PARALLEL needs to remove elements, + * each thread should store them in a temporary list result instead, merging + * the lists into a combined result at the end */ + +/* Work "Handle" */ + +struct worker_control { + int id; /* Used as a modulo when iterating over a hash. */ + atomic_bool finished; /* Set to true after achunk of work is complete. */ + sem_t *fire; /* Work start semaphore - sem_post starts the worker. */ + sem_t *done; /* Work completion semaphore - sem_post on completion. */ + struct ovs_mutex mutex; /* Guards the data. */ + void *data; /* Pointer to data to be processed. */ + void *workload; /* back-pointer to the worker pool structure. */ +}; + +struct worker_pool { + int size; /* Number of threads in the pool. */ + struct ovs_list list_node; /* List of pools - used in cleanup/exit. */ + struct worker_control *controls; /* "Handles" in this pool. */ + sem_t *done; /* Work completion semaphorew. */ +}; + +/* Add a worker pool for thread function start() which expects a pointer to + * a worker_control structure as an argument. */ + +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)); + +/* Setting this to true will make all processing threads exit */ + +bool ovn_stop_parallel_processing(void); + +/* Build a hmap pre-sized for size elements */ + +void ovn_fast_hmap_size_for(struct hmap *hmap, int size); + +/* Build a hmap with a mask equals to size */ + +void ovn_fast_hmap_init(struct hmap *hmap, ssize_t size); + +/* Brute-force merge a hmap into hmap. + * Dest and inc have to have the same mask. The merge is performed + * by extending the element list for bucket N in the dest hmap with the list + * from bucket N in inc. + */ + +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc); + +/* Run a pool, without any default processing of results. + */ + +void ovn_run_pool(struct worker_pool *pool); + +/* Run a pool, merge results from hash frags into a final hash result. + * The hash frags must be pre-sized to the same size. + */ + +void ovn_run_pool_hash(struct worker_pool *pool, + struct hmap *result, struct hmap *result_frags); +/* Run a pool, merge results from list frags into a final list result. + */ + +void ovn_run_pool_list(struct worker_pool *pool, + struct ovs_list *result, struct ovs_list *result_frags); + +/* Run a pool, call a callback function to perform processing of results. + */ + +void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result, + void *result_frags, + void (*helper_func)(struct worker_pool *pool, + void *fin_result, void *result_frags, int index)); + + +/* Returns the first node in 'hmap' in the bucket in which the given 'hash' + * would land, or a null pointer if that bucket is empty. */ + +static inline struct hmap_node * +hmap_first_in_bucket_num(const struct hmap *hmap, size_t num) +{ + return hmap->buckets[num]; +} + +static inline struct hmap_node * +parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t pool_size) +{ + size_t i; + for (i = start; i <= hmap->mask; i+= pool_size) { + struct hmap_node *node = hmap->buckets[i]; + if (node) { + return node; + } + } + return NULL; +} + +/* Returns the first node in 'hmap', as expected by thread with job_id + * for parallel processing in arbitrary order, or a null pointer if + * the slice of 'hmap' for that job_id is empty. */ +static inline struct hmap_node * +parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t pool_size) +{ + return parallel_hmap_next__(hmap, job_id, pool_size); +} + +/* Returns the next node in the slice of 'hmap' following 'node', + * in arbitrary order, or a * null pointer if 'node' is the last node in + * the 'hmap' slice. + * + */ +static inline struct hmap_node * +parallel_hmap_next(const struct hmap *hmap, + const struct hmap_node *node, ssize_t pool_size) +{ + return (node->next + ? node->next + : parallel_hmap_next__(hmap, + (node->hash & hmap->mask) + pool_size, pool_size)); +} + +static inline void post_completed_work(struct worker_control *control) +{ + atomic_thread_fence(memory_order_acq_rel); + atomic_store_relaxed(&control->finished, true); + sem_post(control->done); +} + +static inline void wait_for_work(struct worker_control *control) +{ + int ret; + + do { + ret = sem_wait(control->fire); + } while ((ret == -1) && (errno == EINTR)); + ovs_assert(ret == 0); +} +static inline void wait_for_work_completion(struct worker_pool *pool) +{ + int ret; + + do { + ret = sem_wait(pool->done); + } while ((ret == -1) && (errno == EINTR)); + ovs_assert(ret == 0); +} + + +/* Hash per-row locking support - to be used only in conjunction + * with fast hash inserts. Normal hash inserts may resize the hash + * rendering the locking invalid. + */ + +struct hashrow_locks { + ssize_t mask; + struct ovs_mutex *row_locks; +}; + +/* Update an hash row locks structure to match the current hash size */ + +void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl); + +/* Lock a hash row */ + +static inline void lock_hash_row(struct hashrow_locks *hrl, uint32_t hash) +{ + ovs_mutex_lock(&hrl->row_locks[hash % hrl->mask]); +} + +/* Unlock a hash row */ + +static inline void unlock_hash_row(struct hashrow_locks *hrl, uint32_t hash) +{ + ovs_mutex_unlock(&hrl->row_locks[hash % hrl->mask]); +} +/* Init the row locks structure */ + +static inline void init_hash_row_locks(struct hashrow_locks *hrl) +{ + hrl->mask = 0; + hrl->row_locks = NULL; +} + +bool ovn_can_parallelize_hashes(bool force_parallel); + +/* Use the OVN library functions for stuff which OVS has not defined + * If OVS has defined these, they will still compile using the OVN + * local names, but will be dropped by the linker in favour of the OVS + * supplied functions. + */ + +#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, hrl) + +#define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force) + +#define stop_parallel_processing() ovn_stop_parallel_processing() + +#define add_worker_pool(start) ovn_add_worker_pool(start) + +#define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size) + +#define fast_hmap_init(hmap, size) ovn_fast_hmap_init(hmap, size) + +#define fast_hmap_merge(dest, inc) ovn_fast_hmap_merge(dest, inc) + +#define hmap_merge(dest, inc) ovn_hmap_merge(dest, inc) + +#define ovn_run_pool(pool) ovn_run_pool(pool) + +#define run_pool_hash(pool, result, result_frags) \ + ovn_run_pool_hash(pool, result, result_frags) + +#define run_pool_list(pool, result, result_frags) \ + ovn_run_pool_list(pool, result, result_frags) + +#define run_pool_callback(pool, fin_result, result_frags, helper_func) \ + ovn_run_pool_callback(pool, fin_result, result_frags, helper_func) + + + +#ifdef __clang__ +#pragma clang diagnostic pop +#endif + +#endif + +#ifdef __cplusplus +} +#endif + + +#endif /* lib/fasthmap.h */