@@ -33,6 +33,7 @@
VLOG_DEFINE_THIS_MODULE(fasthmap);
+#ifndef OVS_HAS_PARALLEL_HMAP
static bool worker_pool_setup = false;
static bool workers_must_exit = false;
@@ -279,3 +280,5 @@ void ovn_run_pool_hash(
{
ovn_run_pool_callback(pool, result, result_frags, merge_hash_results);
}
+
+#endif
@@ -37,6 +37,7 @@
#include "lib/ovn-sb-idl.h"
#include "lib/ovn-util.h"
#include "lib/lb.h"
+#include "lib/fasthmap.h"
#include "ovn/actions.h"
#include "ovn/logical-fields.h"
#include "packets.h"
@@ -4174,6 +4175,34 @@ ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
/* If this option is 'true' northd will combine logical flows that differs by
* logical datapath only by creating a datapah group. */
static bool use_logical_dp_groups = false;
+static bool use_parallel_build = true;
+
+static struct ovs_mutex *slice_locks = NULL;
+
+/* Adds a row with the specified contents to the Logical_Flow table.
+ * Version to use when locking is required.
+ */
+static void
+do_ovn_lflow_add_locked(struct hmap *lflow_map, bool shared,
+ struct ovn_datapath *od,
+ uint32_t hash, struct ovn_lflow *lflow)
+{
+
+ struct ovn_lflow *old_lflow;
+
+ if (shared && use_logical_dp_groups) {
+ old_lflow = ovn_lflow_find_by_lflow(lflow_map, lflow, hash);
+ if (old_lflow) {
+ ovn_lflow_destroy(NULL, lflow);
+ hmapx_add(&old_lflow->od_group, od);
+ return;
+ }
+ }
+
+ hmapx_add(&lflow->od_group, od);
+ hmap_insert_fast(lflow_map, &lflow->hmap_node, hash);
+}
+
/* Adds a row with the specified contents to the Logical_Flow table. */
static void
@@ -4184,7 +4213,7 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct ovn_datapath *od,
{
ovs_assert(ovn_stage_to_datapath_type(stage) == ovn_datapath_get_type(od));
- struct ovn_lflow *old_lflow, *lflow;
+ struct ovn_lflow *lflow;
uint32_t hash;
lflow = xmalloc(sizeof *lflow);
@@ -4196,17 +4225,14 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct ovn_datapath *od,
ovn_lflow_hint(stage_hint), where);
hash = ovn_lflow_hash(lflow);
- if (shared && use_logical_dp_groups) {
- old_lflow = ovn_lflow_find_by_lflow(lflow_map, lflow, hash);
- if (old_lflow) {
- ovn_lflow_destroy(NULL, lflow);
- hmapx_add(&old_lflow->od_group, od);
- return;
- }
- }
- hmapx_add(&lflow->od_group, od);
- hmap_insert(lflow_map, &lflow->hmap_node, hash);
+ if (use_logical_dp_groups && use_parallel_build) {
+ ovs_mutex_lock(&slice_locks[hash % lflow_map->mask]);
+ do_ovn_lflow_add_locked(lflow_map, shared, od, hash, lflow);
+ ovs_mutex_unlock(&slice_locks[hash % lflow_map->mask]);
+ } else {
+ do_ovn_lflow_add_locked(lflow_map, shared, od, hash, lflow);
+ }
}
/* Adds a row with the specified contents to the Logical_Flow table. */
@@ -7348,6 +7374,8 @@ build_lswitch_ip_mcast_igmp_mld(struct ovn_igmp_group *igmp_group,
}
}
+static struct ovs_mutex mcgroup_mutex = OVS_MUTEX_INITIALIZER;
+
/* Ingress table 19: Destination lookup, unicast handling (priority 50), */
static void
build_lswitch_ip_unicast_lookup(struct ovn_port *op,
@@ -7386,7 +7414,9 @@ build_lswitch_ip_unicast_lookup(struct ovn_port *op,
&op->nbsp->header_);
} else if (!strcmp(op->nbsp->addresses[i], "unknown")) {
if (lsp_is_enabled(op->nbsp)) {
+ ovs_mutex_lock(&mcgroup_mutex);
ovn_multicast_add(mcgroups, &mc_unknown, op);
+ ovs_mutex_unlock(&mcgroup_mutex);
op->od->has_unknown = true;
}
} else if (is_dynamic_lsp_address(op->nbsp->addresses[i])) {
@@ -11676,6 +11706,125 @@ build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op,
&lsi->match, &lsi->actions);
}
+struct lflows_thread_pool {
+ struct worker_pool *pool;
+};
+
+static void *build_lflows_thread(void *arg) {
+ struct worker_control *control = (struct worker_control *) arg;
+ struct lflows_thread_pool *workload;
+ struct lswitch_flow_build_info *lsi;
+
+ struct ovn_datapath *od;
+ struct ovn_port *op;
+ struct ovn_northd_lb *lb;
+ struct ovn_igmp_group *igmp_group;
+ int bnum;
+
+ while (!cease_fire()) {
+ sem_wait(&control->fire);
+ workload = (struct lflows_thread_pool *) control->workload;
+ lsi = (struct lswitch_flow_build_info *) control->data;
+ if (lsi && workload) {
+ /* Iterate over bucket ThreadID, ThreadID+size, ... */
+ for (bnum = control->id;
+ bnum <= lsi->datapaths->mask;
+ bnum += workload->pool->size)
+ {
+ HMAP_FOR_EACH_IN_PARALLEL (
+ od, key_node, bnum, lsi->datapaths) {
+ if (cease_fire()) {
+ return NULL;
+ }
+ build_lswitch_and_lrouter_iterate_by_od(od, lsi);
+ }
+ }
+ for (bnum = control->id;
+ bnum <= lsi->ports->mask;
+ bnum += workload->pool->size)
+ {
+ HMAP_FOR_EACH_IN_PARALLEL (
+ op, key_node, bnum, lsi->ports) {
+ if (cease_fire()) {
+ return NULL;
+ }
+ build_lswitch_and_lrouter_iterate_by_op(op, lsi);
+ }
+ }
+ for (bnum = control->id;
+ bnum <= lsi->lbs->mask;
+ bnum += workload->pool->size)
+ {
+ HMAP_FOR_EACH_IN_PARALLEL (
+ lb, hmap_node, bnum, lsi->lbs) {
+ if (cease_fire()) {
+ return NULL;
+ }
+ build_lswitch_arp_nd_service_monitor(lb, lsi->lflows,
+ &lsi->match,
+ &lsi->actions);
+ }
+ }
+ for (bnum = control->id;
+ bnum <= lsi->igmp_groups->mask;
+ bnum += workload->pool->size)
+ {
+ HMAP_FOR_EACH_IN_PARALLEL (
+ igmp_group, hmap_node, bnum, lsi->igmp_groups) {
+ if (cease_fire()) {
+ return NULL;
+ }
+ build_lswitch_ip_mcast_igmp_mld(igmp_group, lsi->lflows,
+ &lsi->match,
+ &lsi->actions);
+ }
+ }
+ atomic_store_relaxed(&control->finished, true);
+ atomic_thread_fence(memory_order_release);
+ }
+ sem_post(control->done);
+ }
+ return NULL;
+}
+
+static bool pool_init_done = false;
+static struct lflows_thread_pool *build_lflows_pool = NULL;
+
+static void init_lflows_thread_pool(void)
+{
+ int index;
+
+ if (!pool_init_done) {
+ struct worker_pool *pool = add_worker_pool(build_lflows_thread);
+ pool_init_done = true;
+ if (pool) {
+ build_lflows_pool =
+ xmalloc(sizeof(struct lflows_thread_pool));
+ build_lflows_pool->pool = pool;
+ for (index = 0; index < build_lflows_pool->pool->size; index++) {
+ build_lflows_pool->pool->controls[index].workload =
+ build_lflows_pool;
+ }
+ }
+ }
+}
+
+/* TODO: replace hard cutoffs by configurable via commands. These are
+ * temporary defines to determine single-thread to multi-thread processing
+ * cutoff.
+ * Setting to 1 forces "all parallel" lflow build.
+ */
+
+static void
+noop_callback(struct worker_pool *pool OVS_UNUSED,
+ void *fin_result OVS_UNUSED,
+ void *result_frags OVS_UNUSED,
+ int index OVS_UNUSED)
+{
+ /* Do nothing */
+}
+
+
static void
build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
struct hmap *port_groups, struct hmap *lflows,
@@ -11684,53 +11833,111 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
struct shash *meter_groups, struct hmap *lbs,
struct hmap *bfd_connections)
{
- struct ovn_datapath *od;
- struct ovn_port *op;
- struct ovn_northd_lb *lb;
- struct ovn_igmp_group *igmp_group;
char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
- struct lswitch_flow_build_info lsi = {
- .datapaths = datapaths,
- .ports = ports,
- .port_groups = port_groups,
- .lflows = lflows,
- .mcgroups = mcgroups,
- .igmp_groups = igmp_groups,
- .meter_groups = meter_groups,
- .lbs = lbs,
- .bfd_connections = bfd_connections,
- .svc_check_match = svc_check_match,
- .match = DS_EMPTY_INITIALIZER,
- .actions = DS_EMPTY_INITIALIZER,
- };
+ if (use_parallel_build) {
+ init_lflows_thread_pool();
+ struct hmap *lflow_segs;
+ struct lswitch_flow_build_info *lsiv;
+ int index;
+
+ lsiv = xmalloc(
+ sizeof(struct lswitch_flow_build_info) *
+ build_lflows_pool->pool->size);
+ if (use_logical_dp_groups) {
+ lflow_segs = NULL;
+ } else {
+ lflow_segs = xmalloc(
+ sizeof(struct hmap) * build_lflows_pool->pool->size);
+ }
- /* Combined build - all lflow generation from lswitch and lrouter
- * will move here and will be reogranized by iterator type.
- */
- HMAP_FOR_EACH (od, key_node, datapaths) {
- build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
- }
- HMAP_FOR_EACH (op, key_node, ports) {
- build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
- }
- HMAP_FOR_EACH (lb, hmap_node, lbs) {
- build_lswitch_arp_nd_service_monitor(lb, lsi.lflows,
- &lsi.actions,
- &lsi.match);
- }
- HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) {
- build_lswitch_ip_mcast_igmp_mld(igmp_group,
- lsi.lflows,
- &lsi.actions,
- &lsi.match);
- }
- free(svc_check_match);
+ /* Set up "work chunks" for each thread to work on. */
- ds_destroy(&lsi.match);
- ds_destroy(&lsi.actions);
+ for (index = 0; index < build_lflows_pool->pool->size; index++) {
+ if (use_logical_dp_groups) {
+ /* if dp_groups are in use we lock a shared lflows hash
+ * on a per-bucket level instead of merging hash frags */
+ lsiv[index].lflows = lflows;
+ } else {
+ fast_hmap_init(&lflow_segs[index], lflows->mask);
+ lsiv[index].lflows = &lflow_segs[index];
+ }
+
+ lsiv[index].datapaths = datapaths;
+ lsiv[index].ports = ports;
+ lsiv[index].port_groups = port_groups;
+ lsiv[index].mcgroups = mcgroups;
+ lsiv[index].igmp_groups = igmp_groups;
+ lsiv[index].meter_groups = meter_groups;
+ lsiv[index].lbs = lbs;
+ lsiv[index].bfd_connections = bfd_connections;
+ lsiv[index].svc_check_match = svc_check_match;
+ ds_init(&lsiv[index].match);
+ ds_init(&lsiv[index].actions);
+
+ build_lflows_pool->pool->controls[index].data = &lsiv[index];
+ }
+
+ /* Run thread pool. */
+ if (use_logical_dp_groups) {
+ run_pool_callback(build_lflows_pool->pool, NULL, NULL, noop_callback);
+ } else {
+ run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
+ }
+
+ for (index = 0; index < build_lflows_pool->pool->size; index++) {
+ ds_destroy(&lsiv[index].match);
+ ds_destroy(&lsiv[index].actions);
+ }
+ free(lflow_segs);
+ free(lsiv);
+ } else {
+ struct ovn_datapath *od;
+ struct ovn_port *op;
+ struct ovn_northd_lb *lb;
+ struct ovn_igmp_group *igmp_group;
+ struct lswitch_flow_build_info lsi = {
+ .datapaths = datapaths,
+ .ports = ports,
+ .port_groups = port_groups,
+ .lflows = lflows,
+ .mcgroups = mcgroups,
+ .igmp_groups = igmp_groups,
+ .meter_groups = meter_groups,
+ .lbs = lbs,
+ .bfd_connections = bfd_connections,
+ .svc_check_match = svc_check_match,
+ .match = DS_EMPTY_INITIALIZER,
+ .actions = DS_EMPTY_INITIALIZER,
+ };
+
+ /* Combined build - all lflow generation from lswitch and lrouter
+ * will move here and will be reogranized by iterator type.
+ */
+ HMAP_FOR_EACH (od, key_node, datapaths) {
+ build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
+ }
+ HMAP_FOR_EACH (op, key_node, ports) {
+ build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
+ }
+ HMAP_FOR_EACH (lb, hmap_node, lbs) {
+ build_lswitch_arp_nd_service_monitor(lb, lsi.lflows,
+ &lsi.actions,
+ &lsi.match);
+ }
+ HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) {
+ build_lswitch_ip_mcast_igmp_mld(igmp_group,
+ lsi.lflows,
+ &lsi.actions,
+ &lsi.match);
+ }
+
+ ds_destroy(&lsi.match);
+ ds_destroy(&lsi.actions);
+ }
+ free(svc_check_match);
build_lswitch_flows(datapaths, lflows);
}
@@ -11801,6 +12008,25 @@ ovn_sb_set_lflow_logical_dp_group(
sbrec_logical_flow_set_logical_dp_group(sbflow, dpg->dp_group);
}
+static ssize_t max_seen_lflow_size = 128;
+
+static ssize_t recent_lflow_map_mask = 0;
+
+static void update_lock_array(struct hmap *lflows)
+{
+ int i;
+ if (recent_lflow_map_mask != lflows->mask) {
+ if (slice_locks) {
+ free(slice_locks);
+ }
+ slice_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask + 1);
+ recent_lflow_map_mask = lflows->mask;
+ for (i = 0; i <= lflows->mask; i++) {
+ ovs_mutex_init(&slice_locks[i]);
+ }
+ }
+}
+
/* Updates the Logical_Flow and Multicast_Group tables in the OVN_SB database,
* constructing their contents based on the OVN_NB database. */
static void
@@ -11810,13 +12036,21 @@ build_lflows(struct northd_context *ctx, struct hmap *datapaths,
struct shash *meter_groups,
struct hmap *lbs, struct hmap *bfd_connections)
{
- struct hmap lflows = HMAP_INITIALIZER(&lflows);
+ struct hmap lflows;
+ fast_hmap_size_for(&lflows, max_seen_lflow_size);
+ if (use_parallel_build) {
+ update_lock_array(&lflows);
+ }
build_lswitch_and_lrouter_flows(datapaths, ports,
port_groups, &lflows, mcgroups,
igmp_groups, meter_groups, lbs,
bfd_connections);
+ if (hmap_count(&lflows) > max_seen_lflow_size) {
+ max_seen_lflow_size = hmap_count(&lflows);
+ }
+
/* Collecting all unique datapath groups. */
struct hmap dp_groups = HMAP_INITIALIZER(&dp_groups);
struct hmapx single_dp_lflows = HMAPX_INITIALIZER(&single_dp_lflows);