@@ -59,6 +59,7 @@
#include "unixctl.h"
#include "util.h"
#include "uuid.h"
+#include "ovs-thread.h"
#include "openvswitch/vlog.h"
VLOG_DEFINE_THIS_MODULE(ovn_northd);
@@ -4369,7 +4370,26 @@ ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
static bool use_logical_dp_groups = false;
static bool use_parallel_build = true;
-static struct hashrow_locks lflow_locks;
+static struct ovs_rwlock flowtable_lock;
+
+static void ovn_make_multi_lflow(struct ovn_lflow *old_lflow,
+ struct ovn_datapath *od,
+ struct lflow_state *lflow_map,
+ uint32_t hash)
+{
+ hmapx_add(&old_lflow->od_group, od);
+ hmap_remove(&lflow_map->single_od, &old_lflow->hmap_node);
+ if (use_parallel_build) {
+ hmap_insert_fast(&lflow_map->multiple_od, &old_lflow->hmap_node, hash);
+ } else {
+ hmap_insert(&lflow_map->multiple_od, &old_lflow->hmap_node, hash);
+ }
+}
+
+#ifdef __clang__
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wthread-safety"
+#endif
/* Adds a row with the specified contents to the Logical_Flow table.
* Version to use when locking is required.
@@ -4385,57 +4405,133 @@ do_ovn_lflow_add(struct lflow_state *lflow_map, struct ovn_datapath *od,
struct ovn_lflow *old_lflow;
struct ovn_lflow *lflow;
+ /* Fast Path.
+ * See if we can get away without writing - grab a rdlock and check
+ * if we can get away with as little work as possible.
+ */
+
if (use_logical_dp_groups) {
- old_lflow = do_ovn_lflow_find(&lflow_map->single_od, NULL, stage,
- priority, match,
+ if (use_parallel_build) {
+ ovs_rwlock_rdlock(&flowtable_lock);
+ }
+ old_lflow = do_ovn_lflow_find(&lflow_map->single_od,
+ NULL, stage, priority, match,
actions, ctrl_meter, hash);
if (old_lflow) {
- hmapx_add(&old_lflow->od_group, od);
- /* Found, different, od count went up. Move to multiple od. */
- if (hmapx_count(&old_lflow->od_group) > 1) {
- hmap_remove(&lflow_map->single_od, &old_lflow->hmap_node);
+ if (!hmapx_contains(&old_lflow->od_group, od)) {
+ /* od not in od_group, we need to add it and move to
+ * multiple. */
if (use_parallel_build) {
- hmap_insert_fast(&lflow_map->multiple_od,
- &old_lflow->hmap_node, hash);
- } else {
- hmap_insert(&lflow_map->multiple_od,
- &old_lflow->hmap_node, hash);
+ /* Upgrade the lock to write, we are likely to
+ * modify data. */
+ ovs_rwlock_unlock(&flowtable_lock);
+ ovs_rwlock_wrlock(&flowtable_lock);
+
+ /* Check if someone got ahead of us and the flow is already
+ * in multiple. */
+ if (!hmap_contains(&lflow_map->single_od,
+ &old_lflow->hmap_node)) {
+ /* Someone did get ahead of us, add the od to the
+ * group. */
+ hmapx_add(&old_lflow->od_group, od);
+ goto done_update_unlock;
+ }
}
+ ovn_make_multi_lflow(old_lflow, od, lflow_map, hash);
+ goto done_update_unlock;
}
- } else {
- /* Not found, lookup in multiple od. */
+ }
+ if (!old_lflow) {
+ /* Not found in single, lookup in multiple od. */
old_lflow = do_ovn_lflow_find(&lflow_map->multiple_od, NULL,
stage, priority, match,
actions, ctrl_meter, hash);
if (old_lflow) {
- hmapx_add(&old_lflow->od_group, od);
+ if (!hmapx_contains(&old_lflow->od_group, od)) {
+ if (use_parallel_build) {
+ /* Upgrade lock to write.*/
+ ovs_rwlock_unlock(&flowtable_lock);
+ ovs_rwlock_wrlock(&flowtable_lock);
+ }
+ hmapx_add(&old_lflow->od_group, od);
+ }
}
}
+done_update_unlock:
+ if (use_parallel_build) {
+ ovs_rwlock_unlock(&flowtable_lock);
+ }
if (old_lflow) {
return;
}
}
- lflow = xmalloc(sizeof *lflow);
- /* While adding new logical flows we're not setting single datapath, but
- * collecting a group. 'od' will be updated later for all flows with only
- * one datapath in a group, so it could be hashed correctly. */
- ovn_lflow_init(lflow, NULL, stage, priority,
- xstrdup(match), xstrdup(actions),
- io_port ? xstrdup(io_port) : NULL,
- nullable_xstrdup(ctrl_meter),
- ovn_lflow_hint(stage_hint), where);
- hmapx_add(&lflow->od_group, od);
-
- /* Insert "fresh" lflows into single_od. */
+ /* Slow Path.
+ * We could not get away with minimal mostly ro amount of work,
+ * lock with rw and try to do an insert (may end up repeating
+ * some of what we do for ro). */
+ if (use_logical_dp_groups && use_parallel_build) {
+ ovs_rwlock_wrlock(&flowtable_lock);
+ }
if (!use_parallel_build) {
+ lflow = xmalloc(sizeof *lflow);
+ /* While adding new logical flows we are not setting single datapath,
+ * but collecting a group. 'od' will be updated later for all flows
+ * with only one datapath in a group, so it could be hashed correctly.
+ */
+ ovn_lflow_init(lflow, NULL, stage, priority,
+ xstrdup(match), xstrdup(actions),
+ io_port ? xstrdup(io_port) : NULL,
+ nullable_xstrdup(ctrl_meter),
+ ovn_lflow_hint(stage_hint), where);
+ hmapx_add(&lflow->od_group, od);
hmap_insert(&lflow_map->single_od, &lflow->hmap_node, hash);
} else {
+ if (use_logical_dp_groups) {
+ /* Search again in case someone else got here first. */
+ old_lflow = do_ovn_lflow_find(&lflow_map->single_od,
+ NULL, stage, priority, match,
+ actions, ctrl_meter, hash);
+ if (old_lflow) {
+ if (!hmapx_contains(&old_lflow->od_group, od)) {
+ ovn_make_multi_lflow(old_lflow, od, lflow_map, hash);
+ }
+ goto done_add_unlock;
+ }
+ /* Unlikely, but possible, more than one thread got here
+ * ahead of us while we were wating to acquire a write lock. */
+ old_lflow = do_ovn_lflow_find(&lflow_map->multiple_od, NULL,
+ stage, priority, match,
+ actions, ctrl_meter, hash);
+ if (old_lflow) {
+ hmapx_add(&old_lflow->od_group, od);
+ goto done_add_unlock;
+ }
+ }
+ lflow = xmalloc(sizeof *lflow);
+ /* While adding new logical flows we're not setting single datapath,
+ * but collecting a group. 'od' will be updated later for all
+ * flows with only * one datapath in a group, so it could be hashed
+ * correctly. */
+ ovn_lflow_init(lflow, NULL, stage, priority,
+ xstrdup(match), xstrdup(actions),
+ io_port ? xstrdup(io_port) : NULL,
+ nullable_xstrdup(ctrl_meter),
+ ovn_lflow_hint(stage_hint), where);
+ hmapx_add(&lflow->od_group, od);
hmap_insert_fast(&lflow_map->single_od, &lflow->hmap_node, hash);
}
+done_add_unlock:
+ if (use_logical_dp_groups && use_parallel_build) {
+ ovs_rwlock_unlock(&flowtable_lock);
+ }
}
+#ifdef __clang__
+#pragma clang diagnostic pop
+#endif
+
/* Adds a row with the specified contents to the Logical_Flow table. */
static void
ovn_lflow_add_at(struct lflow_state *lflow_map, struct ovn_datapath *od,
@@ -4453,15 +4549,8 @@ ovn_lflow_add_at(struct lflow_state *lflow_map, struct ovn_datapath *od,
priority, match,
actions);
- if (use_logical_dp_groups && use_parallel_build) {
- lock_hash_row(&lflow_locks, hash);
- do_ovn_lflow_add(lflow_map, od, hash, stage, priority, match,
- actions, io_port, stage_hint, where, ctrl_meter);
- unlock_hash_row(&lflow_locks, hash);
- } else {
- do_ovn_lflow_add(lflow_map, od, hash, stage, priority, match,
- actions, io_port, stage_hint, where, ctrl_meter);
- }
+ do_ovn_lflow_add(lflow_map, od, hash, stage, priority, match,
+ actions, io_port, stage_hint, where, ctrl_meter);
}
/* Adds a row with the specified contents to the Logical_Flow table. */
@@ -4555,15 +4644,9 @@ hmap_safe_remove(struct hmap *hmap, struct hmap_node *node)
static void
remove_lflow_from_lflows(struct lflow_state *lflows, struct ovn_lflow *lflow)
{
- if (use_logical_dp_groups && use_parallel_build) {
- lock_hash_row(&lflow_locks, lflow->hmap_node.hash);
- }
if (!hmap_safe_remove(&lflows->multiple_od, &lflow->hmap_node)) {
hmap_remove(&lflows->single_od, &lflow->hmap_node);
}
- if (use_logical_dp_groups && use_parallel_build) {
- unlock_hash_row(&lflow_locks, lflow->hmap_node.hash);
- }
}
static void
@@ -12967,7 +13050,7 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
}
}
- if (use_parallel_build && (!use_logical_dp_groups)) {
+ if (use_parallel_build) {
struct lflow_state *lflow_segs;
struct lswitch_flow_build_info *lsiv;
int index;
@@ -13197,6 +13280,8 @@ reconcile_lflow(struct ovn_lflow *lflow, struct northd_context *ctx,
ovn_lflow_destroy(lflows, lflow);
}
+static bool needs_init = true;
+
/* Updates the Logical_Flow and Multicast_Group tables in the OVN_SB database,
* constructing their contents based on the OVN_NB database. */
static void
@@ -13211,8 +13296,15 @@ build_lflows(struct northd_context *ctx, struct hmap *datapaths,
fast_hmap_size_for(&lflows.single_od, max_seen_lflow_size);
fast_hmap_size_for(&lflows.multiple_od, max_seen_lflow_size);
- if (use_parallel_build && use_logical_dp_groups) {
- update_hashrow_locks(&lflows.single_od, &lflow_locks);
+ if (use_parallel_build && use_logical_dp_groups && needs_init) {
+ ovs_rwlock_init(&flowtable_lock);
+ /* This happens on first run with parallel+dp_groups.
+ * db_run will re-read use_parallel_build from config and
+ * reset it. This way we get correct sizing for
+ * parallel + dp_groups by doing one single threaded run
+ * on the first iteration. */
+ use_parallel_build = false;
+ needs_init = false;
}
@@ -15139,7 +15231,6 @@ main(int argc, char *argv[])
daemonize_complete();
- init_hash_row_locks(&lflow_locks);
use_parallel_build = can_parallelize_hashes(false);
/* We want to detect (almost) all changes to the ovn-nb db. */