@@ -33,6 +33,7 @@
#include "ovs-thread.h"
#include "ovs-numa.h"
#include "random.h"
+#include "unixctl.h"
VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
@@ -46,6 +47,7 @@ VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
*/
static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
static bool can_parallelize = false;
+static bool should_parallelize = false;
/* This is set only in the process of exit and the set is
* accompanied by a fence. It does not need to be atomic or be
@@ -85,6 +87,19 @@ ovn_stop_parallel_processing(struct worker_pool *pool)
return pool->workers_must_exit;
}
+bool
+ovn_set_parallel_processing(bool enable)
+{
+ should_parallelize = enable;
+ return can_parallelize;
+}
+
+bool
+ovn_get_parallel_processing(void)
+{
+ return can_parallelize && should_parallelize;
+}
+
bool
ovn_can_parallelize_hashes(bool force_parallel)
{
@@ -110,6 +125,7 @@ destroy_pool(struct worker_pool *pool) {
sem_close(pool->done);
sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
sem_unlink(sem_name);
+ free(pool->name);
free(pool);
}
@@ -120,6 +136,10 @@ ovn_resize_pool(struct worker_pool *pool, int size)
ovs_assert(pool != NULL);
+ if (!pool->is_mutable) {
+ return false;
+ }
+
if (!size) {
size = pool_size;
}
@@ -159,7 +179,8 @@ cleanup:
struct worker_pool *
-ovn_add_worker_pool(void *(*start)(void *), int size)
+ovn_add_worker_pool(void *(*start)(void *), int size, char *name,
+ bool is_mutable)
{
struct worker_pool *new_pool = NULL;
bool test = false;
@@ -187,6 +208,8 @@ ovn_add_worker_pool(void *(*start)(void *), int size)
new_pool = xmalloc(sizeof(struct worker_pool));
new_pool->size = size;
new_pool->start = start;
+ new_pool->is_mutable = is_mutable;
+ new_pool->name = xstrdup(name);
sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
if (new_pool->done == SEM_FAILED) {
@@ -219,6 +242,7 @@ cleanup:
sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
sem_unlink(sem_name);
}
+ free(new_pool->name);
ovs_mutex_unlock(&init_mutex);
return NULL;
}
@@ -267,13 +291,9 @@ ovn_fast_hmap_size_for(struct hmap *hmap, int size)
/* Run a thread pool which uses a callback function to process results
*/
void
-ovn_run_pool_callback(struct worker_pool *pool,
- void *fin_result, void *result_frags,
- void (*helper_func)(struct worker_pool *pool,
- void *fin_result,
- void *result_frags, int index))
+ovn_start_pool(struct worker_pool *pool)
{
- int index, completed;
+ int index;
/* Ensure that all worker threads see the same data as the
* main thread.
@@ -284,8 +304,19 @@ ovn_run_pool_callback(struct worker_pool *pool,
for (index = 0; index < pool->size; index++) {
sem_post(pool->controls[index].fire);
}
+}
+
- completed = 0;
+/* Run a thread pool which uses a callback function to process results
+ */
+void
+ovn_complete_pool_callback(struct worker_pool *pool,
+ void *fin_result, void *result_frags,
+ void (*helper_func)(struct worker_pool *pool,
+ void *fin_result,
+ void *result_frags, int index))
+{
+ int index, completed = 0;
do {
bool test;
@@ -327,6 +358,18 @@ ovn_run_pool_callback(struct worker_pool *pool,
}
} while (completed < pool->size);
}
+/* Run a thread pool which uses a callback function to process results
+ */
+void
+ovn_run_pool_callback(struct worker_pool *pool,
+ void *fin_result, void *result_frags,
+ void (*helper_func)(struct worker_pool *pool,
+ void *fin_result,
+ void *result_frags, int index))
+{
+ start_pool(pool);
+ complete_pool_callback(pool, fin_result, result_frags, helper_func);
+}
/* Run a thread pool - basic, does not do results processing.
*/
@@ -373,6 +416,28 @@ ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc)
inc->n = 0;
}
+/* Run a thread pool which gathers results in an array
+ * of hashes. Merge results.
+ */
+void
+ovn_complete_pool_hash(struct worker_pool *pool,
+ struct hmap *result,
+ struct hmap *result_frags)
+{
+ complete_pool_callback(pool, result, result_frags, merge_hash_results);
+}
+
+/* Run a thread pool which gathers results in an array of lists.
+ * Merge results.
+ */
+void
+ovn_complete_pool_list(struct worker_pool *pool,
+ struct ovs_list *result,
+ struct ovs_list *result_frags)
+{
+ complete_pool_callback(pool, result, result_frags, merge_list_results);
+}
+
/* Run a thread pool which gathers results in an array
* of hashes. Merge results.
*/
@@ -486,7 +551,7 @@ static struct worker_control *alloc_controls(int size)
static void
worker_pool_hook(void *aux OVS_UNUSED) {
- static struct worker_pool *pool;
+ struct worker_pool *pool;
char sem_name[256];
/* All workers must honour the must_exit flag and check for it regularly.
@@ -564,4 +629,130 @@ merge_hash_results(struct worker_pool *pool OVS_UNUSED,
hmap_destroy(&res_frags[index]);
}
+static void
+ovn_thread_pool_resize_pool(struct unixctl_conn *conn, int argc OVS_UNUSED,
+ const char *argv[], void *unused OVS_UNUSED)
+{
+
+ struct worker_pool *pool;
+ int value;
+
+ if (!str_to_int(argv[2], 10, &value)) {
+ unixctl_command_reply_error(conn, "invalid argument");
+ return;
+ }
+
+ if (value > 0) {
+ pool_size = value;
+ }
+ LIST_FOR_EACH (pool, list_node, &worker_pools) {
+ if (strcmp(pool->name, argv[1]) == 0) {
+ resize_pool(pool, value);
+ unixctl_command_reply_error(conn, NULL);
+ }
+ }
+ unixctl_command_reply_error(conn, "pool not found");
+}
+
+static void
+ovn_thread_pool_list_pools(struct unixctl_conn *conn, int argc OVS_UNUSED,
+ const char *argv[] OVS_UNUSED,
+ void *unused OVS_UNUSED)
+{
+
+ char *reply = NULL;
+ char *new_reply;
+ char buf[256];
+ struct worker_pool *pool;
+
+ LIST_FOR_EACH (pool, list_node, &worker_pools) {
+ snprintf(buf, 255, "%s : %d\n", pool->name, pool->size);
+ if (reply) {
+ new_reply = xmalloc(strlen(reply) + strlen(buf) + 1);
+ ovs_strlcpy(new_reply, reply, strlen(reply));
+ strcat(new_reply, buf);
+ free(reply);
+ }
+ reply = new_reply;
+ }
+ unixctl_command_reply(conn, reply);
+}
+
+static void
+ovn_thread_pool_set_parallel_on(struct unixctl_conn *conn, int argc OVS_UNUSED,
+ const char *argv[], void *unused OVS_UNUSED)
+{
+ int value;
+ bool result;
+ if (!str_to_int(argv[1], 10, &value)) {
+ unixctl_command_reply_error(conn, "invalid argument");
+ return;
+ }
+
+ if (!ovn_can_parallelize_hashes(true)) {
+ unixctl_command_reply_error(conn, "cannot enable parallel processing");
+ return;
+ }
+
+ if (value > 0) {
+ /* Change default pool size */
+ ovs_mutex_lock(&init_mutex);
+ pool_size = value;
+ ovs_mutex_unlock(&init_mutex);
+ }
+
+ result = ovn_set_parallel_processing(true);
+ unixctl_command_reply(conn, result ? "enabled" : "disabled");
+}
+
+static void
+ovn_thread_pool_set_parallel_off(struct unixctl_conn *conn,
+ int argc OVS_UNUSED,
+ const char *argv[] OVS_UNUSED,
+ void *unused OVS_UNUSED)
+{
+ ovn_set_parallel_processing(false);
+ unixctl_command_reply(conn, NULL);
+}
+
+static void
+ovn_thread_pool_parallel_status(struct unixctl_conn *conn, int argc OVS_UNUSED,
+ const char *argv[] OVS_UNUSED,
+ void *unused OVS_UNUSED)
+{
+ char status[256];
+
+ sprintf(status, "%s, default pool size %d",
+ get_parallel_processing() ? "active" : "inactive",
+ pool_size);
+
+ unixctl_command_reply(conn, status);
+}
+
+void
+ovn_parallel_thread_pools_init(void)
+{
+ bool test = false;
+
+ if (atomic_compare_exchange_strong(
+ &initial_pool_setup,
+ &test,
+ true)) {
+ ovs_mutex_lock(&init_mutex);
+ setup_worker_pools(false);
+ ovs_mutex_unlock(&init_mutex);
+ }
+
+ unixctl_command_register("thread-pool/set-parallel-on", "N", 1, 1,
+ ovn_thread_pool_set_parallel_on, NULL);
+ unixctl_command_register("thread-pool/set-parallel-off", "", 0, 0,
+ ovn_thread_pool_set_parallel_off, NULL);
+ unixctl_command_register("thread-pool/status", "", 0, 0,
+ ovn_thread_pool_parallel_status, NULL);
+ unixctl_command_register("thread-pool/list", "", 0, 0,
+ ovn_thread_pool_list_pools, NULL);
+ unixctl_command_register("thread-pool/reload-pool", "Pool Threads", 2, 2,
+ ovn_thread_pool_resize_pool, NULL);
+}
+
#endif
@@ -33,6 +33,7 @@ extern "C" {
#include "openvswitch/hmap.h"
#include "openvswitch/thread.h"
#include "ovs-atomic.h"
+#include "unixctl.h"
/* Process this include only if OVS does not supply parallel definitions
*/
@@ -93,6 +94,8 @@ struct worker_pool {
sem_t *done; /* Work completion semaphorew. */
void *(*start)(void *); /* Work function. */
bool workers_must_exit; /* Pool to be destroyed flag. */
+ char *name; /* Name to be used in cli commands */
+ bool is_mutable; /* Can the pool be reloaded with different params */
};
/* Add a worker pool for thread function start() which expects a pointer to
@@ -101,7 +104,9 @@ struct worker_pool {
* size uses system defaults.
*/
-struct worker_pool *ovn_add_worker_pool(void *(*start)(void *), int size);
+struct worker_pool *ovn_add_worker_pool(void *(*start)(void *),
+ int size, char *name,
+ bool is_mutable);
/* Setting this to true will make all processing threads exit */
@@ -149,6 +154,35 @@ void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result,
void *fin_result, void *result_frags, int index));
+/* Start a pool. Do not wait for any results. They will be collected
+ * using the _complete_ functions.
+ */
+void ovn_start_pool(struct worker_pool *pool);
+
+/* Complete a pool run started using start_pool();
+ * Merge results from hash frags into a final hash result.
+ * The hash frags must be pre-sized to the same size.
+ */
+
+void ovn_complete_pool_hash(struct worker_pool *pool,
+ struct hmap *result, struct hmap *result_frags);
+
+/* Complete a pool run started using start_pool();
+ * Merge results from list frags into a final list result.
+ */
+
+void ovn_complete_pool_list(struct worker_pool *pool,
+ struct ovs_list *result, struct ovs_list *result_frags);
+
+/* Complete a pool run started using start_pool();
+ * Call a callback function to perform processing of results.
+ */
+
+void ovn_complete_pool_callback(struct worker_pool *pool, void *fin_result,
+ void *result_frags,
+ void (*helper_func)(struct worker_pool *pool,
+ void *fin_result, void *result_frags, int index));
+
/* Returns the first node in 'hmap' in the bucket in which the given 'hash'
* would land, or a null pointer if that bucket is empty. */
@@ -259,10 +293,16 @@ static inline void init_hash_row_locks(struct hashrow_locks *hrl)
bool ovn_can_parallelize_hashes(bool force_parallel);
+bool ovn_set_parallel_processing(bool enable);
+
+bool ovn_get_parallel_processing(void);
+
void ovn_destroy_pool(struct worker_pool *pool);
bool ovn_resize_pool(struct worker_pool *pool, int size);
+void ovn_parallel_thread_pools_init(void);
+
/* Use the OVN library functions for stuff which OVS has not defined
* If OVS has defined these, they will still compile using the OVN
* local names, but will be dropped by the linker in favour of the OVS
@@ -273,9 +313,16 @@ bool ovn_resize_pool(struct worker_pool *pool, int size);
#define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
+#define set_parallel_processing(enable) ovn_set_parallel_processing(enable)
+
+#define get_parallel_processing() ovn_get_parallel_processing()
+
+#define enable_parallel_processing() ovn_enable_parallel_processing()
+
#define stop_parallel_processing(pool) ovn_stop_parallel_processing(pool)
-#define add_worker_pool(start, size) ovn_add_worker_pool(start, size)
+#define add_worker_pool(start, size, name, is_mutable) \
+ ovn_add_worker_pool(start, size, name, is_mutable)
#define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size)
@@ -296,10 +343,22 @@ bool ovn_resize_pool(struct worker_pool *pool, int size);
#define run_pool_callback(pool, fin_result, result_frags, helper_func) \
ovn_run_pool_callback(pool, fin_result, result_frags, helper_func)
+#define start_pool(pool) ovn_start_pool(pool)
+
+#define complete_pool_hash(pool, result, result_frags) \
+ ovn_complete_pool_hash(pool, result, result_frags)
+
+#define complete_pool_list(pool, result, result_frags) \
+ ovn_complete_pool_list(pool, result, result_frags)
+
+#define complete_pool_callback(pool, fin_result, result_frags, helper_func) \
+ ovn_complete_pool_callback(pool, fin_result, result_frags, helper_func)
+
#define destroy_pool(pool) ovn_destroy_pool(pool)
#define resize_pool(pool, size) ovn_resize_pool(pool, size)
+#define parallel_thread_pools_init() ovn_parallel_thread_pools_init()
#ifdef __clang__
#pragma clang diagnostic pop
@@ -4358,7 +4358,6 @@ ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
/* If this option is 'true' northd will combine logical flows that differ by
* logical datapath only by creating a datapath group. */
static bool use_logical_dp_groups = false;
-static bool use_parallel_build = true;
static struct hashrow_locks lflow_locks;
@@ -4395,7 +4394,7 @@ do_ovn_lflow_add(struct hmap *lflow_map, struct ovn_datapath *od,
nullable_xstrdup(ctrl_meter),
ovn_lflow_hint(stage_hint), where);
hmapx_add(&lflow->od_group, od);
- if (!use_parallel_build) {
+ if (!get_parallel_processing()) {
hmap_insert(lflow_map, &lflow->hmap_node, hash);
} else {
hmap_insert_fast(lflow_map, &lflow->hmap_node, hash);
@@ -4414,7 +4413,7 @@ ovn_lflow_add_at_with_hash(struct hmap *lflow_map, struct ovn_datapath *od,
struct ovn_lflow *lflow;
ovs_assert(ovn_stage_to_datapath_type(stage) == ovn_datapath_get_type(od));
- if (use_logical_dp_groups && use_parallel_build) {
+ if (use_logical_dp_groups && get_parallel_processing()) {
lock_hash_row(&lflow_locks, hash);
lflow = do_ovn_lflow_add(lflow_map, od, hash, stage, priority, match,
actions, io_port, stage_hint, where,
@@ -4454,7 +4453,7 @@ ovn_dp_group_add_with_reference(struct ovn_lflow *lflow_ref,
return false;
}
- if (use_parallel_build) {
+ if (get_parallel_processing()) {
lock_hash_row(&lflow_locks, hash);
hmapx_add(&lflow_ref->od_group, od);
unlock_hash_row(&lflow_locks, hash);
@@ -12919,7 +12918,8 @@ static void
init_lflows_thread_pool(void)
{
if (!pool_init_done) {
- build_lflows_pool = add_worker_pool(build_lflows_thread, 0);
+ build_lflows_pool = add_worker_pool(build_lflows_thread, 0,
+ "lflows", true);
pool_init_done = true;
}
}
@@ -12951,14 +12951,11 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
- if (use_parallel_build) {
+ if (get_parallel_processing()) {
init_lflows_thread_pool();
- if (!can_parallelize_hashes(false)) {
- use_parallel_build = false;
- }
}
- if (use_parallel_build) {
+ if (get_parallel_processing()) {
struct hmap *lflow_segs;
struct lswitch_flow_build_info *lsiv;
int index;
@@ -13154,7 +13151,7 @@ build_lflows(struct northd_context *ctx, struct hmap *datapaths,
struct hmap lflows;
fast_hmap_size_for(&lflows, max_seen_lflow_size);
- if (use_parallel_build) {
+ if (get_parallel_processing()) {
update_hashrow_locks(&lflows, &lflow_locks);
}
build_lswitch_and_lrouter_flows(datapaths, ports,
@@ -14226,10 +14223,6 @@ ovnnb_db_run(struct northd_context *ctx,
northd_probe_interval_nb = get_probe_interval(ovnnb_db, nb);
northd_probe_interval_sb = get_probe_interval(ovnsb_db, nb);
- use_parallel_build =
- (smap_get_bool(&nb->options, "use_parallel_build", false) &&
- can_parallelize_hashes(false));
-
use_logical_dp_groups = smap_get_bool(&nb->options,
"use_logical_dp_groups", true);
use_ct_inv_match = smap_get_bool(&nb->options,
@@ -15144,8 +15137,9 @@ main(int argc, char *argv[])
daemonize_complete();
+ ovn_parallel_thread_pools_init();
+
init_hash_row_locks(&lflow_locks);
- use_parallel_build = can_parallelize_hashes(false);
/* We want to detect (almost) all changes to the ovn-nb db. */
struct ovsdb_idl_loop ovnnb_idl_loop = OVSDB_IDL_LOOP_INITIALIZER(
@@ -179,6 +179,18 @@ ovn_start_northd() {
test -d "$ovs_base/$name" || mkdir "$ovs_base/$name"
as $name start_daemon $NORTHD_TYPE $northd_args -vjsonrpc \
--ovnnb-db=$OVN_NB_DB --ovnsb-db=$OVN_SB_DB
+ if test -z "$USE_PARALLEL_THREADS" ; then
+ USE_PARALLEL_THREADS=0
+ fi
+
+ if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
+ case ${NORTHD_TYPE:=ovn-northd} in
+ ovn-northd) ovs-appctl --timeout=10 --target northd$suffix/ovn-northd \
+ thread-pool/set-parallel-on $USE_PARALLEL_THREADS
+ ;;
+ esac
+ fi
+
}
# ovn_start [--backup-northd=none|paused] [AZ]
@@ -252,10 +264,6 @@ ovn_start () {
else
ovn-nbctl set NB_Global . options:use_logical_dp_groups=false
fi
-
- if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
- ovn-nbctl set NB_Global . options:use_parallel_build=true
- fi
}
# Interconnection networks.