Message ID | 20200925095807.19358-4-anton.ivanov@cambridgegreys.com |
---|---|
State | Superseded |
Headers | show |
Series | [ovs-dev,v4,1/9] ovn-libs: Add support for parallel processing | expand |
On 9/25/20 5:58 AM, anton.ivanov@cambridgegreys.com wrote: > From: Anton Ivanov <anton.ivanov@cambridgegreys.com> > > 1. Add support for parallel lflow build. > 2. Move combined lflow generation to be build in parallel. > > Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com> > --- > northd/ovn-northd.c | 197 ++++++++++++++++++++++++++++++++++++++------ > 1 file changed, 171 insertions(+), 26 deletions(-) > > diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c > index 66b6c2985..225f4ca8e 100644 > --- a/northd/ovn-northd.c > +++ b/northd/ovn-northd.c > @@ -48,6 +48,7 @@ > #include "unixctl.h" > #include "util.h" > #include "uuid.h" > +#include "fasthmap.h" > #include "openvswitch/vlog.h" > > VLOG_DEFINE_THIS_MODULE(ovn_northd); > @@ -4189,7 +4190,7 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct ovn_datapath *od, > ovn_lflow_init(lflow, od, stage, priority, > xstrdup(match), xstrdup(actions), > ovn_lflow_hint(stage_hint), where); > - hmap_insert(lflow_map, &lflow->hmap_node, ovn_lflow_hash(lflow)); > + hmap_insert_fast(lflow_map, &lflow->hmap_node, ovn_lflow_hash(lflow)); > } > > /* Adds a row with the specified contents to the Logical_Flow table. */ > @@ -11331,6 +11332,8 @@ struct lswitch_flow_build_info { > > /* Helper function to combine all lflow generation which is iterated by > * datapath. > + * Invoked by parallel build over a "chunk" of work or by single threaded > + * build over a chunk which is initialized to contain "all" work. > */ > > static void > @@ -11357,6 +11360,8 @@ build_lswitch_and_lrouter_iterate_by_od( > } > > /* Helper function to combine all lflow generation which is iterated by port. > + * Invoked by parallel build over a "chunk" of work or by single threaded > + * build over a chunk which is initialized to contain "all" work. > */ > > static void > @@ -11379,6 +11384,85 @@ build_lswitch_and_lrouter_iterate_by_op( > &lsi->actions); > } > > +struct lflows_thread_pool { > + struct worker_pool *pool; > +}; > + > +static void *build_lflows_thread(void *arg) { > + struct worker_control *control = (struct worker_control *) arg; > + struct lflows_thread_pool *workload; > + struct lswitch_flow_build_info *lsi; > + > + struct ovn_datapath *od; > + struct ovn_port *op; > + int bnum; > + > + while (!seize_fire()) { > + sem_wait(&control->fire); > + workload = (struct lflows_thread_pool *) control->workload; > + lsi = (struct lswitch_flow_build_info *) control->data; > + if (lsi && workload) { > + /* Iterate over bucket ThreadID, ThreadID+size, ... */ > + for (bnum = control->id; > + bnum <= lsi->datapaths->mask; > + bnum += workload->pool->size) > + { > + HMAP_FOR_EACH_IN_PARALLEL ( > + od, key_node, bnum, lsi->datapaths) { > + if (seize_fire()) { > + return NULL; > + } > + build_lswitch_and_lrouter_iterate_by_od(od, lsi); > + } > + } > + for (bnum = control->id; > + bnum <= lsi->ports->mask; > + bnum += workload->pool->size) > + { > + HMAP_FOR_EACH_IN_PARALLEL ( > + op, key_node, bnum, lsi->ports) { > + if (seize_fire()) { > + return NULL; > + } > + build_lswitch_and_lrouter_iterate_by_op(op, lsi); > + } > + } > + atomic_store_relaxed(&control->finished, true); > + atomic_thread_fence(memory_order_release); > + } > + sem_post(control->done); > + } > + return NULL; > +} > + > +static struct lflows_thread_pool *build_lflows_pool = NULL; > + > +static void init_lflows_thread_pool(void) > +{ > + int index; > + > + if (!build_lflows_pool) { > + build_lflows_pool = > + xmalloc(sizeof(struct lflows_thread_pool)); > + build_lflows_pool->pool = > + add_worker_pool(build_lflows_thread); > + > + for (index = 0; index < build_lflows_pool->pool->size; index++) { > + build_lflows_pool->pool->controls[index].workload = > + build_lflows_pool; > + } > + } > +} > + > +/* TODO: replace hard cutoffs by configurable via commands. These are > + * temporary defines to determine single-thread to multi-thread processing > + * cutoff. > + * Setting to 1 forces "all parallel" lflow build. > + */ > + > +#define OD_CUTOFF 1 > +#define OP_CUTOFF 1 > + > static void > build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports, > struct hmap *port_groups, struct hmap *lflows, > @@ -11386,35 +11470,89 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports, > struct shash *meter_groups, > struct hmap *lbs) > { > + char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac); > > - struct ovn_datapath *od; > - struct ovn_port *op; > + if (hmap_count(datapaths) > OD_CUTOFF || hmap_count(ports) > OP_CUTOFF) { > > - char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac); > + struct hmap *lflow_segs; > + struct lswitch_flow_build_info *lsiv; > + int index; > > - struct lswitch_flow_build_info lsi = { > - .datapaths = datapaths, > - .ports = ports, > - .port_groups = port_groups, > - .lflows = lflows, > - .mcgroups = mcgroups, > - .igmp_groups = igmp_groups, > - .meter_groups = meter_groups, > - .lbs = lbs, > - .svc_check_match = svc_check_match, > - .match = DS_EMPTY_INITIALIZER, > - .actions = DS_EMPTY_INITIALIZER, > - }; > + init_lflows_thread_pool(); > + lsiv = xmalloc( > + sizeof(struct lswitch_flow_build_info) * > + build_lflows_pool->pool->size); > + lflow_segs = xmalloc( > + sizeof(struct hmap) * build_lflows_pool->pool->size); > > - /* Combined build - all lflow generation from lswitch and lrouter > - * will move here and will be reogranized by iterator type. > - */ > - HMAP_FOR_EACH (od, key_node, datapaths) { > - build_lswitch_and_lrouter_iterate_by_od(od, &lsi); > - } > - HMAP_FOR_EACH (op, key_node, ports) { > - build_lswitch_and_lrouter_iterate_by_op(op, &lsi); > + /* Set up "work chunks" for each thread to work on. */ > + > + for (index = 0; index < build_lflows_pool->pool->size; index++) { > + fast_hmap_init(&lflow_segs[index], lflows->mask); > + > + lsiv[index].datapaths = datapaths; > + lsiv[index].ports = ports; > + lsiv[index].port_groups = port_groups; > + lsiv[index].lflows = &lflow_segs[index]; > + lsiv[index].mcgroups = mcgroups; > + lsiv[index].igmp_groups = igmp_groups; > + lsiv[index].meter_groups = meter_groups; > + lsiv[index].lbs = lbs; > + lsiv[index].svc_check_match = svc_check_match; > + > + /* This cast is needed. While you can initialize without > + * casting, you cannot assign a struct at once without a cast > + * and that is what DS_EMPTY_INITALIZER returns at present. */ > + > + lsiv[index].match = (struct ds) DS_EMPTY_INITIALIZER; > + lsiv[index].actions = (struct ds) DS_EMPTY_INITIALIZER; You can avoid the cast (and the need for the comment) by calling ds_init() instead of using the initializer macro. > + > + build_lflows_pool->pool->controls[index].data = &lsiv[index]; > + } > + > + /* Run thread pool. */ > + > + run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs); > + > + for (index = 0; index < build_lflows_pool->pool->size; index++) { > + ds_destroy(&lsiv[index].match); > + ds_destroy(&lsiv[index].actions); > + } > + > + free(lflow_segs); > + free(lsiv); > + } else { > + struct ovn_datapath *od; > + struct ovn_port *op; > + > + struct lswitch_flow_build_info lsi = { > + .datapaths = datapaths, > + .ports = ports, > + .port_groups = port_groups, > + .lflows = lflows, > + .mcgroups = mcgroups, > + .igmp_groups = igmp_groups, > + .meter_groups = meter_groups, > + .lbs = lbs, > + .svc_check_match = svc_check_match, > + .match = DS_EMPTY_INITIALIZER, > + .actions = DS_EMPTY_INITIALIZER, > + }; > + > + > + /* Converged build - all lflow generation from lswitch and lrouter > + * will move here and will be reogranized by iterator type. > + */ > + HMAP_FOR_EACH (od, key_node, datapaths) { > + build_lswitch_and_lrouter_iterate_by_od(od, &lsi); > + } > + HMAP_FOR_EACH (op, key_node, ports) { > + build_lswitch_and_lrouter_iterate_by_op(op, &lsi); > + } > + ds_destroy(&lsi.match); > + ds_destroy(&lsi.actions); > } > + > free(svc_check_match); > > /* Legacy lswitch build - to be migrated. */ > @@ -11425,6 +11563,7 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports, > build_lrouter_flows(datapaths, ports, lflows, meter_groups, lbs); > } > > +static ssize_t max_seen_lflow_size = 128; > > /* Updates the Logical_Flow and Multicast_Group tables in the OVN_SB database, > * constructing their contents based on the OVN_NB database. */ > @@ -11435,12 +11574,18 @@ build_lflows(struct northd_context *ctx, struct hmap *datapaths, > struct shash *meter_groups, > struct hmap *lbs) > { > - struct hmap lflows = HMAP_INITIALIZER(&lflows); > + struct hmap lflows; > + > + fast_hmap_size_for(&lflows, max_seen_lflow_size); > > build_lswitch_and_lrouter_flows(datapaths, ports, > port_groups, &lflows, mcgroups, > igmp_groups, meter_groups, lbs); > > + if (hmap_count(&lflows) > max_seen_lflow_size) { > + max_seen_lflow_size = hmap_count(&lflows); > + } > + > /* Push changes to the Logical_Flow table to database. */ > const struct sbrec_logical_flow *sbflow, *next_sbflow; > SBREC_LOGICAL_FLOW_FOR_EACH_SAFE (sbflow, next_sbflow, ctx->ovnsb_idl) { >
diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c index 66b6c2985..225f4ca8e 100644 --- a/northd/ovn-northd.c +++ b/northd/ovn-northd.c @@ -48,6 +48,7 @@ #include "unixctl.h" #include "util.h" #include "uuid.h" +#include "fasthmap.h" #include "openvswitch/vlog.h" VLOG_DEFINE_THIS_MODULE(ovn_northd); @@ -4189,7 +4190,7 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct ovn_datapath *od, ovn_lflow_init(lflow, od, stage, priority, xstrdup(match), xstrdup(actions), ovn_lflow_hint(stage_hint), where); - hmap_insert(lflow_map, &lflow->hmap_node, ovn_lflow_hash(lflow)); + hmap_insert_fast(lflow_map, &lflow->hmap_node, ovn_lflow_hash(lflow)); } /* Adds a row with the specified contents to the Logical_Flow table. */ @@ -11331,6 +11332,8 @@ struct lswitch_flow_build_info { /* Helper function to combine all lflow generation which is iterated by * datapath. + * Invoked by parallel build over a "chunk" of work or by single threaded + * build over a chunk which is initialized to contain "all" work. */ static void @@ -11357,6 +11360,8 @@ build_lswitch_and_lrouter_iterate_by_od( } /* Helper function to combine all lflow generation which is iterated by port. + * Invoked by parallel build over a "chunk" of work or by single threaded + * build over a chunk which is initialized to contain "all" work. */ static void @@ -11379,6 +11384,85 @@ build_lswitch_and_lrouter_iterate_by_op( &lsi->actions); } +struct lflows_thread_pool { + struct worker_pool *pool; +}; + +static void *build_lflows_thread(void *arg) { + struct worker_control *control = (struct worker_control *) arg; + struct lflows_thread_pool *workload; + struct lswitch_flow_build_info *lsi; + + struct ovn_datapath *od; + struct ovn_port *op; + int bnum; + + while (!seize_fire()) { + sem_wait(&control->fire); + workload = (struct lflows_thread_pool *) control->workload; + lsi = (struct lswitch_flow_build_info *) control->data; + if (lsi && workload) { + /* Iterate over bucket ThreadID, ThreadID+size, ... */ + for (bnum = control->id; + bnum <= lsi->datapaths->mask; + bnum += workload->pool->size) + { + HMAP_FOR_EACH_IN_PARALLEL ( + od, key_node, bnum, lsi->datapaths) { + if (seize_fire()) { + return NULL; + } + build_lswitch_and_lrouter_iterate_by_od(od, lsi); + } + } + for (bnum = control->id; + bnum <= lsi->ports->mask; + bnum += workload->pool->size) + { + HMAP_FOR_EACH_IN_PARALLEL ( + op, key_node, bnum, lsi->ports) { + if (seize_fire()) { + return NULL; + } + build_lswitch_and_lrouter_iterate_by_op(op, lsi); + } + } + atomic_store_relaxed(&control->finished, true); + atomic_thread_fence(memory_order_release); + } + sem_post(control->done); + } + return NULL; +} + +static struct lflows_thread_pool *build_lflows_pool = NULL; + +static void init_lflows_thread_pool(void) +{ + int index; + + if (!build_lflows_pool) { + build_lflows_pool = + xmalloc(sizeof(struct lflows_thread_pool)); + build_lflows_pool->pool = + add_worker_pool(build_lflows_thread); + + for (index = 0; index < build_lflows_pool->pool->size; index++) { + build_lflows_pool->pool->controls[index].workload = + build_lflows_pool; + } + } +} + +/* TODO: replace hard cutoffs by configurable via commands. These are + * temporary defines to determine single-thread to multi-thread processing + * cutoff. + * Setting to 1 forces "all parallel" lflow build. + */ + +#define OD_CUTOFF 1 +#define OP_CUTOFF 1 + static void build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports, struct hmap *port_groups, struct hmap *lflows, @@ -11386,35 +11470,89 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports, struct shash *meter_groups, struct hmap *lbs) { + char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac); - struct ovn_datapath *od; - struct ovn_port *op; + if (hmap_count(datapaths) > OD_CUTOFF || hmap_count(ports) > OP_CUTOFF) { - char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac); + struct hmap *lflow_segs; + struct lswitch_flow_build_info *lsiv; + int index; - struct lswitch_flow_build_info lsi = { - .datapaths = datapaths, - .ports = ports, - .port_groups = port_groups, - .lflows = lflows, - .mcgroups = mcgroups, - .igmp_groups = igmp_groups, - .meter_groups = meter_groups, - .lbs = lbs, - .svc_check_match = svc_check_match, - .match = DS_EMPTY_INITIALIZER, - .actions = DS_EMPTY_INITIALIZER, - }; + init_lflows_thread_pool(); + lsiv = xmalloc( + sizeof(struct lswitch_flow_build_info) * + build_lflows_pool->pool->size); + lflow_segs = xmalloc( + sizeof(struct hmap) * build_lflows_pool->pool->size); - /* Combined build - all lflow generation from lswitch and lrouter - * will move here and will be reogranized by iterator type. - */ - HMAP_FOR_EACH (od, key_node, datapaths) { - build_lswitch_and_lrouter_iterate_by_od(od, &lsi); - } - HMAP_FOR_EACH (op, key_node, ports) { - build_lswitch_and_lrouter_iterate_by_op(op, &lsi); + /* Set up "work chunks" for each thread to work on. */ + + for (index = 0; index < build_lflows_pool->pool->size; index++) { + fast_hmap_init(&lflow_segs[index], lflows->mask); + + lsiv[index].datapaths = datapaths; + lsiv[index].ports = ports; + lsiv[index].port_groups = port_groups; + lsiv[index].lflows = &lflow_segs[index]; + lsiv[index].mcgroups = mcgroups; + lsiv[index].igmp_groups = igmp_groups; + lsiv[index].meter_groups = meter_groups; + lsiv[index].lbs = lbs; + lsiv[index].svc_check_match = svc_check_match; + + /* This cast is needed. While you can initialize without + * casting, you cannot assign a struct at once without a cast + * and that is what DS_EMPTY_INITALIZER returns at present. */ + + lsiv[index].match = (struct ds) DS_EMPTY_INITIALIZER; + lsiv[index].actions = (struct ds) DS_EMPTY_INITIALIZER; + + build_lflows_pool->pool->controls[index].data = &lsiv[index]; + } + + /* Run thread pool. */ + + run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs); + + for (index = 0; index < build_lflows_pool->pool->size; index++) { + ds_destroy(&lsiv[index].match); + ds_destroy(&lsiv[index].actions); + } + + free(lflow_segs); + free(lsiv); + } else { + struct ovn_datapath *od; + struct ovn_port *op; + + struct lswitch_flow_build_info lsi = { + .datapaths = datapaths, + .ports = ports, + .port_groups = port_groups, + .lflows = lflows, + .mcgroups = mcgroups, + .igmp_groups = igmp_groups, + .meter_groups = meter_groups, + .lbs = lbs, + .svc_check_match = svc_check_match, + .match = DS_EMPTY_INITIALIZER, + .actions = DS_EMPTY_INITIALIZER, + }; + + + /* Converged build - all lflow generation from lswitch and lrouter + * will move here and will be reogranized by iterator type. + */ + HMAP_FOR_EACH (od, key_node, datapaths) { + build_lswitch_and_lrouter_iterate_by_od(od, &lsi); + } + HMAP_FOR_EACH (op, key_node, ports) { + build_lswitch_and_lrouter_iterate_by_op(op, &lsi); + } + ds_destroy(&lsi.match); + ds_destroy(&lsi.actions); } + free(svc_check_match); /* Legacy lswitch build - to be migrated. */ @@ -11425,6 +11563,7 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports, build_lrouter_flows(datapaths, ports, lflows, meter_groups, lbs); } +static ssize_t max_seen_lflow_size = 128; /* Updates the Logical_Flow and Multicast_Group tables in the OVN_SB database, * constructing their contents based on the OVN_NB database. */ @@ -11435,12 +11574,18 @@ build_lflows(struct northd_context *ctx, struct hmap *datapaths, struct shash *meter_groups, struct hmap *lbs) { - struct hmap lflows = HMAP_INITIALIZER(&lflows); + struct hmap lflows; + + fast_hmap_size_for(&lflows, max_seen_lflow_size); build_lswitch_and_lrouter_flows(datapaths, ports, port_groups, &lflows, mcgroups, igmp_groups, meter_groups, lbs); + if (hmap_count(&lflows) > max_seen_lflow_size) { + max_seen_lflow_size = hmap_count(&lflows); + } + /* Push changes to the Logical_Flow table to database. */ const struct sbrec_logical_flow *sbflow, *next_sbflow; SBREC_LOGICAL_FLOW_FOR_EACH_SAFE (sbflow, next_sbflow, ctx->ovnsb_idl) {