Message ID | 20210330091647.24470-2-anton.ivanov@cambridgegreys.com |
---|---|
State | Changes Requested |
Headers | show |
Series | [ovs-dev,v16,1/3] ovn-libs: Add support for parallel processing | expand |
On Tue, Mar 30, 2021 at 2:47 PM <anton.ivanov@cambridgegreys.com> wrote: > > From: Anton Ivanov <anton.ivanov@cambridgegreys.com> > > Datapaths, ports, igmp groups and load balancers can now > be iterated over in parallel in order to speed up the lflow > generation. This decreases the time needed to generate the > logical flows by a factor of 4+ on a 6 core/12 thread CPU > without datapath groups - from 0.8-1 microseconds per flow > down to 0.2-0.3 microseconds per flow on average. > > The decrease in time to compute lflows with datapath groups > enabled is ~2 times for the same hardware - from an average of > 2.4 microseconds per flow to 1.2 microseconds per flow. > > Tested for on an 8 node, 400 pod K8 simulation resulting > in > 6K flows. > > Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com> Hi Anton, I tested on my setup applying the first 2 patches of this series. I don't see any crashes now. And all the tests pass. Great ! However, the compilation is failing with clang *** ../northd/ovn-northd.c:7336:25: error: incompatible integer to pointer conversion passing 'uint64_t' (aka 'unsigned long') to parameter of type 'uint64_t *' (aka 'unsigned long *') [-Werror,-Wint-conversion] (uint64_t) &mcast_sw_info->table_size, ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ /home/nusiddiq/workspace_cpp/ovn-org/ovn-for-reviews/ovn/ovs/lib/ovs-atomic-clang.h:57:50: note: expanded from macro 'atomic_compare_exchange_strong' atomic_compare_exchange_strong_explicit(DST, EXP, SRC, \ ^~~ /home/nusiddiq/workspace_cpp/ovn-org/ovn-for-reviews/ovn/ovs/lib/ovs-atomic-clang.h:61:47: note: expanded from macro 'atomic_compare_exchange_strong_explicit' __c11_atomic_compare_exchange_strong(DST, EXP, SRC, ORD1, ORD2) ^~~ ../northd/ovn-northd.c:7352:25: error: passing 'int64_t *' (aka 'long *') to parameter of type 'uint64_t *' (aka 'unsigned long *') converts between pointers to integer types with different sign [-Werror,-Wpointer-sign] &mcast_sw_info->table_size, ^~~~~~~~~~~~~~~~~~~~~~~~~~ /home/nusiddiq/workspace_cpp/ovn-org/ovn-for-reviews/ovn/ovs/lib/ovs-atomic-clang.h:57:50: note: expanded from macro 'atomic_compare_exchange_strong' atomic_compare_exchange_strong_explicit(DST, EXP, SRC, \ ^~~ /home/nusiddiq/workspace_cpp/ovn-org/ovn-for-reviews/ovn/ovs/lib/ovs-atomic-clang.h:61:47: note: expanded from macro 'atomic_compare_exchange_strong_explicit' __c11_atomic_compare_exchange_strong(DST, EXP, SRC, ORD1, ORD2) ****** I fixed it manually by casting to (uint64_t *) and all the tests passed for me. Thanks Numan > --- > northd/ovn-northd.c | 363 ++++++++++++++++++++++++++++++++++++-------- > 1 file changed, 301 insertions(+), 62 deletions(-) > > diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c > index 57df62b92..eb5cbf832 100644 > --- a/northd/ovn-northd.c > +++ b/northd/ovn-northd.c > @@ -39,6 +39,7 @@ > #include "lib/ovn-util.h" > #include "lib/lb.h" > #include "memory.h" > +#include "lib/ovn-parallel-hmap.h" > #include "ovn/actions.h" > #include "ovn/features.h" > #include "ovn/logical-fields.h" > @@ -539,10 +540,10 @@ struct mcast_switch_info { > * be received for queries that were sent out. > */ > > - uint32_t active_v4_flows; /* Current number of active IPv4 multicast > + atomic_uint64_t active_v4_flows; /* Current number of active IPv4 multicast > * flows. > */ > - uint32_t active_v6_flows; /* Current number of active IPv6 multicast > + atomic_uint64_t active_v6_flows; /* Current number of active IPv6 multicast > * flows. > */ > }; > @@ -1001,8 +1002,8 @@ init_mcast_info_for_switch_datapath(struct ovn_datapath *od) > smap_get_ullong(&od->nbs->other_config, "mcast_query_max_response", > OVN_MCAST_DEFAULT_QUERY_MAX_RESPONSE_S); > > - mcast_sw_info->active_v4_flows = 0; > - mcast_sw_info->active_v6_flows = 0; > + mcast_sw_info->active_v4_flows = ATOMIC_VAR_INIT(0); > + mcast_sw_info->active_v6_flows = ATOMIC_VAR_INIT(0); > } > > static void > @@ -4067,6 +4068,34 @@ ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od, > /* If this option is 'true' northd will combine logical flows that differ by > * logical datapath only by creating a datapath group. */ > static bool use_logical_dp_groups = false; > +static bool use_parallel_build = true; > + > +static struct hashrow_locks lflow_locks; > + > +/* Adds a row with the specified contents to the Logical_Flow table. > + * Version to use when locking is required. > + */ > +static void > +do_ovn_lflow_add(struct hmap *lflow_map, bool shared, > + struct ovn_datapath *od, > + uint32_t hash, struct ovn_lflow *lflow) > +{ > + > + struct ovn_lflow *old_lflow; > + > + if (shared && use_logical_dp_groups) { > + old_lflow = ovn_lflow_find_by_lflow(lflow_map, lflow, hash); > + if (old_lflow) { > + ovn_lflow_destroy(NULL, lflow); > + hmapx_add(&old_lflow->od_group, od); > + return; > + } > + } > + > + hmapx_add(&lflow->od_group, od); > + hmap_insert_fast(lflow_map, &lflow->hmap_node, hash); > +} > + > > /* Adds a row with the specified contents to the Logical_Flow table. */ > static void > @@ -4077,7 +4106,7 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct ovn_datapath *od, > { > ovs_assert(ovn_stage_to_datapath_type(stage) == ovn_datapath_get_type(od)); > > - struct ovn_lflow *old_lflow, *lflow; > + struct ovn_lflow *lflow; > uint32_t hash; > > lflow = xmalloc(sizeof *lflow); > @@ -4089,17 +4118,14 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct ovn_datapath *od, > ovn_lflow_hint(stage_hint), where); > > hash = ovn_lflow_hash(lflow); > - if (shared && use_logical_dp_groups) { > - old_lflow = ovn_lflow_find_by_lflow(lflow_map, lflow, hash); > - if (old_lflow) { > - ovn_lflow_destroy(NULL, lflow); > - hmapx_add(&old_lflow->od_group, od); > - return; > - } > - } > > - hmapx_add(&lflow->od_group, od); > - hmap_insert(lflow_map, &lflow->hmap_node, hash); > + if (use_logical_dp_groups && use_parallel_build) { > + lock_hash_row(&lflow_locks, hash); > + do_ovn_lflow_add(lflow_map, shared, od, hash, lflow); > + unlock_hash_row(&lflow_locks, hash); > + } else { > + do_ovn_lflow_add(lflow_map, shared, od, hash, lflow); > + } > } > > /* Adds a row with the specified contents to the Logical_Flow table. */ > @@ -7285,6 +7311,8 @@ build_lswitch_ip_mcast_igmp_mld(struct ovn_igmp_group *igmp_group, > struct ds *actions, > struct ds *match) > { > + uint64_t dummy; > + > if (igmp_group->datapath) { > > ds_clear(match); > @@ -7303,10 +7331,13 @@ build_lswitch_ip_mcast_igmp_mld(struct ovn_igmp_group *igmp_group, > return; > } > > - if (mcast_sw_info->active_v4_flows >= mcast_sw_info->table_size) { > + if (atomic_compare_exchange_strong( > + &mcast_sw_info->active_v4_flows, > + &mcast_sw_info->table_size, > + mcast_sw_info->table_size)) { > return; > } > - mcast_sw_info->active_v4_flows++; > + atomic_add(&mcast_sw_info->active_v4_flows, 1, &dummy); > ds_put_format(match, "eth.mcast && ip4 && ip4.dst == %s ", > igmp_group->mcgroup.name); > } else { > @@ -7316,10 +7347,13 @@ build_lswitch_ip_mcast_igmp_mld(struct ovn_igmp_group *igmp_group, > if (ipv6_is_all_hosts(&igmp_group->address)) { > return; > } > - if (mcast_sw_info->active_v6_flows >= mcast_sw_info->table_size) { > + if (atomic_compare_exchange_strong( > + &mcast_sw_info->active_v6_flows, > + &mcast_sw_info->table_size, > + mcast_sw_info->table_size)) { > return; > } > - mcast_sw_info->active_v6_flows++; > + atomic_add(&mcast_sw_info->active_v6_flows, 1, &dummy); > ds_put_format(match, "eth.mcast && ip6 && ip6.dst == %s ", > igmp_group->mcgroup.name); > } > @@ -7347,6 +7381,8 @@ build_lswitch_ip_mcast_igmp_mld(struct ovn_igmp_group *igmp_group, > } > } > > +static struct ovs_mutex mcgroup_mutex = OVS_MUTEX_INITIALIZER; > + > /* Ingress table 19: Destination lookup, unicast handling (priority 50), */ > static void > build_lswitch_ip_unicast_lookup(struct ovn_port *op, > @@ -7385,7 +7421,9 @@ build_lswitch_ip_unicast_lookup(struct ovn_port *op, > &op->nbsp->header_); > } else if (!strcmp(op->nbsp->addresses[i], "unknown")) { > if (lsp_is_enabled(op->nbsp)) { > + ovs_mutex_lock(&mcgroup_mutex); > ovn_multicast_add(mcgroups, &mc_unknown, op); > + ovs_mutex_unlock(&mcgroup_mutex); > op->od->has_unknown = true; > } > } else if (is_dynamic_lsp_address(op->nbsp->addresses[i])) { > @@ -7947,6 +7985,8 @@ route_hash(struct parsed_route *route) > (uint32_t)route->plen); > } > > +static struct ovs_mutex bfd_lock = OVS_MUTEX_INITIALIZER; > + > /* Parse and validate the route. Return the parsed route if successful. > * Otherwise return NULL. */ > static struct parsed_route * > @@ -7999,6 +8039,7 @@ parsed_routes_add(struct ovs_list *routes, > > bfd_e = bfd_port_lookup(bfd_connections, nb_bt->logical_port, > nb_bt->dst_ip); > + ovs_mutex_lock(&bfd_lock); > if (bfd_e) { > bfd_e->ref = true; > } > @@ -8008,8 +8049,10 @@ parsed_routes_add(struct ovs_list *routes, > } > > if (!strcmp(nb_bt->status, "down")) { > + ovs_mutex_unlock(&bfd_lock); > return NULL; > } > + ovs_mutex_unlock(&bfd_lock); > } > > struct parsed_route *pr = xzalloc(sizeof *pr); > @@ -11770,7 +11813,9 @@ build_lswitch_and_lrouter_iterate_by_od(struct ovn_datapath *od, > build_lswitch_arp_nd_responder_default(od, lsi->lflows); > build_lswitch_dns_lookup_and_response(od, lsi->lflows); > build_lswitch_dhcp_and_dns_defaults(od, lsi->lflows); > + > build_lswitch_destination_lookup_bmcast(od, lsi->lflows, &lsi->actions); > + > build_lswitch_output_port_sec_od(od, lsi->lflows); > > /* Build Logical Router Flows. */ > @@ -11799,6 +11844,7 @@ build_lswitch_and_lrouter_iterate_by_od(struct ovn_datapath *od, > /* Helper function to combine all lflow generation which is iterated by port. > */ > > + > static void > build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op, > struct lswitch_flow_build_info *lsi) > @@ -11814,7 +11860,7 @@ build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op, > lsi->ports, > &lsi->actions, > &lsi->match); > - build_lswitch_dhcp_options_and_response(op,lsi->lflows); > + build_lswitch_dhcp_options_and_response(op, lsi->lflows); > build_lswitch_external_port(op, lsi->lflows); > build_lswitch_ip_unicast_lookup(op, lsi->lflows, lsi->mcgroups, > &lsi->actions, &lsi->match); > @@ -11842,6 +11888,124 @@ build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op, > &lsi->actions); > } > > +struct lflows_thread_pool { > + struct worker_pool *pool; > +}; > + > + > +static void *build_lflows_thread(void *arg) > +{ > + struct worker_control *control = (struct worker_control *) arg; > + struct lflows_thread_pool *workload; > + struct lswitch_flow_build_info *lsi; > + > + struct ovn_datapath *od; > + struct ovn_port *op; > + struct ovn_northd_lb *lb; > + struct ovn_igmp_group *igmp_group; > + int bnum; > + > + while (!stop_parallel_processing()) { > + wait_for_work(control); > + workload = (struct lflows_thread_pool *) control->workload; > + lsi = (struct lswitch_flow_build_info *) control->data; > + if (stop_parallel_processing()) { > + return NULL; > + } > + if (lsi && workload) { > + /* Iterate over bucket ThreadID, ThreadID+size, ... */ > + for (bnum = control->id; > + bnum <= lsi->datapaths->mask; > + bnum += workload->pool->size) > + { > + HMAP_FOR_EACH_IN_PARALLEL (od, key_node, bnum, lsi->datapaths) { > + if (stop_parallel_processing()) { > + return NULL; > + } > + build_lswitch_and_lrouter_iterate_by_od(od, lsi); > + } > + } > + for (bnum = control->id; > + bnum <= lsi->ports->mask; > + bnum += workload->pool->size) > + { > + HMAP_FOR_EACH_IN_PARALLEL (op, key_node, bnum, lsi->ports) { > + if (stop_parallel_processing()) { > + return NULL; > + } > + build_lswitch_and_lrouter_iterate_by_op(op, lsi); > + } > + } > + for (bnum = control->id; > + bnum <= lsi->lbs->mask; > + bnum += workload->pool->size) > + { > + HMAP_FOR_EACH_IN_PARALLEL (lb, hmap_node, bnum, lsi->lbs) { > + if (stop_parallel_processing()) { > + return NULL; > + } > + build_lswitch_arp_nd_service_monitor(lb, lsi->lflows, > + &lsi->match, > + &lsi->actions); > + } > + } > + for (bnum = control->id; > + bnum <= lsi->igmp_groups->mask; > + bnum += workload->pool->size) > + { > + HMAP_FOR_EACH_IN_PARALLEL ( > + igmp_group, hmap_node, bnum, lsi->igmp_groups) { > + if (stop_parallel_processing()) { > + return NULL; > + } > + build_lswitch_ip_mcast_igmp_mld(igmp_group, lsi->lflows, > + &lsi->match, > + &lsi->actions); > + } > + } > + } > + post_completed_work(control); > + } > + return NULL; > +} > + > +static bool pool_init_done = false; > +static struct lflows_thread_pool *build_lflows_pool = NULL; > + > +static void init_lflows_thread_pool(void) > +{ > + int index; > + > + if (!pool_init_done) { > + struct worker_pool *pool = add_worker_pool(build_lflows_thread); > + pool_init_done = true; > + if (pool) { > + build_lflows_pool = xmalloc(sizeof(*build_lflows_pool)); > + build_lflows_pool->pool = pool; > + for (index = 0; index < build_lflows_pool->pool->size; index++) { > + build_lflows_pool->pool->controls[index].workload = > + build_lflows_pool; > + } > + } > + } > +} > + > +/* TODO: replace hard cutoffs by configurable via commands. These are > + * temporary defines to determine single-thread to multi-thread processing > + * cutoff. > + * Setting to 1 forces "all parallel" lflow build. > + */ > + > +static void > +noop_callback(struct worker_pool *pool OVS_UNUSED, > + void *fin_result OVS_UNUSED, > + void *result_frags OVS_UNUSED, > + int index OVS_UNUSED) > +{ > + /* Do nothing */ > +} > + > + > static void > build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports, > struct hmap *port_groups, struct hmap *lflows, > @@ -11850,53 +12014,114 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports, > struct shash *meter_groups, struct hmap *lbs, > struct hmap *bfd_connections) > { > - struct ovn_datapath *od; > - struct ovn_port *op; > - struct ovn_northd_lb *lb; > - struct ovn_igmp_group *igmp_group; > > char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac); > > - struct lswitch_flow_build_info lsi = { > - .datapaths = datapaths, > - .ports = ports, > - .port_groups = port_groups, > - .lflows = lflows, > - .mcgroups = mcgroups, > - .igmp_groups = igmp_groups, > - .meter_groups = meter_groups, > - .lbs = lbs, > - .bfd_connections = bfd_connections, > - .svc_check_match = svc_check_match, > - .match = DS_EMPTY_INITIALIZER, > - .actions = DS_EMPTY_INITIALIZER, > - }; > - > - /* Combined build - all lflow generation from lswitch and lrouter > - * will move here and will be reogranized by iterator type. > - */ > - HMAP_FOR_EACH (od, key_node, datapaths) { > - build_lswitch_and_lrouter_iterate_by_od(od, &lsi); > - } > - HMAP_FOR_EACH (op, key_node, ports) { > - build_lswitch_and_lrouter_iterate_by_op(op, &lsi); > - } > - HMAP_FOR_EACH (lb, hmap_node, lbs) { > - build_lswitch_arp_nd_service_monitor(lb, lsi.lflows, > - &lsi.actions, > - &lsi.match); > - } > - HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) { > - build_lswitch_ip_mcast_igmp_mld(igmp_group, > - lsi.lflows, > - &lsi.actions, > - &lsi.match); > + if (use_parallel_build) { > + init_lflows_thread_pool(); > + if (!can_parallelize_hashes(false)) { > + use_parallel_build = false; > + } > } > - free(svc_check_match); > > - ds_destroy(&lsi.match); > - ds_destroy(&lsi.actions); > + if (use_parallel_build) { > + struct hmap *lflow_segs; > + struct lswitch_flow_build_info *lsiv; > + int index; > + > + lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->pool->size); > + if (use_logical_dp_groups) { > + lflow_segs = NULL; > + } else { > + lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->pool->size); > + } > + > + /* Set up "work chunks" for each thread to work on. */ > + > + for (index = 0; index < build_lflows_pool->pool->size; index++) { > + if (use_logical_dp_groups) { > + /* if dp_groups are in use we lock a shared lflows hash > + * on a per-bucket level instead of merging hash frags */ > + lsiv[index].lflows = lflows; > + } else { > + fast_hmap_init(&lflow_segs[index], lflows->mask); > + lsiv[index].lflows = &lflow_segs[index]; > + } > > + lsiv[index].datapaths = datapaths; > + lsiv[index].ports = ports; > + lsiv[index].port_groups = port_groups; > + lsiv[index].mcgroups = mcgroups; > + lsiv[index].igmp_groups = igmp_groups; > + lsiv[index].meter_groups = meter_groups; > + lsiv[index].lbs = lbs; > + lsiv[index].bfd_connections = bfd_connections; > + lsiv[index].svc_check_match = svc_check_match; > + ds_init(&lsiv[index].match); > + ds_init(&lsiv[index].actions); > + > + build_lflows_pool->pool->controls[index].data = &lsiv[index]; > + } > + > + /* Run thread pool. */ > + if (use_logical_dp_groups) { > + run_pool_callback(build_lflows_pool->pool, NULL, NULL, noop_callback); > + } else { > + run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs); > + } > + > + for (index = 0; index < build_lflows_pool->pool->size; index++) { > + ds_destroy(&lsiv[index].match); > + ds_destroy(&lsiv[index].actions); > + } > + free(lflow_segs); > + free(lsiv); > + } else { > + struct ovn_datapath *od; > + struct ovn_port *op; > + struct ovn_northd_lb *lb; > + struct ovn_igmp_group *igmp_group; > + struct lswitch_flow_build_info lsi = { > + .datapaths = datapaths, > + .ports = ports, > + .port_groups = port_groups, > + .lflows = lflows, > + .mcgroups = mcgroups, > + .igmp_groups = igmp_groups, > + .meter_groups = meter_groups, > + .lbs = lbs, > + .bfd_connections = bfd_connections, > + .svc_check_match = svc_check_match, > + .match = DS_EMPTY_INITIALIZER, > + .actions = DS_EMPTY_INITIALIZER, > + }; > + > + /* Combined build - all lflow generation from lswitch and lrouter > + * will move here and will be reogranized by iterator type. > + */ > + HMAP_FOR_EACH (od, key_node, datapaths) { > + build_lswitch_and_lrouter_iterate_by_od(od, &lsi); > + } > + HMAP_FOR_EACH (op, key_node, ports) { > + build_lswitch_and_lrouter_iterate_by_op(op, &lsi); > + } > + HMAP_FOR_EACH (lb, hmap_node, lbs) { > + build_lswitch_arp_nd_service_monitor(lb, lsi.lflows, > + &lsi.actions, > + &lsi.match); > + } > + HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) { > + build_lswitch_ip_mcast_igmp_mld(igmp_group, > + lsi.lflows, > + &lsi.actions, > + &lsi.match); > + } > + > + ds_destroy(&lsi.match); > + ds_destroy(&lsi.actions); > + } > + > + free(svc_check_match); > build_lswitch_flows(datapaths, lflows); > } > > @@ -11967,6 +12192,8 @@ ovn_sb_set_lflow_logical_dp_group( > sbrec_logical_flow_set_logical_dp_group(sbflow, dpg->dp_group); > } > > +static ssize_t max_seen_lflow_size = 128; > + > /* Updates the Logical_Flow and Multicast_Group tables in the OVN_SB database, > * constructing their contents based on the OVN_NB database. */ > static void > @@ -11976,13 +12203,21 @@ build_lflows(struct northd_context *ctx, struct hmap *datapaths, > struct shash *meter_groups, > struct hmap *lbs, struct hmap *bfd_connections) > { > - struct hmap lflows = HMAP_INITIALIZER(&lflows); > + struct hmap lflows; > > + fast_hmap_size_for(&lflows, max_seen_lflow_size); > + if (use_parallel_build) { > + update_hashrow_locks(&lflows, &lflow_locks); > + } > build_lswitch_and_lrouter_flows(datapaths, ports, > port_groups, &lflows, mcgroups, > igmp_groups, meter_groups, lbs, > bfd_connections); > > + if (hmap_count(&lflows) > max_seen_lflow_size) { > + max_seen_lflow_size = hmap_count(&lflows); > + } > + > /* Collecting all unique datapath groups. */ > struct hmap dp_groups = HMAP_INITIALIZER(&dp_groups); > struct hmapx single_dp_lflows = HMAPX_INITIALIZER(&single_dp_lflows); > @@ -13784,6 +14019,9 @@ main(int argc, char *argv[]) > > daemonize_complete(); > > + init_hash_row_locks(&lflow_locks); > + use_parallel_build = can_parallelize_hashes(false); > + > /* We want to detect (almost) all changes to the ovn-nb db. */ > struct ovsdb_idl_loop ovnnb_idl_loop = OVSDB_IDL_LOOP_INITIALIZER( > ovsdb_idl_create(ovnnb_db, &nbrec_idl_class, true, true)); > @@ -14052,6 +14290,7 @@ main(int argc, char *argv[]) > exiting = false; > state.had_lock = false; > state.paused = false; > + > while (!exiting) { > memory_run(); > if (memory_should_report()) { > -- > 2.20.1 > > _______________________________________________ > dev mailing list > dev@openvswitch.org > https://mail.openvswitch.org/mailman/listinfo/ovs-dev >
diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c index 57df62b92..eb5cbf832 100644 --- a/northd/ovn-northd.c +++ b/northd/ovn-northd.c @@ -39,6 +39,7 @@ #include "lib/ovn-util.h" #include "lib/lb.h" #include "memory.h" +#include "lib/ovn-parallel-hmap.h" #include "ovn/actions.h" #include "ovn/features.h" #include "ovn/logical-fields.h" @@ -539,10 +540,10 @@ struct mcast_switch_info { * be received for queries that were sent out. */ - uint32_t active_v4_flows; /* Current number of active IPv4 multicast + atomic_uint64_t active_v4_flows; /* Current number of active IPv4 multicast * flows. */ - uint32_t active_v6_flows; /* Current number of active IPv6 multicast + atomic_uint64_t active_v6_flows; /* Current number of active IPv6 multicast * flows. */ }; @@ -1001,8 +1002,8 @@ init_mcast_info_for_switch_datapath(struct ovn_datapath *od) smap_get_ullong(&od->nbs->other_config, "mcast_query_max_response", OVN_MCAST_DEFAULT_QUERY_MAX_RESPONSE_S); - mcast_sw_info->active_v4_flows = 0; - mcast_sw_info->active_v6_flows = 0; + mcast_sw_info->active_v4_flows = ATOMIC_VAR_INIT(0); + mcast_sw_info->active_v6_flows = ATOMIC_VAR_INIT(0); } static void @@ -4067,6 +4068,34 @@ ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od, /* If this option is 'true' northd will combine logical flows that differ by * logical datapath only by creating a datapath group. */ static bool use_logical_dp_groups = false; +static bool use_parallel_build = true; + +static struct hashrow_locks lflow_locks; + +/* Adds a row with the specified contents to the Logical_Flow table. + * Version to use when locking is required. + */ +static void +do_ovn_lflow_add(struct hmap *lflow_map, bool shared, + struct ovn_datapath *od, + uint32_t hash, struct ovn_lflow *lflow) +{ + + struct ovn_lflow *old_lflow; + + if (shared && use_logical_dp_groups) { + old_lflow = ovn_lflow_find_by_lflow(lflow_map, lflow, hash); + if (old_lflow) { + ovn_lflow_destroy(NULL, lflow); + hmapx_add(&old_lflow->od_group, od); + return; + } + } + + hmapx_add(&lflow->od_group, od); + hmap_insert_fast(lflow_map, &lflow->hmap_node, hash); +} + /* Adds a row with the specified contents to the Logical_Flow table. */ static void @@ -4077,7 +4106,7 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct ovn_datapath *od, { ovs_assert(ovn_stage_to_datapath_type(stage) == ovn_datapath_get_type(od)); - struct ovn_lflow *old_lflow, *lflow; + struct ovn_lflow *lflow; uint32_t hash; lflow = xmalloc(sizeof *lflow); @@ -4089,17 +4118,14 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct ovn_datapath *od, ovn_lflow_hint(stage_hint), where); hash = ovn_lflow_hash(lflow); - if (shared && use_logical_dp_groups) { - old_lflow = ovn_lflow_find_by_lflow(lflow_map, lflow, hash); - if (old_lflow) { - ovn_lflow_destroy(NULL, lflow); - hmapx_add(&old_lflow->od_group, od); - return; - } - } - hmapx_add(&lflow->od_group, od); - hmap_insert(lflow_map, &lflow->hmap_node, hash); + if (use_logical_dp_groups && use_parallel_build) { + lock_hash_row(&lflow_locks, hash); + do_ovn_lflow_add(lflow_map, shared, od, hash, lflow); + unlock_hash_row(&lflow_locks, hash); + } else { + do_ovn_lflow_add(lflow_map, shared, od, hash, lflow); + } } /* Adds a row with the specified contents to the Logical_Flow table. */ @@ -7285,6 +7311,8 @@ build_lswitch_ip_mcast_igmp_mld(struct ovn_igmp_group *igmp_group, struct ds *actions, struct ds *match) { + uint64_t dummy; + if (igmp_group->datapath) { ds_clear(match); @@ -7303,10 +7331,13 @@ build_lswitch_ip_mcast_igmp_mld(struct ovn_igmp_group *igmp_group, return; } - if (mcast_sw_info->active_v4_flows >= mcast_sw_info->table_size) { + if (atomic_compare_exchange_strong( + &mcast_sw_info->active_v4_flows, + &mcast_sw_info->table_size, + mcast_sw_info->table_size)) { return; } - mcast_sw_info->active_v4_flows++; + atomic_add(&mcast_sw_info->active_v4_flows, 1, &dummy); ds_put_format(match, "eth.mcast && ip4 && ip4.dst == %s ", igmp_group->mcgroup.name); } else { @@ -7316,10 +7347,13 @@ build_lswitch_ip_mcast_igmp_mld(struct ovn_igmp_group *igmp_group, if (ipv6_is_all_hosts(&igmp_group->address)) { return; } - if (mcast_sw_info->active_v6_flows >= mcast_sw_info->table_size) { + if (atomic_compare_exchange_strong( + &mcast_sw_info->active_v6_flows, + &mcast_sw_info->table_size, + mcast_sw_info->table_size)) { return; } - mcast_sw_info->active_v6_flows++; + atomic_add(&mcast_sw_info->active_v6_flows, 1, &dummy); ds_put_format(match, "eth.mcast && ip6 && ip6.dst == %s ", igmp_group->mcgroup.name); } @@ -7347,6 +7381,8 @@ build_lswitch_ip_mcast_igmp_mld(struct ovn_igmp_group *igmp_group, } } +static struct ovs_mutex mcgroup_mutex = OVS_MUTEX_INITIALIZER; + /* Ingress table 19: Destination lookup, unicast handling (priority 50), */ static void build_lswitch_ip_unicast_lookup(struct ovn_port *op, @@ -7385,7 +7421,9 @@ build_lswitch_ip_unicast_lookup(struct ovn_port *op, &op->nbsp->header_); } else if (!strcmp(op->nbsp->addresses[i], "unknown")) { if (lsp_is_enabled(op->nbsp)) { + ovs_mutex_lock(&mcgroup_mutex); ovn_multicast_add(mcgroups, &mc_unknown, op); + ovs_mutex_unlock(&mcgroup_mutex); op->od->has_unknown = true; } } else if (is_dynamic_lsp_address(op->nbsp->addresses[i])) { @@ -7947,6 +7985,8 @@ route_hash(struct parsed_route *route) (uint32_t)route->plen); } +static struct ovs_mutex bfd_lock = OVS_MUTEX_INITIALIZER; + /* Parse and validate the route. Return the parsed route if successful. * Otherwise return NULL. */ static struct parsed_route * @@ -7999,6 +8039,7 @@ parsed_routes_add(struct ovs_list *routes, bfd_e = bfd_port_lookup(bfd_connections, nb_bt->logical_port, nb_bt->dst_ip); + ovs_mutex_lock(&bfd_lock); if (bfd_e) { bfd_e->ref = true; } @@ -8008,8 +8049,10 @@ parsed_routes_add(struct ovs_list *routes, } if (!strcmp(nb_bt->status, "down")) { + ovs_mutex_unlock(&bfd_lock); return NULL; } + ovs_mutex_unlock(&bfd_lock); } struct parsed_route *pr = xzalloc(sizeof *pr); @@ -11770,7 +11813,9 @@ build_lswitch_and_lrouter_iterate_by_od(struct ovn_datapath *od, build_lswitch_arp_nd_responder_default(od, lsi->lflows); build_lswitch_dns_lookup_and_response(od, lsi->lflows); build_lswitch_dhcp_and_dns_defaults(od, lsi->lflows); + build_lswitch_destination_lookup_bmcast(od, lsi->lflows, &lsi->actions); + build_lswitch_output_port_sec_od(od, lsi->lflows); /* Build Logical Router Flows. */ @@ -11799,6 +11844,7 @@ build_lswitch_and_lrouter_iterate_by_od(struct ovn_datapath *od, /* Helper function to combine all lflow generation which is iterated by port. */ + static void build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op, struct lswitch_flow_build_info *lsi) @@ -11814,7 +11860,7 @@ build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op, lsi->ports, &lsi->actions, &lsi->match); - build_lswitch_dhcp_options_and_response(op,lsi->lflows); + build_lswitch_dhcp_options_and_response(op, lsi->lflows); build_lswitch_external_port(op, lsi->lflows); build_lswitch_ip_unicast_lookup(op, lsi->lflows, lsi->mcgroups, &lsi->actions, &lsi->match); @@ -11842,6 +11888,124 @@ build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op, &lsi->actions); } +struct lflows_thread_pool { + struct worker_pool *pool; +}; + + +static void *build_lflows_thread(void *arg) +{ + struct worker_control *control = (struct worker_control *) arg; + struct lflows_thread_pool *workload; + struct lswitch_flow_build_info *lsi; + + struct ovn_datapath *od; + struct ovn_port *op; + struct ovn_northd_lb *lb; + struct ovn_igmp_group *igmp_group; + int bnum; + + while (!stop_parallel_processing()) { + wait_for_work(control); + workload = (struct lflows_thread_pool *) control->workload; + lsi = (struct lswitch_flow_build_info *) control->data; + if (stop_parallel_processing()) { + return NULL; + } + if (lsi && workload) { + /* Iterate over bucket ThreadID, ThreadID+size, ... */ + for (bnum = control->id; + bnum <= lsi->datapaths->mask; + bnum += workload->pool->size) + { + HMAP_FOR_EACH_IN_PARALLEL (od, key_node, bnum, lsi->datapaths) { + if (stop_parallel_processing()) { + return NULL; + } + build_lswitch_and_lrouter_iterate_by_od(od, lsi); + } + } + for (bnum = control->id; + bnum <= lsi->ports->mask; + bnum += workload->pool->size) + { + HMAP_FOR_EACH_IN_PARALLEL (op, key_node, bnum, lsi->ports) { + if (stop_parallel_processing()) { + return NULL; + } + build_lswitch_and_lrouter_iterate_by_op(op, lsi); + } + } + for (bnum = control->id; + bnum <= lsi->lbs->mask; + bnum += workload->pool->size) + { + HMAP_FOR_EACH_IN_PARALLEL (lb, hmap_node, bnum, lsi->lbs) { + if (stop_parallel_processing()) { + return NULL; + } + build_lswitch_arp_nd_service_monitor(lb, lsi->lflows, + &lsi->match, + &lsi->actions); + } + } + for (bnum = control->id; + bnum <= lsi->igmp_groups->mask; + bnum += workload->pool->size) + { + HMAP_FOR_EACH_IN_PARALLEL ( + igmp_group, hmap_node, bnum, lsi->igmp_groups) { + if (stop_parallel_processing()) { + return NULL; + } + build_lswitch_ip_mcast_igmp_mld(igmp_group, lsi->lflows, + &lsi->match, + &lsi->actions); + } + } + } + post_completed_work(control); + } + return NULL; +} + +static bool pool_init_done = false; +static struct lflows_thread_pool *build_lflows_pool = NULL; + +static void init_lflows_thread_pool(void) +{ + int index; + + if (!pool_init_done) { + struct worker_pool *pool = add_worker_pool(build_lflows_thread); + pool_init_done = true; + if (pool) { + build_lflows_pool = xmalloc(sizeof(*build_lflows_pool)); + build_lflows_pool->pool = pool; + for (index = 0; index < build_lflows_pool->pool->size; index++) { + build_lflows_pool->pool->controls[index].workload = + build_lflows_pool; + } + } + } +} + +/* TODO: replace hard cutoffs by configurable via commands. These are + * temporary defines to determine single-thread to multi-thread processing + * cutoff. + * Setting to 1 forces "all parallel" lflow build. + */ + +static void +noop_callback(struct worker_pool *pool OVS_UNUSED, + void *fin_result OVS_UNUSED, + void *result_frags OVS_UNUSED, + int index OVS_UNUSED) +{ + /* Do nothing */ +} + + static void build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports, struct hmap *port_groups, struct hmap *lflows, @@ -11850,53 +12014,114 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports, struct shash *meter_groups, struct hmap *lbs, struct hmap *bfd_connections) { - struct ovn_datapath *od; - struct ovn_port *op; - struct ovn_northd_lb *lb; - struct ovn_igmp_group *igmp_group; char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac); - struct lswitch_flow_build_info lsi = { - .datapaths = datapaths, - .ports = ports, - .port_groups = port_groups, - .lflows = lflows, - .mcgroups = mcgroups, - .igmp_groups = igmp_groups, - .meter_groups = meter_groups, - .lbs = lbs, - .bfd_connections = bfd_connections, - .svc_check_match = svc_check_match, - .match = DS_EMPTY_INITIALIZER, - .actions = DS_EMPTY_INITIALIZER, - }; - - /* Combined build - all lflow generation from lswitch and lrouter - * will move here and will be reogranized by iterator type. - */ - HMAP_FOR_EACH (od, key_node, datapaths) { - build_lswitch_and_lrouter_iterate_by_od(od, &lsi); - } - HMAP_FOR_EACH (op, key_node, ports) { - build_lswitch_and_lrouter_iterate_by_op(op, &lsi); - } - HMAP_FOR_EACH (lb, hmap_node, lbs) { - build_lswitch_arp_nd_service_monitor(lb, lsi.lflows, - &lsi.actions, - &lsi.match); - } - HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) { - build_lswitch_ip_mcast_igmp_mld(igmp_group, - lsi.lflows, - &lsi.actions, - &lsi.match); + if (use_parallel_build) { + init_lflows_thread_pool(); + if (!can_parallelize_hashes(false)) { + use_parallel_build = false; + } } - free(svc_check_match); - ds_destroy(&lsi.match); - ds_destroy(&lsi.actions); + if (use_parallel_build) { + struct hmap *lflow_segs; + struct lswitch_flow_build_info *lsiv; + int index; + + lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->pool->size); + if (use_logical_dp_groups) { + lflow_segs = NULL; + } else { + lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->pool->size); + } + + /* Set up "work chunks" for each thread to work on. */ + + for (index = 0; index < build_lflows_pool->pool->size; index++) { + if (use_logical_dp_groups) { + /* if dp_groups are in use we lock a shared lflows hash + * on a per-bucket level instead of merging hash frags */ + lsiv[index].lflows = lflows; + } else { + fast_hmap_init(&lflow_segs[index], lflows->mask); + lsiv[index].lflows = &lflow_segs[index]; + } + lsiv[index].datapaths = datapaths; + lsiv[index].ports = ports; + lsiv[index].port_groups = port_groups; + lsiv[index].mcgroups = mcgroups; + lsiv[index].igmp_groups = igmp_groups; + lsiv[index].meter_groups = meter_groups; + lsiv[index].lbs = lbs; + lsiv[index].bfd_connections = bfd_connections; + lsiv[index].svc_check_match = svc_check_match; + ds_init(&lsiv[index].match); + ds_init(&lsiv[index].actions); + + build_lflows_pool->pool->controls[index].data = &lsiv[index]; + } + + /* Run thread pool. */ + if (use_logical_dp_groups) { + run_pool_callback(build_lflows_pool->pool, NULL, NULL, noop_callback); + } else { + run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs); + } + + for (index = 0; index < build_lflows_pool->pool->size; index++) { + ds_destroy(&lsiv[index].match); + ds_destroy(&lsiv[index].actions); + } + free(lflow_segs); + free(lsiv); + } else { + struct ovn_datapath *od; + struct ovn_port *op; + struct ovn_northd_lb *lb; + struct ovn_igmp_group *igmp_group; + struct lswitch_flow_build_info lsi = { + .datapaths = datapaths, + .ports = ports, + .port_groups = port_groups, + .lflows = lflows, + .mcgroups = mcgroups, + .igmp_groups = igmp_groups, + .meter_groups = meter_groups, + .lbs = lbs, + .bfd_connections = bfd_connections, + .svc_check_match = svc_check_match, + .match = DS_EMPTY_INITIALIZER, + .actions = DS_EMPTY_INITIALIZER, + }; + + /* Combined build - all lflow generation from lswitch and lrouter + * will move here and will be reogranized by iterator type. + */ + HMAP_FOR_EACH (od, key_node, datapaths) { + build_lswitch_and_lrouter_iterate_by_od(od, &lsi); + } + HMAP_FOR_EACH (op, key_node, ports) { + build_lswitch_and_lrouter_iterate_by_op(op, &lsi); + } + HMAP_FOR_EACH (lb, hmap_node, lbs) { + build_lswitch_arp_nd_service_monitor(lb, lsi.lflows, + &lsi.actions, + &lsi.match); + } + HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) { + build_lswitch_ip_mcast_igmp_mld(igmp_group, + lsi.lflows, + &lsi.actions, + &lsi.match); + } + + ds_destroy(&lsi.match); + ds_destroy(&lsi.actions); + } + + free(svc_check_match); build_lswitch_flows(datapaths, lflows); } @@ -11967,6 +12192,8 @@ ovn_sb_set_lflow_logical_dp_group( sbrec_logical_flow_set_logical_dp_group(sbflow, dpg->dp_group); } +static ssize_t max_seen_lflow_size = 128; + /* Updates the Logical_Flow and Multicast_Group tables in the OVN_SB database, * constructing their contents based on the OVN_NB database. */ static void @@ -11976,13 +12203,21 @@ build_lflows(struct northd_context *ctx, struct hmap *datapaths, struct shash *meter_groups, struct hmap *lbs, struct hmap *bfd_connections) { - struct hmap lflows = HMAP_INITIALIZER(&lflows); + struct hmap lflows; + fast_hmap_size_for(&lflows, max_seen_lflow_size); + if (use_parallel_build) { + update_hashrow_locks(&lflows, &lflow_locks); + } build_lswitch_and_lrouter_flows(datapaths, ports, port_groups, &lflows, mcgroups, igmp_groups, meter_groups, lbs, bfd_connections); + if (hmap_count(&lflows) > max_seen_lflow_size) { + max_seen_lflow_size = hmap_count(&lflows); + } + /* Collecting all unique datapath groups. */ struct hmap dp_groups = HMAP_INITIALIZER(&dp_groups); struct hmapx single_dp_lflows = HMAPX_INITIALIZER(&single_dp_lflows); @@ -13784,6 +14019,9 @@ main(int argc, char *argv[]) daemonize_complete(); + init_hash_row_locks(&lflow_locks); + use_parallel_build = can_parallelize_hashes(false); + /* We want to detect (almost) all changes to the ovn-nb db. */ struct ovsdb_idl_loop ovnnb_idl_loop = OVSDB_IDL_LOOP_INITIALIZER( ovsdb_idl_create(ovnnb_db, &nbrec_idl_class, true, true)); @@ -14052,6 +14290,7 @@ main(int argc, char *argv[]) exiting = false; state.had_lock = false; state.paused = false; + while (!exiting) { memory_run(); if (memory_should_report()) {