Message ID | 20210114164155.939-3-anton.ivanov@cambridgegreys.com |
---|---|
State | Superseded |
Headers | show |
Series | [ovs-dev,v11,1/4] ovn-libs: Add support for parallel processing | expand |
On Thu, Jan 14, 2021 at 10:12 PM <anton.ivanov@cambridgegreys.com> wrote: > From: Anton Ivanov <anton.ivanov@cambridgegreys.com> > > Datapaths, ports, igmp groups and load balancers can now > be iterated over in parallel in order to speed up the lflow > generation. This decreases the time needed to generate the > logical flows by a factor of 4+ on a 6 core/12 thread CPU > without datapath groups - from 0.8-1 microseconds per flow > down to 0.2-0.3 microseconds per flow on average. > > The decrease in time to compute lflows with datapath groups > enabled is ~2 times for the same hardware - from an average of > 2.4 microseconds per flow to 1.2 microseconds per flow. > > Tested for on an 8 node, 400 pod K8 simulation resulting > in > 6K flows. > > Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com> > Please see below a few minor comments. Thanks Numan > --- > lib/fasthmap.c | 3 + > northd/ovn-northd.c | 340 +++++++++++++++++++++++++++++++++++++------- > 2 files changed, 290 insertions(+), 53 deletions(-) > > diff --git a/lib/fasthmap.c b/lib/fasthmap.c > index 2be93e6a8..24acb278b 100644 > --- a/lib/fasthmap.c > +++ b/lib/fasthmap.c > @@ -33,6 +33,7 @@ > > VLOG_DEFINE_THIS_MODULE(fasthmap); > > +#ifndef OVS_HAS_PARALLEL_HMAP > > /* These are accessed under mutex inside ovn_add_worker_pool(). > * They do not need to be atomic. > @@ -324,3 +325,5 @@ void ovn_run_pool_hash( > { > ovn_run_pool_callback(pool, result, result_frags, merge_hash_results); > } > + > +#endif > diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c > index dda033543..5e1d129d5 100644 > --- a/northd/ovn-northd.c > +++ b/northd/ovn-northd.c > @@ -37,6 +37,7 @@ > #include "lib/ovn-sb-idl.h" > #include "lib/ovn-util.h" > #include "lib/lb.h" > +#include "lib/fasthmap.h" > #include "ovn/actions.h" > #include "ovn/logical-fields.h" > #include "packets.h" > @@ -4174,6 +4175,34 @@ ovn_lflow_init(struct ovn_lflow *lflow, struct > ovn_datapath *od, > /* If this option is 'true' northd will combine logical flows that > differs by > * logical datapath only by creating a datapah group. */ > static bool use_logical_dp_groups = false; > +static bool use_parallel_build = true; > + > +static struct ovs_mutex *slice_locks = NULL; > + > +/* Adds a row with the specified contents to the Logical_Flow table. > + * Version to use when locking is required. > + */ > +static void > +do_ovn_lflow_add_locked(struct hmap *lflow_map, bool shared, > + struct ovn_datapath *od, > + uint32_t hash, struct ovn_lflow *lflow) > +{ > + > + struct ovn_lflow *old_lflow; > + > + if (shared && use_logical_dp_groups) { > + old_lflow = ovn_lflow_find_by_lflow(lflow_map, lflow, hash); > + if (old_lflow) { > + ovn_lflow_destroy(NULL, lflow); > + hmapx_add(&old_lflow->od_group, od); > + return; > + } > + } > + > + hmapx_add(&lflow->od_group, od); > + hmap_insert_fast(lflow_map, &lflow->hmap_node, hash); > +} > + > > /* Adds a row with the specified contents to the Logical_Flow table. */ > static void > @@ -4184,7 +4213,7 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct > ovn_datapath *od, > { > ovs_assert(ovn_stage_to_datapath_type(stage) == > ovn_datapath_get_type(od)); > > - struct ovn_lflow *old_lflow, *lflow; > + struct ovn_lflow *lflow; > uint32_t hash; > > lflow = xmalloc(sizeof *lflow); > @@ -4196,17 +4225,14 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct > ovn_datapath *od, > ovn_lflow_hint(stage_hint), where); > > hash = ovn_lflow_hash(lflow); > - if (shared && use_logical_dp_groups) { > - old_lflow = ovn_lflow_find_by_lflow(lflow_map, lflow, hash); > - if (old_lflow) { > - ovn_lflow_destroy(NULL, lflow); > - hmapx_add(&old_lflow->od_group, od); > - return; > - } > - } > > - hmapx_add(&lflow->od_group, od); > - hmap_insert(lflow_map, &lflow->hmap_node, hash); > + if (use_logical_dp_groups && use_parallel_build) { > + ovs_mutex_lock(&slice_locks[hash % lflow_map->mask]); > + do_ovn_lflow_add_locked(lflow_map, shared, od, hash, lflow); > + ovs_mutex_unlock(&slice_locks[hash % lflow_map->mask]); > + } else { > + do_ovn_lflow_add_locked(lflow_map, shared, od, hash, lflow); > + } > } > > /* Adds a row with the specified contents to the Logical_Flow table. */ > @@ -7348,6 +7374,8 @@ build_lswitch_ip_mcast_igmp_mld(struct > ovn_igmp_group *igmp_group, > } > } > > +static struct ovs_mutex mcgroup_mutex = OVS_MUTEX_INITIALIZER; > + > /* Ingress table 19: Destination lookup, unicast handling (priority 50), > */ > static void > build_lswitch_ip_unicast_lookup(struct ovn_port *op, > @@ -7386,7 +7414,9 @@ build_lswitch_ip_unicast_lookup(struct ovn_port *op, > &op->nbsp->header_); > } else if (!strcmp(op->nbsp->addresses[i], "unknown")) { > if (lsp_is_enabled(op->nbsp)) { > + ovs_mutex_lock(&mcgroup_mutex); > ovn_multicast_add(mcgroups, &mc_unknown, op); > + ovs_mutex_unlock(&mcgroup_mutex); > op->od->has_unknown = true; > } > } else if (is_dynamic_lsp_address(op->nbsp->addresses[i])) { > @@ -11676,6 +11706,125 @@ build_lswitch_and_lrouter_iterate_by_op(struct > ovn_port *op, > &lsi->match, &lsi->actions); > } > > +struct lflows_thread_pool { > + struct worker_pool *pool; > +}; > + > +static void *build_lflows_thread(void *arg) { > + struct worker_control *control = (struct worker_control *) arg; > + struct lflows_thread_pool *workload; > + struct lswitch_flow_build_info *lsi; > + > + struct ovn_datapath *od; > + struct ovn_port *op; > + struct ovn_northd_lb *lb; > + struct ovn_igmp_group *igmp_group; > + int bnum; > + > + while (!cease_fire()) { > + sem_wait(&control->fire); > + workload = (struct lflows_thread_pool *) control->workload; > + lsi = (struct lswitch_flow_build_info *) control->data; > + if (lsi && workload) { > + /* Iterate over bucket ThreadID, ThreadID+size, ... */ > + for (bnum = control->id; > + bnum <= lsi->datapaths->mask; > + bnum += workload->pool->size) > + { > + HMAP_FOR_EACH_IN_PARALLEL ( > + od, key_node, bnum, lsi->datapaths) { > + if (cease_fire()) { > + return NULL; > + } > + build_lswitch_and_lrouter_iterate_by_od(od, lsi); > + } > + } > + for (bnum = control->id; > + bnum <= lsi->ports->mask; > + bnum += workload->pool->size) > + { > + HMAP_FOR_EACH_IN_PARALLEL ( > + op, key_node, bnum, lsi->ports) { > + if (cease_fire()) { > + return NULL; > + } > + build_lswitch_and_lrouter_iterate_by_op(op, lsi); > + } > + } > + for (bnum = control->id; > + bnum <= lsi->lbs->mask; > + bnum += workload->pool->size) > + { > + HMAP_FOR_EACH_IN_PARALLEL ( > + lb, hmap_node, bnum, lsi->lbs) { > + if (cease_fire()) { > + return NULL; > + } > + build_lswitch_arp_nd_service_monitor(lb, lsi->lflows, > + &lsi->match, > + &lsi->actions); > + } > + } > + for (bnum = control->id; > + bnum <= lsi->igmp_groups->mask; > + bnum += workload->pool->size) > + { > + HMAP_FOR_EACH_IN_PARALLEL ( > + igmp_group, hmap_node, bnum, lsi->igmp_groups) { > + if (cease_fire()) { > + return NULL; > + } > + build_lswitch_ip_mcast_igmp_mld(igmp_group, > lsi->lflows, > + &lsi->match, > + &lsi->actions); > + } > + } > + atomic_store_relaxed(&control->finished, true); > + atomic_thread_fence(memory_order_acq_rel); > + } > + sem_post(control->done); > + } > + return NULL; > +} > + > +static bool pool_init_done = false; > +static struct lflows_thread_pool *build_lflows_pool = NULL; > + > +static void init_lflows_thread_pool(void) > +{ > + int index; > + > + if (!pool_init_done) { > + struct worker_pool *pool = add_worker_pool(build_lflows_thread); > + pool_init_done = true; > + if (pool) { > + build_lflows_pool = > + xmalloc(sizeof(struct lflows_thread_pool)); > As per the guidelines, using expression is preferred over data type in sizeof operator - https://github.com/ovn-org/ovn/blob/master/Documentation/internals/contributing/coding-style.rst I would suggest to change the above to - build_lflows_pool = xmalloc(sizeof *build_lflows_pool); I would suggest doing the same for few other places in the patch series. + build_lflows_pool->pool = pool; > + for (index = 0; index < build_lflows_pool->pool->size; > index++) { > + build_lflows_pool->pool->controls[index].workload = > + build_lflows_pool; > + } > + } > + } > +} > + > +/* TODO: replace hard cutoffs by configurable via commands. These are > + * temporary defines to determine single-thread to multi-thread processing > + * cutoff. > + * Setting to 1 forces "all parallel" lflow build. > + */ > + > +static void > +noop_callback(struct worker_pool *pool OVS_UNUSED, > + void *fin_result OVS_UNUSED, > + void *result_frags OVS_UNUSED, > + int index OVS_UNUSED) > +{ > + /* Do nothing */ > +} > + > + > static void > build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap > *ports, > struct hmap *port_groups, struct hmap > *lflows, > @@ -11684,53 +11833,111 @@ build_lswitch_and_lrouter_flows(struct hmap > *datapaths, struct hmap *ports, > struct shash *meter_groups, struct hmap > *lbs, > struct hmap *bfd_connections) > { > - struct ovn_datapath *od; > - struct ovn_port *op; > - struct ovn_northd_lb *lb; > - struct ovn_igmp_group *igmp_group; > > char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac); > > - struct lswitch_flow_build_info lsi = { > - .datapaths = datapaths, > - .ports = ports, > - .port_groups = port_groups, > - .lflows = lflows, > - .mcgroups = mcgroups, > - .igmp_groups = igmp_groups, > - .meter_groups = meter_groups, > - .lbs = lbs, > - .bfd_connections = bfd_connections, > - .svc_check_match = svc_check_match, > - .match = DS_EMPTY_INITIALIZER, > - .actions = DS_EMPTY_INITIALIZER, > - }; > + if (use_parallel_build) { > + init_lflows_thread_pool(); > + struct hmap *lflow_segs; > + struct lswitch_flow_build_info *lsiv; > + int index; > + > + lsiv = xmalloc( > + sizeof(struct lswitch_flow_build_info) * > + build_lflows_pool->pool->size); > + if (use_logical_dp_groups) { > + lflow_segs = NULL; > + } else { > + lflow_segs = xmalloc( > + sizeof(struct hmap) * build_lflows_pool->pool->size); > Same here - lflow_segs = xmalloc( + sizeof *lflow_segs * build_lflows_pool->pool->size); + } > > - /* Combined build - all lflow generation from lswitch and lrouter > - * will move here and will be reogranized by iterator type. > - */ > - HMAP_FOR_EACH (od, key_node, datapaths) { > - build_lswitch_and_lrouter_iterate_by_od(od, &lsi); > - } > - HMAP_FOR_EACH (op, key_node, ports) { > - build_lswitch_and_lrouter_iterate_by_op(op, &lsi); > - } > - HMAP_FOR_EACH (lb, hmap_node, lbs) { > - build_lswitch_arp_nd_service_monitor(lb, lsi.lflows, > - &lsi.actions, > - &lsi.match); > - } > - HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) { > - build_lswitch_ip_mcast_igmp_mld(igmp_group, > - lsi.lflows, > - &lsi.actions, > - &lsi.match); > - } > - free(svc_check_match); > + /* Set up "work chunks" for each thread to work on. */ > > - ds_destroy(&lsi.match); > - ds_destroy(&lsi.actions); > + for (index = 0; index < build_lflows_pool->pool->size; index++) { > + if (use_logical_dp_groups) { > + /* if dp_groups are in use we lock a shared lflows hash > + * on a per-bucket level instead of merging hash frags */ > + lsiv[index].lflows = lflows; > + } else { > + fast_hmap_init(&lflow_segs[index], lflows->mask); > + lsiv[index].lflows = &lflow_segs[index]; > + } > + > + lsiv[index].datapaths = datapaths; > + lsiv[index].ports = ports; > + lsiv[index].port_groups = port_groups; > + lsiv[index].mcgroups = mcgroups; > + lsiv[index].igmp_groups = igmp_groups; > + lsiv[index].meter_groups = meter_groups; > + lsiv[index].lbs = lbs; > + lsiv[index].bfd_connections = bfd_connections; > + lsiv[index].svc_check_match = svc_check_match; > + ds_init(&lsiv[index].match); > + ds_init(&lsiv[index].actions); > + > + build_lflows_pool->pool->controls[index].data = &lsiv[index]; > + } > + > + /* Run thread pool. */ > + if (use_logical_dp_groups) { > + run_pool_callback(build_lflows_pool->pool, NULL, NULL, > noop_callback); > + } else { > + run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs); > + } > + > + for (index = 0; index < build_lflows_pool->pool->size; index++) { > + ds_destroy(&lsiv[index].match); > + ds_destroy(&lsiv[index].actions); > + } > + free(lflow_segs); > + free(lsiv); > + } else { > + struct ovn_datapath *od; > + struct ovn_port *op; > + struct ovn_northd_lb *lb; > + struct ovn_igmp_group *igmp_group; > + struct lswitch_flow_build_info lsi = { > + .datapaths = datapaths, > + .ports = ports, > + .port_groups = port_groups, > + .lflows = lflows, > + .mcgroups = mcgroups, > + .igmp_groups = igmp_groups, > + .meter_groups = meter_groups, > + .lbs = lbs, > + .bfd_connections = bfd_connections, > + .svc_check_match = svc_check_match, > + .match = DS_EMPTY_INITIALIZER, > + .actions = DS_EMPTY_INITIALIZER, > + }; > + > + /* Combined build - all lflow generation from lswitch and lrouter > + * will move here and will be reogranized by iterator type. > + */ > + HMAP_FOR_EACH (od, key_node, datapaths) { > + build_lswitch_and_lrouter_iterate_by_od(od, &lsi); > + } > + HMAP_FOR_EACH (op, key_node, ports) { > + build_lswitch_and_lrouter_iterate_by_op(op, &lsi); > + } > + HMAP_FOR_EACH (lb, hmap_node, lbs) { > + build_lswitch_arp_nd_service_monitor(lb, lsi.lflows, > + &lsi.actions, > + &lsi.match); > + } > + HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) { > + build_lswitch_ip_mcast_igmp_mld(igmp_group, > + lsi.lflows, > + &lsi.actions, > + &lsi.match); > + } > + > + ds_destroy(&lsi.match); > + ds_destroy(&lsi.actions); > + } > > + free(svc_check_match); > build_lswitch_flows(datapaths, lflows); > } > > @@ -11801,6 +12008,25 @@ ovn_sb_set_lflow_logical_dp_group( > sbrec_logical_flow_set_logical_dp_group(sbflow, dpg->dp_group); > } > > +static ssize_t max_seen_lflow_size = 128; > + > +static ssize_t recent_lflow_map_mask = 0; > + > +static void update_lock_array(struct hmap *lflows) > +{ > + int i; > + if (recent_lflow_map_mask != lflows->mask) { > + if (slice_locks) { > + free(slice_locks); > + } > + slice_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask + 1); > + recent_lflow_map_mask = lflows->mask; > + for (i = 0; i <= lflows->mask; i++) { > + ovs_mutex_init(&slice_locks[i]); > + } > + } > +} > + > /* Updates the Logical_Flow and Multicast_Group tables in the OVN_SB > database, > * constructing their contents based on the OVN_NB database. */ > static void > @@ -11810,13 +12036,21 @@ build_lflows(struct northd_context *ctx, struct > hmap *datapaths, > struct shash *meter_groups, > struct hmap *lbs, struct hmap *bfd_connections) > { > - struct hmap lflows = HMAP_INITIALIZER(&lflows); > + struct hmap lflows; > > + fast_hmap_size_for(&lflows, max_seen_lflow_size); > + if (use_parallel_build) { > + update_lock_array(&lflows); > + } > build_lswitch_and_lrouter_flows(datapaths, ports, > port_groups, &lflows, mcgroups, > igmp_groups, meter_groups, lbs, > bfd_connections); > > + if (hmap_count(&lflows) > max_seen_lflow_size) { > + max_seen_lflow_size = hmap_count(&lflows); > + } > + > /* Collecting all unique datapath groups. */ > struct hmap dp_groups = HMAP_INITIALIZER(&dp_groups); > struct hmapx single_dp_lflows = HMAPX_INITIALIZER(&single_dp_lflows); > -- > 2.20.1 > > _______________________________________________ > dev mailing list > dev@openvswitch.org > https://mail.openvswitch.org/mailman/listinfo/ovs-dev > >
diff --git a/lib/fasthmap.c b/lib/fasthmap.c index 2be93e6a8..24acb278b 100644 --- a/lib/fasthmap.c +++ b/lib/fasthmap.c @@ -33,6 +33,7 @@ VLOG_DEFINE_THIS_MODULE(fasthmap); +#ifndef OVS_HAS_PARALLEL_HMAP /* These are accessed under mutex inside ovn_add_worker_pool(). * They do not need to be atomic. @@ -324,3 +325,5 @@ void ovn_run_pool_hash( { ovn_run_pool_callback(pool, result, result_frags, merge_hash_results); } + +#endif diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c index dda033543..5e1d129d5 100644 --- a/northd/ovn-northd.c +++ b/northd/ovn-northd.c @@ -37,6 +37,7 @@ #include "lib/ovn-sb-idl.h" #include "lib/ovn-util.h" #include "lib/lb.h" +#include "lib/fasthmap.h" #include "ovn/actions.h" #include "ovn/logical-fields.h" #include "packets.h" @@ -4174,6 +4175,34 @@ ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od, /* If this option is 'true' northd will combine logical flows that differs by * logical datapath only by creating a datapah group. */ static bool use_logical_dp_groups = false; +static bool use_parallel_build = true; + +static struct ovs_mutex *slice_locks = NULL; + +/* Adds a row with the specified contents to the Logical_Flow table. + * Version to use when locking is required. + */ +static void +do_ovn_lflow_add_locked(struct hmap *lflow_map, bool shared, + struct ovn_datapath *od, + uint32_t hash, struct ovn_lflow *lflow) +{ + + struct ovn_lflow *old_lflow; + + if (shared && use_logical_dp_groups) { + old_lflow = ovn_lflow_find_by_lflow(lflow_map, lflow, hash); + if (old_lflow) { + ovn_lflow_destroy(NULL, lflow); + hmapx_add(&old_lflow->od_group, od); + return; + } + } + + hmapx_add(&lflow->od_group, od); + hmap_insert_fast(lflow_map, &lflow->hmap_node, hash); +} + /* Adds a row with the specified contents to the Logical_Flow table. */ static void @@ -4184,7 +4213,7 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct ovn_datapath *od, { ovs_assert(ovn_stage_to_datapath_type(stage) == ovn_datapath_get_type(od)); - struct ovn_lflow *old_lflow, *lflow; + struct ovn_lflow *lflow; uint32_t hash; lflow = xmalloc(sizeof *lflow); @@ -4196,17 +4225,14 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct ovn_datapath *od, ovn_lflow_hint(stage_hint), where); hash = ovn_lflow_hash(lflow); - if (shared && use_logical_dp_groups) { - old_lflow = ovn_lflow_find_by_lflow(lflow_map, lflow, hash); - if (old_lflow) { - ovn_lflow_destroy(NULL, lflow); - hmapx_add(&old_lflow->od_group, od); - return; - } - } - hmapx_add(&lflow->od_group, od); - hmap_insert(lflow_map, &lflow->hmap_node, hash); + if (use_logical_dp_groups && use_parallel_build) { + ovs_mutex_lock(&slice_locks[hash % lflow_map->mask]); + do_ovn_lflow_add_locked(lflow_map, shared, od, hash, lflow); + ovs_mutex_unlock(&slice_locks[hash % lflow_map->mask]); + } else { + do_ovn_lflow_add_locked(lflow_map, shared, od, hash, lflow); + } } /* Adds a row with the specified contents to the Logical_Flow table. */ @@ -7348,6 +7374,8 @@ build_lswitch_ip_mcast_igmp_mld(struct ovn_igmp_group *igmp_group, } } +static struct ovs_mutex mcgroup_mutex = OVS_MUTEX_INITIALIZER; + /* Ingress table 19: Destination lookup, unicast handling (priority 50), */ static void build_lswitch_ip_unicast_lookup(struct ovn_port *op, @@ -7386,7 +7414,9 @@ build_lswitch_ip_unicast_lookup(struct ovn_port *op, &op->nbsp->header_); } else if (!strcmp(op->nbsp->addresses[i], "unknown")) { if (lsp_is_enabled(op->nbsp)) { + ovs_mutex_lock(&mcgroup_mutex); ovn_multicast_add(mcgroups, &mc_unknown, op); + ovs_mutex_unlock(&mcgroup_mutex); op->od->has_unknown = true; } } else if (is_dynamic_lsp_address(op->nbsp->addresses[i])) { @@ -11676,6 +11706,125 @@ build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op, &lsi->match, &lsi->actions); } +struct lflows_thread_pool { + struct worker_pool *pool; +}; + +static void *build_lflows_thread(void *arg) { + struct worker_control *control = (struct worker_control *) arg; + struct lflows_thread_pool *workload; + struct lswitch_flow_build_info *lsi; + + struct ovn_datapath *od; + struct ovn_port *op; + struct ovn_northd_lb *lb; + struct ovn_igmp_group *igmp_group; + int bnum; + + while (!cease_fire()) { + sem_wait(&control->fire); + workload = (struct lflows_thread_pool *) control->workload; + lsi = (struct lswitch_flow_build_info *) control->data; + if (lsi && workload) { + /* Iterate over bucket ThreadID, ThreadID+size, ... */ + for (bnum = control->id; + bnum <= lsi->datapaths->mask; + bnum += workload->pool->size) + { + HMAP_FOR_EACH_IN_PARALLEL ( + od, key_node, bnum, lsi->datapaths) { + if (cease_fire()) { + return NULL; + } + build_lswitch_and_lrouter_iterate_by_od(od, lsi); + } + } + for (bnum = control->id; + bnum <= lsi->ports->mask; + bnum += workload->pool->size) + { + HMAP_FOR_EACH_IN_PARALLEL ( + op, key_node, bnum, lsi->ports) { + if (cease_fire()) { + return NULL; + } + build_lswitch_and_lrouter_iterate_by_op(op, lsi); + } + } + for (bnum = control->id; + bnum <= lsi->lbs->mask; + bnum += workload->pool->size) + { + HMAP_FOR_EACH_IN_PARALLEL ( + lb, hmap_node, bnum, lsi->lbs) { + if (cease_fire()) { + return NULL; + } + build_lswitch_arp_nd_service_monitor(lb, lsi->lflows, + &lsi->match, + &lsi->actions); + } + } + for (bnum = control->id; + bnum <= lsi->igmp_groups->mask; + bnum += workload->pool->size) + { + HMAP_FOR_EACH_IN_PARALLEL ( + igmp_group, hmap_node, bnum, lsi->igmp_groups) { + if (cease_fire()) { + return NULL; + } + build_lswitch_ip_mcast_igmp_mld(igmp_group, lsi->lflows, + &lsi->match, + &lsi->actions); + } + } + atomic_store_relaxed(&control->finished, true); + atomic_thread_fence(memory_order_acq_rel); + } + sem_post(control->done); + } + return NULL; +} + +static bool pool_init_done = false; +static struct lflows_thread_pool *build_lflows_pool = NULL; + +static void init_lflows_thread_pool(void) +{ + int index; + + if (!pool_init_done) { + struct worker_pool *pool = add_worker_pool(build_lflows_thread); + pool_init_done = true; + if (pool) { + build_lflows_pool = + xmalloc(sizeof(struct lflows_thread_pool)); + build_lflows_pool->pool = pool; + for (index = 0; index < build_lflows_pool->pool->size; index++) { + build_lflows_pool->pool->controls[index].workload = + build_lflows_pool; + } + } + } +} + +/* TODO: replace hard cutoffs by configurable via commands. These are + * temporary defines to determine single-thread to multi-thread processing + * cutoff. + * Setting to 1 forces "all parallel" lflow build. + */ + +static void +noop_callback(struct worker_pool *pool OVS_UNUSED, + void *fin_result OVS_UNUSED, + void *result_frags OVS_UNUSED, + int index OVS_UNUSED) +{ + /* Do nothing */ +} + + static void build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports, struct hmap *port_groups, struct hmap *lflows, @@ -11684,53 +11833,111 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports, struct shash *meter_groups, struct hmap *lbs, struct hmap *bfd_connections) { - struct ovn_datapath *od; - struct ovn_port *op; - struct ovn_northd_lb *lb; - struct ovn_igmp_group *igmp_group; char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac); - struct lswitch_flow_build_info lsi = { - .datapaths = datapaths, - .ports = ports, - .port_groups = port_groups, - .lflows = lflows, - .mcgroups = mcgroups, - .igmp_groups = igmp_groups, - .meter_groups = meter_groups, - .lbs = lbs, - .bfd_connections = bfd_connections, - .svc_check_match = svc_check_match, - .match = DS_EMPTY_INITIALIZER, - .actions = DS_EMPTY_INITIALIZER, - }; + if (use_parallel_build) { + init_lflows_thread_pool(); + struct hmap *lflow_segs; + struct lswitch_flow_build_info *lsiv; + int index; + + lsiv = xmalloc( + sizeof(struct lswitch_flow_build_info) * + build_lflows_pool->pool->size); + if (use_logical_dp_groups) { + lflow_segs = NULL; + } else { + lflow_segs = xmalloc( + sizeof(struct hmap) * build_lflows_pool->pool->size); + } - /* Combined build - all lflow generation from lswitch and lrouter - * will move here and will be reogranized by iterator type. - */ - HMAP_FOR_EACH (od, key_node, datapaths) { - build_lswitch_and_lrouter_iterate_by_od(od, &lsi); - } - HMAP_FOR_EACH (op, key_node, ports) { - build_lswitch_and_lrouter_iterate_by_op(op, &lsi); - } - HMAP_FOR_EACH (lb, hmap_node, lbs) { - build_lswitch_arp_nd_service_monitor(lb, lsi.lflows, - &lsi.actions, - &lsi.match); - } - HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) { - build_lswitch_ip_mcast_igmp_mld(igmp_group, - lsi.lflows, - &lsi.actions, - &lsi.match); - } - free(svc_check_match); + /* Set up "work chunks" for each thread to work on. */ - ds_destroy(&lsi.match); - ds_destroy(&lsi.actions); + for (index = 0; index < build_lflows_pool->pool->size; index++) { + if (use_logical_dp_groups) { + /* if dp_groups are in use we lock a shared lflows hash + * on a per-bucket level instead of merging hash frags */ + lsiv[index].lflows = lflows; + } else { + fast_hmap_init(&lflow_segs[index], lflows->mask); + lsiv[index].lflows = &lflow_segs[index]; + } + + lsiv[index].datapaths = datapaths; + lsiv[index].ports = ports; + lsiv[index].port_groups = port_groups; + lsiv[index].mcgroups = mcgroups; + lsiv[index].igmp_groups = igmp_groups; + lsiv[index].meter_groups = meter_groups; + lsiv[index].lbs = lbs; + lsiv[index].bfd_connections = bfd_connections; + lsiv[index].svc_check_match = svc_check_match; + ds_init(&lsiv[index].match); + ds_init(&lsiv[index].actions); + + build_lflows_pool->pool->controls[index].data = &lsiv[index]; + } + + /* Run thread pool. */ + if (use_logical_dp_groups) { + run_pool_callback(build_lflows_pool->pool, NULL, NULL, noop_callback); + } else { + run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs); + } + + for (index = 0; index < build_lflows_pool->pool->size; index++) { + ds_destroy(&lsiv[index].match); + ds_destroy(&lsiv[index].actions); + } + free(lflow_segs); + free(lsiv); + } else { + struct ovn_datapath *od; + struct ovn_port *op; + struct ovn_northd_lb *lb; + struct ovn_igmp_group *igmp_group; + struct lswitch_flow_build_info lsi = { + .datapaths = datapaths, + .ports = ports, + .port_groups = port_groups, + .lflows = lflows, + .mcgroups = mcgroups, + .igmp_groups = igmp_groups, + .meter_groups = meter_groups, + .lbs = lbs, + .bfd_connections = bfd_connections, + .svc_check_match = svc_check_match, + .match = DS_EMPTY_INITIALIZER, + .actions = DS_EMPTY_INITIALIZER, + }; + + /* Combined build - all lflow generation from lswitch and lrouter + * will move here and will be reogranized by iterator type. + */ + HMAP_FOR_EACH (od, key_node, datapaths) { + build_lswitch_and_lrouter_iterate_by_od(od, &lsi); + } + HMAP_FOR_EACH (op, key_node, ports) { + build_lswitch_and_lrouter_iterate_by_op(op, &lsi); + } + HMAP_FOR_EACH (lb, hmap_node, lbs) { + build_lswitch_arp_nd_service_monitor(lb, lsi.lflows, + &lsi.actions, + &lsi.match); + } + HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) { + build_lswitch_ip_mcast_igmp_mld(igmp_group, + lsi.lflows, + &lsi.actions, + &lsi.match); + } + + ds_destroy(&lsi.match); + ds_destroy(&lsi.actions); + } + free(svc_check_match); build_lswitch_flows(datapaths, lflows); } @@ -11801,6 +12008,25 @@ ovn_sb_set_lflow_logical_dp_group( sbrec_logical_flow_set_logical_dp_group(sbflow, dpg->dp_group); } +static ssize_t max_seen_lflow_size = 128; + +static ssize_t recent_lflow_map_mask = 0; + +static void update_lock_array(struct hmap *lflows) +{ + int i; + if (recent_lflow_map_mask != lflows->mask) { + if (slice_locks) { + free(slice_locks); + } + slice_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask + 1); + recent_lflow_map_mask = lflows->mask; + for (i = 0; i <= lflows->mask; i++) { + ovs_mutex_init(&slice_locks[i]); + } + } +} + /* Updates the Logical_Flow and Multicast_Group tables in the OVN_SB database, * constructing their contents based on the OVN_NB database. */ static void @@ -11810,13 +12036,21 @@ build_lflows(struct northd_context *ctx, struct hmap *datapaths, struct shash *meter_groups, struct hmap *lbs, struct hmap *bfd_connections) { - struct hmap lflows = HMAP_INITIALIZER(&lflows); + struct hmap lflows; + fast_hmap_size_for(&lflows, max_seen_lflow_size); + if (use_parallel_build) { + update_lock_array(&lflows); + } build_lswitch_and_lrouter_flows(datapaths, ports, port_groups, &lflows, mcgroups, igmp_groups, meter_groups, lbs, bfd_connections); + if (hmap_count(&lflows) > max_seen_lflow_size) { + max_seen_lflow_size = hmap_count(&lflows); + } + /* Collecting all unique datapath groups. */ struct hmap dp_groups = HMAP_INITIALIZER(&dp_groups); struct hmapx single_dp_lflows = HMAPX_INITIALIZER(&single_dp_lflows);