@@ -51,7 +51,6 @@ static bool can_parallelize = false;
* accompanied by a fence. It does not need to be atomic or be
* accessed under a lock.
*/
-static bool workers_must_exit = false;
static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
@@ -70,10 +69,20 @@ static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
void *fin_result, void *result_frags,
int index);
+
+static bool init_control(struct worker_control *control, int id,
+ struct worker_pool *pool);
+
+static void cleanup_control(struct worker_pool *pool, int id);
+
+static void free_controls(struct worker_pool *pool);
+
+static struct worker_control *alloc_controls(int size);
+
bool
-ovn_stop_parallel_processing(void)
+ovn_stop_parallel_processing(struct worker_pool *pool)
{
- return workers_must_exit;
+ return pool->workers_must_exit;
}
bool
@@ -92,11 +101,67 @@ ovn_can_parallelize_hashes(bool force_parallel)
return can_parallelize;
}
+
+void
+destroy_pool(struct worker_pool *pool) {
+ char sem_name[256];
+
+ free_controls(pool);
+ sem_close(pool->done);
+ sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
+ sem_unlink(sem_name);
+ free(pool);
+}
+
+bool
+ovn_resize_pool(struct worker_pool *pool, int size)
+{
+ int i;
+
+ ovs_assert(pool != NULL);
+
+ if (!size) {
+ size = pool_size;
+ }
+
+ ovs_mutex_lock(&init_mutex);
+
+ if (can_parallelize) {
+ free_controls(pool);
+ pool->size = size;
+
+ /* Allocate new control structures. */
+
+ pool->controls = alloc_controls(size);
+ pool->workers_must_exit = false;
+
+ for (i = 0; i < pool->size; i++) {
+ if (! init_control(&pool->controls[i], i, pool)) {
+ goto cleanup;
+ }
+ }
+ }
+ ovs_mutex_unlock(&init_mutex);
+ return true;
+cleanup:
+
+ /* Something went wrong when opening semaphores. In this case
+ * it is better to shut off parallel procesing altogether
+ */
+
+ VLOG_INFO("Failed to initialize parallel processing, error %d", errno);
+ can_parallelize = false;
+ free_controls(pool);
+
+ ovs_mutex_unlock(&init_mutex);
+ return false;
+}
+
+
struct worker_pool *
-ovn_add_worker_pool(void *(*start)(void *))
+ovn_add_worker_pool(void *(*start)(void *), int size)
{
struct worker_pool *new_pool = NULL;
- struct worker_control *new_control;
bool test = false;
int i;
char sem_name[256];
@@ -113,38 +178,29 @@ ovn_add_worker_pool(void *(*start)(void *))
ovs_mutex_unlock(&init_mutex);
}
+ if (!size) {
+ size = pool_size;
+ }
+
ovs_mutex_lock(&init_mutex);
if (can_parallelize) {
new_pool = xmalloc(sizeof(struct worker_pool));
- new_pool->size = pool_size;
- new_pool->controls = NULL;
+ new_pool->size = size;
+ new_pool->start = start;
sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
if (new_pool->done == SEM_FAILED) {
goto cleanup;
}
- new_pool->controls =
- xmalloc(sizeof(struct worker_control) * new_pool->size);
+ new_pool->controls = alloc_controls(size);
+ new_pool->workers_must_exit = false;
for (i = 0; i < new_pool->size; i++) {
- new_control = &new_pool->controls[i];
- new_control->id = i;
- new_control->done = new_pool->done;
- new_control->data = NULL;
- ovs_mutex_init(&new_control->mutex);
- new_control->finished = ATOMIC_VAR_INIT(false);
- sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
- new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
- if (new_control->fire == SEM_FAILED) {
+ if (!init_control(&new_pool->controls[i], i, new_pool)) {
goto cleanup;
}
}
-
- for (i = 0; i < pool_size; i++) {
- new_pool->controls[i].worker =
- ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
- }
ovs_list_push_back(&worker_pools, &new_pool->list_node);
}
ovs_mutex_unlock(&init_mutex);
@@ -157,16 +213,7 @@ cleanup:
VLOG_INFO("Failed to initialize parallel processing, error %d", errno);
can_parallelize = false;
- if (new_pool->controls) {
- for (i = 0; i < new_pool->size; i++) {
- if (new_pool->controls[i].fire != SEM_FAILED) {
- sem_close(new_pool->controls[i].fire);
- sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
- sem_unlink(sem_name);
- break; /* semaphores past this one are uninitialized */
- }
- }
- }
+ free_controls(new_pool);
if (new_pool->done != SEM_FAILED) {
sem_close(new_pool->done);
sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
@@ -176,7 +223,6 @@ cleanup:
return NULL;
}
-
/* Initializes 'hmap' as an empty hash table with mask N. */
void
ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
@@ -365,14 +411,84 @@ ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl)
}
}
+static bool
+init_control(struct worker_control *control, int id,
+ struct worker_pool *pool)
+{
+ char sem_name[256];
+ control->id = id;
+ control->done = pool->done;
+ control->data = NULL;
+ ovs_mutex_init(&control->mutex);
+ control->finished = ATOMIC_VAR_INIT(false);
+ sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, id);
+ control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
+ control->pool = pool;
+ control->worker = 0;
+ if (control->fire == SEM_FAILED) {
+ return false;
+ }
+ control->worker =
+ ovs_thread_create("worker pool helper", pool->start, control);
+ return true;
+}
+
static void
-worker_pool_hook(void *aux OVS_UNUSED) {
+cleanup_control(struct worker_pool *pool, int id)
+{
+ char sem_name[256];
+ struct worker_control *control = &pool->controls[id];
+
+ if (control->fire != SEM_FAILED) {
+ sem_close(control->fire);
+ sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, id);
+ sem_unlink(sem_name);
+ }
+}
+
+static void
+free_controls(struct worker_pool *pool)
+{
int i;
+ if (pool->controls) {
+ pool->workers_must_exit = true;
+ for (i = 0; i < pool->size ; i++) {
+ if (pool->controls[i].fire != SEM_FAILED) {
+ sem_post(pool->controls[i].fire);
+ }
+ }
+ for (i = 0; i < pool->size ; i++) {
+ if (pool->controls[i].worker) {
+ pthread_join(pool->controls[i].worker, NULL);
+ pool->controls[i].worker = 0;
+ }
+ }
+ for (i = 0; i < pool->size; i++) {
+ cleanup_control(pool, i);
+ }
+ free(pool->controls);
+ pool->controls = NULL;
+ pool->workers_must_exit = false;
+ }
+}
+
+static struct worker_control *alloc_controls(int size)
+{
+ int i;
+ struct worker_control *controls =
+ xcalloc(sizeof(struct worker_control), size);
+
+ for (i = 0; i < size ; i++) {
+ controls[i].fire = SEM_FAILED;
+ }
+ return controls;
+}
+
+static void
+worker_pool_hook(void *aux OVS_UNUSED) {
static struct worker_pool *pool;
char sem_name[256];
- workers_must_exit = true;
-
/* All workers must honour the must_exit flag and check for it regularly.
* We can make it atomic and check it via atomics in workers, but that
* is not really necessary as it is set just once - when the program
@@ -383,17 +499,7 @@ worker_pool_hook(void *aux OVS_UNUSED) {
/* Wake up the workers after the must_exit flag has been set */
LIST_FOR_EACH (pool, list_node, &worker_pools) {
- for (i = 0; i < pool->size ; i++) {
- sem_post(pool->controls[i].fire);
- }
- for (i = 0; i < pool->size ; i++) {
- pthread_join(pool->controls[i].worker, NULL);
- }
- for (i = 0; i < pool->size ; i++) {
- sem_close(pool->controls[i].fire);
- sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
- sem_unlink(sem_name);
- }
+ free_controls(pool);
sem_close(pool->done);
sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
sem_unlink(sem_name);
@@ -83,6 +83,7 @@ struct worker_control {
void *data; /* Pointer to data to be processed. */
void *workload; /* back-pointer to the worker pool structure. */
pthread_t worker;
+ struct worker_pool *pool;
};
struct worker_pool {
@@ -90,16 +91,21 @@ struct worker_pool {
struct ovs_list list_node; /* List of pools - used in cleanup/exit. */
struct worker_control *controls; /* "Handles" in this pool. */
sem_t *done; /* Work completion semaphorew. */
+ void *(*start)(void *); /* Work function. */
+ bool workers_must_exit; /* Pool to be destroyed flag. */
};
/* Add a worker pool for thread function start() which expects a pointer to
- * a worker_control structure as an argument. */
+ * a worker_control structure as an argument.
+ * If size is non-zero, it is used for pool sizing. If size is zero, pool
+ * size uses system defaults.
+ */
-struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
+struct worker_pool *ovn_add_worker_pool(void *(*start)(void *), int size);
/* Setting this to true will make all processing threads exit */
-bool ovn_stop_parallel_processing(void);
+bool ovn_stop_parallel_processing(struct worker_pool *pool);
/* Build a hmap pre-sized for size elements */
@@ -253,6 +259,10 @@ static inline void init_hash_row_locks(struct hashrow_locks *hrl)
bool ovn_can_parallelize_hashes(bool force_parallel);
+void ovn_destroy_pool(struct worker_pool *pool);
+
+bool ovn_resize_pool(struct worker_pool *pool, int size);
+
/* Use the OVN library functions for stuff which OVS has not defined
* If OVS has defined these, they will still compile using the OVN
* local names, but will be dropped by the linker in favour of the OVS
@@ -263,9 +273,9 @@ bool ovn_can_parallelize_hashes(bool force_parallel);
#define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
-#define stop_parallel_processing() ovn_stop_parallel_processing()
+#define stop_parallel_processing(pool) ovn_stop_parallel_processing(pool)
-#define add_worker_pool(start) ovn_add_worker_pool(start)
+#define add_worker_pool(start, size) ovn_add_worker_pool(start, size)
#define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size)
@@ -286,6 +296,9 @@ bool ovn_can_parallelize_hashes(bool force_parallel);
#define run_pool_callback(pool, fin_result, result_frags, helper_func) \
ovn_run_pool_callback(pool, fin_result, result_frags, helper_func)
+#define destroy_pool(pool) ovn_destroy_pool(pool)
+
+#define resize_pool(pool, size) ovn_resize_pool(pool, size)
#ifdef __clang__
@@ -12828,16 +12828,10 @@ build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op,
&lsi->actions);
}
-struct lflows_thread_pool {
- struct worker_pool *pool;
-};
-
-
static void *
build_lflows_thread(void *arg)
{
struct worker_control *control = (struct worker_control *) arg;
- struct lflows_thread_pool *workload;
struct lswitch_flow_build_info *lsi;
struct ovn_datapath *od;
@@ -12846,21 +12840,21 @@ build_lflows_thread(void *arg)
struct ovn_igmp_group *igmp_group;
int bnum;
- while (!stop_parallel_processing()) {
+
+ while (!stop_parallel_processing(control->pool)) {
wait_for_work(control);
- workload = (struct lflows_thread_pool *) control->workload;
lsi = (struct lswitch_flow_build_info *) control->data;
- if (stop_parallel_processing()) {
+ if (stop_parallel_processing(control->pool)) {
return NULL;
}
- if (lsi && workload) {
+ if (lsi) {
/* Iterate over bucket ThreadID, ThreadID+size, ... */
for (bnum = control->id;
bnum <= lsi->datapaths->mask;
- bnum += workload->pool->size)
+ bnum += control->pool->size)
{
HMAP_FOR_EACH_IN_PARALLEL (od, key_node, bnum, lsi->datapaths) {
- if (stop_parallel_processing()) {
+ if (stop_parallel_processing(control->pool)) {
return NULL;
}
build_lswitch_and_lrouter_iterate_by_od(od, lsi);
@@ -12868,10 +12862,10 @@ build_lflows_thread(void *arg)
}
for (bnum = control->id;
bnum <= lsi->ports->mask;
- bnum += workload->pool->size)
+ bnum += control->pool->size)
{
HMAP_FOR_EACH_IN_PARALLEL (op, key_node, bnum, lsi->ports) {
- if (stop_parallel_processing()) {
+ if (stop_parallel_processing(control->pool)) {
return NULL;
}
build_lswitch_and_lrouter_iterate_by_op(op, lsi);
@@ -12879,10 +12873,10 @@ build_lflows_thread(void *arg)
}
for (bnum = control->id;
bnum <= lsi->lbs->mask;
- bnum += workload->pool->size)
+ bnum += control->pool->size)
{
HMAP_FOR_EACH_IN_PARALLEL (lb, hmap_node, bnum, lsi->lbs) {
- if (stop_parallel_processing()) {
+ if (stop_parallel_processing(control->pool)) {
return NULL;
}
build_lswitch_arp_nd_service_monitor(lb, lsi->lflows,
@@ -12900,11 +12894,11 @@ build_lflows_thread(void *arg)
}
for (bnum = control->id;
bnum <= lsi->igmp_groups->mask;
- bnum += workload->pool->size)
+ bnum += control->pool->size)
{
HMAP_FOR_EACH_IN_PARALLEL (
igmp_group, hmap_node, bnum, lsi->igmp_groups) {
- if (stop_parallel_processing()) {
+ if (stop_parallel_processing(control->pool)) {
return NULL;
}
build_lswitch_ip_mcast_igmp_mld(igmp_group, lsi->lflows,
@@ -12919,24 +12913,14 @@ build_lflows_thread(void *arg)
}
static bool pool_init_done = false;
-static struct lflows_thread_pool *build_lflows_pool = NULL;
+static struct worker_pool *build_lflows_pool = NULL;
static void
init_lflows_thread_pool(void)
{
- int index;
-
if (!pool_init_done) {
- struct worker_pool *pool = add_worker_pool(build_lflows_thread);
+ build_lflows_pool = add_worker_pool(build_lflows_thread, 0);
pool_init_done = true;
- if (pool) {
- build_lflows_pool = xmalloc(sizeof(*build_lflows_pool));
- build_lflows_pool->pool = pool;
- for (index = 0; index < build_lflows_pool->pool->size; index++) {
- build_lflows_pool->pool->controls[index].workload =
- build_lflows_pool;
- }
- }
}
}
@@ -12979,16 +12963,16 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
struct lswitch_flow_build_info *lsiv;
int index;
- lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->pool->size);
+ lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->size);
if (use_logical_dp_groups) {
lflow_segs = NULL;
} else {
- lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->pool->size);
+ lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->size);
}
/* Set up "work chunks" for each thread to work on. */
- for (index = 0; index < build_lflows_pool->pool->size; index++) {
+ for (index = 0; index < build_lflows_pool->size; index++) {
if (use_logical_dp_groups) {
/* if dp_groups are in use we lock a shared lflows hash
* on a per-bucket level instead of merging hash frags */
@@ -13010,17 +12994,17 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
ds_init(&lsiv[index].match);
ds_init(&lsiv[index].actions);
- build_lflows_pool->pool->controls[index].data = &lsiv[index];
+ build_lflows_pool->controls[index].data = &lsiv[index];
}
/* Run thread pool. */
if (use_logical_dp_groups) {
- run_pool_callback(build_lflows_pool->pool, NULL, NULL, noop_callback);
+ run_pool_callback(build_lflows_pool, NULL, NULL, noop_callback);
} else {
- run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
+ run_pool_hash(build_lflows_pool, lflows, lflow_segs);
}
- for (index = 0; index < build_lflows_pool->pool->size; index++) {
+ for (index = 0; index < build_lflows_pool->size; index++) {
ds_destroy(&lsiv[index].match);
ds_destroy(&lsiv[index].actions);
}
@@ -1 +1 @@
-Subproject commit 748010ff304b7cd2c43f4eb98a554433f0df07f9
+Subproject commit 50e5523b9b2b154e5fafc5acdcdec85e9cc5a330